Skip to content

Commit

Permalink
Merge pull request #1200 from zhicwu/master
Browse files Browse the repository at this point in the history
consistent behavior of setting InputStream and ClickHouseWriter in ClickHouseRequest
  • Loading branch information
zhicwu authored Jan 19, 2023
2 parents 6e689ad + 84e0301 commit 3a53cf1
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 155 deletions.
53 changes: 39 additions & 14 deletions clickhouse-benchmark/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -83,6 +85,38 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>brotli4j</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-linux-x86_64</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-osx-aarch64</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-osx-x86_64</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-windows-x86_64</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
Expand Down Expand Up @@ -119,23 +153,14 @@
<configuration>
<finalName>${shade.name}</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
<filters>
<filter>
<artifact>com.clickhouse:clickhouse-jdbc</artifact>
<excludes>
<exclude>**/module-info.class</exclude>
<exclude>ru/**</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ static Process startProcess(ClickHouseRequest<?> request) {
final ClickHouseNode server = request.getServer();
final int timeout = config.getSocketTimeout();

// FIXME potential timing issue
final Optional<ClickHouseInputStream> in = request.getInputStream();

String hostDir = config.getStrOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY);
hostDir = ClickHouseUtils.normalizeDirectory(
ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir);
Expand Down Expand Up @@ -382,7 +385,7 @@ static Process startProcess(ClickHouseRequest<?> request) {
builder.redirectOutput(f);
}
}
final Optional<ClickHouseInputStream> in = request.getInputStream();

try {
final Process process;
if (in.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.clickhouse.client.ClickHouseStreamResponse;

public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse {
private static final long serialVersionUID = 4253185543390807162L;

private final transient ClickHouseCommandLine cli;

protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -17,12 +16,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -31,20 +26,17 @@
import java.util.function.Function;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.config.ClickHouseBufferingMode;
import com.clickhouse.config.ClickHouseConfigChangeListener;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseChecker;
import com.clickhouse.data.ClickHouseCompression;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseDeferredValue;
import com.clickhouse.data.ClickHouseExternalTable;
import com.clickhouse.data.ClickHouseFile;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHousePassThruStream;
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.ClickHouseUtils;
import com.clickhouse.data.ClickHouseValue;
import com.clickhouse.data.ClickHouseValues;
Expand All @@ -69,12 +61,26 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement
SPECIAL_SETTINGS = Collections.unmodifiableSet(set);
}

static class PipedWriter implements ClickHouseWriter {
private final ClickHouseDeferredValue<ClickHouseInputStream> input;

PipedWriter(ClickHouseDeferredValue<ClickHouseInputStream> input) {
this.input = input;
}

@Override
public void write(ClickHouseOutputStream output) throws IOException {
ClickHouseInputStream in = input.get();
if (in != null) {
in.pipe(output);
}
}
}

/**
* Mutation request.
*/
public static class Mutation extends ClickHouseRequest<Mutation> {
private transient ClickHouseWriter writer;

protected Mutation(ClickHouseRequest<?> request, boolean sealed) {
super(request.getClient(), request.server, request.serverRef, request.options, sealed);

Expand Down Expand Up @@ -151,16 +157,18 @@ public Mutation format(ClickHouseFormat format) {
}

/**
* Sets custom writer for streaming. This will create a piped stream between the
* writer and ClickHouse server.
* Sets custom writer for streaming. This will remove input stream set by other
* {@code data()} methods.
*
* @param writer writer
* @return mutation request
*/
public Mutation data(ClickHouseWriter writer) {
checkSealed();

this.input = changeProperty(PROP_DATA, this.input, null);
this.writer = changeProperty(PROP_WRITER, this.writer, writer);

return this;
}

Expand Down Expand Up @@ -191,6 +199,7 @@ public Mutation data(ClickHousePassThruStream stream) {
final int bufferSize = c.getReadBufferSize();
this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue
.of(() -> ClickHouseInputStream.of(stream, bufferSize, null)));
this.writer = changeProperty(PROP_WRITER, this.writer, null);
return this;
}

Expand Down Expand Up @@ -259,6 +268,7 @@ public Mutation data(ClickHouseInputStream input) {

this.input = changeProperty(PROP_DATA, this.input,
ClickHouseDeferredValue.of(input, ClickHouseInputStream.class));
this.writer = changeProperty(PROP_WRITER, this.writer, null);
return this;
}

Expand All @@ -272,83 +282,11 @@ public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
checkSealed();

this.input = changeProperty(PROP_DATA, this.input, input);
this.writer = changeProperty(PROP_WRITER, this.writer, null);

return this;
}

@Override
public CompletableFuture<ClickHouseResponse> execute() {
if (writer != null) {
final ClickHouseConfig c = getConfig();
final boolean perfMode = c.getResponseBuffering() == ClickHouseBufferingMode.PERFORMANCE;
final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(c, null);
data(stream.getInputStream());
CompletableFuture<ClickHouseResponse> future = null;
if (c.isAsync()) {
future = getClient().execute(this);
}

if (perfMode) {
ClickHouseClient.submit(() -> {

});
} else {
try (ClickHouseOutputStream out = stream) {
writer.write(out);
} catch (IOException e) {
throw new CompletionException(e);
}
}

if (future != null) {
return future;
}
}

return getClient().execute(this);
}

@Override
public ClickHouseResponse executeAndWait() throws ClickHouseException {
if (writer != null) {
ClickHouseConfig c = getConfig();
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(c, null);
data(stream.getInputStream());
CompletableFuture<ClickHouseResponse> future = null;
if (c.isAsync()) {
future = getClient().execute(this);
}
try (ClickHouseOutputStream out = stream) {
writer.write(out);
} catch (IOException e) {
throw ClickHouseException.of(e, getServer());
}
if (future != null) {
try {
return future.get(c.getSocketTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, getServer());
} catch (CancellationException e) {
throw ClickHouseException.forCancellation(e, getServer());
} catch (ExecutionException | TimeoutException | UncheckedIOException e) {
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
throw cause instanceof ClickHouseException ? (ClickHouseException) cause
: ClickHouseException.of(cause, getServer());
} catch (RuntimeException e) { // unexpected
throw ClickHouseException.of(e, getServer());
}
}
}

return getClient().executeAndWait(this);
}

@Override
public Mutation table(String table, String queryId) {
checkSealed();
Expand All @@ -369,6 +307,7 @@ public Mutation seal() {
req.namedParameters.putAll(namedParameters);

req.input = input;
req.writer = writer;
req.queryId = queryId;
req.sql = sql;

Expand Down Expand Up @@ -407,6 +346,7 @@ public Mutation seal() {

protected final Map<String, String> namedParameters;

protected transient ClickHouseWriter writer;
protected transient ClickHouseDeferredValue<ClickHouseInputStream> input;
protected transient ClickHouseDeferredValue<ClickHouseOutputStream> output;
protected String queryId;
Expand Down Expand Up @@ -508,6 +448,7 @@ public ClickHouseRequest<SelfT> copy() {
req.settings.putAll(settings);
req.namedParameters.putAll(namedParameters);
req.input = input;
req.writer = writer;
req.output = output;
req.queryId = queryId;
req.sql = sql;
Expand Down Expand Up @@ -555,12 +496,12 @@ public boolean isTransactional() {
}

/**
* Checks if the request contains any input stream.
* Checks if the request contains any input stream or custom writer.
*
* @return true if there's input stream; false otherwise
* @return true if there's input stream or customer writer; false otherwise
*/
public boolean hasInputStream() {
return this.input != null || !this.externalTables.isEmpty();
return this.input != null || this.writer != null || !this.externalTables.isEmpty();
}

/**
Expand Down Expand Up @@ -632,9 +573,27 @@ public final BiConsumer<ClickHouseNode, ClickHouseNode> getServerListener() {
* @return input stream
*/
public Optional<ClickHouseInputStream> getInputStream() {
if (this.input == null && this.writer != null) {
final ClickHouseConfig c = getConfig();
final ClickHouseWriter w = this.writer;
this.input = changeProperty(PROP_DATA, this.input,
ClickHouseDeferredValue.of(() -> ClickHouseInputStream.of(c, w)));
}
return input != null ? input.getOptional() : Optional.empty();
}

/**
* Gets custom writer for writing raw request.
*
* @return custom writer
*/
public Optional<ClickHouseWriter> getWriter() {
if (this.writer == null && this.input != null) {
this.writer = changeProperty(PROP_WRITER, this.writer, new PipedWriter(input));
}
return Optional.ofNullable(this.writer);
}

/**
* Gets output stream.
*
Expand Down Expand Up @@ -2026,6 +1985,7 @@ public SelfT reset() {
}
}
this.input = changeProperty(PROP_DATA, this.input, null);
this.writer = changeProperty(PROP_WRITER, this.writer, null);
this.output = changeProperty(PROP_OUTPUT, this.output, null);
this.sql = changeProperty(PROP_QUERY, this.sql, null);
this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null);
Expand Down Expand Up @@ -2065,6 +2025,7 @@ public ClickHouseRequest<SelfT> seal() {
req.namedParameters.putAll(namedParameters);

req.input = input;
req.writer = writer;
req.output = output;
req.queryId = queryId;
req.sql = sql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public enum ClickHouseClientOption implements ClickHouseOption {
/**
* Socket timeout in milliseconds.
*/
SOCKET_TIMEOUT("socket_timeout", 30 * 1000, "Socket timeout in milliseconds."),
SOCKET_TIMEOUT("socket_timeout", ClickHouseDataConfig.DEFAULT_TIMEOUT, "Socket timeout in milliseconds."),
/**
* Whether allows for the reuse of local addresses and ports. See
* {@link java.net.StandardSocketOptions#SO_REUSEADDR}.
Expand Down
Loading

0 comments on commit 3a53cf1

Please sign in to comment.