From 84e030141368d9e54f0af48eeac9cdf9938e6b83 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 19 Jan 2023 17:08:56 +0800 Subject: [PATCH] consistent behavior of setting InputStream and ClickHouseWriter in ClickHouseRequest --- clickhouse-benchmark/pom.xml | 53 +++++-- .../client/cli/ClickHouseCommandLine.java | 5 +- .../cli/ClickHouseCommandLineResponse.java | 2 + .../clickhouse/client/ClickHouseRequest.java | 137 ++++++----------- .../client/config/ClickHouseClientOption.java | 2 +- .../client/ClickHouseRequestTest.java | 47 ++++++ .../client/ClientIntegrationTest.java | 4 +- .../data/ClickHouseCompressionAlgorithm.java | 1 - .../clickhouse/data/ClickHouseDataConfig.java | 10 +- .../data/ClickHouseInputStream.java | 14 ++ .../com/clickhouse/data/ClickHouseWriter.java | 23 --- .../data/stream/DelegatedInputStream.java | 144 ++++++++++++++++++ .../data/stream/DelegatedInputStreamTest.java | 50 ++++++ .../client/grpc/ClickHouseGrpcClient.java | 13 +- .../client/grpc/ClickHouseGrpcClientTest.java | 20 +-- 15 files changed, 370 insertions(+), 155 deletions(-) create mode 100644 clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java create mode 100644 clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java diff --git a/clickhouse-benchmark/pom.xml b/clickhouse-benchmark/pom.xml index 0db661a74..3882c122d 100644 --- a/clickhouse-benchmark/pom.xml +++ b/clickhouse-benchmark/pom.xml @@ -1,4 +1,6 @@ - + 4.0.0 @@ -83,6 +85,38 @@ + + com.aayushatharva.brotli4j + brotli4j + + + com.aayushatharva.brotli4j + native-linux-x86_64 + + + com.aayushatharva.brotli4j + native-osx-aarch64 + + + com.aayushatharva.brotli4j + native-osx-x86_64 + + + com.aayushatharva.brotli4j + native-windows-x86_64 + + + com.github.luben + zstd-jni + + + org.lz4 + lz4-java + + + org.tukaani + xz + org.jctools jctools-core @@ -119,23 +153,14 @@ ${shade.name} - + org.openjdk.jmh.Main - + - - com.clickhouse:clickhouse-jdbc - - **/module-info.class - ru/** - META-INF/MANIFEST.MF - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - *:* diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java index bf442b3df..2f325067f 100644 --- a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java @@ -193,6 +193,9 @@ static Process startProcess(ClickHouseRequest request) { final ClickHouseNode server = request.getServer(); final int timeout = config.getSocketTimeout(); + // FIXME potential timing issue + final Optional in = request.getInputStream(); + String hostDir = config.getStrOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY); hostDir = ClickHouseUtils.normalizeDirectory( ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir); @@ -382,7 +385,7 @@ static Process startProcess(ClickHouseRequest request) { builder.redirectOutput(f); } } - final Optional in = request.getInputStream(); + try { final Process process; if (in.isPresent()) { diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java index 6a7c2d8ec..49448f582 100644 --- a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java @@ -8,6 +8,8 @@ import com.clickhouse.client.ClickHouseStreamResponse; public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse { + private static final long serialVersionUID = 4253185543390807162L; + private final transient ClickHouseCommandLine cli; protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException { diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index e75c37323..d171bdd14 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -4,7 +4,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -17,12 +16,8 @@ import java.util.Map; import java.util.Objects; import java.util.Map.Entry; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.Optional; import java.util.Properties; @@ -31,12 +26,10 @@ import java.util.function.Function; import com.clickhouse.client.config.ClickHouseClientOption; -import com.clickhouse.config.ClickHouseBufferingMode; import com.clickhouse.config.ClickHouseConfigChangeListener; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseChecker; import com.clickhouse.data.ClickHouseCompression; -import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseDeferredValue; import com.clickhouse.data.ClickHouseExternalTable; import com.clickhouse.data.ClickHouseFile; @@ -44,7 +37,6 @@ import com.clickhouse.data.ClickHouseInputStream; import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHousePassThruStream; -import com.clickhouse.data.ClickHousePipedOutputStream; import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.ClickHouseValue; import com.clickhouse.data.ClickHouseValues; @@ -69,12 +61,26 @@ public class ClickHouseRequest> implement SPECIAL_SETTINGS = Collections.unmodifiableSet(set); } + static class PipedWriter implements ClickHouseWriter { + private final ClickHouseDeferredValue input; + + PipedWriter(ClickHouseDeferredValue input) { + this.input = input; + } + + @Override + public void write(ClickHouseOutputStream output) throws IOException { + ClickHouseInputStream in = input.get(); + if (in != null) { + in.pipe(output); + } + } + } + /** * Mutation request. */ public static class Mutation extends ClickHouseRequest { - private transient ClickHouseWriter writer; - protected Mutation(ClickHouseRequest request, boolean sealed) { super(request.getClient(), request.server, request.serverRef, request.options, sealed); @@ -151,8 +157,8 @@ public Mutation format(ClickHouseFormat format) { } /** - * Sets custom writer for streaming. This will create a piped stream between the - * writer and ClickHouse server. + * Sets custom writer for streaming. This will remove input stream set by other + * {@code data()} methods. * * @param writer writer * @return mutation request @@ -160,7 +166,9 @@ public Mutation format(ClickHouseFormat format) { public Mutation data(ClickHouseWriter writer) { checkSealed(); + this.input = changeProperty(PROP_DATA, this.input, null); this.writer = changeProperty(PROP_WRITER, this.writer, writer); + return this; } @@ -191,6 +199,7 @@ public Mutation data(ClickHousePassThruStream stream) { final int bufferSize = c.getReadBufferSize(); this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue .of(() -> ClickHouseInputStream.of(stream, bufferSize, null))); + this.writer = changeProperty(PROP_WRITER, this.writer, null); return this; } @@ -259,6 +268,7 @@ public Mutation data(ClickHouseInputStream input) { this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue.of(input, ClickHouseInputStream.class)); + this.writer = changeProperty(PROP_WRITER, this.writer, null); return this; } @@ -272,83 +282,11 @@ public Mutation data(ClickHouseDeferredValue input) { checkSealed(); this.input = changeProperty(PROP_DATA, this.input, input); + this.writer = changeProperty(PROP_WRITER, this.writer, null); return this; } - @Override - public CompletableFuture execute() { - if (writer != null) { - final ClickHouseConfig c = getConfig(); - final boolean perfMode = c.getResponseBuffering() == ClickHouseBufferingMode.PERFORMANCE; - final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(c, null); - data(stream.getInputStream()); - CompletableFuture future = null; - if (c.isAsync()) { - future = getClient().execute(this); - } - - if (perfMode) { - ClickHouseClient.submit(() -> { - - }); - } else { - try (ClickHouseOutputStream out = stream) { - writer.write(out); - } catch (IOException e) { - throw new CompletionException(e); - } - } - - if (future != null) { - return future; - } - } - - return getClient().execute(this); - } - - @Override - public ClickHouseResponse executeAndWait() throws ClickHouseException { - if (writer != null) { - ClickHouseConfig c = getConfig(); - ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(c, null); - data(stream.getInputStream()); - CompletableFuture future = null; - if (c.isAsync()) { - future = getClient().execute(this); - } - try (ClickHouseOutputStream out = stream) { - writer.write(out); - } catch (IOException e) { - throw ClickHouseException.of(e, getServer()); - } - if (future != null) { - try { - return future.get(c.getSocketTimeout(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw ClickHouseException.forCancellation(e, getServer()); - } catch (CancellationException e) { - throw ClickHouseException.forCancellation(e, getServer()); - } catch (ExecutionException | TimeoutException | UncheckedIOException e) { - Throwable cause = e.getCause(); - if (cause == null) { - cause = e; - } - throw cause instanceof ClickHouseException ? (ClickHouseException) cause - : ClickHouseException.of(cause, getServer()); - } catch (RuntimeException e) { // unexpected - throw ClickHouseException.of(e, getServer()); - } - } - } - - return getClient().executeAndWait(this); - } - @Override public Mutation table(String table, String queryId) { checkSealed(); @@ -369,6 +307,7 @@ public Mutation seal() { req.namedParameters.putAll(namedParameters); req.input = input; + req.writer = writer; req.queryId = queryId; req.sql = sql; @@ -407,6 +346,7 @@ public Mutation seal() { protected final Map namedParameters; + protected transient ClickHouseWriter writer; protected transient ClickHouseDeferredValue input; protected transient ClickHouseDeferredValue output; protected String queryId; @@ -508,6 +448,7 @@ public ClickHouseRequest copy() { req.settings.putAll(settings); req.namedParameters.putAll(namedParameters); req.input = input; + req.writer = writer; req.output = output; req.queryId = queryId; req.sql = sql; @@ -555,12 +496,12 @@ public boolean isTransactional() { } /** - * Checks if the request contains any input stream. + * Checks if the request contains any input stream or custom writer. * - * @return true if there's input stream; false otherwise + * @return true if there's input stream or customer writer; false otherwise */ public boolean hasInputStream() { - return this.input != null || !this.externalTables.isEmpty(); + return this.input != null || this.writer != null || !this.externalTables.isEmpty(); } /** @@ -632,9 +573,27 @@ public final BiConsumer getServerListener() { * @return input stream */ public Optional getInputStream() { + if (this.input == null && this.writer != null) { + final ClickHouseConfig c = getConfig(); + final ClickHouseWriter w = this.writer; + this.input = changeProperty(PROP_DATA, this.input, + ClickHouseDeferredValue.of(() -> ClickHouseInputStream.of(c, w))); + } return input != null ? input.getOptional() : Optional.empty(); } + /** + * Gets custom writer for writing raw request. + * + * @return custom writer + */ + public Optional getWriter() { + if (this.writer == null && this.input != null) { + this.writer = changeProperty(PROP_WRITER, this.writer, new PipedWriter(input)); + } + return Optional.ofNullable(this.writer); + } + /** * Gets output stream. * @@ -2026,6 +1985,7 @@ public SelfT reset() { } } this.input = changeProperty(PROP_DATA, this.input, null); + this.writer = changeProperty(PROP_WRITER, this.writer, null); this.output = changeProperty(PROP_OUTPUT, this.output, null); this.sql = changeProperty(PROP_QUERY, this.sql, null); this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null); @@ -2065,6 +2025,7 @@ public ClickHouseRequest seal() { req.namedParameters.putAll(namedParameters); req.input = input; + req.writer = writer; req.output = output; req.queryId = queryId; req.sql = sql; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index b70cb9f36..d61b59022 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -261,7 +261,7 @@ public enum ClickHouseClientOption implements ClickHouseOption { /** * Socket timeout in milliseconds. */ - SOCKET_TIMEOUT("socket_timeout", 30 * 1000, "Socket timeout in milliseconds."), + SOCKET_TIMEOUT("socket_timeout", ClickHouseDataConfig.DEFAULT_TIMEOUT, "Socket timeout in milliseconds."), /** * Whether allows for the reuse of local addresses and ports. See * {@link java.net.StandardSocketOptions#SO_REUSEADDR}. diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java index bf5f1ecae..1b18b01ae 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java @@ -1,8 +1,11 @@ package com.clickhouse.client; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -22,6 +25,7 @@ import com.clickhouse.data.ClickHouseCompression; import com.clickhouse.data.ClickHouseExternalTable; import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHouseValues; import com.clickhouse.data.value.ClickHouseBigIntegerValue; import com.clickhouse.data.value.ClickHouseByteValue; @@ -278,6 +282,49 @@ public void testGetSetting() { Assert.assertEquals(request.getSetting("b", 9), 9); } + @Test(groups = { "unit" }) + public void testInputData() throws IOException { + ClickHouseRequest request = ClickHouseClient.newInstance().connect(ClickHouseNode.builder().build()); + Assert.assertFalse(request.hasInputStream()); + Assert.assertFalse(request.getInputStream().isPresent()); + Assert.assertFalse(request.getWriter().isPresent()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + String uuid = UUID.randomUUID().toString(); + Mutation m = request.write(); + m.data(w -> w.write(uuid.getBytes(StandardCharsets.US_ASCII))); + Assert.assertTrue(m.hasInputStream()); + try (ClickHouseOutputStream o = ClickHouseOutputStream.of(out)) { + m.getWriter().get().write(o); + } + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.US_ASCII), uuid); + out = new ByteArrayOutputStream(); + try (ClickHouseOutputStream o = ClickHouseOutputStream.of(out)) { + m.getInputStream().get().pipe(o); + } + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.US_ASCII), uuid); + + m.reset(); + Assert.assertFalse(request.hasInputStream()); + Assert.assertFalse(request.getInputStream().isPresent()); + Assert.assertFalse(request.getWriter().isPresent()); + + out = new ByteArrayOutputStream(); + m.data(new ByteArrayInputStream(uuid.getBytes(StandardCharsets.US_ASCII))); + Assert.assertTrue(m.hasInputStream()); + try (ClickHouseOutputStream o = ClickHouseOutputStream.of(out)) { + m.getWriter().get().write(o); + } + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.US_ASCII), uuid); + // unlike ClickHouseWriter, InputStream cannot be reused + m.data(new ByteArrayInputStream(uuid.getBytes(StandardCharsets.US_ASCII))); + out = new ByteArrayOutputStream(); + try (ClickHouseOutputStream o = ClickHouseOutputStream.of(out)) { + m.getInputStream().get().pipe(o); + } + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.US_ASCII), uuid); + } + @Test(groups = { "unit" }) public void testNamedParameters() { // String sql = "select xxx from xxx settings max_execution_time = diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 172cb38a3..ce07b6b2b 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1434,7 +1434,7 @@ public void testCustomWriter() throws ClickHouseException { try (ClickHouseResponse resp = req.execute().get()) { Assert.assertNotNull(resp); } catch (Exception e) { - Assert.fail("Failed to call send() followed by get()", e); + Assert.fail("Failed to call send() followed by get(): async=" + b, e); } try (ClickHouseResponse resp = req.executeAndWait()) { @@ -1444,7 +1444,7 @@ public void testCustomWriter() throws ClickHouseException { try (ClickHouseResponse resp = req.execute().get()) { Assert.assertNotNull(resp); } catch (Exception e) { - Assert.fail("Failed to call execute() followed by get()", e); + Assert.fail("Failed to call execute() followed by get(): async=" + b, e); } try (ClickHouseResponse resp = req.executeAndWait()) { diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseCompressionAlgorithm.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseCompressionAlgorithm.java index ecc92ee72..ee08086f4 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseCompressionAlgorithm.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseCompressionAlgorithm.java @@ -37,7 +37,6 @@ static ClickHouseCompressionAlgorithm createInstance(String option, alg = preferredInstance.getDeclaredConstructor().newInstance(); } catch (Throwable t) { // NOSONAR // ignore - t.printStackTrace(); } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java index b288869f9..11efa6519 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java @@ -139,6 +139,8 @@ public boolean isWidenUnsignedTypes() { static final int DEFAULT_READ_COMPRESS_LEVEL = -1; static final int DEFAULT_WRITE_COMPRESS_LEVEL = -1; + static final int DEFAULT_TIMEOUT = 30 * 1000; // 30 seconds + static final RoundingMode DEFAULT_ROUNDING_MODE = RoundingMode.DOWN; /** @@ -348,14 +350,18 @@ default boolean isWidenUnsignedTypes() { * * @return read time out in milliseconds */ - int getReadTimeout(); + default int getReadTimeout() { + return DEFAULT_TIMEOUT; + } /** * Gets write timeout in milliseconds. * * @return write time out in milliseconds */ - int getWriteTimeout(); + default int getWriteTimeout() { + return DEFAULT_TIMEOUT; + } /** * Gets time zone for date values. diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java index c1230a94b..b5091f2c7 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java @@ -24,6 +24,7 @@ import com.clickhouse.data.stream.BlockingInputStream; import com.clickhouse.data.stream.DeferredInputStream; +import com.clickhouse.data.stream.DelegatedInputStream; import com.clickhouse.data.stream.EmptyInputStream; import com.clickhouse.data.stream.RestrictedInputStream; import com.clickhouse.data.stream.IterableByteArrayInputStream; @@ -154,6 +155,19 @@ public static ClickHouseInputStream of(ClickHousePassThruStream stream, int buff return stream.newInputStream(bufferSize, postCloseAction); } + /** + * Creates an input stream using the given customer writer. Behind the scene, a + * piped stream will be created, writer will be called in a separate worker + * thread for writing. + * + * @param config configuration, could be null + * @param writer non-null customer writer + * @return wrapped input + */ + public static ClickHouseInputStream of(ClickHouseDataConfig config, ClickHouseWriter writer) { + return new DelegatedInputStream(config, writer); + } + /** * Wraps the given input stream. * diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java index 35034c6f0..2b92bda55 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java @@ -1,32 +1,9 @@ package com.clickhouse.data; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; - -import com.clickhouse.config.ClickHouseBufferingMode; @FunctionalInterface public interface ClickHouseWriter { - static void writeAndClose(ClickHouseDataConfig config, ClickHouseWriter writer, ClickHouseOutputStream output) - throws IOException { - if (config.isAsync() || config.getWriteBufferingMode() == ClickHouseBufferingMode.PERFORMANCE) { - CompletableFuture.supplyAsync(() -> { - try (ClickHouseOutputStream out = output) { - writer.write(output); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return null; - }); - } else { - try (ClickHouseOutputStream out = output) { - writer.write(output); - } - } - } - /** * Writes value to output stream. * diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java new file mode 100644 index 000000000..e8221ee71 --- /dev/null +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java @@ -0,0 +1,144 @@ +package com.clickhouse.data.stream; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.imageio.IIOException; + +import com.clickhouse.data.ClickHouseByteBuffer; +import com.clickhouse.data.ClickHouseChecker; +import com.clickhouse.data.ClickHouseDataConfig; +import com.clickhouse.data.ClickHouseDataStreamFactory; +import com.clickhouse.data.ClickHouseDataUpdater; +import com.clickhouse.data.ClickHouseFile; +import com.clickhouse.data.ClickHouseInputStream; +import com.clickhouse.data.ClickHouseOutputStream; +import com.clickhouse.data.ClickHousePassThruStream; +import com.clickhouse.data.ClickHousePipedOutputStream; +import com.clickhouse.data.ClickHouseUtils; +import com.clickhouse.data.ClickHouseWriter; + +public class DelegatedInputStream extends ClickHouseInputStream { + private final ClickHouseInputStream input; + + private final int timeout; + private final CompletableFuture future; + + public DelegatedInputStream(ClickHousePassThruStream stream, ClickHouseInputStream input, OutputStream copyTo, + Runnable postCloseAction) { + super(stream, copyTo, postCloseAction); + + this.input = ClickHouseChecker.nonNull(input, TYPE_NAME); + this.timeout = ClickHouseDataConfig.DEFAULT_TIMEOUT; + this.future = CompletableFuture.completedFuture(true); + } + + public DelegatedInputStream(ClickHouseDataConfig config, ClickHouseWriter writer) { + super(null, null, null); + + if (writer == null) { + throw new IllegalArgumentException("Non-null writer is required"); + } + + this.timeout = config != null ? config.getReadTimeout() : ClickHouseDataConfig.DEFAULT_TIMEOUT; + ClickHousePipedOutputStream stream = config != null + ? ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null) // NOSONAR + : ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( // NOSONAR + ClickHouseDataConfig.DEFAULT_WRITE_BUFFER_SIZE, 0, this.timeout, null); + this.input = stream.getInputStream(); + this.future = CompletableFuture.supplyAsync(() -> { + try (ClickHouseOutputStream out = stream) { + writer.write(out); + } catch (Exception e) { + throw new CompletionException(e); + } + return true; + }); + } + + @Override + public int peek() throws IOException { + return input.peek(); + } + + @Override + public long pipe(ClickHouseOutputStream output) throws IOException { + return input.pipe(output); + } + + @Override + public byte readByte() throws IOException { + return input.readByte(); + } + + @Override + public ClickHouseByteBuffer readCustom(ClickHouseDataUpdater reader) throws IOException { + return input.readCustom(reader); + } + + @Override + public int read() throws IOException { + return input.read(); + } + + @Override + public void close() throws IOException { + try { + try { + future.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Custom writer was interrupted", e); + } catch (TimeoutException e) { + throw new IIOException( + ClickHouseUtils.format("Custom writing timed out after %d milliseconds", timeout), e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw ((IOException) cause); + } else if (cause instanceof UncheckedIOException) { + throw ((UncheckedIOException) cause).getCause(); + } else { + throw new IOException("Custom writing failure", cause); + } + } + } finally { + try { + super.close(); + } finally { + input.close(); + } + } + } + + @Override + public ClickHouseFile getUnderlyingFile() { + return input.getUnderlyingFile(); + } + + @Override + public ClickHousePassThruStream getUnderlyingStream() { + return input.getUnderlyingStream(); + } + + @Override + public boolean isClosed() { + return input.isClosed(); + } + + @Override + public int available() throws IOException { + return input.available(); + } + + @Override + public long skip(long n) throws IOException { + return input.skip(n); + } +} \ No newline at end of file diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java new file mode 100644 index 000000000..891c6340a --- /dev/null +++ b/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java @@ -0,0 +1,50 @@ +package com.clickhouse.data.stream; + +import java.io.IOException; + +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class DelegatedInputStreamTest { + @Test(groups = { "unit" }) + public void testRead() throws IOException { + try (DelegatedInputStream in = new DelegatedInputStream(null, w -> { + for (int i = 0; i < Byte.MAX_VALUE; i++) { + w.write(i); + } + })) { + Assert.assertFalse(in.isClosed()); + Assert.assertTrue(in.available() > 0); + for (int i = 0; i < Byte.MAX_VALUE; i++) { + Assert.assertEquals(in.read(), i); + } + Assert.assertEquals(in.read(), -1); + Assert.assertEquals(in.read(), -1); + Assert.assertFalse(in.isClosed()); + in.close(); + Assert.assertTrue(in.isClosed()); + Assert.assertEquals(in.available(), 0); + } + } + + @Test(groups = { "unit" }) + public void testReadBytes() throws IOException { + try (DelegatedInputStream in = new DelegatedInputStream(null, w -> { + for (int i = 0; i < Byte.MAX_VALUE; i++) { + w.writeBytes(new byte[] { 1, 2, 3, 4, 5, 6 }); + } + })) { + Assert.assertFalse(in.isClosed()); + Assert.assertTrue(in.available() > 0); + for (int i = 0; i < Byte.MAX_VALUE; i++) { + Assert.assertEquals(in.readBytes(6), new byte[] { 1, 2, 3, 4, 5, 6 }); + } + Assert.assertEquals(in.read(), -1); + Assert.assertFalse(in.isClosed()); + in.close(); + Assert.assertTrue(in.isClosed()); + Assert.assertEquals(in.available(), 0); + } + } +} diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java index c5fb41ac2..e8d2e310b 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java @@ -114,7 +114,7 @@ static ClickHouseOutputStream getOutput(ClickHouseConfig config, OutputStream ou protected static ClickHouseInputStream getCompressedInputStream(ClickHouseConfig config, ClickHouseInputStream input) { - if (input.getUnderlyingStream().hasInput()) { + if (!config.isRequestCompressed() || input.getUnderlyingStream().hasInput()) { return input; } @@ -154,6 +154,9 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami ClickHouseNode server = request.getServer(); ClickHouseCredentials credentials = server.getCredentials(config); + // FIXME potential timing issue + Optional input = request.getInputStream(); + Builder builder = QueryInfo.newBuilder(); String database = server.getDatabase(config); if (!ClickHouseChecker.isNullOrEmpty(database)) { @@ -242,10 +245,10 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami // builder.setTransportCompressionType("none"); // builder.setTransportCompressionLevel(0); - Optional input = request.getInputStream(); if (input.isPresent()) { - ClickHouseCompression inputCompression = config.getRequestCompressAlgorithm(); - builder.setInputCompressionType(inputCompression.encoding()); + if (config.isRequestCompressed()) { + builder.setInputCompressionType(config.getRequestCompressAlgorithm().encoding()); + } if (streaming) { // builder.setInputData(ByteString.EMPTY); builder.setNextQueryInfo(true); @@ -258,7 +261,7 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami } } - log.debug("Query: %s", sql); + log.debug("Query(stream=%s): %s", streaming, sql); return builder.setQuery(sql).build(); } diff --git a/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java b/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java index 821e70ac4..804cbe997 100644 --- a/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java +++ b/clickhouse-grpc-client/src/test/java/com/clickhouse/client/grpc/ClickHouseGrpcClientTest.java @@ -57,7 +57,8 @@ protected Object[][] getMixedCompressionMatrix() { }; ClickHouseCompression[] supportedResponseCompression = { ClickHouseCompression.NONE, - ClickHouseCompression.BROTLI, + // unexpected end of input + // ClickHouseCompression.BROTLI, ClickHouseCompression.DEFLATE, ClickHouseCompression.LZ4, ClickHouseCompression.ZSTD @@ -131,23 +132,6 @@ public void testLZ4FrameStream() throws IOException { } - @Test(dataProvider = "mixedCompressionMatrix", groups = "integration") - @Override - public void testDecompressResponse(ClickHouseCompression reqComp, ClickHouseCompression respComp) throws Exception { - if (respComp == ClickHouseCompression.BROTLI) { - throw new SkipException( - "Skip due to unexpected end of input error when using brotli for decompression"); - } - - super.testDecompressResponse(reqComp, respComp); - } - - @Test(groups = { "integration" }) - @Override - public void testCustomLoad() throws ClickHouseException { - throw new SkipException("Skip due to timeout error"); - } - @Test(groups = { "integration" }) @Override public void testSessionLock() {