Skip to content

Commit

Permalink
Merge pull request #1693 from ClickHouse/feat_exec_api
Browse files Browse the repository at this point in the history
[client-v2] Added executCommand API
  • Loading branch information
chernser authored Jun 27, 2024
2 parents ae04bf0 + 09dfdba commit 5c8f0aa
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 2 deletions.
41 changes: 39 additions & 2 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.command.CommandSettings;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
Expand Down Expand Up @@ -672,8 +674,6 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings set
return query(sqlQuery, null, settings);
}



/**
* <p>Sends SQL query to server with parameters. The map `queryParams` should contain keys that
* match the placeholders in the SQL query.</p>
Expand Down Expand Up @@ -848,6 +848,43 @@ public TableSchema getTableSchema(String table, String database) {
}
}

/**
* <p>Executes a SQL command and doesn't care response. Useful for DDL statements, like `CREATE`, `DROP`, `ALTER`.
* Method however returns execution errors from a server or summary in case of successful execution. </p>
*
* @param sql - SQL command
* @param settings - execution settings
* @return {@code CompletableFuture<CommandResponse>} - a promise to command response
*/
public CompletableFuture<CommandResponse> execute(String sql, CommandSettings settings) {
return query(sql, settings)
.thenApplyAsync(response -> {
try {
return new CommandResponse(response);
} catch (Exception e) {
throw new ClientException("Failed to get command response", e);
}
});
}

/**
* <p>Executes a SQL command and doesn't care response. Useful for DDL statements, like `CREATE`, `DROP`, `ALTER`.
* Method however returns execution errors from a server or summary in case of successful execution. </p>
*
* @param sql - SQL command
* @return {@code CompletableFuture<CommandResponse>} - a promise to command response
*/
public CompletableFuture<CommandResponse> execute(String sql) {
return query(sql)
.thenApplyAsync(response -> {
try {
return new CommandResponse(response);
} catch (Exception e) {
throw new ClientException("Failed to get command response", e);
}
});
}

private String startOperation() {
String operationId = UUID.randomUUID().toString();
globalClientStats.put(operationId, new ClientStatisticsHolder());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.clickhouse.client.api.command;

import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.metrics.OperationMetrics;
import com.clickhouse.client.api.metrics.ServerMetrics;
import com.clickhouse.client.api.query.QueryResponse;

public class CommandResponse{

private final QueryResponse response;

public CommandResponse(QueryResponse response) {
this.response = response;
try {
response.close();
} catch (Exception e) {
throw new ClientException("Failed to close underlying resource", e);
}
}

/**
* Returns the metrics of this operation.
*
* @return metrics of this operation
*/
public OperationMetrics getMetrics() {
return response.getMetrics();
}

/**
* Alias for {@link ServerMetrics#NUM_ROWS_READ}
*
* @return number of rows read by server from the storage
*/
public long getReadRows() {
return response.getReadRows();
}

/**
* Alias for {@link ServerMetrics#NUM_BYTES_READ}
*
* @return number of bytes read by server from the storage
*/
public long getReadBytes() {
return response.getReadBytes();
}

/**
* Alias for {@link ServerMetrics#NUM_ROWS_WRITTEN}
*
* @return number of rows written by server to the storage
*/
public long getWrittenRows() {
return response.getWrittenRows();
}

/**
* Alias for {@link ServerMetrics#NUM_BYTES_WRITTEN}
*
* @return number of bytes written by server to the storage
*/
public long getWrittenBytes() {
return response.getWrittenBytes();
}

/**
* Alias for {@link ServerMetrics#ELAPSED_TIME}
*
* @return elapsed time in nanoseconds
*/
public long getServerTime() {
return response.getServerTime();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.clickhouse.client.api.command;

import com.clickhouse.client.api.query.QuerySettings;

public class CommandSettings extends QuerySettings {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.clickhouse.client.command;

import com.clickhouse.client.BaseIntegrationTest;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.command.CommandResponse;
import com.clickhouse.client.api.enums.Protocol;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

public class CommandTests extends BaseIntegrationTest {

private Client client;

@BeforeMethod(groups = {"integration"})
public void setUp() {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
client = new Client.Builder()
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
.setUsername("default")
.setPassword("")
.build();

System.out.println("Real port: " + node.getPort());
}


@Test(groups = {"integration"})
public void testCreateTable() throws Exception {
client.execute("DROP TABLE IF EXISTS test_table").get(10, TimeUnit.SECONDS);
CommandResponse response =
client.execute("CREATE TABLE IF NOT EXISTS test_table (id UInt32, name String) ENGINE = Memory")
.get(10, TimeUnit.SECONDS);

Assert.assertNotNull(response);
}

@Test(groups = {"integration"})
public void testInvalidCommandExecution() throws Exception {
CommandResponse response = client.execute("ALTER TABLE non_existing_table ADD COLUMN id2 UInt32")
.exceptionally(e -> {

if (!(e.getCause() instanceof ClientException)) {
Assert.fail("Cause should be a ClientException");
}
return null;
}).get(10, TimeUnit.SECONDS);

Assert.assertNull(response);
}
}

0 comments on commit 5c8f0aa

Please sign in to comment.