Skip to content

Commit

Permalink
Merge pull request #1633 from ClickHouse/feat-metrics
Browse files Browse the repository at this point in the history
Operation (Query) Metrics
  • Loading branch information
chernser authored May 22, 2024
2 parents e589c2f + ddc9522 commit bfc4c60
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ public class ClickHouseResponseSummary implements Serializable {
public static final class Progress implements Serializable {
private static final long serialVersionUID = -1447066780591278108L;

static final Progress EMPTY = new Progress(0L, 0L, 0L, 0L, 0L);
static final Progress EMPTY = new Progress(0L, 0L, 0L, 0L, 0L, 0L, 0L);

private final long read_rows;
private final long read_bytes;
private final long total_rows_to_read;
private final long written_rows;
private final long written_bytes;
private final long elapsed_time;
private final long result_rows;

/**
* Default constructor.
Expand All @@ -36,14 +38,17 @@ public static final class Progress implements Serializable {
* @param total_rows_to_read Total number of rows to be read
* @param written_rows Number of rows written
* @param written_bytes Volume of data written in bytes
* @param elapsed_time Query processing time in (ns)
*/
public Progress(long read_rows, long read_bytes, long total_rows_to_read, long written_rows,
long written_bytes) {
long written_bytes, long elapsed_time, long result_rows) {
this.read_rows = read_rows;
this.read_bytes = read_bytes;
this.total_rows_to_read = total_rows_to_read;
this.written_rows = written_rows;
this.written_bytes = written_bytes;
this.elapsed_time = elapsed_time;
this.result_rows = result_rows;
}

public long getReadRows() {
Expand All @@ -66,14 +71,22 @@ public long getWrittenBytes() {
return written_bytes;
}

public long getElapsedTime() {
return elapsed_time;
}

public long getResultRows() {
return result_rows;
}
public Progress add(Progress progress) {
if (progress == null) {
return this;
}

return new Progress(read_rows + progress.read_rows, read_bytes + progress.read_bytes,
total_rows_to_read + progress.total_rows_to_read, written_rows + progress.written_rows,
written_bytes + progress.written_bytes);
written_bytes + progress.written_bytes,elapsed_time + progress.elapsed_time,
result_rows + progress.result_rows);
}

public boolean isEmpty() {
Expand Down Expand Up @@ -301,6 +314,14 @@ public int getUpdateCount() {
return updates.get();
}

public long getElapsedTime() {
return progress.get().getElapsedTime();
}

public long getResultRows() {
return progress.get().getResultRows();
}

public boolean isEmpty() {
return progress.get().isEmpty() && stats.get().isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public void testConsutrctor() {
Assert.assertEquals(summary.getWrittenBytes(), 0L);
Assert.assertEquals(summary.getWrittenRows(), 0L);

Progress progress = new Progress(1L, 2L, 3L, 4L, 5L);
Progress progress = new Progress(1L, 2L, 3L, 4L, 5L,
6L, 7L);
Statistics stats = new Statistics(6L, 7L, 8L, true, 9L);
summary = new ClickHouseResponseSummary(progress, stats);
Assert.assertTrue(summary.getProgress() == progress);
Expand Down Expand Up @@ -58,7 +59,7 @@ public void testAdd() {
Assert.assertEquals(summary.getWrittenBytes(), 0L);
Assert.assertEquals(summary.getWrittenRows(), 0L);

summary.add(new Progress(1L, 2L, 3L, 4L, 5L));
summary.add(new Progress(1L, 2L, 3L, 4L, 5L, 6L, 7L));
Assert.assertEquals(summary.getReadBytes(), 2L);
Assert.assertEquals(summary.getReadRows(), 1L);
Assert.assertEquals(summary.getTotalRowsToRead(), 3L);
Expand Down Expand Up @@ -93,13 +94,15 @@ public void testUpdate() {
Assert.assertEquals(summary.getWrittenBytes(), 0L);
Assert.assertEquals(summary.getWrittenRows(), 0L);

summary.update(new Progress(1L, 2L, 3L, 4L, 5L));
summary.update(new Progress(1L, 2L, 3L, 4L, 5L, 6L, 7L));
Assert.assertEquals(summary.getReadBytes(), 2L);
Assert.assertEquals(summary.getReadRows(), 1L);
Assert.assertEquals(summary.getTotalRowsToRead(), 3L);
Assert.assertEquals(summary.getUpdateCount(), 1L);
Assert.assertEquals(summary.getWrittenBytes(), 5L);
Assert.assertEquals(summary.getWrittenRows(), 4L);
Assert.assertEquals(summary.getProgress().getElapsedTime(), 6L);
Assert.assertEquals(summary.getProgress().getResultRows(), 7L);

summary.update(new Statistics(6L, 7L, 8L, true, 9L));
Assert.assertEquals(summary.getReadBytes(), 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected ClickHouseGrpcResponse(ClickHouseConfig config, Map<String, Serializab
if (result.hasProgress()) {
Progress p = result.getProgress();
summary.update(new ClickHouseResponseSummary.Progress(p.getReadRows(), p.getReadBytes(),
p.getTotalRowsToRead(), p.getWrittenRows(), p.getWrittenBytes()));
p.getTotalRowsToRead(), p.getWrittenRows(), p.getWrittenBytes(), 0L, 0L));
}

if (result.hasStats()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected boolean updateStatus(Result result) {
if (result.hasProgress()) {
Progress p = result.getProgress();
summary.update(new ClickHouseResponseSummary.Progress(p.getReadRows(), p.getReadBytes(),
p.getTotalRowsToRead(), p.getWrittenRows(), p.getWrittenBytes()));
p.getTotalRowsToRead(), p.getWrittenRows(), p.getWrittenBytes(), 0L, 0L));
}

if (result.getCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInp
this.summary = new ClickHouseResponseSummary(
new ClickHouseResponseSummary.Progress(getLongValue(map, "read_rows"), getLongValue(map, "read_bytes"),
getLongValue(map, "total_rows_to_read"), getLongValue(map, "written_rows"),
getLongValue(map, "written_bytes")),
getLongValue(map, "written_bytes"), getLongValue(map, "elapsed_ns"),
getLongValue(map, "result_rows")),
null);

this.format = format != null ? format : connection.config.getFormat();
Expand Down
45 changes: 36 additions & 9 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.format.BinaryStreamUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -38,7 +37,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -57,6 +58,8 @@ public class Client {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private ExecutorService queryExecutor;

private Map<String, OperationStatistics.ClientStatistics> globalClientStats = new ConcurrentHashMap<>();

private Client(Set<String> endpoints, Map<String,String> configuration) {
this.endpoints = endpoints;
this.configuration = configuration;
Expand Down Expand Up @@ -280,10 +283,14 @@ public void register(Class<?> clazz, TableSchema schema) {
public InsertResponse insert(String tableName,
List<Object> data,
InsertSettings settings) throws ClientException, IOException {

String operationId = startOperation();
settings.setSetting(INTERNAL_OPERATION_ID, operationId);
globalClientStats.get(operationId).start("serialization");

if (data == null || data.isEmpty()) {
throw new IllegalArgumentException("Data cannot be empty");
}
StopWatch watch = StopWatch.createStarted();

//Add format to the settings
if (settings == null) {
Expand Down Expand Up @@ -318,8 +325,8 @@ public InsertResponse insert(String tableName,
}
}

watch.stop();
LOG.debug("Total serialization time: {}", watch.getTime());
globalClientStats.get(operationId).stop("serialization");
LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization"));
return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), settings);
}

Expand All @@ -329,7 +336,13 @@ public InsertResponse insert(String tableName,
public InsertResponse insert(String tableName,
InputStream data,
InsertSettings settings) throws IOException, ClientException {
StopWatch watch = StopWatch.createStarted();
String operationId = (String) settings.getSetting(INTERNAL_OPERATION_ID);
if (operationId == null) {
operationId = startOperation();
settings.setSetting(INTERNAL_OPERATION_ID, operationId);
}
OperationStatistics.ClientStatistics clientStats = globalClientStats.remove(operationId);
clientStats.start("insert");
InsertResponse response;
try (ClickHouseClient client = createClient()) {
ClickHouseRequest.Mutation request = createMutationRequest(client.write(getServerNode()), tableName, settings)
Expand All @@ -346,14 +359,15 @@ public InsertResponse insert(String tableName,
}
}
try {
response = new InsertResponse(client, future.get());
response = new InsertResponse(client, future.get(), clientStats);
} catch (InterruptedException | ExecutionException e) {
throw new ClientException("Operation has likely timed out.", e);
}
}

watch.stop();
LOG.debug("Total insert (InputStream) time: {}", watch.getTime());
clientStats.stop("insert");
LOG.debug("Total insert (InputStream) time: {}",
clientStats.getElapsedTime("insert"));
return response;
}

Expand All @@ -371,6 +385,9 @@ public InsertResponse insert(String tableName,
* @return
*/
public Future<QueryResponse> query(String sqlQuery, Map<String, Object> qparams, QuerySettings settings) {

OperationStatistics.ClientStatistics clientStats = new OperationStatistics.ClientStatistics();
clientStats.start("query");
ClickHouseClient client = createClient();
ClickHouseRequest<?> request = client.read(getServerNode());

Expand All @@ -393,7 +410,9 @@ public Future<QueryResponse> query(String sqlQuery, Map<String, Object> qparams,
MDC.put("queryId", settings.getQueryID());
LOG.debug("Executing request: {}", request);
try {
future.complete(new QueryResponse(client, request.execute(), settings, format));
QueryResponse queryResponse = new QueryResponse(client, request.execute(), settings, format,
clientStats);
future.complete(queryResponse);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
Expand Down Expand Up @@ -472,4 +491,12 @@ private static Set<String> createFormatWhitelist(String shouldSupport) {
}
return Collections.unmodifiableSet(formats);
}

private static final String INTERNAL_OPERATION_ID = "operationID";

private String startOperation() {
String operationId = UUID.randomUUID().toString();
globalClientStats.put(operationId, new OperationStatistics.ClientStatistics());
return operationId;
}
}
Loading

0 comments on commit bfc4c60

Please sign in to comment.