From a339f543694fd037e53f58c4473b26c49ceeb95f Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 19 Jan 2023 22:11:46 +0800 Subject: [PATCH 1/8] re-enable a few tests for gRPC --- .../test/java/com/clickhouse/client/ClientIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index ce07b6b2b..89b76f20c 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1528,7 +1528,7 @@ public void testDump() throws ExecutionException, InterruptedException, IOExcept public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws ExecutionException, InterruptedException, IOException { ClickHouseNode server = getServer(); - if (server.getProtocol() != ClickHouseProtocol.HTTP) { + if (server.getProtocol() != ClickHouseProtocol.GRPC && server.getProtocol() != ClickHouseProtocol.HTTP) { throw new SkipException("Skip as only http implementation works well"); } From 68ab0745d302c5131083a72fdfad9c76ce2e7b6b Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 09:56:31 +0800 Subject: [PATCH 2/8] clean up code and remove unnecessary dependency --- .../benchmark/misc/QueueBenchmark.java | 8 +- clickhouse-client/pom.xml | 11 +- .../client/AbstractSocketClient.java | 2 +- .../clickhouse/client/ClickHouseClient.java | 77 +++++++-- .../client/ClickHouseClientBuilder.java | 45 +---- .../client/ClickHouseLoadBalancingPolicy.java | 3 +- .../clickhouse/client/ClickHouseRequest.java | 49 ++++-- .../client/ClickHouseStreamResponse.java | 1 + .../src/main/java9/module-info.java | 3 +- .../client/ClickHouseRequestTest.java | 72 ++++++++ .../client/ClientIntegrationTest.java | 9 +- .../clickhouse/data/ClickHouseDataConfig.java | 10 +- .../data/ClickHouseDataStreamFactory.java | 141 ++++++++++++++++ .../data/ClickHouseDeferredValue.java | 11 +- .../com/clickhouse/data/ClickHouseFile.java | 24 ++- .../data/ClickHouseInputStream.java | 156 +++++++++++++++++- .../data/ClickHouseOutputStream.java | 14 +- .../data/ClickHousePassThruStream.java | 52 +++++- .../data/ClickHousePipedOutputStream.java | 63 +++++++ .../com/clickhouse/data/ClickHouseUtils.java | 25 ++- .../com/clickhouse/data/ClickHouseWriter.java | 4 +- .../data/stream/BlockingInputStream.java | 12 +- .../stream/BlockingPipedOutputStream.java | 39 ++++- .../data/stream/DelegatedInputStream.java | 67 ++------ .../data/stream/NonBlockingInputStream.java | 6 +- .../stream/NonBlockingPipedOutputStream.java | 37 ++++- .../data/ClickHouseDataStreamFactoryTest.java | 24 ++- .../data/ClickHouseDeferredValueTest.java | 58 +++++++ .../clickhouse/data/ClickHouseFileTest.java | 2 +- .../data/ClickHouseInputStreamTest.java | 12 ++ .../data/ClickHouseOutputStreamTest.java | 12 ++ .../data/ClickHousePassThruStreamTest.java | 5 +- .../clickhouse/data/ClickHouseUtilsTest.java | 4 +- .../stream/BlockingPipedOutputStreamTest.java | 13 +- .../data/stream/DelegatedInputStreamTest.java | 2 +- .../NonBlockingPipedOutputStreamTest.java | 20 +-- clickhouse-grpc-client/pom.xml | 18 -- .../client/grpc/ClickHouseGrpcClient.java | 2 +- .../client/grpc/ClickHouseStreamObserver.java | 2 +- clickhouse-http-client/pom.xml | 12 +- .../client/http/HttpClientConnectionImpl.java | 5 +- .../src/main/java11/module-info.java | 2 - .../src/main/java9/module-info.java | 2 - clickhouse-jdbc/pom.xml | 36 +--- .../internal/InputBasedPreparedStatement.java | 7 +- .../StreamBasedPreparedStatement.java | 2 +- .../src/main/java9/module-info.java | 12 +- .../jdbc/ClickHousePreparedStatementTest.java | 2 +- clickhouse-r2dbc/pom.xml | 12 -- .../src/main/java9/module-info.java | 6 - 50 files changed, 893 insertions(+), 320 deletions(-) create mode 100644 clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDeferredValueTest.java diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/QueueBenchmark.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/QueueBenchmark.java index 70511a75b..9e0904cd0 100644 --- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/QueueBenchmark.java +++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/QueueBenchmark.java @@ -76,8 +76,8 @@ public void blocking(CompareState state, Blackhole consumer) throws Exception { // options.put(ClickHouseClientOption.SOCKET_TIMEOUT, 0); options.put(ClickHouseClientOption.USE_BLOCKING_QUEUE, false); final ClickHouseConfig config = new ClickHouseConfig(options); - final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( - config, null); + final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config); CompletableFuture future = ClickHouseClient.submit(() -> { long range = state.samples; try (ClickHouseOutputStream out = stream) { @@ -99,8 +99,8 @@ public void blocking(CompareState state, Blackhole consumer) throws Exception { public void nonBlocking(CompareState state, Blackhole consumer) throws Exception { final ClickHouseConfig config = new ClickHouseConfig(Collections .singletonMap(ClickHouseClientOption.RESPONSE_BUFFERING, ClickHouseBufferingMode.PERFORMANCE)); - final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( - config, null); + final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config); CompletableFuture future = ClickHouseClient.submit(() -> { long range = state.samples; try (ClickHouseOutputStream out = stream) { diff --git a/clickhouse-client/pom.xml b/clickhouse-client/pom.xml index bf88ac598..938d6e4ee 100644 --- a/clickhouse-client/pom.xml +++ b/clickhouse-client/pom.xml @@ -43,11 +43,12 @@ dnsjava dnsjava true - - - org.slf4j - slf4j-api - true + + + org.slf4j + slf4j-api + + diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/AbstractSocketClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractSocketClient.java index e496c914c..418e4464f 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/AbstractSocketClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractSocketClient.java @@ -432,7 +432,7 @@ public ClickHouseInputStream send(ClickHouseConfig config, ClickHouseInputStream } ClickHousePipedOutputStream responeStream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(ClickHouseChecker.nonNull(config, ClickHouseConfig.TYPE_NAME), null); + .createPipedOutputStream(ClickHouseChecker.nonNull(config, ClickHouseConfig.TYPE_NAME)); processRequest(config, rawRequest, responeStream); return responeStream.getInputStream(); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 7cc9caf67..b5eec3612 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -31,7 +31,9 @@ import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseInputStream; import com.clickhouse.data.ClickHouseOutputStream; +import com.clickhouse.data.ClickHousePassThruStream; import com.clickhouse.data.ClickHousePipedOutputStream; +import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.ClickHouseValue; import com.clickhouse.data.ClickHouseValues; import com.clickhouse.data.ClickHouseWriter; @@ -71,7 +73,7 @@ static ClickHouseClientBuilder builder() { * @return non-null default executor service */ static ExecutorService getExecutorService() { - return ClickHouseClientBuilder.defaultExecutor; + return ClickHouseDataStreamFactory.getInstance().getExecutor(); } /** @@ -187,12 +189,12 @@ static ClickHouseInputStream getAsyncResponseInputStream(ClickHouseConfig config // raw response -> input final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null); + .createPipedOutputStream(config); final ClickHouseInputStream wrappedInput; // raw response -> decompressed response -> input if (config.isResponseCompressed()) { // one more thread for decompression? final ClickHousePipedOutputStream decompressedStream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null); + .createPipedOutputStream(config); wrappedInput = getResponseInputStream(config, decompressedStream.getInputStream(), postCloseAction); submit(() -> { try (ClickHouseInputStream in = ClickHouseInputStream.of(input, config.getReadBufferSize(), @@ -297,7 +299,7 @@ static CompletableFuture submit(Runnable task) { } run(task); - return CompletableFuture.completedFuture(null); + return ClickHouseUtils.NULL_FUTURE; } /** @@ -356,7 +358,7 @@ static CompletableFuture dump(ClickHouseNode server, */ static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, ClickHouseFormat format, ClickHouseCompression compression, String file) { - return dump(server, tableOrQuery, ClickHouseFile.of(file, compression, 0, format)); + return dump(server, tableOrQuery, ClickHouseFile.of(file, compression, format)); } /** @@ -411,18 +413,39 @@ static CompletableFuture dump(ClickHouseNode server, } /** - * Loads data from given file into a table. + * Loads data from the given pass-thru stream into a table. Pass + * {@link com.clickhouse.data.ClickHouseFile} to load data from a file, which + * may or may not be compressed. * * @param server non-null server to connect to * @param table non-null target table - * @param file non-null file + * @param stream non-null pass-thru stream * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution */ - static CompletableFuture load(ClickHouseNode server, String table, ClickHouseFile file) { - if (server == null || table == null || file == null) { - throw new IllegalArgumentException("Non-null server, table, and file are required"); + static CompletableFuture load(ClickHouseNode server, String table, + ClickHousePassThruStream stream) { + if (server == null || table == null || stream == null) { + throw new IllegalArgumentException("Non-null server, table, and pass-thru stream are required"); + } + + // in case the protocol is ANY + final ClickHouseNode theServer = server.probe(); + + return submit(() -> { + try (ClickHouseClient client = newInstance(theServer.getProtocol()); + ClickHouseResponse response = client.connect(theServer).write().table(table).data(stream) + .executeAndWait()) { + return response.getSummary(); + } + }); + } + + static CompletableFuture load(ClickHouseNode server, String table, + ClickHouseWriter writer, ClickHouseCompression compression, ClickHouseFormat format) { + if (server == null || table == null || writer == null) { + throw new IllegalArgumentException("Non-null server, table, and custom writer are required"); } // in case the protocol is ANY @@ -430,7 +453,8 @@ static CompletableFuture load(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol()); - ClickHouseResponse response = client.connect(theServer).write().table(table).data(file) + ClickHouseResponse response = client.connect(theServer).write().table(table) + .decompressClientRequest(compression).format(format).data(writer) .executeAndWait()) { return response.getSummary(); } @@ -449,10 +473,33 @@ static CompletableFuture load(ClickHouseNode server, * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #load(ClickHouseNode, String, String, ClickHouseCompression, ClickHouseFormat)} + * instead */ + @Deprecated static CompletableFuture load(ClickHouseNode server, String table, ClickHouseFormat format, ClickHouseCompression compression, String file) { - return load(server, table, ClickHouseFile.of(file, compression, 0, format)); + return load(server, table, ClickHouseFile.of(file, compression, format)); + } + + /** + * Loads data from a file into table using specified format and compression + * algorithm. Same as + * {@code load(server, table, ClickHouseFile.of(file, compression, format))} + * + * @param server non-null server to connect to + * @param table non-null target table + * @param format input format to use + * @param compression compression algorithm to use + * @param file file to load + * @return future object to get result + * @throws IllegalArgumentException if any of server, table, and input is null + * @throws CompletionException when error occurred during execution + */ + static CompletableFuture load(ClickHouseNode server, String table, + String file, ClickHouseCompression compression, ClickHouseFormat format) { + return load(server, table, ClickHouseFile.of(file, compression, format)); } /** @@ -467,7 +514,11 @@ static CompletableFuture load(ClickHouseNode server, * @return future object to get result * @throws IllegalArgumentException if any of server, table, and writer is null * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #load(ClickHouseNode, String, ClickHouseWriter, ClickHouseCompression, ClickHouseFormat)} + * instead */ + @Deprecated static CompletableFuture load(ClickHouseNode server, String table, ClickHouseFormat format, ClickHouseCompression compression, ClickHouseWriter writer) { if (server == null || table == null || writer == null) { @@ -484,7 +535,7 @@ static CompletableFuture load(ClickHouseNode server, .nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol())) .option(ClickHouseClientOption.ASYNC, true).build()) { ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(client.getConfig(), null); + .createPipedOutputStream(client.getConfig()); // execute query in a separate thread(because async is explicitly set to true) CompletableFuture future = client.connect(theServer).write().table(table) .decompressClientRequest(compression).format(format).data(input = stream.getInputStream()) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java index cfc1f532b..8b2f80b96 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java @@ -12,18 +12,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import com.clickhouse.client.ClickHouseNode.Status; -import com.clickhouse.client.config.ClickHouseDefaults; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseChecker; -import com.clickhouse.data.ClickHouseThreadFactory; import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.logging.Logger; import com.clickhouse.logging.LoggerFactory; @@ -85,9 +80,9 @@ public boolean ping(ClickHouseNode server, int timeout) { static final class Agent implements ClickHouseClient { private static final Logger log = LoggerFactory.getLogger(Agent.class); - private static final long INITIAL_REPEAT_DELAY = 100; - private static final long MAX_REPEAT_DELAY = 1000; - private static final long REPEAT_DELAY_BACKOFF = 100; + private static final long INITIAL_REPEAT_DELAY = 100L; + private static final long MAX_REPEAT_DELAY = 1000L; + private static final long REPEAT_DELAY_BACKOFF = 100L; private final AtomicReference client; @@ -186,7 +181,7 @@ ClickHouseResponse failover(ClickHouseRequest sealedRequest, ClickHouseExcept * @throws CompletionException when error occurred or timed out */ ClickHouseResponse repeat(ClickHouseRequest sealedRequest, ClickHouseException exception, long timeout) { - if (timeout > 0) { + if (timeout > 0L) { final int errorCode = exception.getErrorCode(); final long startTime = System.currentTimeMillis(); @@ -367,38 +362,6 @@ public void close() { private static final Logger log = LoggerFactory.getLogger(ClickHouseClientBuilder.class); - // expose method to change default thread pool in runtime? JMX? - static final ExecutorService defaultExecutor; - static final ScheduledExecutorService defaultScheduler; - - static { - int maxSchedulers = (int) ClickHouseDefaults.MAX_SCHEDULER_THREADS.getEffectiveDefaultValue(); - int maxThreads = (int) ClickHouseDefaults.MAX_THREADS.getEffectiveDefaultValue(); - int maxRequests = (int) ClickHouseDefaults.MAX_REQUESTS.getEffectiveDefaultValue(); - long keepAliveTimeoutMs = (long) ClickHouseDefaults.THREAD_KEEPALIVE_TIMEOUT.getEffectiveDefaultValue(); - - if (maxThreads <= 0) { - maxThreads = Runtime.getRuntime().availableProcessors(); - } - if (maxSchedulers <= 0) { - maxSchedulers = Runtime.getRuntime().availableProcessors(); - } else if (maxSchedulers > maxThreads) { - maxSchedulers = maxThreads; - } - - if (maxRequests <= 0) { - maxRequests = 0; - } - - String prefix = "ClickHouseClientWorker"; - defaultExecutor = ClickHouseUtils.newThreadPool(prefix, maxThreads, maxThreads * 3 + 1, maxRequests, - keepAliveTimeoutMs, false); - prefix = "ClickHouseClientScheduler"; - defaultScheduler = maxSchedulers == 1 ? Executors - .newSingleThreadScheduledExecutor(new ClickHouseThreadFactory(prefix)) - : Executors.newScheduledThreadPool(maxSchedulers, new ClickHouseThreadFactory(prefix)); - } - static ServiceLoader loadClients() { return ServiceLoader.load(ClickHouseClient.class, ClickHouseClientBuilder.class.getClassLoader()); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseLoadBalancingPolicy.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseLoadBalancingPolicy.java index 482a78156..c797ec1dd 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseLoadBalancingPolicy.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseLoadBalancingPolicy.java @@ -12,6 +12,7 @@ import com.clickhouse.client.ClickHouseNode.Status; import com.clickhouse.data.ClickHouseChecker; +import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseUtils; /** @@ -336,7 +337,7 @@ protected void update(ClickHouseNodes manager, ClickHouseNode node, Status statu * health check */ protected ScheduledExecutorService getScheduler() { - return ClickHouseClientBuilder.defaultScheduler; + return ClickHouseDataStreamFactory.getInstance().getScheduler(); } /** diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index d171bdd14..253e19793 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -157,8 +157,9 @@ public Mutation format(ClickHouseFormat format) { } /** - * Sets custom writer for streaming. This will remove input stream set by other - * {@code data()} methods. + * Sets custom writer for writing uncompressed data, use + * {@link #data(ClickHousePassThruStream)} when the data is compressed. This + * will remove input stream set by other {@code data(...)} methods. * * @param writer writer * @return mutation request @@ -173,7 +174,9 @@ public Mutation data(ClickHouseWriter writer) { } /** - * Loads data from given pass-thru stream which may or may not be compressed. + * Loads data from the given pass-thru stream which may or may not be + * compressed. This will not only remove custom writer set by + * {@link #data(ClickHouseWriter)}, but may also update compression and format. * * @param stream pass-thru stream, could be a file and may or may not be * compressed @@ -204,22 +207,28 @@ public Mutation data(ClickHousePassThruStream stream) { } /** - * Loads data from given file which may or may not be compressed. + * Loads data from the given file. It's same as + * {@code data(ClickHouseFile.of(file))} if the file name implies compression + * algorithm and/or format(e.g. {@code some_file.csv.gz}). It fall back to + * {@link #data(InputStream)} if no clue. This will remove custom writer set by + * {@link #data(ClickHouseWriter)}. * * @param file absolute or relative path of the file, file extension will be * used to determine if it's compressed or not * @return mutation request */ public Mutation data(String file) { - return data(ClickHouseFile.of(file)); + ClickHouseFile f = ClickHouseFile.of(file); + return f.isRecognized() ? data(f) : data(f.getInputStream()); } /** - * Loads compressed data from given file. + * Loads compressed data from the given file. This will remove custom writer set + * by {@link #data(ClickHouseWriter)}. * * @param file absolute or relative path of the file - * @param compression compression algorithm, {@link ClickHouseCompression#NONE} - * means no compression + * @param compression compression algorithm, null or + * {@link ClickHouseCompression#NONE} means no compression * @return mutation request */ public Mutation data(String file, ClickHouseCompression compression) { @@ -228,13 +237,16 @@ public Mutation data(String file, ClickHouseCompression compression) { } /** - * Loads compressed data from given file. + * Loads compressed data from the given file. This will remove custom writer set + * by {@link #data(ClickHouseWriter)}. * * @param file absolute or relative path of the file - * @param compression compression algorithm, - * {@link ClickHouseCompression#NONE} - * means no compression - * @param compressionLevel compression level + * @param compression compression algorithm, null or + * {@link ClickHouseCompression#NONE} means no + * compression + * @param compressionLevel compression level, use + * {@code com.clickhouse.data.ClickHouseDataConfig#DEFAULT_READ_COMPRESS_LEVEL} + * to use recommended level for the algorithm * @return mutation request */ public Mutation data(String file, ClickHouseCompression compression, int compressionLevel) { @@ -242,7 +254,8 @@ public Mutation data(String file, ClickHouseCompression compression, int compres } /** - * Loads data from input stream. + * Loads data from input stream. This will remove custom writer set by + * {@link #data(ClickHouseWriter)}. * * @param input input stream * @return mutation request @@ -252,7 +265,8 @@ public Mutation data(InputStream input) { } /** - * Loads data from input stream. + * Loads data from input stream. This will remove custom writer set by + * {@link #data(ClickHouseWriter)}. * * @param input input stream * @return mutation request @@ -273,7 +287,8 @@ public Mutation data(ClickHouseInputStream input) { } /** - * Loads data from input stream. + * Loads data from deferred input stream. This will remove custom writer set by + * {@link #data(ClickHouseWriter)}. * * @param input input stream * @return mutation request @@ -1804,7 +1819,7 @@ public SelfT transaction() throws ClickHouseException { @SuppressWarnings("unchecked") public SelfT transaction(int timeout) throws ClickHouseException { ClickHouseTransaction tx = txRef.get(); - if (tx != null && tx.getTimeout() == (timeout > 0 ? timeout : 0)) { + if (tx != null && tx.getTimeout() == (timeout < 0 ? 0 : timeout)) { return (SelfT) this; } return transaction(getManager().getOrStartTransaction(this, timeout)); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseStreamResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseStreamResponse.java index 52c5bcf86..19520fbfd 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseStreamResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseStreamResponse.java @@ -111,6 +111,7 @@ public void close() { // ignore log.debug("Failed to skip reading input stream due to: %s", e.getMessage()); } finally { + // close forcibly without skipping won't help much when network is slow/unstable try { input.close(); } catch (IOException e) { diff --git a/clickhouse-client/src/main/java9/module-info.java b/clickhouse-client/src/main/java9/module-info.java index 69c58c083..a503f3d6a 100644 --- a/clickhouse-client/src/main/java9/module-info.java +++ b/clickhouse-client/src/main/java9/module-info.java @@ -5,8 +5,7 @@ exports com.clickhouse.client; exports com.clickhouse.client.config; - requires static java.logging; - requires static org.slf4j; + requires static org.dnsjava; requires transitive com.clickhouse.data; diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java index 1b18b01ae..5fe71e60a 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java @@ -2,6 +2,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; @@ -23,9 +25,13 @@ import com.clickhouse.config.ClickHouseConfigChangeListener; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseCompression; +import com.clickhouse.data.ClickHouseDataConfig; import com.clickhouse.data.ClickHouseExternalTable; import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseInputStream; import com.clickhouse.data.ClickHouseOutputStream; +import com.clickhouse.data.ClickHousePassThruStream; +import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.ClickHouseValues; import com.clickhouse.data.value.ClickHouseBigIntegerValue; import com.clickhouse.data.value.ClickHouseByteValue; @@ -284,6 +290,72 @@ public void testGetSetting() { @Test(groups = { "unit" }) public void testInputData() throws IOException { + Mutation request = ClickHouseClient.newInstance().connect(ClickHouseNode.builder().build()).write(); + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseDataConfig.DEFAULT_FORMAT); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.NONE); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), + ClickHouseDataConfig.DEFAULT_WRITE_COMPRESS_LEVEL); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertFalse(request.hasInputStream()); + + request.data("/non-existing-file/" + UUID.randomUUID().toString()); // unrecognized file + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseDataConfig.DEFAULT_FORMAT); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.NONE); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), + ClickHouseDataConfig.DEFAULT_WRITE_COMPRESS_LEVEL); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertTrue(request.hasInputStream()); + + Assert.assertThrows(IllegalArgumentException.class, + () -> request.data("/non-existing-file/" + UUID.randomUUID().toString() + ".csv.gz")); + + File tmp = ClickHouseUtils.createTempFile(null, ".csv.gz"); + request.data(tmp.getAbsolutePath()); + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseFormat.CSV); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.GZIP); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), + ClickHouseDataConfig.DEFAULT_WRITE_COMPRESS_LEVEL); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertTrue(request.hasInputStream()); + + request.data(ClickHousePassThruStream.of(ClickHouseInputStream.empty(), ClickHouseCompression.BROTLI, 2, + ClickHouseFormat.Arrow)); + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseFormat.Arrow); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.BROTLI); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), 2); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertTrue(request.hasInputStream()); + + request.data(new FileInputStream(tmp)); + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseFormat.Arrow); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.BROTLI); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), 2); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertTrue(request.hasInputStream()); + + request.data(ClickHousePassThruStream.of(ClickHouseInputStream.empty(), ClickHouseCompression.XZ, 3, + ClickHouseFormat.ArrowStream).newInputStream(64, null)); + Assert.assertEquals(request.getConfig().getFormat(), ClickHouseFormat.ArrowStream); + Assert.assertEquals(request.getConfig().getRequestCompressAlgorithm(), ClickHouseCompression.XZ); + Assert.assertEquals(request.getConfig().getRequestCompressLevel(), 3); + Assert.assertEquals(request.getConfig().getResponseCompressAlgorithm(), ClickHouseCompression.LZ4); + Assert.assertEquals(request.getConfig().getResponseCompressLevel(), + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL); + Assert.assertTrue(request.hasInputStream()); + } + + @Test(groups = { "unit" }) + public void testInputStreamAndCustomWriter() throws IOException { ClickHouseRequest request = ClickHouseClient.newInstance().connect(ClickHouseNode.builder().build()); Assert.assertFalse(request.hasInputStream()); Assert.assertFalse(request.getInputStream().isPresent()); diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 89b76f20c..135ef4f35 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1532,10 +1532,9 @@ public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throw new SkipException("Skip as only http implementation works well"); } - File file = Files.createTempFile("chc", ".data").toFile(); + File file = ClickHouseUtils.createTempFile("chc", ".data", false); ClickHouseFile wrappedFile = ClickHouseFile.of(file, - gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0, - ClickHouseFormat.CSV); + gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, ClickHouseFormat.CSV); String query = "select number, if(number % 2 = 0, null, toString(number)) str from numbers(10)"; if (useOneLiner) { ClickHouseClient.dump(server, query, wrappedFile).get(); @@ -1683,7 +1682,7 @@ public void testLoadFile(boolean gzipCompressed, boolean useOneLiner) throws Cli sendAndWait(server, "drop table if exists test_load_file", "create table test_load_file(a Int32, b Nullable(String))engine=Memory"); ClickHouseFile wrappedFile = ClickHouseFile.of(file, - gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, -1, ClickHouseFormat.CSV); + gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, ClickHouseFormat.CSV); if (useOneLiner) { try { ClickHouseClient.load(server, "test_load_file", wrappedFile).get(); @@ -1735,7 +1734,7 @@ public void testLoadRawData() throws ClickHouseException, IOException { // single producer → single consumer // important to close the stream *before* retrieving response try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null)) { + .createPipedOutputStream(config)) { // start the worker thread which transfer data from the input into ClickHouse future = request.data(stream.getInputStream()).execute(); // write bytes into the piped stream diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java index 11efa6519..cad11911a 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataConfig.java @@ -17,6 +17,11 @@ public Wrapped(ClickHouseDataConfig config) { this.config = ClickHouseChecker.nonNull(config, TYPE_NAME); } + @Override + public boolean isAsync() { + return config.isAsync(); + } + @Override public ClickHouseFormat getFormat() { return config.getFormat(); @@ -136,8 +141,9 @@ public boolean isWidenUnsignedTypes() { static final boolean DEFAULT_USE_OBJECT_IN_ARRAY = false; static final boolean DEFAULT_WIDEN_UNSIGNED_TYPE = false; - static final int DEFAULT_READ_COMPRESS_LEVEL = -1; - static final int DEFAULT_WRITE_COMPRESS_LEVEL = -1; + static final int DEFAULT_COMPRESS_LEVEL = -1; + static final int DEFAULT_READ_COMPRESS_LEVEL = DEFAULT_COMPRESS_LEVEL; + static final int DEFAULT_WRITE_COMPRESS_LEVEL = DEFAULT_COMPRESS_LEVEL; static final int DEFAULT_TIMEOUT = 30 * 1000; // 30 seconds diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java index 5f8530910..7378325cd 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java @@ -2,8 +2,16 @@ import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import com.clickhouse.config.ClickHouseBufferingMode; import com.clickhouse.data.format.ClickHouseRowBinaryProcessor; @@ -16,6 +24,25 @@ * Factory class for creating objects to handle data stream. */ public class ClickHouseDataStreamFactory { + protected static final class DefaultExecutors { + protected static final ExecutorService executor; + protected static final ScheduledExecutorService scheduler; + + static { + int coreThreads = Runtime.getRuntime().availableProcessors(); + if (coreThreads < ClickHouseUtils.MIN_CORE_THREADS) { + coreThreads = ClickHouseUtils.MIN_CORE_THREADS; + } + + executor = ClickHouseUtils.newThreadPool("ClickHouseWorker-", coreThreads, + coreThreads * 2 + 1, 0, 0, false); + scheduler = Executors.newSingleThreadScheduledExecutor(new ClickHouseThreadFactory("ClickHouseScheduler-")); + } + + private DefaultExecutors() { + } + } + private static final ClickHouseDataStreamFactory instance = ClickHouseUtils .getService(ClickHouseDataStreamFactory.class, new ClickHouseDataStreamFactory()); @@ -23,6 +50,24 @@ public class ClickHouseDataStreamFactory { protected static final String ERROR_NO_SERIALIZER = "No serializer available because format %s does not support output"; protected static final String ERROR_UNSUPPORTED_FORMAT = "Unsupported format: "; + /** + * Handles custom action. + * + * @param postCloseAction post close action, could be null + * @throws IOException when failed to execute post close action + */ + public static void handleCustomAction(Runnable postCloseAction) throws IOException { + if (postCloseAction == null) { + return; + } + + try { + postCloseAction.run(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + /** * Gets instance of the factory class. * @@ -32,6 +77,46 @@ public static ClickHouseDataStreamFactory getInstance() { return instance; } + /** + * Gets default executor service for running blocking tasks. + * + * @return non-null executor service + */ + public ExecutorService getExecutor() { + return DefaultExecutors.executor; + } + + /** + * Gets default scheduled executor service for scheduled tasks. + * + * @return non-null scheduled executor service + */ + public ScheduledExecutorService getScheduler() { + return DefaultExecutors.scheduler; + } + + /** + * Executes a blocking task using + * {@link CompletableFuture#supplyAsync(Supplier)} and custom + * {@link ExecutorService}. + * + * @return non-null future to get result + */ + public CompletableFuture runBlockingTask(Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, DefaultExecutors.executor); + } + + /** + * Schedules a task using + * {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)} and + * custom {@link ScheduledExecutorService}. + * + * @return non-null future to get result + */ + public ScheduledFuture scheduleTask(Runnable task, long delay, TimeUnit unit) { + return DefaultExecutors.scheduler.schedule(task, delay, unit); + } + /** * Gets data processor according to given {@link ClickHouseDataConfig} and * settings. @@ -60,6 +145,16 @@ public ClickHouseDataProcessor getProcessor(ClickHouseDataConfig config, ClickHo return processor; } + /** + * Creates a piped output stream. + * + * @param config non-null configuration + * @return piped output stream + */ + public final ClickHousePipedOutputStream createPipedOutputStream(ClickHouseDataConfig config) { + return createPipedOutputStream(config, (Runnable) null); + } + /** * Creates a piped output stream. * @@ -92,9 +187,55 @@ public ClickHousePipedOutputStream createPipedOutputStream(ClickHouseDataConfig : new NonBlockingPipedOutputStream(bufferSize, queue, timeout, policy, postCloseAction); } + /** + * Creates a piped output stream. + * + * @param config non-null configuration + * @param writer non-null custom writer + * @return piped output stream + */ + public ClickHousePipedOutputStream createPipedOutputStream(ClickHouseDataConfig config, ClickHouseWriter writer) { + if (config == null || writer == null) { + throw new IllegalArgumentException("Non-null config and writer are required"); + } + + final int bufferSize = config.getWriteBufferSize(); + final boolean blocking; + final int queue; + final CapacityPolicy policy; + final int timeout; + + if (config.getReadBufferingMode() == ClickHouseBufferingMode.PERFORMANCE) { + blocking = false; + queue = 0; + policy = null; + timeout = 0; // questionable + } else { + blocking = config.isUseBlockingQueue(); + queue = config.getMaxQueuedBuffers(); + policy = config.getBufferQueueVariation() < 1 ? CapacityPolicy.fixedCapacity(queue) + : CapacityPolicy.linearDynamicCapacity(1, queue, config.getBufferQueueVariation()); + timeout = config.getReadTimeout(); + } + + return blocking + ? new BlockingPipedOutputStream(bufferSize, queue, timeout, writer) + : new NonBlockingPipedOutputStream(bufferSize, queue, timeout, policy, writer); + } + + public final ClickHousePipedOutputStream createPipedOutputStream(int bufferSize, int queueSize, int timeout) { + return createPipedOutputStream(bufferSize, queueSize, timeout, (Runnable) null); + } + public ClickHousePipedOutputStream createPipedOutputStream(int bufferSize, int queueSize, int timeout, Runnable postCloseAction) { return new BlockingPipedOutputStream(ClickHouseDataConfig.getBufferSize(bufferSize), queueSize, timeout, postCloseAction); } + + public ClickHousePipedOutputStream createPipedOutputStream(int bufferSize, int queueSize, int timeout, + ClickHouseWriter writer) { + return new BlockingPipedOutputStream(ClickHouseDataConfig.getBufferSize(bufferSize), queueSize, timeout, + writer); + } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java index 7db57d258..3a6689d9f 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java @@ -24,7 +24,7 @@ public final class ClickHouseDeferredValue implements Supplier { * @return deferred value of a future object */ public static ClickHouseDeferredValue of(CompletableFuture future) { - return of(future, 0); + return of(future, 0L); } /** @@ -36,13 +36,14 @@ public static ClickHouseDeferredValue of(CompletableFuture future) { * timeout * @return deferred vaue of a future object */ - public static ClickHouseDeferredValue of(CompletableFuture future, int timeout) { - final CompletableFuture f = future != null ? future : CompletableFuture.completedFuture(null); - final int t = timeout < 0 ? 0 : timeout; + @SuppressWarnings("unchecked") + public static ClickHouseDeferredValue of(CompletableFuture future, long timeout) { + final CompletableFuture f = future != null ? future : (CompletableFuture) ClickHouseUtils.NULL_FUTURE; + final long t = timeout < 0L ? 0L : timeout; Supplier supplier = () -> { try { - return f.get(t, TimeUnit.MILLISECONDS); + return t > 0L ? f.get(t, TimeUnit.MILLISECONDS) : f.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); f.cancel(false); diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseFile.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseFile.java index 466b60233..4c90dbf47 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseFile.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseFile.java @@ -25,17 +25,21 @@ public class ClickHouseFile extends ClickHousePassThruStream { ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, null); public static ClickHouseFile of(File file) { - return of(file, null, 0, null); + return of(file, null, ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, null); } public static ClickHouseFile of(Path path) { return of(ClickHouseChecker.nonNull(path, "Path").toFile(), null, - ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, null); + ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, null); } public static ClickHouseFile of(String file) { return of(new File(ClickHouseChecker.nonEmpty(file, FILE_TYPE_NAME)), null, - ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, null); + ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, null); + } + + public static ClickHouseFile of(String file, ClickHouseCompression compression, ClickHouseFormat format) { + return of(file, compression, ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, format); } public static ClickHouseFile of(String file, ClickHouseCompression compression, int compressionLevel, @@ -43,6 +47,10 @@ public static ClickHouseFile of(String file, ClickHouseCompression compression, return of(new File(ClickHouseChecker.nonEmpty(file, FILE_TYPE_NAME)), compression, compressionLevel, format); } + public static ClickHouseFile of(File file, ClickHouseCompression compression, ClickHouseFormat format) { + return of(file, compression, ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, format); + } + public static ClickHouseFile of(File file, ClickHouseCompression compression, int compressionLevel, ClickHouseFormat format) { final String name = ClickHouseChecker.nonNull(file, FILE_TYPE_NAME).getName(); @@ -168,6 +176,16 @@ public File getFile() { return file; } + /** + * Checks if the given file is recogonized or not. Same as + * {@code hasCompression() || hasFormat()}. + * + * @return true if the file is recogonized + */ + public boolean isRecognized() { + return hasCompression() || hasFormat(); + } + @Override public boolean hasInput() { return file != null && file.exists(); diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java index b5091f2c7..998c54c69 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java @@ -155,6 +155,18 @@ public static ClickHouseInputStream of(ClickHousePassThruStream stream, int buff return stream.newInputStream(bufferSize, postCloseAction); } + /** + * Creates an input stream using the given customer writer. Behind the scene, a + * piped stream will be created, writer will be called in a separate worker + * thread for writing. + * + * @param writer non-null customer writer + * @return wrapped input + */ + public static ClickHouseInputStream of(ClickHouseWriter writer) { + return new DelegatedInputStream(null, writer); + } + /** * Creates an input stream using the given customer writer. Behind the scene, a * piped stream will be created, writer will be called in a separate worker @@ -493,10 +505,138 @@ public static long pipe(InputStream input, OutputStream output, byte[] buffer) t return count; } + /** + * Saves data from the given input stream to a temporary file, which will be + * deleted after JVM exited. + * + * @param input non-null input stream + * @return non-null temporary file + */ + public static File save(InputStream input) { + return save(null, input, null); + } + + /** + * Saves data from the given input stream to the specified file. + * + * @param input non-null input stream + * @param file target file, could be null + * @return non-null file + */ + public static File save(InputStream input, File file) { + return save(null, input, file); + } + + /** + * Saves data from the given input stream to a temporary file, which will be + * deleted after JVM exited. + * + * @param config config, could be null + * @param input non-null input stream + * @return non-null temporary file + */ + public static File save(ClickHouseDataConfig config, InputStream input) { + return save(config, input, null); + } + + /** + * Saves data from the given input stream to the specified file. + * + * @param config config, could be null + * @param input non-null input stream + * @param file target file, could be null + * @return non-null file + */ + public static File save(ClickHouseDataConfig config, InputStream input, File file) { + final File tmp; + if (file != null) { + tmp = file; + } else { + try { + tmp = ClickHouseUtils.createTempFile(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create temp file", e); + } + } + + final int bufferSize; + final long timeout; + if (config != null) { + bufferSize = config.getWriteBufferSize(); + timeout = config.getWriteTimeout(); + } else { + bufferSize = ClickHouseDataConfig.DEFAULT_WRITE_BUFFER_SIZE; + timeout = ClickHouseDataConfig.DEFAULT_TIMEOUT; + } + + if (timeout <= 0L) { + try { + try (OutputStream out = new FileOutputStream(tmp)) { + pipe(input, out, bufferSize); + } + return tmp; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + CompletableFuture data = ClickHouseDataStreamFactory.getInstance().runBlockingTask(() -> { + try { + try (OutputStream out = new FileOutputStream(tmp)) { + pipe(input, out, bufferSize); + } + return tmp; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + try { + return data.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (TimeoutException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof UncheckedIOException) { + throw ((UncheckedIOException) cause); + } else if (cause instanceof IOException) { + throw new UncheckedIOException((IOException) cause); + } + throw new IllegalStateException(cause); + } + } + } + + /** + * Saves data from the given input stream to a temporary file. + * + * @param in non-null input stream + * @param bufferSize buffer size + * @param timeout timeout in milliseconds + * @return non-null temporary file + * @deprecated will be dropped in 0.5, please use {@link #save(InputStream)} + * instead + */ + @Deprecated public static File save(InputStream in, int bufferSize, int timeout) { return save(null, in, bufferSize, timeout, true); } + /** + * Saves data from the given input stream to the specified file. + * + * @param file target file, could be null + * @param in non-null input stream + * @param bufferSize buffer size + * @param timeout timeout in milliseconds + * @param deleteOnExit whether the file should be deleted after JVM exit + * @return non-null file + * @deprecated will be dropped in 0.5, please use + * {@link #save(InputStream, File)} instead + */ + @Deprecated public static File save(File file, InputStream in, int bufferSize, int timeout, boolean deleteOnExit) { final File tmp; if (file != null) { @@ -511,7 +651,7 @@ public static File save(File file, InputStream in, int bufferSize, int timeout, throw new UncheckedIOException("Failed to create temp file", e); } } - CompletableFuture data = CompletableFuture.supplyAsync(() -> { + CompletableFuture data = ClickHouseDataStreamFactory.getInstance().runBlockingTask(() -> { try { try (OutputStream out = new FileOutputStream(tmp)) { pipe(in, out, bufferSize); @@ -612,6 +752,16 @@ public ClickHousePassThruStream getUnderlyingStream() { return stream; } + /** + * Checks if there's underlying input stream. Same as + * {@code getUnderlyingStream().hasInput()}. + * + * @return true if there's underlying input stream; false otherwise + */ + public boolean hasUnderlyingStream() { + return stream.hasInput(); + } + /** * Gets user data associated with this input stream. * @@ -960,9 +1110,7 @@ public void close() throws IOException { userData.clear(); // don't want to hold the last byte array reference for too long byteBuffer.reset(); - if (postCloseAction != null) { - postCloseAction.run(); - } + ClickHouseDataStreamFactory.handleCustomAction(postCloseAction); } } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseOutputStream.java index 4a69037a5..6e6d5b78a 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseOutputStream.java @@ -172,6 +172,16 @@ public ClickHousePassThruStream getUnderlyingStream() { return stream; } + /** + * Checks if there's underlying output stream. Same as + * {@code getUnderlyingStream().hasOutput()}. + * + * @return true if there's underlying output stream; false otherwise + */ + public boolean hasUnderlyingStream() { + return stream.hasOutput(); + } + /** * Transfers bytes into output stream without creating a copy. * @@ -255,9 +265,7 @@ public void close() throws IOException { flush(); } finally { closed = true; - if (postCloseAction != null) { - postCloseAction.run(); - } + ClickHouseDataStreamFactory.handleCustomAction(postCloseAction); } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePassThruStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePassThruStream.java index 40f5d95c4..864ac4b1f 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePassThruStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePassThruStream.java @@ -20,13 +20,34 @@ public class ClickHousePassThruStream implements Serializable { * Null stream which has no compression and format. */ public static final ClickHousePassThruStream NULL = new ClickHousePassThruStream(null, null, - ClickHouseCompression.NONE, -1, null); + ClickHouseCompression.NONE, ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL, null); + + public static ClickHousePassThruStream of(InputStream in, ClickHouseCompression compression, + ClickHouseFormat format) { + return of(in, null, compression, ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, format); + } public static ClickHousePassThruStream of(InputStream in, ClickHouseCompression compression, int compressionLevel, ClickHouseFormat format) { return of(in, null, compression, compressionLevel, format); } + public static ClickHousePassThruStream of(ClickHouseWriter writer, ClickHouseCompression compression, + ClickHouseFormat format) { + return of(ClickHouseInputStream.of(writer), null, compression, + ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, format); + } + + public static ClickHousePassThruStream of(ClickHouseWriter writer, ClickHouseCompression compression, + int compressionLevel, ClickHouseFormat format) { + return of(ClickHouseInputStream.of(writer), null, compression, compressionLevel, format); + } + + public static ClickHousePassThruStream of(OutputStream out, ClickHouseCompression compression, + ClickHouseFormat format) { + return of(null, out, compression, ClickHouseDataConfig.DEFAULT_WRITE_COMPRESS_LEVEL, format); + } + public static ClickHousePassThruStream of(OutputStream out, ClickHouseCompression compression, int compressionLevel, ClickHouseFormat format) { return of(null, out, compression, compressionLevel, format); @@ -34,7 +55,8 @@ public static ClickHousePassThruStream of(OutputStream out, ClickHouseCompressio public static ClickHousePassThruStream of(InputStream in, OutputStream out, ClickHouseCompression compression, int compressionLevel, ClickHouseFormat format) { - if (in == null && out == null && compression == null && compressionLevel == -1 && format == null) { + if (in == null && out == null && compression == null + && compressionLevel == ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL && format == null) { return NULL; } @@ -57,7 +79,9 @@ protected ClickHousePassThruStream(InputStream in, OutputStream out, ClickHouseC } /** - * Gets the input stream for reading. + * Gets the input stream for reading. Please pay attention that the returned + * input stream has nothing to do with this pass-thru stream, as + * {@code getInputStream().getUnderlyingStream()} is always {@link #NULL}. * * @return non-null input stream */ @@ -66,8 +90,13 @@ public ClickHouseInputStream getInputStream() { } /** - * Creates a wrapped input stream for reading. This method is supposed to be - * called once and only once. + * Creates a wrapped input stream for reading. Calling this method multiple + * times will generate multiple {@link ClickHouseInputStream} instances pointing + * to the same input stream, so it does not make sense to call this more than + * once. Unlike {@link #getInputStream()}, the returned input stream is + * associated with this pass-thru stream, so + * {@code newInputStream(...).getUnderlyingStream()} simply returns the current + * pass-thru stream. * * @param bufferSize buffer size which is always greater than zero(usually * 4096 or larger) @@ -83,7 +112,9 @@ public ClickHouseInputStream newInputStream(int bufferSize, Runnable postCloseAc } /** - * Gets the output stream for writing. + * Gets the output stream for writing. Please pay attention that the returned + * output stream has nothing to do with this pass-thru stream, as + * {@code getOutputStream().getUnderlyingStream()} is always {@link #NULL}. * * @return non-null output stream */ @@ -92,8 +123,13 @@ public ClickHouseOutputStream getOutputStream() { } /** - * Creates a wrapped output stream for writing. This method is supposed to be - * called once and only once. + * Creates a wrapped output stream for writing. Calling this method multiple + * times will generate multiple {@link ClickHouseOutputStream} instances, so it + * does not make sense to call this more than once. Unlike + * {@link #getOutputStream()}, the returned + * output stream is associated with this pass-thru stream, so + * {@code newOutputStream(...).getUnderlyingStream()} simply returns the current + * pass-thru stream. * * @param bufferSize buffer size which is always greater than zero(usually * 4096 or larger) diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePipedOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePipedOutputStream.java index a86d14a0d..80245a518 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePipedOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHousePipedOutputStream.java @@ -1,9 +1,72 @@ package com.clickhouse.data; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + /** * SPSC(Single-producer single-consumer) channel for streaming. */ public abstract class ClickHousePipedOutputStream extends ClickHouseOutputStream { + /** + * Handles async write result. + * + * @param future async write result + * @param timeout timeout in milliseconds + * @param postCloseAction post close aciton, could be null + * @throws UncheckedIOException when writing failed + */ + protected static void handleWriteResult(CompletableFuture future, long timeout, Runnable postCloseAction) + throws UncheckedIOException { + try { + if (future != null) { + future.get(timeout, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(new IOException("Writing was interrupted", e)); + } catch (TimeoutException e) { + throw new UncheckedIOException( + new IOException(ClickHouseUtils.format("Writing timed out after %d milliseconds", timeout), e)); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw new UncheckedIOException((IOException) cause); + } else if (cause instanceof UncheckedIOException) { + throw (UncheckedIOException) cause; + } else { + throw new UncheckedIOException(new IOException("Writing failed", cause)); + } + } finally { + if (postCloseAction != null) { + postCloseAction.run(); + } + } + } + + /** + * Writes data to the piped output stream in a separate thread. The given piped + * output stream will be closed automatically at the end of writing. + * + * @param writer non-null custom writer + * @param output non-null piped output stream + * @return non-null future + */ + protected static CompletableFuture writeAsync(ClickHouseWriter writer, ClickHousePipedOutputStream output) { + return ClickHouseDataStreamFactory.getInstance().runBlockingTask(() -> { + try (ClickHouseOutputStream out = output) { + writer.write(out); + } catch (Exception e) { + throw new CompletionException(e); + } + return null; + }); + } + protected ClickHousePipedOutputStream(Runnable postCloseAction) { super(null, postCloseAction); } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java index 7a3acc65a..97115e878 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -67,6 +68,9 @@ public final class ClickHouseUtils { */ public static final String DEFAULT_CHARSET = StandardCharsets.UTF_8.name(); + public static final int MIN_CORE_THREADS = 3; + public static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); + public static final String VARIABLE_PREFIX = "{{"; public static final String VARIABLE_SUFFIX = "}}"; @@ -140,24 +144,33 @@ public static File createTempFile(String prefix, String suffix) throws IOExcepti } /** - * Creates a temporary file with given prefix and suffix. + * Creates a temporary file with the given prefix and suffix. The file has only + * read and write access granted to the owner. * - * @param prefix prefix, could be null - * @param suffix suffix, could be null + * @param prefix prefix, null or empty string is taken as {@code "ch"} + * @param suffix suffix, null or empty string is taken as {@code ".data"} * @param deleteOnExit whether the file be deleted on exit * @return non-null temporary file * @throws IOException when failed to create the temporary file */ public static File createTempFile(String prefix, String suffix, boolean deleteOnExit) throws IOException { + if (prefix == null || prefix.isEmpty()) { + prefix = "ch"; + } + if (suffix == null || suffix.isEmpty()) { + suffix = ".data"; + } + final File f; if (IS_UNIX) { FileAttribute> attr = PosixFilePermissions .asFileAttribute(PosixFilePermissions.fromString("rw-------")); f = Files.createTempFile(prefix, suffix, attr).toFile(); } else { - f = Files.createTempFile(prefix == null ? "chc" : prefix, suffix == null ? ".data" : suffix).toFile(); + f = Files.createTempFile(prefix, suffix).toFile(); // NOSONAR f.setReadable(true, true); // NOSONAR f.setWritable(true, true); // NOSONAR + f.setExecutable(false, false); // NOSONAR } if (deleteOnExit) { @@ -266,8 +279,8 @@ public static ExecutorService newThreadPool(Object owner, int coreThreads, int m long keepAliveTimeoutMs, boolean allowCoreThreadTimeout) { BlockingQueue queue = maxRequests > 0 ? new ArrayBlockingQueue<>(maxRequests) : new LinkedBlockingQueue<>(); - if (coreThreads < 3) { - coreThreads = 3; + if (coreThreads < MIN_CORE_THREADS) { + coreThreads = MIN_CORE_THREADS; } if (maxThreads <= coreThreads) { maxThreads = coreThreads + 1; diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java index 2b92bda55..5f1ff2503 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseWriter.java @@ -4,8 +4,10 @@ @FunctionalInterface public interface ClickHouseWriter { + static final String TYPE_NAME = "Writer"; + /** - * Writes value to output stream. + * Writes data to output stream. * * @param output non-null output stream * @throws IOException when failed to write data to output stream diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingInputStream.java index 38703b27b..7de869231 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingInputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingInputStream.java @@ -8,31 +8,27 @@ import com.clickhouse.data.ClickHouseChecker; import com.clickhouse.data.ClickHouseUtils; -import com.clickhouse.logging.Logger; -import com.clickhouse.logging.LoggerFactory; /** * {@link java.nio.ByteBuffer} backed input stream with * {@link java.util.concurrent.BlockingQueue}. */ public class BlockingInputStream extends AbstractByteBufferInputStream { - private static final Logger log = LoggerFactory.getLogger(BlockingInputStream.class); - private final BlockingQueue queue; - private final int timeout; + private final long timeout; - public BlockingInputStream(BlockingQueue queue, int timeout, Runnable postCloseAction) { + public BlockingInputStream(BlockingQueue queue, long timeout, Runnable postCloseAction) { super(null, null, postCloseAction); this.queue = ClickHouseChecker.nonNull(queue, "Queue"); - this.timeout = timeout > 0 ? timeout : 0; + this.timeout = timeout < 0L ? 0L : timeout; } @Override protected int updateBuffer() throws IOException { ByteBuffer b; try { - if (timeout > 0) { + if (timeout > 0L) { b = queue.poll(timeout, TimeUnit.MILLISECONDS); if (b == null) { throw new IOException(ClickHouseUtils.format("Read timed out after %d ms", timeout)); diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingPipedOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingPipedOutputStream.java index bc005b852..1bcea6b72 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingPipedOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/BlockingPipedOutputStream.java @@ -5,16 +5,20 @@ import java.nio.ByteBuffer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import com.clickhouse.data.ClickHouseByteBuffer; +import com.clickhouse.data.ClickHouseChecker; import com.clickhouse.data.ClickHouseDataConfig; +import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseDataUpdater; import com.clickhouse.data.ClickHouseInputStream; import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHousePipedOutputStream; import com.clickhouse.data.ClickHouseUtils; +import com.clickhouse.data.ClickHouseWriter; /** * A combination of {@link java.io.PipedOutputStream} and @@ -26,11 +30,16 @@ public class BlockingPipedOutputStream extends ClickHousePipedOutputStream { protected final BlockingQueue queue; private final int bufferSize; - private final int timeout; + private final CompletableFuture future; + private final long timeout; private ByteBuffer buffer; - public BlockingPipedOutputStream(int bufferSize, int queueLength, int timeout, Runnable postCloseAction) { + public BlockingPipedOutputStream(int bufferSize, int queueLength, long timeout) { + this(bufferSize, queueLength, timeout, (Runnable) null); + } + + public BlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, Runnable postCloseAction) { super(postCloseAction); // DisruptorBlockingQueue? Did not see much difference here... @@ -38,11 +47,27 @@ public BlockingPipedOutputStream(int bufferSize, int queueLength, int timeout, R // may need an initialBufferSize and a monitor to update bufferSize in runtime this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize); + this.future = ClickHouseUtils.NULL_FUTURE; this.timeout = timeout; this.buffer = ByteBuffer.allocate(this.bufferSize); } + public BlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, ClickHouseWriter writer) { + super(null); + + // DisruptorBlockingQueue? Did not see much difference here... + this.queue = queueLength <= 0 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(queueLength); + + // may need an initialBufferSize and a monitor to update bufferSize in runtime + this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize); + this.timeout = timeout; + + this.buffer = ByteBuffer.allocate(this.bufferSize); + + this.future = writeAsync(ClickHouseChecker.nonNull(writer, ClickHouseWriter.TYPE_NAME), this); + } + private void updateBuffer(boolean allocateNewBuffer) throws IOException { ByteBuffer b = buffer; if (b.hasRemaining()) { @@ -59,7 +84,7 @@ private void updateBuffer(boolean allocateNewBuffer) throws IOException { private void updateBuffer(ByteBuffer b) throws IOException { try { - if (timeout > 0) { + if (timeout > 0L) { if (!queue.offer(b, timeout, TimeUnit.MILLISECONDS)) { throw new IOException(ClickHouseUtils.format("Write timed out after %d ms", timeout)); } @@ -74,7 +99,7 @@ private void updateBuffer(ByteBuffer b) throws IOException { @Override public ClickHouseInputStream getInputStream(Runnable postCloseAction) { - return new BlockingInputStream(queue, timeout, postCloseAction); + return new BlockingInputStream(queue, timeout, () -> handleWriteResult(future, timeout, postCloseAction)); } @Override @@ -89,7 +114,7 @@ public void close() throws IOException { buffer = ClickHouseByteBuffer.EMPTY_BUFFER; try { - if (timeout > 0) { + if (timeout > 0L) { if (!queue.offer(buffer, timeout, TimeUnit.MILLISECONDS)) { throw new IOException(ClickHouseUtils.format("Close stream timed out after %d ms", timeout)); } @@ -101,9 +126,7 @@ public void close() throws IOException { throw new IOException("Thread was interrupted when putting EMPTY buffer into queue", e); } finally { closed = true; - if (postCloseAction != null) { - postCloseAction.run(); - } + ClickHouseDataStreamFactory.handleCustomAction(postCloseAction); } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java index e8221ee71..eeb8c29be 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/DelegatedInputStream.java @@ -2,14 +2,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.io.UncheckedIOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.imageio.IIOException; import com.clickhouse.data.ClickHouseByteBuffer; import com.clickhouse.data.ClickHouseChecker; @@ -21,22 +13,16 @@ import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHousePassThruStream; import com.clickhouse.data.ClickHousePipedOutputStream; -import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.ClickHouseWriter; public class DelegatedInputStream extends ClickHouseInputStream { private final ClickHouseInputStream input; - private final int timeout; - private final CompletableFuture future; - public DelegatedInputStream(ClickHousePassThruStream stream, ClickHouseInputStream input, OutputStream copyTo, Runnable postCloseAction) { super(stream, copyTo, postCloseAction); this.input = ClickHouseChecker.nonNull(input, TYPE_NAME); - this.timeout = ClickHouseDataConfig.DEFAULT_TIMEOUT; - this.future = CompletableFuture.completedFuture(true); } public DelegatedInputStream(ClickHouseDataConfig config, ClickHouseWriter writer) { @@ -46,20 +32,18 @@ public DelegatedInputStream(ClickHouseDataConfig config, ClickHouseWriter writer throw new IllegalArgumentException("Non-null writer is required"); } - this.timeout = config != null ? config.getReadTimeout() : ClickHouseDataConfig.DEFAULT_TIMEOUT; - ClickHousePipedOutputStream stream = config != null - ? ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null) // NOSONAR - : ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( // NOSONAR - ClickHouseDataConfig.DEFAULT_WRITE_BUFFER_SIZE, 0, this.timeout, null); + final ClickHousePipedOutputStream stream; + if (config != null) { + stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, writer); // NOSONAR + } else { + stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( // NOSONAR + ClickHouseDataConfig.DEFAULT_WRITE_BUFFER_SIZE, 0, ClickHouseDataConfig.DEFAULT_TIMEOUT, writer); + } this.input = stream.getInputStream(); - this.future = CompletableFuture.supplyAsync(() -> { - try (ClickHouseOutputStream out = stream) { - writer.write(out); - } catch (Exception e) { - throw new CompletionException(e); - } - return true; - }); + } + + public DelegatedInputStream(ClickHouseWriter writer) { + this(null, writer); } @Override @@ -89,31 +73,14 @@ public int read() throws IOException { @Override public void close() throws IOException { + if (closed) { + return; + } + try { - try { - future.get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Custom writer was interrupted", e); - } catch (TimeoutException e) { - throw new IIOException( - ClickHouseUtils.format("Custom writing timed out after %d milliseconds", timeout), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw ((IOException) cause); - } else if (cause instanceof UncheckedIOException) { - throw ((UncheckedIOException) cause).getCause(); - } else { - throw new IOException("Custom writing failure", cause); - } - } + input.close(); } finally { - try { - super.close(); - } finally { - input.close(); - } + super.close(); } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingInputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingInputStream.java index 86de8bd18..bfacebb6d 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingInputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingInputStream.java @@ -13,16 +13,16 @@ public class NonBlockingInputStream extends ClickHouseInputStream { private final AdaptiveQueue queue; - private final int timeout; + private final long timeout; private byte[] buffer; private int position; - public NonBlockingInputStream(AdaptiveQueue queue, int timeout, Runnable postCloseAction) { + public NonBlockingInputStream(AdaptiveQueue queue, long timeout, Runnable postCloseAction) { super(null, null, postCloseAction); this.queue = ClickHouseChecker.nonNull(queue, "Queue"); - this.timeout = timeout > 0 ? timeout : 0; + this.timeout = timeout < 0L ? 0L : timeout; this.buffer = null; this.position = 0; diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java index 738730ce9..5d5a7739f 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java @@ -1,14 +1,18 @@ package com.clickhouse.data.stream; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import com.clickhouse.data.ClickHouseByteBuffer; +import com.clickhouse.data.ClickHouseChecker; import com.clickhouse.data.ClickHouseDataConfig; +import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseDataUpdater; import com.clickhouse.data.ClickHouseInputStream; import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHousePipedOutputStream; import com.clickhouse.data.ClickHouseUtils; +import com.clickhouse.data.ClickHouseWriter; /** * A combination of {@link java.io.PipedOutputStream} and @@ -20,8 +24,9 @@ public class NonBlockingPipedOutputStream extends ClickHousePipedOutputStream { protected final AdaptiveQueue queue; protected final int bufferSize; - protected final int timeout; protected final byte[][] buckets; + protected final CompletableFuture future; + protected final long timeout; protected int current; @@ -78,7 +83,11 @@ private void updateBuffer(byte[] bytes, int offset, int length) throws IOExcepti } } - public NonBlockingPipedOutputStream(int bufferSize, int queueLength, int timeout, CapacityPolicy policy, + public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy) { + this(bufferSize, queueLength, timeout, policy, (Runnable) null); + } + + public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy, Runnable postCloseAction) { super(postCloseAction); @@ -86,16 +95,34 @@ public NonBlockingPipedOutputStream(int bufferSize, int queueLength, int timeout // may need an initialBufferSize and a monitor to update bufferSize in runtime this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize); + this.buckets = queueLength < 2 ? new byte[0][] : new byte[queueLength][]; + this.future = ClickHouseUtils.NULL_FUTURE; this.timeout = timeout; + + this.current = queueLength < 2 ? -1 : 0; + this.buffer = allocateBuffer(); + } + + public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy, + ClickHouseWriter writer) { + super(null); + + this.queue = AdaptiveQueue.create(policy); + + // may need an initialBufferSize and a monitor to update bufferSize in runtime + this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize); this.buckets = queueLength < 2 ? new byte[0][] : new byte[queueLength][]; + this.timeout = timeout; this.current = queueLength < 2 ? -1 : 0; this.buffer = allocateBuffer(); + + this.future = writeAsync(ClickHouseChecker.nonNull(writer, ClickHouseWriter.TYPE_NAME), this); } @Override public ClickHouseInputStream getInputStream(Runnable postCloseAction) { - return new NonBlockingInputStream(queue, timeout, postCloseAction); + return new NonBlockingInputStream(queue, timeout, () -> handleWriteResult(future, timeout, postCloseAction)); } @Override @@ -115,9 +142,7 @@ public void close() throws IOException { } closed = true; - if (postCloseAction != null) { - postCloseAction.run(); - } + ClickHouseDataStreamFactory.handleCustomAction(postCloseAction); } } diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDataStreamFactoryTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDataStreamFactoryTest.java index f0ff86497..d8b65b329 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDataStreamFactoryTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDataStreamFactoryTest.java @@ -22,8 +22,8 @@ public void testCreatePipedOutputStream() throws ExecutionException, IOException for (int i = 0; i < 256; i++) { CompletableFuture future; try (ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null)) { - future = CompletableFuture.supplyAsync(() -> { + .createPipedOutputStream(config)) { + future = ClickHouseDataStreamFactory.getInstance().runBlockingTask(() -> { try (ClickHouseInputStream in = out.getInputStream()) { return in.read(); } catch (IOException e) { @@ -37,11 +37,10 @@ public void testCreatePipedOutputStream() throws ExecutionException, IOException // write in worker thread for (int i = 0; i < 256; i++) { - ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, - null); + ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config); final int num = i; try (ClickHouseInputStream in = out.getInputStream()) { - CompletableFuture.supplyAsync(() -> { + ClickHouseDataStreamFactory.getInstance().runBlockingTask(() -> { try (ClickHouseOutputStream o = out) { o.write(num); } catch (IOException e) { @@ -53,4 +52,19 @@ public void testCreatePipedOutputStream() throws ExecutionException, IOException } } } + + @Test(groups = { "unit" }) + public void testHandleCustomAction() throws IOException { + // nothing will happen + ClickHouseDataStreamFactory.handleCustomAction(null); + ClickHouseDataStreamFactory.handleCustomAction(() -> { + }); + ClickHouseDataStreamFactory.handleCustomAction(() -> { + new Exception(); + }); + + Assert.assertThrows(IOException.class, () -> ClickHouseDataStreamFactory.handleCustomAction(() -> { + throw new UncheckedIOException(new IOException("fake exception")); + })); + } } diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDeferredValueTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDeferredValueTest.java new file mode 100644 index 000000000..45ae9a2c7 --- /dev/null +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseDeferredValueTest.java @@ -0,0 +1,58 @@ +package com.clickhouse.data; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ClickHouseDeferredValueTest { + @Test(groups = { "unit" }) + public void testDeferredValues() throws Exception { + final List list = new ArrayList<>(2); + ClickHouseDeferredValue v = ClickHouseDeferredValue.of(list, List.class); + Assert.assertEquals(v.get(), list); + list.add(3); + Assert.assertEquals(v.get(), list); + + v = ClickHouseDeferredValue.of(() -> { + list.add(5); + return list; + }); + Assert.assertEquals(list, Arrays.asList(3)); + Assert.assertEquals(v.get(), list); + Assert.assertEquals(list, Arrays.asList(3, 5)); + + CountDownLatch latch = new CountDownLatch(1); + v = ClickHouseDeferredValue.of(CompletableFuture.supplyAsync(() -> { + try { + latch.await(3000L, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + list.remove(0); + return list; + }), 500L); + Assert.assertEquals(list, Arrays.asList(3, 5)); + latch.countDown(); + Thread.sleep(1000L); + Assert.assertEquals(v.get(), list); + Assert.assertEquals(list, Arrays.asList(5)); + } + + @Test(groups = { "unit" }) + public void testNullValues() { + Assert.assertNull(ClickHouseDeferredValue.of((CompletableFuture) null).get()); + Assert.assertNull(ClickHouseDeferredValue.of((CompletableFuture) null, 50L).get()); + + Assert.assertNull(ClickHouseDeferredValue.of((Supplier) null).get()); + + Assert.assertNull(ClickHouseDeferredValue.of(null, Object.class).get()); + Assert.assertNull(ClickHouseDeferredValue.of(null, null).get()); + } +} diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseFileTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseFileTest.java index 94f2b5848..0ec46a5d7 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseFileTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseFileTest.java @@ -18,7 +18,7 @@ public void testConstructor() { Assert.assertTrue(file.hasOutput()); Assert.assertTrue(file.hasCompression()); Assert.assertEquals(file.getCompressionAlgorithm(), ClickHouseCompression.LZ4); - Assert.assertEquals(file.getCompressionLevel(), -1); + Assert.assertEquals(file.getCompressionLevel(), ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL); Assert.assertTrue(file.hasFormat()); Assert.assertEquals(file.getFormat(), ClickHouseFormat.CSV); Assert.assertEquals(file.getFile().getAbsolutePath(), nonExistingFile); diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseInputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseInputStreamTest.java index 90ea62237..11a103cda 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseInputStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseInputStreamTest.java @@ -5,7 +5,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; @@ -183,6 +185,16 @@ public void testBlockingInputAsync() throws IOException { Assert.assertTrue(in.isClosed(), "Should have been closed"); } + @Test(groups = { "unit" }) + public void testPostCloseAction() throws IOException { + try (ClickHouseInputStream in = ClickHouseInputStream + .of(generateInputStream("abc".getBytes(StandardCharsets.US_ASCII)), 0, () -> { + throw new UncheckedIOException(new IOException("fake exception")); + })) { + Assert.assertThrows(IOException.class, () -> in.close()); + } + } + @Test(groups = { "unit" }) public void testReadVarInt() throws IOException { Assert.assertEquals(ClickHouseInputStream.of(new byte[] { 0x00 }).readVarInt(), 0); diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseOutputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseOutputStreamTest.java index 1c84553f7..ba697b317 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseOutputStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseOutputStreamTest.java @@ -4,6 +4,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import org.testng.Assert; @@ -62,4 +63,15 @@ public void testNullOrClosedOutput() throws IOException { Assert.assertEquals(empty.isClosed(), true); Assert.assertThrows(IOException.class, () -> empty.write(1)); } + + @Test(groups = { "unit" }) + public void testPostCloseAction() throws IOException { + try (ClickHouseOutputStream out = ClickHouseOutputStream.of(new ByteArrayOutputStream(), 0, + ClickHouseCompression.NONE, + ClickHouseTestDataConfig.DEFAULT_COMPRESS_LEVEL, () -> { + throw new UncheckedIOException(new IOException("fake exception")); + })) { + Assert.assertThrows(IOException.class, () -> out.close()); + } + } } diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHousePassThruStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHousePassThruStreamTest.java index 01e0c58fc..c9fcc8d5e 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHousePassThruStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHousePassThruStreamTest.java @@ -14,8 +14,7 @@ public void testConstructor() { final InputStream in = new ByteArrayInputStream(new byte[0]); final OutputStream out = new ByteArrayOutputStream(); - ClickHousePassThruStream stream = ClickHousePassThruStream.of(in, null, -1, - null); + ClickHousePassThruStream stream = ClickHousePassThruStream.of(in, null, null); Assert.assertNotEquals(stream.getInputStream(), ClickHouseInputStream.empty()); Assert.assertEquals(stream.getOutputStream(), ClickHouseOutputStream.empty()); Assert.assertFalse(stream.hasCompression()); @@ -24,7 +23,7 @@ public void testConstructor() { Assert.assertFalse(stream.isCompressed()); Assert.assertFalse(stream.hasFormat()); Assert.assertNull(stream.getCompressionAlgorithm()); - Assert.assertEquals(stream.getCompressionLevel(), -1); + Assert.assertEquals(stream.getCompressionLevel(), ClickHouseDataConfig.DEFAULT_COMPRESS_LEVEL); Assert.assertNull(stream.getFormat()); stream = ClickHousePassThruStream.of(out, null, -1, null); diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseUtilsTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseUtilsTest.java index c7e17b457..701982c27 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseUtilsTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseUtilsTest.java @@ -20,8 +20,8 @@ public void testCreateTempFile() throws IOException { File f = ClickHouseUtils.createTempFile(null, null); Assert.assertNotNull(f); Assert.assertTrue(f.exists(), f.getAbsolutePath() + " should exist"); - Assert.assertTrue(f.getName().endsWith(".tmp"), - "By default temporary file should end with .tmp, but it's " + f.getName()); + Assert.assertTrue(f.getName().endsWith(".data"), + "By default temporary file should end with .data, but it's " + f.getName()); f = ClickHouseUtils.createTempFile("prefix__", "__suffix", true); Assert.assertNotNull(f); diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/stream/BlockingPipedOutputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/stream/BlockingPipedOutputStreamTest.java index ce52eea76..9c84c85e0 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/stream/BlockingPipedOutputStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/stream/BlockingPipedOutputStreamTest.java @@ -21,7 +21,7 @@ public class BlockingPipedOutputStreamTest { @Test(groups = { "unit" }) public void testRead() throws InterruptedException, IOException { - BlockingPipedOutputStream stream = new BlockingPipedOutputStream(4, 3, 1, null); + BlockingPipedOutputStream stream = new BlockingPipedOutputStream(4, 3, 1); Assert.assertEquals(stream.queue.size(), 0); try (InputStream in = stream.getInputStream()) { in.read(); @@ -81,7 +81,7 @@ public void testRead() throws InterruptedException, IOException { @Test(groups = { "unit" }) public void testReadBytes() throws InterruptedException, IOException { - BlockingPipedOutputStream stream = new BlockingPipedOutputStream(4, 3, 1, null); + BlockingPipedOutputStream stream = new BlockingPipedOutputStream(4, 3, 1); Assert.assertEquals(stream.queue.size(), 0); byte[] bytes = new byte[3]; try (InputStream in = stream.getInputStream()) { @@ -145,7 +145,7 @@ public void testReadBytes() throws InterruptedException, IOException { @Test(groups = { "unit" }) public void testWrite() throws InterruptedException, IOException { - BlockingPipedOutputStream stream = new BlockingPipedOutputStream(2, 3, 2, null); + BlockingPipedOutputStream stream = new BlockingPipedOutputStream(2, 3, 2); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(5); @@ -160,7 +160,7 @@ public void testWrite() throws InterruptedException, IOException { Assert.assertEquals(stream.queue.take().array(), new byte[] { (byte) 7, (byte) 0 }); } - stream = new BlockingPipedOutputStream(1, 1, 2, null); + stream = new BlockingPipedOutputStream(1, 1, 2); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(5); @@ -185,7 +185,7 @@ public void testWrite() throws InterruptedException, IOException { @Test(groups = { "unit" }) public void testWriteBytes() throws InterruptedException, IOException { - BlockingPipedOutputStream stream = new BlockingPipedOutputStream(2, 3, 2, null); + BlockingPipedOutputStream stream = new BlockingPipedOutputStream(2, 3, 2); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(new byte[] { (byte) 9, (byte) 10 }); @@ -215,8 +215,7 @@ public void testPipedStream() throws InterruptedException, IOException { ExecutorService executor = Executors.newFixedThreadPool(2); for (int bufferSize = -1; bufferSize < 10; bufferSize++) { for (int queueLength = -1; queueLength < 10; queueLength++) { - BlockingPipedOutputStream stream = new BlockingPipedOutputStream(bufferSize, queueLength, timeout, - null); + BlockingPipedOutputStream stream = new BlockingPipedOutputStream(bufferSize, queueLength, timeout); try (InputStream in = stream.getInputStream(); OutputStream out = stream) { final int count = 10000; final AtomicInteger p = new AtomicInteger(0); diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java index 891c6340a..af71c6d5e 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/stream/DelegatedInputStreamTest.java @@ -30,7 +30,7 @@ public void testRead() throws IOException { @Test(groups = { "unit" }) public void testReadBytes() throws IOException { - try (DelegatedInputStream in = new DelegatedInputStream(null, w -> { + try (DelegatedInputStream in = new DelegatedInputStream(w -> { for (int i = 0; i < Byte.MAX_VALUE; i++) { w.writeBytes(new byte[] { 1, 2, 3, 4, 5, 6 }); } diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/stream/NonBlockingPipedOutputStreamTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/stream/NonBlockingPipedOutputStreamTest.java index 13ac909f9..d9c1513d1 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/stream/NonBlockingPipedOutputStreamTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/stream/NonBlockingPipedOutputStreamTest.java @@ -19,8 +19,8 @@ public class NonBlockingPipedOutputStreamTest { @Test(groups = { "unit" }) public void testRead() throws IOException { - NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, CapacityPolicy.fixedCapacity(3), - null); + NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, + CapacityPolicy.fixedCapacity(3)); Assert.assertEquals(stream.queue.size(), 0); try (InputStream in = stream.getInputStream()) { in.read(); @@ -78,8 +78,8 @@ public void testRead() throws IOException { @Test(groups = { "unit" }) public void testReadBytes() throws IOException { - NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, CapacityPolicy.fixedCapacity(3), - null); + NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, + CapacityPolicy.fixedCapacity(3)); Assert.assertEquals(stream.queue.size(), 0); byte[] bytes = new byte[3]; try (InputStream in = stream.getInputStream()) { @@ -141,8 +141,8 @@ public void testReadBytes() throws IOException { @Test(groups = { "unit" }) public void testWrite() throws IOException { - NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, CapacityPolicy.fixedCapacity(3), - null); + NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, + CapacityPolicy.fixedCapacity(3)); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(5); @@ -157,7 +157,7 @@ public void testWrite() throws IOException { Assert.assertEquals(stream.queue.poll(), new byte[] { (byte) 7 }); } - stream = new NonBlockingPipedOutputStream(1, 1, 2, CapacityPolicy.fixedCapacity(1), null); + stream = new NonBlockingPipedOutputStream(1, 1, 2, CapacityPolicy.fixedCapacity(1)); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(5); @@ -182,8 +182,8 @@ public void testWrite() throws IOException { @Test(groups = { "unit" }) public void testWriteBytes() throws IOException { - NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, CapacityPolicy.fixedCapacity(3), - null); + NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, + CapacityPolicy.fixedCapacity(3)); Assert.assertEquals(stream.queue.size(), 0); try (OutputStream out = stream) { out.write(new byte[] { (byte) 9, (byte) 10 }); @@ -214,7 +214,7 @@ public void testPipedStream() throws InterruptedException, IOException { for (int bufferSize = -1; bufferSize < 10; bufferSize++) { for (int queueLength = -1; queueLength < 10; queueLength++) { final NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(bufferSize, queueLength, - timeout, CapacityPolicy.fixedCapacity(queueLength), null); + timeout, CapacityPolicy.fixedCapacity(queueLength)); try (InputStream in = stream.getInputStream(); OutputStream out = stream) { final int count = 10000; final AtomicInteger p = new AtomicInteger(0); diff --git a/clickhouse-grpc-client/pom.xml b/clickhouse-grpc-client/pom.xml index 618e4c8c4..74d1ec66a 100644 --- a/clickhouse-grpc-client/pom.xml +++ b/clickhouse-grpc-client/pom.xml @@ -208,12 +208,6 @@ ** - - org.slf4j:slf4j-api - - ** - - org.tukaani:xz @@ -307,12 +301,6 @@ ** - - org.slf4j:slf4j-api - - ** - - org.tukaani:xz @@ -408,12 +396,6 @@ ** - - org.slf4j:slf4j-api - - ** - - org.tukaani:xz diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java index e8d2e310b..dcfec8d2d 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java @@ -120,7 +120,7 @@ protected static ClickHouseInputStream getCompressedInputStream(ClickHouseConfig final int bufferSize = config.getWriteBufferSize(); final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() // NOSONAR - .createPipedOutputStream(bufferSize, 0, config.getSocketTimeout(), null); + .createPipedOutputStream(bufferSize, 0, config.getSocketTimeout()); final ClickHouseInputStream compressedInput = stream.getInputStream(); ClickHouseClient.submit(() -> { diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java index 447edd1e7..3b3020834 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java @@ -63,7 +63,7 @@ protected ClickHouseStreamObserver(ClickHouseConfig config, ClickHouseNode serve ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, postCloseAction); } else { ClickHousePipedOutputStream pipedStream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null); + .createPipedOutputStream(config); this.stream = pipedStream; this.input = ClickHouseGrpcClient.getInput(config, pipedStream.getInputStream(), postCloseAction); } diff --git a/clickhouse-http-client/pom.xml b/clickhouse-http-client/pom.xml index e322f9817..9fb1f01de 100644 --- a/clickhouse-http-client/pom.xml +++ b/clickhouse-http-client/pom.xml @@ -41,6 +41,12 @@ org.apache.httpcomponents.client5 httpclient5 true + + + org.slf4j + slf4j-api + + org.brotli @@ -155,12 +161,6 @@ ** - - org.slf4j:slf4j-api - - ** - - org.tukaani:xz diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java index fe44eb7c7..8b6bc981e 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java @@ -209,9 +209,8 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B String sql, ClickHouseInputStream data, List tables, ClickHouseOutputStream output, Runnable postAction) throws IOException { try { - ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream( - config, - null); + ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config); reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream)); // running in async is necessary to avoid deadlock of the piped stream CompletableFuture> f = postRequest(reqBuilder.build()); diff --git a/clickhouse-http-client/src/main/java11/module-info.java b/clickhouse-http-client/src/main/java11/module-info.java index 7d1cb83e4..7829f28bf 100644 --- a/clickhouse-http-client/src/main/java11/module-info.java +++ b/clickhouse-http-client/src/main/java11/module-info.java @@ -6,7 +6,5 @@ requires java.net.http; - requires static com.google.gson; - requires transitive com.clickhouse.client; } diff --git a/clickhouse-http-client/src/main/java9/module-info.java b/clickhouse-http-client/src/main/java9/module-info.java index d1f979b0d..1ee3fb54c 100644 --- a/clickhouse-http-client/src/main/java9/module-info.java +++ b/clickhouse-http-client/src/main/java9/module-info.java @@ -4,7 +4,5 @@ provides com.clickhouse.client.ClickHouseClient with com.clickhouse.client.http.ClickHouseHttpClient; - requires static com.google.gson; - requires transitive com.clickhouse.client; } diff --git a/clickhouse-jdbc/pom.xml b/clickhouse-jdbc/pom.xml index fd686bf45..27f553a01 100644 --- a/clickhouse-jdbc/pom.xml +++ b/clickhouse-jdbc/pom.xml @@ -54,6 +54,12 @@ org.apache.httpcomponents.client5 httpclient5 + + + org.slf4j + slf4j-api + + org.lz4 @@ -217,12 +223,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* @@ -297,12 +297,6 @@ - - org.slf4j:slf4j-api - - ** - - *:* @@ -391,12 +385,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* @@ -483,12 +471,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* @@ -576,12 +558,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java index f11014ced..cfe7d91e9 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java @@ -80,7 +80,7 @@ protected InputBasedPreparedStatement(ClickHouseConnectionImpl connection, Click counter = 0; // it's important to make sure the queue has unlimited length stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0, - config.getSocketTimeout(), null); + config.getSocketTimeout()); } protected void ensureParams() throws SQLException { @@ -367,7 +367,7 @@ public void clearBatch() throws SQLException { ClickHouseConfig config = getConfig(); stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0, - config.getSocketTimeout(), null); + config.getSocketTimeout()); resetDataProcessor(); } @@ -447,7 +447,8 @@ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws S } else { Calendar c = (Calendar) cal.clone(); c.setTime(x); - dt = c.toInstant().atZone(tz).withNano(x.getNanos()).withZoneSameInstant(timeZoneForTs).toLocalDateTime(); + dt = c.toInstant().atZone(tz).withNano(x.getNanos()).withZoneSameInstant(timeZoneForTs) + .toLocalDateTime(); } values[idx].update(dt); } else { diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java index 1641b2a23..dc9405358 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java @@ -239,7 +239,7 @@ public void setObject(int parameterIndex, Object x) throws SQLException { if (x instanceof ClickHouseWriter) { final ClickHouseWriter writer = (ClickHouseWriter) x; final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() // NOSONAR - .createPipedOutputStream(getConfig(), null); + .createPipedOutputStream(getConfig()); value = stream.getInputStream(); // always run in async mode or it will not work diff --git a/clickhouse-jdbc/src/main/java9/module-info.java b/clickhouse-jdbc/src/main/java9/module-info.java index ff9099a4f..b5893ee61 100644 --- a/clickhouse-jdbc/src/main/java9/module-info.java +++ b/clickhouse-jdbc/src/main/java9/module-info.java @@ -3,18 +3,12 @@ */ module com.clickhouse.jdbc { exports com.clickhouse.jdbc; - + requires java.sql; requires transitive com.clickhouse.client; - requires transitive com.google.gson; - requires transitive org.lz4.java; - - requires static java.logging; - // requires static com.github.benmanes.caffeine; - // requires static org.dnsjava; - // requires static org.slf4j; - requires static org.roaringbitmap; + // requires transitive com.google.gson; + // requires transitive org.lz4.java; uses com.clickhouse.client.ClickHouseClient; uses com.clickhouse.client.ClickHouseDnsResolver; diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 82941f1bc..f741c089d 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -1069,7 +1069,7 @@ public void testLoadRawData() throws IOException, SQLException { ClickHouseConfig config = stmt.getConfig(); CompletableFuture future; try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(config, null)) { + .createPipedOutputStream(config)) { ps.setObject(1, ClickHouseExternalTable.builder().name("raw_data") .columns("s String").format(ClickHouseFormat.RowBinary) .content(stream.getInputStream()) diff --git a/clickhouse-r2dbc/pom.xml b/clickhouse-r2dbc/pom.xml index 072d00af6..8be2cdd57 100644 --- a/clickhouse-r2dbc/pom.xml +++ b/clickhouse-r2dbc/pom.xml @@ -286,12 +286,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* @@ -392,12 +386,6 @@ ** - - org.slf4j:slf4j-api - - ** - - *:* diff --git a/clickhouse-r2dbc/src/main/java9/module-info.java b/clickhouse-r2dbc/src/main/java9/module-info.java index 72ac18c88..3f97650c6 100644 --- a/clickhouse-r2dbc/src/main/java9/module-info.java +++ b/clickhouse-r2dbc/src/main/java9/module-info.java @@ -9,12 +9,6 @@ requires transitive reactor.core; requires transitive org.lz4.java; - requires static java.logging; - // requires static com.github.benmanes.caffeine; - // requires static org.dnsjava; - // requires static org.slf4j; - requires static org.roaringbitmap; - uses com.clickhouse.client.ClickHouseClient; uses com.clickhouse.client.ClickHouseDnsResolver; uses com.clickhouse.client.ClickHouseSslContextProvider; From 525188d52de6081ad4bb5b06c64d19109c6c4111 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 10:05:49 +0800 Subject: [PATCH 3/8] Disable testDumpFile for gRPC for now --- .../test/java/com/clickhouse/client/ClientIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 135ef4f35..fd32f1287 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1528,7 +1528,7 @@ public void testDump() throws ExecutionException, InterruptedException, IOExcept public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws ExecutionException, InterruptedException, IOException { ClickHouseNode server = getServer(); - if (server.getProtocol() != ClickHouseProtocol.GRPC && server.getProtocol() != ClickHouseProtocol.HTTP) { + if (server.getProtocol() != ClickHouseProtocol.HTTP) { throw new SkipException("Skip as only http implementation works well"); } From a7c5f3dcbfc69b84a6e6bd0bb0e13992cef6cdee Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 10:50:07 +0800 Subject: [PATCH 4/8] add ClickHousePassThruStream to dump method and mark a few more methods as deprecated --- .../clickhouse/client/ClickHouseClient.java | 187 +++++++++++------- 1 file changed, 113 insertions(+), 74 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index b5eec3612..0c6eff991 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -303,21 +303,22 @@ static CompletableFuture submit(Runnable task) { } /** - * Dumps a table or query result from server into a file. File will be - * created/overwrited as needed. + * Dumps a table or query result from server into the given pass-thru stream. + * Pass {@link com.clickhouse.data.ClickHouseFile} to dump data into a file, + * which may or may not be compressed. * * @param server non-null server to connect to * @param tableOrQuery table name or a select query - * @param file output file + * @param stream non-null pass-thru stream * @return non-null future object to get result * @throws IllegalArgumentException if any of server, tableOrQuery, and output * is null * @throws CompletionException when error occurred during execution */ static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, - ClickHouseFile file) { - if (server == null || tableOrQuery == null || file == null) { - throw new IllegalArgumentException("Non-null server, tableOrQuery, and file are required"); + ClickHousePassThruStream stream) { + if (server == null || tableOrQuery == null || stream == null) { + throw new IllegalArgumentException("Non-null server, tableOrQuery, and pass-thru stream are required"); } // in case the protocol is ANY @@ -327,7 +328,7 @@ static CompletableFuture dump(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol())) { - ClickHouseRequest request = client.connect(theServer).output(file); + ClickHouseRequest request = client.connect(theServer).output(stream); // FIXME what if the table name is `try me`? if (theQuery.indexOf(' ') < 0) { request.table(theQuery); @@ -342,6 +343,25 @@ static CompletableFuture dump(ClickHouseNode server, }); } + /** + * Dumps a table or query result from server into a file. File will be + * created/overwrited as needed. + * + * @param server non-null server to connect to + * @param tableOrQuery table name or a select query + * @param file output file + * @param compression compression algorithm to use + * @param format output format to use + * @return non-null future object to get result + * @throws IllegalArgumentException if any of server, tableOrQuery, and output + * is null + * @throws CompletionException when error occurred during execution + */ + static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, String file, + ClickHouseCompression compression, ClickHouseFormat format) { + return dump(server, tableOrQuery, ClickHouseFile.of(file, compression, format)); + } + /** * Dumps a table or query result from server into a file. File will be * created/overwrited as needed. @@ -355,10 +375,14 @@ static CompletableFuture dump(ClickHouseNode server, * @throws IllegalArgumentException if any of server, tableOrQuery, and output * is null * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #dump(ClickHouseNode, String, String, ClickHouseCompression, ClickHouseFormat)} + * instead */ + @Deprecated static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, ClickHouseFormat format, ClickHouseCompression compression, String file) { - return dump(server, tableOrQuery, ClickHouseFile.of(file, compression, format)); + return dump(server, tableOrQuery, file, compression, format); } /** @@ -378,7 +402,7 @@ static CompletableFuture dump(ClickHouseNode server, * @throws CompletionException when error occurred during execution */ static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, - ClickHouseFormat format, ClickHouseCompression compression, OutputStream output) { + OutputStream output, ClickHouseCompression compression, ClickHouseFormat format) { if (server == null || tableOrQuery == null || output == null) { throw new IllegalArgumentException("Non-null server, tableOrQuery, and output are required"); } @@ -412,6 +436,31 @@ static CompletableFuture dump(ClickHouseNode server, }); } + /** + * Dumps a table or query result from server into output stream. + * + * @param server non-null server to connect to + * @param tableOrQuery table name or a select query + * @param format output format to use, null means + * {@link ClickHouseFormat#TabSeparated} + * @param compression compression algorithm to use, null means + * {@link ClickHouseCompression#NONE} + * @param output output stream, which will be closed automatically at the + * end of the call + * @return future object to get result + * @throws IllegalArgumentException if any of server, tableOrQuery, and output + * is null + * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #dump(ClickHouseNode, String, OutputStream, ClickHouseCompression, ClickHouseFormat)} + * instead + */ + @Deprecated + static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, + ClickHouseFormat format, ClickHouseCompression compression, OutputStream output) { + return dump(server, tableOrQuery, output, compression, format); + } + /** * Loads data from the given pass-thru stream into a table. Pass * {@link com.clickhouse.data.ClickHouseFile} to load data from a file, which @@ -442,6 +491,19 @@ static CompletableFuture load(ClickHouseNode server, }); } + /** + * Loads data from a customer writer into table using specified format and + * compression algorithm. + * + * @param server non-null server to connect to + * @param table non-null target table + * @param writer non-null custom writer to produce data + * @param compression compression algorithm to use + * @param format input format to use + * @return future object to get result + * @throws IllegalArgumentException if any of server, table, and input is null + * @throws CompletionException when error occurred during execution + */ static CompletableFuture load(ClickHouseNode server, String table, ClickHouseWriter writer, ClickHouseCompression compression, ClickHouseFormat format) { if (server == null || table == null || writer == null) { @@ -463,30 +525,26 @@ static CompletableFuture load(ClickHouseNode server, /** * Loads data from a file into table using specified format and compression - * algorithm. + * algorithm. Same as + * {@code load(server, table, ClickHouseFile.of(file, compression, format))} * * @param server non-null server to connect to * @param table non-null target table - * @param format input format to use - * @param compression compression algorithm to use * @param file file to load + * @param compression compression algorithm to use + * @param format input format to use * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution - * @deprecated will be dropped in 0.5, please use - * {@link #load(ClickHouseNode, String, String, ClickHouseCompression, ClickHouseFormat)} - * instead */ - @Deprecated static CompletableFuture load(ClickHouseNode server, String table, - ClickHouseFormat format, ClickHouseCompression compression, String file) { + String file, ClickHouseCompression compression, ClickHouseFormat format) { return load(server, table, ClickHouseFile.of(file, compression, format)); } /** * Loads data from a file into table using specified format and compression - * algorithm. Same as - * {@code load(server, table, ClickHouseFile.of(file, compression, format))} + * algorithm. * * @param server non-null server to connect to * @param table non-null target table @@ -496,10 +554,14 @@ static CompletableFuture load(ClickHouseNode server, * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #load(ClickHouseNode, String, String, ClickHouseCompression, ClickHouseFormat)} + * instead */ + @Deprecated static CompletableFuture load(ClickHouseNode server, String table, - String file, ClickHouseCompression compression, ClickHouseFormat format) { - return load(server, table, ClickHouseFile.of(file, compression, format)); + ClickHouseFormat format, ClickHouseCompression compression, String file) { + return load(server, table, file, compression, format); } /** @@ -508,9 +570,9 @@ static CompletableFuture load(ClickHouseNode server, * * @param server non-null server to connect to * @param table non-null target table - * @param format input format to use - * @param compression compression algorithm to use * @param writer non-null custom writer to generate data + * @param compression compression algorithm to use + * @param format input format to use * @return future object to get result * @throws IllegalArgumentException if any of server, table, and writer is null * @throws CompletionException when error occurred during execution @@ -521,53 +583,7 @@ static CompletableFuture load(ClickHouseNode server, @Deprecated static CompletableFuture load(ClickHouseNode server, String table, ClickHouseFormat format, ClickHouseCompression compression, ClickHouseWriter writer) { - if (server == null || table == null || writer == null) { - throw new IllegalArgumentException("Non-null server, table, and writer are required"); - } - - // in case the protocol is ANY - final ClickHouseNode theServer = server.probe(); - - return submit(() -> { - InputStream input = null; - // must run in async mode so that we won't hold everything in memory - try (ClickHouseClient client = ClickHouseClient.builder() - .nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol())) - .option(ClickHouseClientOption.ASYNC, true).build()) { - ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() - .createPipedOutputStream(client.getConfig()); - // execute query in a separate thread(because async is explicitly set to true) - CompletableFuture future = client.connect(theServer).write().table(table) - .decompressClientRequest(compression).format(format).data(input = stream.getInputStream()) - .execute(); - try { - // write data into stream in current thread - writer.write(stream); - } finally { - stream.close(); - } - // wait until write & read acomplished - try (ClickHouseResponse response = future.get(client.getConfig().getSocketTimeout(), - TimeUnit.MILLISECONDS)) { - return response.getSummary(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw ClickHouseException.forCancellation(e, theServer); - } catch (CancellationException e) { - throw ClickHouseException.forCancellation(e, theServer); - } catch (ExecutionException | TimeoutException e) { - throw ClickHouseException.of(e, theServer); - } finally { - if (input != null) { - try { - input.close(); - } catch (IOException e) { - // ignore - } - } - } - }); + return load(server, table, writer, compression, format); } /** @@ -576,16 +592,16 @@ static CompletableFuture load(ClickHouseNode server, * * @param server non-null server to connect to * @param table non-null target table - * @param format input format to use - * @param compression compression algorithm to use * @param input input stream, which will be closed automatically at the * end of the call + * @param compression compression algorithm to use + * @param format input format to use * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution */ - static CompletableFuture load(ClickHouseNode server, String table, - ClickHouseFormat format, ClickHouseCompression compression, InputStream input) { + static CompletableFuture load(ClickHouseNode server, String table, InputStream input, + ClickHouseCompression compression, ClickHouseFormat format) { if (server == null || table == null || input == null) { throw new IllegalArgumentException("Non-null server, table, and input are required"); } @@ -608,6 +624,29 @@ static CompletableFuture load(ClickHouseNode server, }); } + /** + * Loads data from input stream into a table using specified format and + * compression algorithm. + * + * @param server non-null server to connect to + * @param table non-null target table + * @param format input format to use + * @param compression compression algorithm to use + * @param input input stream, which will be closed automatically at the + * end of the call + * @return future object to get result + * @throws IllegalArgumentException if any of server, table, and input is null + * @throws CompletionException when error occurred during execution + * @deprecated will be dropped in 0.5, please use + * {@link #load(ClickHouseNode, String, InputStream, ClickHouseCompression, ClickHouseFormat)} + * instead + */ + @Deprecated + static CompletableFuture load(ClickHouseNode server, String table, + ClickHouseFormat format, ClickHouseCompression compression, InputStream input) { + return load(server, table, input, compression, format); + } + /** * Creates a new instance compatible with any of the given protocols. * From 7754abe21a694839c3f2dd8fa5a4754254789e2e Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 11:49:28 +0800 Subject: [PATCH 5/8] thread-safe deferred value --- .../data/ClickHouseDeferredValue.java | 23 +++++++++++-------- .../com/clickhouse/data/ClickHouseUtils.java | 2 ++ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java index 3a6689d9f..3b4db3e48 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDeferredValue.java @@ -8,6 +8,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; /** @@ -41,7 +42,7 @@ public static ClickHouseDeferredValue of(CompletableFuture future, lon final CompletableFuture f = future != null ? future : (CompletableFuture) ClickHouseUtils.NULL_FUTURE; final long t = timeout < 0L ? 0L : timeout; - Supplier supplier = () -> { + final Supplier supplier = () -> { try { return t > 0L ? f.get(t, TimeUnit.MILLISECONDS) : f.get(); } catch (InterruptedException e) { @@ -65,8 +66,10 @@ public static ClickHouseDeferredValue of(CompletableFuture future, lon * @param supplier supplier function, could be null * @return deferred value of return value from supplier function */ + @SuppressWarnings("unchecked") public static ClickHouseDeferredValue of(Supplier supplier) { - return new ClickHouseDeferredValue<>(supplier != null ? supplier : () -> null, null); + return new ClickHouseDeferredValue<>(supplier != null ? supplier : (Supplier) ClickHouseUtils.NULL_SUPPLIER, + null); } /** @@ -77,16 +80,17 @@ public static ClickHouseDeferredValue of(Supplier supplier) { * @param clazz class of the value * @return deferred value */ + @SuppressWarnings("unchecked") public static ClickHouseDeferredValue of(T value, Class clazz) { // NOSONAR - return new ClickHouseDeferredValue<>(null, Optional.ofNullable(value)); + return new ClickHouseDeferredValue<>((Supplier) ClickHouseUtils.NULL_SUPPLIER, Optional.ofNullable(value)); } - private Supplier supplier; - private Optional value; + private final Supplier supplier; + private final AtomicReference> value; private ClickHouseDeferredValue(Supplier supplier, Optional value) { this.supplier = supplier; - this.value = value; + this.value = new AtomicReference<>(value); } @Override @@ -100,10 +104,11 @@ public T get() { * @return optional value */ public Optional getOptional() { - if (value != null) { // NOSONAR - return value; + Optional v = value.get(); + if (v == null && !value.compareAndSet(null, v = Optional.ofNullable(supplier.get()))) { // NOSONAR + v = value.get(); } - return value = (supplier == null ? Optional.empty() : Optional.ofNullable(supplier.get())); // NOSONAR + return v != null ? v : Optional.empty(); // NOSONAR } } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java index 97115e878..c414d7b85 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java @@ -69,7 +69,9 @@ public final class ClickHouseUtils { public static final String DEFAULT_CHARSET = StandardCharsets.UTF_8.name(); public static final int MIN_CORE_THREADS = 3; + public static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); + public static final Supplier NULL_SUPPLIER = () -> null; public static final String VARIABLE_PREFIX = "{{"; public static final String VARIABLE_SUFFIX = "}}"; From ad385a06ed87a19841d575a1e1de1ae9d9cc87f6 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 12:32:27 +0800 Subject: [PATCH 6/8] update input and writer together --- .../clickhouse/client/ClickHouseRequest.java | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index 253e19793..f6e807e22 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -167,8 +167,10 @@ public Mutation format(ClickHouseFormat format) { public Mutation data(ClickHouseWriter writer) { checkSealed(); - this.input = changeProperty(PROP_DATA, this.input, null); - this.writer = changeProperty(PROP_WRITER, this.writer, writer); + this.writer = changeProperty(PROP_WRITER, this.writer, + ClickHouseChecker.nonNull(writer, ClickHouseWriter.TYPE_NAME)); + this.input = changeProperty(PROP_DATA, this.input, + ClickHouseDeferredValue.of(() -> ClickHouseInputStream.of(getConfig(), writer))); return this; } @@ -202,7 +204,7 @@ public Mutation data(ClickHousePassThruStream stream) { final int bufferSize = c.getReadBufferSize(); this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue .of(() -> ClickHouseInputStream.of(stream, bufferSize, null))); - this.writer = changeProperty(PROP_WRITER, this.writer, null); + this.writer = changeProperty(PROP_WRITER, this.writer, new PipedWriter(this.input)); return this; } @@ -272,17 +274,15 @@ public Mutation data(InputStream input) { * @return mutation request */ public Mutation data(ClickHouseInputStream input) { - ClickHousePassThruStream stream = ClickHouseChecker.nonNull(input, ClickHouseInputStream.TYPE_NAME) - .getUnderlyingStream(); - if (stream.hasInput()) { - return data(stream); + if (ClickHouseChecker.nonNull(input, ClickHouseInputStream.TYPE_NAME).hasUnderlyingStream()) { + return data(input.getUnderlyingStream()); } checkSealed(); this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue.of(input, ClickHouseInputStream.class)); - this.writer = changeProperty(PROP_WRITER, this.writer, null); + this.writer = changeProperty(PROP_WRITER, this.writer, new PipedWriter(this.input)); return this; } @@ -297,7 +297,7 @@ public Mutation data(ClickHouseDeferredValue input) { checkSealed(); this.input = changeProperty(PROP_DATA, this.input, input); - this.writer = changeProperty(PROP_WRITER, this.writer, null); + this.writer = changeProperty(PROP_WRITER, this.writer, input != null ? new PipedWriter(input) : null); return this; } @@ -588,12 +588,6 @@ public final BiConsumer getServerListener() { * @return input stream */ public Optional getInputStream() { - if (this.input == null && this.writer != null) { - final ClickHouseConfig c = getConfig(); - final ClickHouseWriter w = this.writer; - this.input = changeProperty(PROP_DATA, this.input, - ClickHouseDeferredValue.of(() -> ClickHouseInputStream.of(c, w))); - } return input != null ? input.getOptional() : Optional.empty(); } @@ -603,9 +597,6 @@ public Optional getInputStream() { * @return custom writer */ public Optional getWriter() { - if (this.writer == null && this.input != null) { - this.writer = changeProperty(PROP_WRITER, this.writer, new PipedWriter(input)); - } return Optional.ofNullable(this.writer); } From 4a0e9cca02684e8d372d7907b78c10f1a6660b1a Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 15:44:57 +0800 Subject: [PATCH 7/8] correct integration test and remove duplicated code --- .../client/cli/ClickHouseCommandLine.java | 19 ++++++++------- .../clickhouse/client/ClickHouseClient.java | 2 +- .../client/ClientIntegrationTest.java | 23 +++++++------------ .../data/ClickHouseExternalTable.java | 5 ++-- .../client/grpc/ClickHouseGrpcClient.java | 8 +++---- 5 files changed, 23 insertions(+), 34 deletions(-) diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java index 2f325067f..13bed4303 100644 --- a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java @@ -32,6 +32,7 @@ import com.clickhouse.client.config.ClickHouseSslMode; import com.clickhouse.data.ClickHouseChecker; import com.clickhouse.data.ClickHouseCompression; +import com.clickhouse.data.ClickHouseDataConfig; import com.clickhouse.data.ClickHouseExternalTable; import com.clickhouse.data.ClickHouseFile; import com.clickhouse.data.ClickHouseInputStream; @@ -193,9 +194,6 @@ static Process startProcess(ClickHouseRequest request) { final ClickHouseNode server = request.getServer(); final int timeout = config.getSocketTimeout(); - // FIXME potential timing issue - final Optional in = request.getInputStream(); - String hostDir = config.getStrOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY); hostDir = ClickHouseUtils.normalizeDirectory( ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir); @@ -343,9 +341,9 @@ static Process startProcess(ClickHouseRequest request) { if (request.hasOutputStream()) { final ClickHouseOutputStream chOutput = request.getOutputStream().get(); // NOSONAR - final ClickHousePassThruStream customStream = chOutput.getUnderlyingStream(); - if (customStream.hasOutput()) { + if (chOutput.hasUnderlyingStream()) { + final ClickHousePassThruStream customStream = chOutput.getUnderlyingStream(); File f = customStream instanceof ClickHouseFile ? ((ClickHouseFile) customStream).getFile() : null; if (f == null) { throw new UncheckedIOException(new IOException("Output file not found in " + customStream)); @@ -385,14 +383,15 @@ static Process startProcess(ClickHouseRequest request) { builder.redirectOutput(f); } } - + + final Optional in = request.getInputStream(); try { final Process process; if (in.isPresent()) { final ClickHouseInputStream chInput = in.get(); - final ClickHousePassThruStream customStream = chInput.getUnderlyingStream(); final File inputFile; - if (customStream.hasInput()) { + if (chInput.hasUnderlyingStream()) { + final ClickHousePassThruStream customStream = chInput.getUnderlyingStream(); inputFile = customStream instanceof ClickHouseFile ? ((ClickHouseFile) customStream).getFile() : ClickHouseFile.of(customStream.getInputStream(), config.getRequestCompressAlgorithm(), config.getRequestCompressLevel(), config.getFormat()).getFile(); @@ -434,12 +433,12 @@ public ClickHouseInputStream getInputStream() throws IOException { throw new UncheckedIOException(exp); } }; - if (out != null && !out.getUnderlyingStream().hasOutput()) { + if (out != null && !out.hasUnderlyingStream()) { try (OutputStream o = out) { ClickHouseInputStream.pipe(process.getInputStream(), o, c.getWriteBufferSize()); } return ClickHouseInputStream.wrap(null, ClickHouseInputStream.empty(), c.getReadBufferSize(), - ClickHouseCompression.NONE, -1, postCloseAction); + ClickHouseCompression.NONE, ClickHouseDataConfig.DEFAULT_READ_COMPRESS_LEVEL, postCloseAction); } else { return ClickHouseInputStream.of(process.getInputStream(), c.getReadBufferSize(), postCloseAction); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 0c6eff991..07ea6d8d8 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -90,7 +90,7 @@ static ClickHouseOutputStream getRequestOutputStream(ClickHouseConfig config, Ou Runnable postCloseAction) { if (config == null) { return ClickHouseOutputStream.of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), - ClickHouseCompression.NONE, -1, postCloseAction); + ClickHouseCompression.NONE, ClickHouseConfig.DEFAULT_READ_COMPRESS_LEVEL, postCloseAction); } return ClickHouseOutputStream.of(output, config.getWriteBufferSize(), config.getRequestCompressAlgorithm(), diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index fd32f1287..9ee3fbf24 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1423,33 +1423,26 @@ public void testCustomWriter() throws ClickHouseException { try (ClickHouseClient client = getClient()) { AtomicInteger i = new AtomicInteger(1); + ClickHouseWriter w = o -> { + o.write(i.getAndIncrement()); + }; ClickHouseRequest.Mutation req = newRequest(client, server).write() .format(ClickHouseFormat.RowBinary) - .table("test_custom_writer").data(o -> { - o.write(i.getAndIncrement()); - }); + .table("test_custom_writer"); for (boolean b : new boolean[] { true, false }) { req.option(ClickHouseClientOption.ASYNC, b); - try (ClickHouseResponse resp = req.execute().get()) { - Assert.assertNotNull(resp); - } catch (Exception e) { - Assert.fail("Failed to call send() followed by get(): async=" + b, e); - } - - try (ClickHouseResponse resp = req.executeAndWait()) { - Assert.assertNotNull(resp); - } - - try (ClickHouseResponse resp = req.execute().get()) { + try (ClickHouseResponse resp = req.data(w).execute().get()) { Assert.assertNotNull(resp); } catch (Exception e) { Assert.fail("Failed to call execute() followed by get(): async=" + b, e); } + Assert.assertTrue(req.getInputStream().get().isClosed(), "Input stream should have been closed"); - try (ClickHouseResponse resp = req.executeAndWait()) { + try (ClickHouseResponse resp = req.data(w).executeAndWait()) { Assert.assertNotNull(resp); } + Assert.assertTrue(req.getInputStream().get().isClosed(), "Input stream should have been closed"); } try (ClickHouseResponse resp = newRequest(client, server) diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseExternalTable.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseExternalTable.java index 00402accd..d3714bfd0 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseExternalTable.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseExternalTable.java @@ -68,9 +68,8 @@ public Builder content(InputStream content) { } public Builder content(ClickHouseInputStream input) { - ClickHousePassThruStream stream = ClickHouseChecker.nonNull(input, ClickHouseInputStream.TYPE_NAME) - .getUnderlyingStream(); - if (stream.hasInput()) { + if (ClickHouseChecker.nonNull(input, ClickHouseInputStream.TYPE_NAME).hasUnderlyingStream()) { + ClickHousePassThruStream stream = input.getUnderlyingStream(); this.compression = ClickHouseCompression.NONE; this.compressionLevel = ClickHouseDataConfig.DEFAULT_WRITE_COMPRESS_LEVEL; this.content = ClickHouseDeferredValue.of(stream.getInputStream(), InputStream.class); diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java index dcfec8d2d..6061a8fcc 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java @@ -154,9 +154,6 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami ClickHouseNode server = request.getServer(); ClickHouseCredentials credentials = server.getCredentials(config); - // FIXME potential timing issue - Optional input = request.getInputStream(); - Builder builder = QueryInfo.newBuilder(); String database = server.getDatabase(config); if (!ClickHouseChecker.isNullOrEmpty(database)) { @@ -245,6 +242,7 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami // builder.setTransportCompressionType("none"); // builder.setTransportCompressionLevel(0); + Optional input = request.getInputStream(); if (input.isPresent()) { if (config.isRequestCompressed()) { builder.setInputCompressionType(config.getRequestCompressAlgorithm().encoding()); @@ -253,8 +251,8 @@ protected static QueryInfo convert(ClickHouseRequest request, boolean streami // builder.setInputData(ByteString.EMPTY); builder.setNextQueryInfo(true); } else { - try { - builder.setInputData(ByteString.readFrom(getCompressedInputStream(config, input.get()))); + try (ClickHouseInputStream in = input.get()) { + builder.setInputData(ByteString.readFrom(getCompressedInputStream(config, in))); } catch (IOException e) { throw new CompletionException(ClickHouseException.of(e, server)); } From 7b1b28658169b33d62ce7f14240c7b8ae31304a9 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 27 Jan 2023 17:31:17 +0800 Subject: [PATCH 8/8] increase minimize thread pool size --- .../clickhouse/client/ClickHouseClient.java | 9 +++-- .../client/ClientIntegrationTest.java | 35 ++++++++++--------- .../data/ClickHouseDataStreamFactory.java | 5 ++- .../com/clickhouse/data/ClickHouseUtils.java | 17 ++++++--- 4 files changed, 37 insertions(+), 29 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 07ea6d8d8..8e6623649 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -515,9 +515,8 @@ static CompletableFuture load(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol()); - ClickHouseResponse response = client.connect(theServer).write().table(table) - .decompressClientRequest(compression).format(format).data(writer) - .executeAndWait()) { + ClickHouseResponse response = client.connect(theServer).write().table(table).data(writer) + .decompressClientRequest(compression).format(format).executeAndWait()) { return response.getSummary(); } }); @@ -611,8 +610,8 @@ static CompletableFuture load(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol()); - ClickHouseResponse response = client.connect(theServer).write().table(table) - .decompressClientRequest(compression).format(format).data(input).executeAndWait()) { + ClickHouseResponse response = client.connect(theServer).write().table(table).data(input) + .decompressClientRequest(compression).format(format).executeAndWait()) { return response.getSummary(); } finally { try { diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 9ee3fbf24..ec4677a00 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1502,9 +1502,10 @@ public void testDump() throws ExecutionException, InterruptedException, IOExcept Assert.assertEquals(Files.size(temp), 0L); int lines = 10000; - ClickHouseResponseSummary summary = ClickHouseClient.dump(server, - ClickHouseUtils.format("select * from numbers(%d)", lines), - ClickHouseFormat.TabSeparated, ClickHouseCompression.NONE, temp.toString()).get(); + ClickHouseResponseSummary summary = ClickHouseClient + .dump(server, ClickHouseUtils.format("select * from numbers(%d)", lines), temp.toString(), + ClickHouseCompression.NONE, ClickHouseFormat.TabSeparated) + .get(); Assert.assertNotNull(summary); // Assert.assertEquals(summary.getReadRows(), lines); @@ -1521,23 +1522,25 @@ public void testDump() throws ExecutionException, InterruptedException, IOExcept public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws ExecutionException, InterruptedException, IOException { ClickHouseNode server = getServer(); - if (server.getProtocol() != ClickHouseProtocol.HTTP) { - throw new SkipException("Skip as only http implementation works well"); + if (server.getProtocol() != ClickHouseProtocol.GRPC && server.getProtocol() != ClickHouseProtocol.HTTP) { + throw new SkipException("Skip as only http and grpc implementation work well"); } - File file = ClickHouseUtils.createTempFile("chc", ".data", false); - ClickHouseFile wrappedFile = ClickHouseFile.of(file, + final File file = ClickHouseUtils.createTempFile(); + final ClickHouseFile wrappedFile = ClickHouseFile.of(file, gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, ClickHouseFormat.CSV); String query = "select number, if(number % 2 = 0, null, toString(number)) str from numbers(10)"; + final ClickHouseResponseSummary summary; if (useOneLiner) { - ClickHouseClient.dump(server, query, wrappedFile).get(); + summary = ClickHouseClient.dump(server, query, wrappedFile).get(); } else { try (ClickHouseClient client = getClient(); - ClickHouseResponse response = newRequest(client, server).query(query) - .output(wrappedFile).execute().get()) { - // ignore + ClickHouseResponse response = newRequest(client, server).output(wrappedFile).query(query).execute() + .get()) { + summary = response.getSummary(); } } + Assert.assertNotNull(summary); try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file)) : new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) { ClickHouseInputStream.pipe(in, out, 512); @@ -1566,14 +1569,14 @@ public void testCustomLoad() throws ClickHouseException { "create table test_custom_load(n UInt32, s Nullable(String)) engine = Memory"); try { - ClickHouseClient.load(server, "test_custom_load", ClickHouseFormat.TabSeparated, - ClickHouseCompression.NONE, new ClickHouseWriter() { + ClickHouseClient.load(server, "test_custom_load", + new ClickHouseWriter() { @Override public void write(ClickHouseOutputStream output) throws IOException { output.write("1\t\\N\n".getBytes(StandardCharsets.US_ASCII)); output.write("2\t123".getBytes(StandardCharsets.US_ASCII)); } - }).get(); + }, ClickHouseCompression.NONE, ClickHouseFormat.TabSeparated).get(); } catch (Exception e) { Assert.fail("Faile to load data", e); } @@ -1619,8 +1622,8 @@ public void testLoadCsv() throws ExecutionException, InterruptedException, IOExc Files.write(temp, builder.toString().getBytes(StandardCharsets.US_ASCII)); Assert.assertTrue(Files.size(temp) > 0L); - ClickHouseResponseSummary summary = ClickHouseClient.load(server, "test_load_csv", - ClickHouseFormat.TabSeparated, ClickHouseCompression.NONE, temp.toString()).get(); + ClickHouseResponseSummary summary = ClickHouseClient.load(server, "test_load_csv", temp.toString(), + ClickHouseCompression.NONE, ClickHouseFormat.TabSeparated).get(); Assert.assertNotNull(summary); try (ClickHouseClient client = getClient(); ClickHouseResponse resp = newRequest(client, server) diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java index 7378325cd..e8e8e285d 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataStreamFactory.java @@ -29,13 +29,12 @@ protected static final class DefaultExecutors { protected static final ScheduledExecutorService scheduler; static { - int coreThreads = Runtime.getRuntime().availableProcessors(); + int coreThreads = 2 * Runtime.getRuntime().availableProcessors() + 1; if (coreThreads < ClickHouseUtils.MIN_CORE_THREADS) { coreThreads = ClickHouseUtils.MIN_CORE_THREADS; } - executor = ClickHouseUtils.newThreadPool("ClickHouseWorker-", coreThreads, - coreThreads * 2 + 1, 0, 0, false); + executor = ClickHouseUtils.newThreadPool("ClickHouseWorker-", coreThreads, coreThreads, 0, 0, false); scheduler = Executors.newSingleThreadScheduledExecutor(new ClickHouseThreadFactory("ClickHouseScheduler-")); } diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java index c414d7b85..584d31372 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseUtils.java @@ -68,7 +68,7 @@ public final class ClickHouseUtils { */ public static final String DEFAULT_CHARSET = StandardCharsets.UTF_8.name(); - public static final int MIN_CORE_THREADS = 3; + public static final int MIN_CORE_THREADS = 4; public static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); public static final Supplier NULL_SUPPLIER = () -> null; @@ -279,13 +279,20 @@ public static ExecutorService newThreadPool(Object owner, int maxThreads, int ma public static ExecutorService newThreadPool(Object owner, int coreThreads, int maxThreads, int maxRequests, long keepAliveTimeoutMs, boolean allowCoreThreadTimeout) { - BlockingQueue queue = maxRequests > 0 ? new ArrayBlockingQueue<>(maxRequests) - : new LinkedBlockingQueue<>(); + final BlockingQueue queue; if (coreThreads < MIN_CORE_THREADS) { coreThreads = MIN_CORE_THREADS; } - if (maxThreads <= coreThreads) { - maxThreads = coreThreads + 1; + if (maxRequests > 0) { + queue = new ArrayBlockingQueue<>(maxRequests); + if (maxThreads <= coreThreads) { + maxThreads = coreThreads * 2; + } + } else { + queue = new LinkedBlockingQueue<>(); + if (maxThreads != coreThreads) { + maxThreads = coreThreads; + } } if (keepAliveTimeoutMs <= 0L) { keepAliveTimeoutMs = allowCoreThreadTimeout ? 1000L : 0L;