diff --git a/CHANGES.md b/CHANGES.md index bd24be1989d4..1b72f92d278c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Add Datadog IO support (Java) ([#37318](https://github.com/apache/beam/issues/37318)). * Remove Pubsublite IO support, since service will be deprecated in March 2026. ([#37375](https://github.com/apache/beam/issues/37375)). +* (Java) ClickHouse - migrating from the legacy JDBC driver (v0.6.3) to ClickHouse Java Client v2 (v0.9.6). See the [class documentation](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.html) for migration guide ([#37610](https://github.com/apache/beam/issues/37610)). ## New Features / Improvements @@ -2357,4 +2358,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). diff --git a/sdks/java/io/clickhouse/build.gradle b/sdks/java/io/clickhouse/build.gradle index 70f3c1a63874..4923bf32a436 100644 --- a/sdks/java/io/clickhouse/build.gradle +++ b/sdks/java/io/clickhouse/build.gradle @@ -31,7 +31,7 @@ applyJavaNature( ) description = "Apache Beam :: SDKs :: Java :: IO :: ClickHouse" -ext.summary = "IO to write to ClickHouse (https://clickhouse.yandex)." +ext.summary = "IO to write to ClickHouse (https://clickhouse.com)." // Match the output directory for generated code with the package, to be more tool-friendly def generatedJavaccSourceDir = "${project.buildDir}/generated/javacc" @@ -50,7 +50,7 @@ idea { } } -def clickhouse_jdbc_version = "0.6.4" +def clickhouse_java_client_version = "0.9.6" dependencies { javacc "net.java.dev.javacc:javacc:7.0.9" @@ -59,11 +59,12 @@ dependencies { implementation library.java.joda_time implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre - implementation "com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all" + implementation "com.clickhouse:client-v2:$clickhouse_java_client_version:all" testImplementation library.java.slf4j_api testImplementation library.java.junit testImplementation library.java.hamcrest testImplementation library.java.testcontainers_clickhouse + testImplementation "com.clickhouse:clickhouse-jdbc:$clickhouse_java_client_version:all" testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 52dca7cfa64a..fc00a1e420e0 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -17,18 +17,17 @@ */ package org.apache.beam.sdk.io.clickhouse; -import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.Records; import com.clickhouse.data.ClickHouseFormat; -import com.clickhouse.jdbc.ClickHouseConnection; -import com.clickhouse.jdbc.ClickHouseDataSource; -import com.clickhouse.jdbc.ClickHouseStatement; import com.google.auto.value.AutoValue; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType; @@ -63,15 +62,27 @@ * *

Writing to ClickHouse

* - *

To write to ClickHouse, use {@link ClickHouseIO#write(String, String)}, which writes elements - * from input {@link PCollection}. It's required that your ClickHouse cluster already has table you - * are going to insert into. + *

To write to ClickHouse, use {@link ClickHouseIO#write(String, String, String)}, which writes + * elements from input {@link PCollection}. It's required that your ClickHouse cluster already has + * table you are going to insert into. * *

{@code
+ * // New way (recommended):
+ * Properties props = new Properties();
+ * props.setProperty("user", "admin");
+ * props.setProperty("password", "secret");
+ *
+ * pipeline
+ *   .apply(...)
+ *   .apply(
+ *     ClickHouseIO.write("http://localhost:8123", "default", "my_table")
+ *       .withProperties(props));
+ *
+ * // Old way (deprecated):
  * pipeline
  *   .apply(...)
  *   .apply(
- *     ClickHouseIO.write("jdbc:clickhouse:localhost:8123/default", "my_table"));
+ *     ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default", "my_table"));
  * }
* *

Optionally, you can provide connection settings, for instance, specify insert block size with @@ -80,14 +91,21 @@ * *

Deduplication

* - * Deduplication is performed by ClickHouse if inserting to ReplicatedMergeTree or Distributed table on top of - * ReplicatedMergeTree. Without replication, inserting into regular MergeTree can produce - * duplicates, if insert fails, and then successfully retries. However, each block is inserted - * atomically, and you can configure block size with {@link Write#withMaxInsertBlockSize(long)}. + *

Deduplication is performed by ClickHouse if inserting to ReplicatedMergeTree + * or Distributed + * table on top of ReplicatedMergeTree. Without replication, inserting into regular MergeTree can + * produce duplicates, if insert fails, and then successfully retries. However, each block is + * inserted atomically, and you can configure block size with {@link + * Write#withMaxInsertBlockSize(long)}. * - *

Deduplication is performed using checksums of inserted blocks. + *

Deduplication is performed using checksums of inserted blocks. For SharedMergeTree + * tables in ClickHouse Cloud, deduplication behavior is similar to ReplicatedMergeTree. For more + * information about deduplication, please visit the Deduplication strategies + * documentation * *

Mapping between Beam and ClickHouse types

* @@ -114,8 +132,10 @@ * {@link TableSchema.TypeName#TUPLE} {@link Schema.TypeName#ROW} * * - * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is - * supported through LowCardinality DataType in ClickHouse. + *

Nullable row columns are supported through Nullable type in + * ClickHouse. Low + * cardinality hint is supported through LowCardinality DataType in ClickHouse. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}. @@ -130,9 +150,57 @@ public class ClickHouseIO { public static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); public static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(5); + /** + * Creates a write transform using a JDBC URL format. + * + *

Deprecated: Use {@link #write(String, String, String)} instead with separate URL, + * database, and table parameters. + * + *

This method is provided for backward compatibility. It parses the JDBC URL to extract the + * connection URL, database name, and any connection properties specified in the query string. + * Properties can be overridden later using {@link Write#withProperties(Properties)}. + * + *

Example: + * + *

{@code
+   * // Old way (deprecated):
+   * ClickHouseIO.write("jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret", "table")
+   *
+   * // New way:
+   * ClickHouseIO.write("http://localhost:8123", "mydb", "table")
+   *   .withProperties(props)
+   * }
+ * + *

Property Precedence: Properties from the JDBC URL can be overridden by calling {@link + * Write#withProperties(Properties)}. Later calls to withProperties() override earlier settings. + * + * @param jdbcUrl JDBC connection URL (e.g., jdbc:clickhouse://host:port/database?param=value) + * @param table table name + * @return a {@link PTransform} writing data to ClickHouse + * @deprecated Use {@link #write(String, String, String)} with explicit URL, database, and table + */ + @Deprecated public static Write write(String jdbcUrl, String table) { + ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + return new AutoValue_ClickHouseIO_Write.Builder() - .jdbcUrl(jdbcUrl) + .clickHouseUrl(parsed.getClickHouseUrl()) + .database(parsed.getDatabase()) + .table(table) + .properties(parsed.getProperties()) // Start with JDBC URL properties + .maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE) + .initialBackoff(DEFAULT_INITIAL_BACKOFF) + .maxRetries(DEFAULT_MAX_RETRIES) + .maxCumulativeBackoff(DEFAULT_MAX_CUMULATIVE_BACKOFF) + .build() + .withInsertDeduplicate(true) + .withInsertDistributedSync(true); + } + + public static Write write(String clickHouseUrl, String database, String table) { + return new AutoValue_ClickHouseIO_Write.Builder() + .clickHouseUrl(clickHouseUrl) + .database(database) .table(table) .properties(new Properties()) .maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE) @@ -148,7 +216,9 @@ public static Write write(String jdbcUrl, String table) { @AutoValue public abstract static class Write extends PTransform, PDone> { - public abstract String jdbcUrl(); + public abstract String clickHouseUrl(); + + public abstract String database(); public abstract String table(); @@ -176,7 +246,7 @@ public abstract static class Write extends PTransform, PDone> public PDone expand(PCollection input) { TableSchema tableSchema = tableSchema(); if (tableSchema == null) { - tableSchema = getTableSchema(jdbcUrl(), table()); + tableSchema = getTableSchema(clickHouseUrl(), database(), table(), properties()); } String sdkVersion = ReleaseInfo.getReleaseInfo().getSdkVersion(); @@ -192,7 +262,8 @@ public PDone expand(PCollection input) { WriteFn fn = new AutoValue_ClickHouseIO_WriteFn.Builder() - .jdbcUrl(jdbcUrl()) + .clickHouseUrl(clickHouseUrl()) + .database(database()) .table(table()) .maxInsertBlockSize(maxInsertBlockSize()) .schema(tableSchema) @@ -212,7 +283,8 @@ public PDone expand(PCollection input) { * * @param value number of rows * @return a {@link PTransform} writing data to ClickHouse - * @see ClickHouse + * @see ClickHouse * documentation */ public Write withMaxInsertBlockSize(long value) { @@ -238,7 +310,8 @@ public Write withInsertDistributedSync(@Nullable Boolean value) { * * @param value number of replicas, 0 for disabling, null for server default * @return a {@link PTransform} writing data to ClickHouse - * @see ClickHouse + * @see ClickHouse * documentation */ public Write withInsertQuorum(@Nullable Long value) { @@ -305,11 +378,56 @@ public Write withTableSchema(@Nullable TableSchema tableSchema) { return toBuilder().tableSchema(tableSchema).build(); } + /** + * Set connection properties (user, password, etc.). + * + *

Important: If using the deprecated JDBC URL-based {@link #write(String, String)} + * method, this will fail if any properties specified here conflict with properties already + * extracted from the JDBC URL. This prevents accidental property conflicts. + * + *

For the new API {@link #write(String, String, String)}, properties can be set freely since + * there are no URL-embedded properties to conflict with. + * + * @param properties connection properties + * @return a {@link PTransform} writing data to ClickHouse + * @throws IllegalArgumentException if properties is null or if any property conflicts with + * existing properties (e.g., from JDBC URL) + */ + public Write withProperties(Properties properties) { + if (properties == null) { + throw new IllegalArgumentException("Properties cannot be null"); + } + + // Check for conflicts with existing properties + Properties existing = properties(); + for (String key : properties.stringPropertyNames()) { + if (existing.containsKey(key)) { + String existingValue = existing.getProperty(key); + String newValue = properties.getProperty(key); + if (!existingValue.equals(newValue)) { + throw new IllegalArgumentException( + String.format( + "Property conflict: '%s' is already set to '%s' (likely from JDBC URL), " + + "but attempting to set it to '%s'. " + + "Please use either JDBC URL properties OR withProperties(), not both for the same keys.", + key, existingValue, newValue)); + } + } + } + + // Merge properties: new properties are added to existing ones + Properties merged = new Properties(); + merged.putAll(existing); + merged.putAll(properties); + return toBuilder().properties(merged).build(); + } /** Builder for {@link Write}. */ @AutoValue.Builder abstract static class Builder { - public abstract Builder jdbcUrl(String jdbcUrl); + public abstract Builder clickHouseUrl(String clickHouseUrl); + + public abstract Builder database(String database); public abstract Builder table(String table); @@ -348,7 +466,7 @@ abstract static class WriteFn extends DoFn { private static final String RETRY_ATTEMPT_LOG = "Error writing to ClickHouse. Retry attempt[{}]"; - private ClickHouseConnection connection; + private Client client; private FluentBackoff retryBackoff; private final List buffer = new ArrayList<>(); private final Distribution batchSize = Metrics.distribution(Write.class, "batch_size"); @@ -360,7 +478,9 @@ abstract static class WriteFn extends DoFn { @FieldAccess("filterFields") final FieldAccessDescriptor fieldAccessDescriptor = FieldAccessDescriptor.withAllFields(); - public abstract String jdbcUrl(); + public abstract String clickHouseUrl(); + + public abstract String database(); public abstract String table(); @@ -387,9 +507,36 @@ static String insertSql(TableSchema schema, String table) { } @Setup - public void setup() throws SQLException { + public void setup() throws Exception { + + String user = properties().getProperty("user", "default"); + String password = properties().getProperty("password", ""); + + // add the options to the client builder + Map options = + properties().stringPropertyNames().stream() + .filter(key -> !key.equals("user") && !key.equals("password")) + .collect(Collectors.toMap(key -> key, properties()::getProperty)); + + // Create ClickHouse Java Client + Client.Builder clientBuilder = + new Client.Builder() + .addEndpoint(clickHouseUrl()) + .setUsername(user) + .setPassword(password) + .setDefaultDatabase(database()) + .setOptions(options) + .setClientName( + String.format("Apache Beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())); + + // Add optional compression if specified in properties + String compress = properties().getProperty("compress", "false"); + if (Boolean.parseBoolean(compress)) { + clientBuilder.compressServerResponse(true); + clientBuilder.compressClientRequest(true); + } - connection = new ClickHouseDataSource(jdbcUrl(), properties()).getConnection(); + client = clientBuilder.build(); retryBackoff = FluentBackoff.DEFAULT @@ -400,7 +547,9 @@ public void setup() throws SQLException { @Teardown public void tearDown() throws Exception { - connection.close(); + if (client != null) { + client.close(); + } } @StartBundle @@ -431,25 +580,46 @@ private void flush() throws Exception { } batchSize.update(buffer.size()); + + // Serialize rows to RowBinary format + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + + // Wrap ByteArrayOutputStream with ClickHouseOutputStream + try (com.clickhouse.data.ClickHouseOutputStream outputStream = + com.clickhouse.data.ClickHouseOutputStream.of(byteStream)) { + for (Row row : buffer) { + ClickHouseWriter.writeRow(outputStream, schema(), row); + } + outputStream.flush(); + } + byte[] data = byteStream.toByteArray(); + while (true) { - try (ClickHouseStatement statement = connection.createStatement()) { - statement - .unwrap(ClickHouseRequest.class) - .write() - .table(table()) - .format(ClickHouseFormat.RowBinary) - .data( - out -> { - for (Row row : buffer) { - ClickHouseWriter.writeRow(out, schema(), row); - } - }) - .executeAndWait(); // query happens in a separate thread + try { + + // Perform the insert using ClickHouse Java Client + InsertResponse response = + client + .insert( + table(), new java.io.ByteArrayInputStream(data), ClickHouseFormat.RowBinary) + .get(); + + if (response != null) { + LOG.debug( + "Successfully inserted {} rows out of {} into table {}. total size written {} bytes", + response.getWrittenRows(), + buffer.size(), + table(), + response.getWrittenBytes()); + } else { + LOG.debug("Successfully inserted {} rows into table {}", buffer.size(), table()); + } + buffer.clear(); break; - } catch (SQLException e) { + } catch (Exception e) { if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) { - throw e; + throw new RuntimeException("Failed to write to ClickHouse after retries", e); } else { retries.inc(); LOG.warn(RETRY_ATTEMPT_LOG, attempt, e); @@ -462,7 +632,9 @@ private void flush() throws Exception { @AutoValue.Builder abstract static class Builder { - public abstract Builder jdbcUrl(String jdbcUrl); + public abstract Builder clickHouseUrl(String clickHouseUrl); + + public abstract Builder database(String database); public abstract Builder table(String table); @@ -491,57 +663,106 @@ private static String tuplePreprocessing(String payload) { String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'"); return content; } + /** - * Returns {@link TableSchema} for a given table. + * Returns {@link TableSchema} for a given table using JDBC URL format. + * + *

Deprecated: Use {@link #getTableSchema(String, String, String, Properties)} instead + * with separate URL, database, table, and properties parameters. + * + *

This method parses the JDBC URL to extract connection details and properties. For new code, + * use the explicit parameter version for better clarity and control. * - * @param jdbcUrl jdbc connection url + *

Example migration: + * + *

{@code
+   * // Old way (deprecated):
+   * TableSchema schema = ClickHouseIO.getTableSchema(
+   *     "jdbc:clickhouse://localhost:8123/mydb?user=admin", "my_table");
+   *
+   * // New way:
+   * Properties props = new Properties();
+   * props.setProperty("user", "admin");
+   * TableSchema schema = ClickHouseIO.getTableSchema(
+   *     "http://localhost:8123", "mydb", "my_table", props);
+   * }
+ * + * @param jdbcUrl JDBC connection URL (e.g., jdbc:clickhouse://host:port/database?param=value) * @param table table name * @return table schema + * @deprecated Use {@link #getTableSchema(String, String, String, Properties)} with explicit + * parameters */ + @Deprecated public static TableSchema getTableSchema(String jdbcUrl, String table) { - List columns = new ArrayList<>(); - - try (ClickHouseConnection connection = new ClickHouseDataSource(jdbcUrl).getConnection(); - Statement statement = connection.createStatement()) { - - ResultSet rs = null; // try-finally is used because findbugs doesn't like try-with-resource - try { - rs = statement.executeQuery("DESCRIBE TABLE " + quoteIdentifier(table)); - - while (rs.next()) { - String name = rs.getString("name"); - String type = rs.getString("type"); - String defaultTypeStr = rs.getString("default_type"); - String defaultExpression = rs.getString("default_expression"); + ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + return getTableSchema( + parsed.getClickHouseUrl(), parsed.getDatabase(), table, parsed.getProperties()); + } - ColumnType columnType = null; - if (type.toLowerCase().trim().startsWith("tuple(")) { - String content = tuplePreprocessing(type); - columnType = ColumnType.parse(content); - } else { - columnType = ColumnType.parse(type); - } - DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null); + /** + * Returns {@link TableSchema} for a given table using ClickHouse Java Client. + * + * @param clickHouseUrl ClickHouse connection url + * @param database ClickHouse database + * @param table table name + * @param properties connection properties + * @return table schema + * @since 2.72.0 + */ + public static TableSchema getTableSchema( + String clickHouseUrl, String database, String table, Properties properties) { + List columns = new ArrayList<>(); - Object defaultValue; - if (DefaultType.DEFAULT.equals(defaultType) - && !Strings.isNullOrEmpty(defaultExpression)) { - defaultValue = ColumnType.parseDefaultExpression(columnType, defaultExpression); - } else { - defaultValue = null; + try { + String user = properties.getProperty("user", "default"); + String password = properties.getProperty("password", ""); + + // Create ClickHouse Java Client + Client.Builder clientBuilder = + new Client.Builder() + .addEndpoint(clickHouseUrl) + .setUsername(user) + .setPassword(password) + .setDefaultDatabase(database) + .setClientName( + String.format("Apache Beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())); + + try (Client client = clientBuilder.build()) { + String query = "DESCRIBE TABLE " + quoteIdentifier(table); + + try (Records records = client.queryRecords(query).get()) { + for (GenericRecord record : records) { + String name = record.getString("name"); + String type = record.getString("type"); + String defaultTypeStr = record.getString("default_type"); + String defaultExpression = record.getString("default_expression"); + + ColumnType columnType; + if (type.toLowerCase().trim().startsWith("tuple(")) { + String content = tuplePreprocessing(type); + columnType = ColumnType.parse(content); + } else { + columnType = ColumnType.parse(type); + } + DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null); + + Object defaultValue; + if (DefaultType.DEFAULT.equals(defaultType) + && !Strings.isNullOrEmpty(defaultExpression)) { + defaultValue = ColumnType.parseDefaultExpression(columnType, defaultExpression); + } else { + defaultValue = null; + } + + columns.add(TableSchema.Column.of(name, columnType, defaultType, defaultValue)); } - - columns.add(TableSchema.Column.of(name, columnType, defaultType, defaultValue)); - } - } finally { - if (rs != null) { - rs.close(); } } return TableSchema.of(columns.toArray(new TableSchema.Column[0])); - } catch (SQLException e) { - throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException("Failed to get table schema for table: " + table, e); } } diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java new file mode 100644 index 000000000000..92cd1eaeacdd --- /dev/null +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.clickhouse; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Properties; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +/** + * Utility class for parsing ClickHouse JDBC URLs and extracting connection parameters. + * + *

Used for supporting backward compatibility with the deprecated {@link + * ClickHouseIO#write(String, String)} method that accepts JDBC URLs. New code should use {@link + * ClickHouseIO#write(String, String, String)} with explicit parameters instead. + * + * @deprecated Use {@link ClickHouseIO#write(String, String, String)} with separate clickHouseUrl, + * database, and table parameters instead of JDBC URL format. + */ +@Deprecated +class ClickHouseJdbcUrlParser { + + /** + * Represents parsed components of a ClickHouse JDBC URL. + * + *

Contains the extracted HTTP/HTTPS URL, database name, and connection properties from a JDBC + * URL string. + * + * @deprecated This class supports the deprecated JDBC URL-based API. Use separate parameters for + * clickHouseUrl, database, and properties instead. + */ + @Deprecated + static class ParsedJdbcUrl { + private final String clickHouseUrl; + private final String database; + private final Properties properties; + + ParsedJdbcUrl(String clickHouseUrl, String database, Properties properties) { + this.clickHouseUrl = clickHouseUrl; + this.database = database; + this.properties = properties; + } + + public String getClickHouseUrl() { + return clickHouseUrl; + } + + public String getDatabase() { + return database; + } + + public Properties getProperties() { + return properties; + } + } + + /** + * Parses a ClickHouse JDBC URL into its components. + * + *

Supported formats: + * + *

+ * + * @param jdbcUrl the JDBC URL to parse + * @return ParsedJdbcUrl containing the HTTP/HTTPS URL, database, and properties + * @throws IllegalArgumentException if the URL format is invalid + */ + static ParsedJdbcUrl parse(String jdbcUrl) { + if (Strings.isNullOrEmpty(jdbcUrl)) { + throw new IllegalArgumentException("JDBC URL cannot be null or empty"); + } + + String actualUrl = extractHttpUrl(jdbcUrl); + + try { + URI uri = new URI(actualUrl); + + validateScheme(uri.getScheme()); + String host = validateAndGetHost(uri.getHost(), jdbcUrl); + int port = getPortOrDefault(uri.getPort(), uri.getScheme()); + + String clickHouseUrl = String.format("%s://%s:%d", uri.getScheme(), host, port); + String database = extractDatabase(uri.getPath()); + Properties properties = extractProperties(uri.getQuery()); + + return new ParsedJdbcUrl(clickHouseUrl, database, properties); + + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid JDBC URL format: " + jdbcUrl, e); + } catch (java.io.UnsupportedEncodingException e) { + throw new IllegalArgumentException("Failed to decode URL parameters: " + jdbcUrl, e); + } + } + + /** + * Extracts and normalizes the HTTP/HTTPS URL from a JDBC URL. + * + * @param jdbcUrl the JDBC URL to process + * @return normalized HTTP/HTTPS URL + * @throws IllegalArgumentException if the URL format is invalid + */ + private static String extractHttpUrl(String jdbcUrl) { + // Remove jdbc: prefix + String urlWithoutJdbc = jdbcUrl; + if (jdbcUrl.toLowerCase().startsWith("jdbc:")) { + urlWithoutJdbc = jdbcUrl.substring(5); + } + + // Handle jdbc:clickhouse: or jdbc:ch: prefix + String actualUrl; + if (urlWithoutJdbc.toLowerCase().startsWith("clickhouse:")) { + actualUrl = urlWithoutJdbc.substring(11); + } else if (urlWithoutJdbc.toLowerCase().startsWith("ch:")) { + actualUrl = urlWithoutJdbc.substring(3); + } else { + throw new IllegalArgumentException( + "Invalid JDBC URL format. Expected 'jdbc:clickhouse:' or 'jdbc:ch:' prefix. Got: " + + jdbcUrl); + } + + // Check if URL already has a scheme and validate it + if (actualUrl.toLowerCase().startsWith("http://") + || actualUrl.toLowerCase().startsWith("https://")) { + return actualUrl; + } + + // Check for invalid schemes before prepending http:// + if (actualUrl.contains("://")) { + // Extract the scheme part + int schemeEnd = actualUrl.indexOf("://"); + String scheme = actualUrl.substring(0, schemeEnd).toLowerCase(); + if (!scheme.equals("http") && !scheme.equals("https")) { + throw new IllegalArgumentException( + "Invalid scheme in JDBC URL. Expected 'http' or 'https'. Got: " + scheme); + } + } + + // If URL doesn't start with http:// or https://, assume http:// + if (actualUrl.startsWith("//")) { + actualUrl = "http:" + actualUrl; + } else { + actualUrl = "http://" + actualUrl; + } + + return actualUrl; + } + + /** + * Validates the URI scheme. + * + * @param scheme the scheme to validate + * @throws IllegalArgumentException if scheme is invalid + */ + private static void validateScheme(String scheme) { + if (scheme == null || (!scheme.equals("http") && !scheme.equals("https"))) { + throw new IllegalArgumentException( + "Invalid scheme. Expected 'http' or 'https'. Got: " + scheme); + } + } + + /** + * Validates and returns the host from the URI. + * + * @param host the host to validate + * @param jdbcUrl the original JDBC URL (for error reporting) + * @return the validated host + * @throws IllegalArgumentException if host is invalid + */ + private static String validateAndGetHost(String host, String jdbcUrl) { + if (Strings.isNullOrEmpty(host)) { + throw new IllegalArgumentException("Host cannot be empty in JDBC URL: " + jdbcUrl); + } + return host; + } + + /** + * Returns the port or default port based on scheme. + * + * @param port the port from URI (-1 if not specified) + * @param scheme the URI scheme (http or https) + * @return the port number + */ + private static int getPortOrDefault(int port, String scheme) { + if (port == -1) { + return scheme.equals("https") ? 8443 : 8123; // Default ClickHouse ports + } + return port; + } + + /** + * Extracts database name from URI path. + * + * @param path the URI path + * @return the database name, or "default" if not specified + */ + private static String extractDatabase(String path) { + if (Strings.isNullOrEmpty(path)) { + return "default"; + } + + // Remove leading slash + String pathWithoutSlash = path.startsWith("/") ? path.substring(1) : path; + return pathWithoutSlash.isEmpty() ? "default" : pathWithoutSlash; + } + + /** + * Extracts connection properties from URI query string. + * + * @param query the URI query string + * @return Properties object containing the parsed parameters + * @throws java.io.UnsupportedEncodingException if URL decoding fails + */ + private static Properties extractProperties(String query) + throws java.io.UnsupportedEncodingException { + Properties properties = new Properties(); + + if (Strings.isNullOrEmpty(query)) { + return properties; + } + + // Use Guava Splitter instead of String.split() + for (String param : Splitter.on('&').split(query)) { + // Split key-value pairs, handling parameters without values + List parts = Splitter.on('=').limit(2).splitToList(param); + + if (parts.size() == 2) { + String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8"); + String value = java.net.URLDecoder.decode(parts.get(1), "UTF-8"); + properties.setProperty(key, value); + } else if (parts.size() == 1) { + // Parameter without value (e.g., ?compress) + String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8"); + properties.setProperty(key, "true"); + } + } + + return properties; + } +} diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java index b89a88b3fae8..baee77c5f9af 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java @@ -29,7 +29,10 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.checkerframework.checker.nullness.qual.Nullable; -/** A descriptor for ClickHouse table schema. */ +/** + * A descriptor for ClickHouse table schema. To be updated with ClickHouse table schema API - + * https://github.com/apache/beam/issues/37613 + */ @AutoValue @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -185,7 +188,8 @@ public enum TypeName { /** * An enumeration of possible kinds of default values in ClickHouse. * - * @see ClickHouse + * @see ClickHouse * documentation */ public enum DefaultType { diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java index 3a881ff04595..73a822b8ec0a 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.sql.SQLException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.sdk.Pipeline; @@ -48,6 +47,7 @@ public class AtomicInsertTest extends BaseClickHouseTest { private static final int MIN_ATTEMPTS = 2; private static final int MAX_ATTEMPTS = 20; // should be enough to succeed at least once + static final int TEST_BATCH_SIZE = 100000; private static boolean shouldAttempt(int i, long count) { return i < MIN_ATTEMPTS || (count == 0 && i < MAX_ATTEMPTS); @@ -55,8 +55,7 @@ private static boolean shouldAttempt(int i, long count) { /** With sufficient block size, ClickHouse will atomically insert all or nothing. */ @Test - public void testAtomicInsert() throws SQLException { - int size = 100000; + public void testAtomicInsert() throws Exception { int done = 0; // inserts to such table fail with 60% chance for 1M batch size @@ -64,16 +63,16 @@ public void testAtomicInsert() throws SQLException { "CREATE TABLE test_atomic_insert (" + " f0 Int64, " + " f1 Int64 MATERIALIZED CAST(if((rand() % " - + size + + TEST_BATCH_SIZE + ") = 0, '', '1') AS Int64)" + ") ENGINE=MergeTree ORDER BY (f0)"); pipeline // make sure we get one big bundle - .apply(RangeBundle.of(size)) + .apply(RangeBundle.of(TEST_BATCH_SIZE)) .apply( - ClickHouseIO.write(clickHouse.getJdbcUrl(), "test_atomic_insert") - .withMaxInsertBlockSize(size) + ClickHouseIO.write(clickHouseUrl, database, "test_atomic_insert") + .withMaxInsertBlockSize(TEST_BATCH_SIZE) .withInitialBackoff(Duration.millis(1)) .withMaxRetries(2)); @@ -84,7 +83,7 @@ public void testAtomicInsert() throws SQLException { } // each insert is atomic, so we get exactly done * size elements - assertEquals(((long) done) * size, count); + assertEquals(((long) done) * TEST_BATCH_SIZE, count); assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts", count > 0L); } @@ -93,25 +92,24 @@ public void testAtomicInsert() throws SQLException { * replicated tables, it will deduplicate blocks. */ @Test - public void testIdempotentInsert() throws SQLException { - int size = 100000; + public void testIdempotentInsert() throws Exception { // inserts to such table fail with 60% chance for 1M batch size executeSql( "CREATE TABLE test_idempotent_insert (" + " f0 Int64, " + " f1 Int64 MATERIALIZED CAST(if((rand() % " - + size + + TEST_BATCH_SIZE + ") = 0, '', '1') AS Int64)" + ") ENGINE=ReplicatedMergeTree('/clickHouse/tables/0/test_idempotent_insert', 'replica_0') " + "ORDER BY (f0)"); pipeline // make sure we get one big bundle - .apply(RangeBundle.of(size)) + .apply(RangeBundle.of(TEST_BATCH_SIZE)) .apply( - ClickHouseIO.write(clickHouse.getJdbcUrl(), "test_idempotent_insert") - .withMaxInsertBlockSize(size) + ClickHouseIO.write(clickHouseUrl, database, "test_idempotent_insert") + .withMaxInsertBlockSize(TEST_BATCH_SIZE) .withInitialBackoff(Duration.millis(1)) .withMaxRetries(2)); @@ -122,7 +120,7 @@ public void testIdempotentInsert() throws SQLException { } // inserts should be deduplicated, so we get exactly `size` elements - assertEquals(size, count); + assertEquals(TEST_BATCH_SIZE, count); assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts", count > 0L); } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java index 5f90f3f31844..d3f6c3982527 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.clickhouse; +import static org.testcontainers.containers.ClickHouseContainer.HTTP_PORT; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.Records; import java.io.IOException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -41,14 +43,19 @@ public class BaseClickHouseTest { public static ClickHouseContainer clickHouse; + public static String clickHouseUrl; + public static String database; public static Network network; public static GenericContainer zookeeper; + static final int CLIENT_TIMEOUT = 30; + private static final Logger LOG = LoggerFactory.getLogger(BaseClickHouseTest.class); - private Connection connection; + private Client client; @BeforeClass public static void setup() throws IOException, InterruptedException { + System.setProperty("api.version", "1.44"); network = Network.newNetwork(); zookeeper = @@ -72,45 +79,130 @@ public static void setup() throws IOException, InterruptedException { ; clickHouse.start(); LOG.info("Start Clickhouse"); + clickHouseUrl = "http://" + clickHouse.getHost() + ":" + clickHouse.getMappedPort(HTTP_PORT); + database = "default"; } @AfterClass public static void tearDown() { - clickHouse.close(); - zookeeper.close(); + if (clickHouse != null) { + clickHouse.close(); + } + if (zookeeper != null) { + zookeeper.close(); + } } @Before - public void setUp() throws SQLException { - connection = clickHouse.createConnection(""); + public void setUp() throws Exception { + // Create ClickHouse Java Client + Client.Builder clientBuilder = + new Client.Builder() + .addEndpoint(clickHouseUrl) + .setUsername(clickHouse.getUsername()) + .setPassword(clickHouse.getPassword()) + .setDefaultDatabase(database); + + client = clientBuilder.build(); } @After public void after() { - if (connection != null) { + if (client != null) { try { - connection.close(); - } catch (SQLException e) { - // failed to close connection, ignore + client.close(); + } catch (Exception e) { + LOG.warn("Failed to close ClickHouse client", e); } finally { - connection = null; + client = null; } } } - boolean executeSql(String sql) throws SQLException { - Statement statement = connection.createStatement(); - return statement.execute(sql); + /** + * Executes a SQL statement (DDL, DML, etc.). + * + * @param sql SQL statement to execute + * @return true if execution was successful + * @throws Exception if execution fails + */ + boolean executeSql(String sql) throws Exception { + try { + client.query(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS); + return true; + } catch (Exception e) { + LOG.error("Failed to execute SQL: {}", sql, e); + throw e; + } + } + + /** + * Executes a query and returns the results. + * + * @param sql SQL query to execute + * @return Records containing query results + * @throws Exception if query fails + */ + Records executeQuery(String sql) throws Exception { + try { + return client.queryRecords(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Failed to execute query: {}", sql, e); + throw e; + } + } + + /** + * Executes a query and returns the first column of the first row as a long. Useful for COUNT + * queries or other single-value results. + * + * @param sql SQL query to execute + * @return long value from first column of first row + * @throws Exception if query fails or result is empty + */ + long executeQueryAsLong(String sql) throws Exception { + try (Records records = executeQuery(sql)) { + for (GenericRecord record : records) { + // Get the first column value - assuming it's numeric + return record.getLong(1); // Column index is 1-based + } + throw new IllegalStateException("Query returned no results: " + sql); + } catch (Exception e) { + LOG.error("Failed to execute query as long: {}", sql, e); + throw e; + } } - ResultSet executeQuery(String sql) throws SQLException { - Statement statement = connection.createStatement(); - return statement.executeQuery(sql); + /** + * Executes a query and returns the first column of the first row as a String. + * + * @param sql SQL query to execute + * @return String value from first column of first row + * @throws Exception if query fails or result is empty + */ + String executeQueryAsString(String sql) throws Exception { + try (Records records = executeQuery(sql)) { + for (GenericRecord record : records) { + return record.getString(1); // Column index is 1-based + } + throw new IllegalStateException("Query returned no results: " + sql); + } catch (Exception e) { + LOG.error("Failed to execute query as string: {}", sql, e); + throw e; + } } - long executeQueryAsLong(String sql) throws SQLException { - ResultSet rs = executeQuery(sql); - rs.next(); - return rs.getLong(1); + /** + * Checks if the ClickHouse server is alive and responsive. + * + * @return true if server responds to ping + */ + boolean isServerAlive() { + try { + return client != null && client.ping(); + } catch (Exception e) { + LOG.warn("Server ping failed", e); + return false; + } } } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java new file mode 100644 index 000000000000..3a4a00421a4f --- /dev/null +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.clickhouse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for JDBC URL backward compatibility. */ +@RunWith(JUnit4.class) +public class ClickHouseIOJdbcBackwardCompatibilityTest { + + @Test + public void testDeprecatedWriteMethodWithBasicJdbcUrl() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb"; + String table = "test_table"; + + @SuppressWarnings("deprecation") + Write write = ClickHouseIO.write(jdbcUrl, table); + + assertEquals("http://localhost:8123", write.clickHouseUrl()); + assertEquals("testdb", write.database()); + assertEquals(table, write.table()); + } + + @Test + public void testDeprecatedWriteMethodWithParameters() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret"; + String table = "test_table"; + + @SuppressWarnings("deprecation") + Write write = ClickHouseIO.write(jdbcUrl, table); + + assertEquals("http://localhost:8123", write.clickHouseUrl()); + assertEquals("testdb", write.database()); + assertEquals(table, write.table()); + assertEquals("admin", write.properties().getProperty("user")); + assertEquals("secret", write.properties().getProperty("password")); + } + + @Test + public void testDeprecatedWriteMethodPreservesDefaults() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb"; + String table = "test_table"; + + @SuppressWarnings("deprecation") + Write write = ClickHouseIO.write(jdbcUrl, table); + + // Verify defaults are set + assertEquals(ClickHouseIO.DEFAULT_MAX_INSERT_BLOCK_SIZE, write.maxInsertBlockSize()); + assertEquals(ClickHouseIO.DEFAULT_MAX_RETRIES, write.maxRetries()); + assertEquals(ClickHouseIO.DEFAULT_INITIAL_BACKOFF, write.initialBackoff()); + assertEquals(ClickHouseIO.DEFAULT_MAX_CUMULATIVE_BACKOFF, write.maxCumulativeBackoff()); + assertTrue(write.insertDeduplicate()); + assertTrue(write.insertDistributedSync()); + } + + @Test + public void testNewWriteMethodEquivalence() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin"; + String table = "test_table"; + + // Old way (deprecated) + @SuppressWarnings("deprecation") + Write oldWrite = ClickHouseIO.write(jdbcUrl, table); + + // New way + Write newWrite = + ClickHouseIO.write("http://localhost:8123", "testdb", table) + .withProperties(oldWrite.properties()); + + // Should produce equivalent configurations + assertEquals(oldWrite.clickHouseUrl(), newWrite.clickHouseUrl()); + assertEquals(oldWrite.database(), newWrite.database()); + assertEquals(oldWrite.table(), newWrite.table()); + assertEquals( + oldWrite.properties().getProperty("user"), newWrite.properties().getProperty("user")); + } +} diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java new file mode 100644 index 000000000000..5bd0687f5320 --- /dev/null +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.clickhouse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Properties; +import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for property conflict detection in ClickHouseIO. */ +@RunWith(JUnit4.class) +public class ClickHouseIOPropertyMergingTest { + + @Test + @SuppressWarnings("deprecation") + public void testDeprecatedWriteExtractsPropertiesFromJdbcUrl() { + String jdbcUrl = + "jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret&compress=true"; + String table = "test_table"; + + Write write = ClickHouseIO.write(jdbcUrl, table); + + Properties props = write.properties(); + assertEquals("admin", props.getProperty("user")); + assertEquals("secret", props.getProperty("password")); + assertEquals("true", props.getProperty("compress")); + } + + @Test(expected = IllegalArgumentException.class) + @SuppressWarnings("deprecation") + public void testWithPropertiesConflictThrows() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin&password=old_secret"; + String table = "test_table"; + + Properties conflictingProps = new Properties(); + conflictingProps.setProperty("password", "new_secret"); // Conflicts! + + ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); // Should throw + } + + @Test + @SuppressWarnings("deprecation") + public void testWithPropertiesNoConflictWhenSameValue() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret"; + String table = "test_table"; + + Properties sameProps = new Properties(); + sameProps.setProperty("user", "admin"); // Same value - OK + sameProps.setProperty("password", "secret"); // Same value - OK + + Write write = ClickHouseIO.write(jdbcUrl, table).withProperties(sameProps); + + assertEquals("admin", write.properties().getProperty("user")); + assertEquals("secret", write.properties().getProperty("password")); + } + + @Test + @SuppressWarnings("deprecation") + public void testWithPropertiesAddsNewPropertiesWithoutConflict() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin"; + String table = "test_table"; + + Properties additionalProps = new Properties(); + additionalProps.setProperty("socket_timeout", "30000"); // New property - OK + additionalProps.setProperty("compress", "true"); // New property - OK + + Write write = ClickHouseIO.write(jdbcUrl, table).withProperties(additionalProps); + + Properties finalProps = write.properties(); + assertEquals("admin", finalProps.getProperty("user")); + assertEquals("30000", finalProps.getProperty("socket_timeout")); + assertEquals("true", finalProps.getProperty("compress")); + } + + @Test + public void testNewWriteMethodWithProperties() { + Properties props = new Properties(); + props.setProperty("user", "admin"); + props.setProperty("password", "secret"); + + Write write = + ClickHouseIO.write("http://localhost:8123", "testdb", "test_table").withProperties(props); + + Properties finalProps = write.properties(); + assertEquals("admin", finalProps.getProperty("user")); + assertEquals("secret", finalProps.getProperty("password")); + } + + @Test + @SuppressWarnings("deprecation") + public void testEmptyPropertiesDoesNotAffectExisting() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret"; + String table = "test_table"; + + Properties emptyProps = new Properties(); + + Write write = ClickHouseIO.write(jdbcUrl, table).withProperties(emptyProps); + + Properties finalProps = write.properties(); + assertEquals("admin", finalProps.getProperty("user")); + assertEquals("secret", finalProps.getProperty("password")); + } + + @Test + @SuppressWarnings("deprecation") + public void testWithPropertiesConflictHasDetailedMessage() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?compress=false"; + String table = "test_table"; + + Properties conflictingProps = new Properties(); + conflictingProps.setProperty("compress", "true"); // Different value + + try { + ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); + fail("Expected IllegalArgumentException for property conflict"); + } catch (IllegalArgumentException e) { + // Verify error message is helpful + assertTrue(e.getMessage().contains("compress")); + assertTrue(e.getMessage().contains("false")); + assertTrue(e.getMessage().contains("true")); + assertTrue(e.getMessage().contains("conflict")); + } + } + + @Test(expected = IllegalArgumentException.class) + @SuppressWarnings("deprecation") + public void testMultipleWithPropertiesCallsWithConflict() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?password=original"; + String table = "test_table"; + + Properties props1 = new Properties(); + props1.setProperty("compress", "true"); // New property - OK + + Properties props2 = new Properties(); + props2.setProperty("password", "secret2"); // Conflicts with JDBC URL! + + ClickHouseIO.write(jdbcUrl, table).withProperties(props1).withProperties(props2); + } + + @Test + @SuppressWarnings("deprecation") + public void testMultipleWithPropertiesCallsWithoutConflict() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin"; + String table = "test_table"; + + Properties props1 = new Properties(); + props1.setProperty("compress", "true"); // New property - OK + + Properties props2 = new Properties(); + props2.setProperty("socket_timeout", "30000"); // New property - OK + + Write write = + ClickHouseIO.write(jdbcUrl, table).withProperties(props1).withProperties(props2); + + Properties finalProps = write.properties(); + assertEquals("admin", finalProps.getProperty("user")); // From JDBC URL + assertEquals("true", finalProps.getProperty("compress")); // From first withProperties + assertEquals("30000", finalProps.getProperty("socket_timeout")); // From second withProperties + } + + @Test(expected = IllegalArgumentException.class) + @SuppressWarnings("deprecation") + public void testCannotOverrideJdbcUrlProperties() { + // This test verifies the NEW behavior: conflicts are not allowed + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=url_user&password=url_pass"; + String table = "test_table"; + + Properties conflictingProps = new Properties(); + conflictingProps.setProperty("user", "explicit_user"); // Conflict! + + ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); // Should throw + } + + @Test + @SuppressWarnings("deprecation") + public void testCanAddPropertiesToJdbcUrlWithoutConflict() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin"; + String table = "test_table"; + + Properties additionalProps = new Properties(); + additionalProps.setProperty("password", "secret"); // New - no conflict + additionalProps.setProperty("compress", "true"); // New - no conflict + + Write write = ClickHouseIO.write(jdbcUrl, table).withProperties(additionalProps); + + Properties finalProps = write.properties(); + assertEquals("admin", finalProps.getProperty("user")); + assertEquals("secret", finalProps.getProperty("password")); + assertEquals("true", finalProps.getProperty("compress")); + } +} diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index 8e5dc7ebe38b..64f7f86177b1 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -19,10 +19,14 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import java.sql.ResultSet; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.Records; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Objects; +import java.util.Properties; import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.Schema; @@ -155,15 +159,17 @@ public void testTupleType() throws Exception { pipeline.run().waitUntilFinish(); - try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) { - rs.next(); - assertEquals("[tuple, true]", rs.getString("t0")); + try (Records records = executeQuery("SELECT * FROM test_named_tuples")) { + for (GenericRecord record : records) { + assertArrayEquals(new Object[] {"tuple", true}, record.getTuple("t0")); + } } - try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) { - rs.next(); - assertEquals("tuple", rs.getString("f0")); - assertEquals("true", rs.getString("f1")); + try (Records records = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) { + for (GenericRecord record : records) { + assertEquals("tuple", record.getString("f0")); + assertTrue(record.getBoolean("f1")); + } } } @@ -202,17 +208,24 @@ public void testComplexTupleType() throws Exception { pipeline.run().waitUntilFinish(); - try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) { - rs.next(); - assertEquals("[[test, [10, 20], 1.0.0], mobile]", rs.getString("prop")); + try (Records records = executeQuery("SELECT * FROM test_named_complex_tuples")) { + for (GenericRecord record : records) { + // Object[] propValue = record.getTuple("prop"); + // Adjust assertion based on actual output + assertArrayEquals( + new Object[] {new Object[] {"test", new Object[] {10L, 20L}, "1.0.0"}, "mobile"}, + record.getTuple("prop")); + // assertEquals("(('test',[10,20],'1.0.0'),'mobile')", propValue); + } } - try (ResultSet rs = + try (Records records = executeQuery( "SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) { - rs.next(); - assertEquals("test", rs.getString("name")); - assertEquals("[10, 20]", rs.getString("size")); + for (GenericRecord record : records) { + assertEquals("test", record.getString("name")); + assertArrayEquals(new Object[] {10L, 20L}, record.getTuple("size")); + } } } @@ -292,29 +305,32 @@ public void testPrimitiveTypes() throws Exception { pipeline.run().waitUntilFinish(); - try (ResultSet rs = executeQuery("SELECT * FROM test_primitive_types")) { - rs.next(); - - assertEquals("2030-10-01", rs.getString("f0")); - assertEquals("2030-10-09 08:07:06", rs.getString("f1")); - assertEquals("2.2", rs.getString("f2")); - assertEquals("3.3", rs.getString("f3")); - assertEquals("4", rs.getString("f4")); - assertEquals("5", rs.getString("f5")); - assertEquals("6", rs.getString("f6")); - assertEquals("7", rs.getString("f7")); - assertEquals("eight", rs.getString("f8")); - assertEquals("9", rs.getString("f9")); - assertEquals("10", rs.getString("f10")); - assertEquals("11", rs.getString("f11")); - assertEquals("12", rs.getString("f12")); - assertEquals("abc", rs.getString("f13")); - assertEquals("cde", rs.getString("f14")); - assertArrayEquals(new byte[] {'q', 'w', 'e'}, rs.getBytes("f15")); - assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16")); - assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17")); - assertEquals("true", rs.getString("f18")); - assertEquals("lowcardenality", rs.getString("f19")); + try (Records records = executeQuery("SELECT * FROM test_primitive_types")) { + for (GenericRecord record : records) { + assertEquals("2030-10-01", record.getString("f0")); + assertEquals("2030-10-09 08:07:06", record.getString("f1")); + assertEquals("2.2", record.getString("f2")); + assertEquals("3.3", record.getString("f3")); + assertEquals("4", record.getString("f4")); + assertEquals("5", record.getString("f5")); + assertEquals("6", record.getString("f6")); + assertEquals("7", record.getString("f7")); + assertEquals("eight", record.getString("f8")); + assertEquals("9", record.getString("f9")); + assertEquals("10", record.getString("f10")); + assertEquals("11", record.getString("f11")); + assertEquals("12", record.getString("f12")); + assertEquals("abc", record.getString("f13")); + assertEquals("cde", record.getString("f14")); + assertArrayEquals( + new byte[] {'q', 'w', 'e'}, record.getString("f15").getBytes(StandardCharsets.UTF_8)); + assertArrayEquals( + new byte[] {'a', 's', 'd'}, record.getString("f16").getBytes(StandardCharsets.UTF_8)); + assertArrayEquals( + new byte[] {'z', 'x', 'c'}, record.getString("f17").getBytes(StandardCharsets.UTF_8)); + assertEquals("true", record.getString("f18")); + assertEquals("lowcardenality", record.getString("f19")); + } } } @@ -388,26 +404,28 @@ public void testArrayOfPrimitiveTypes() throws Exception { pipeline.run().waitUntilFinish(); - try (ResultSet rs = executeQuery("SELECT * FROM test_array_of_primitive_types")) { - rs.next(); - - assertEquals("[2030-10-01, 2031-10-01]", rs.getString("f0")); - assertEquals("[2030-10-09T08:07:06, 2031-10-09T08:07:06]", rs.getString("f1")); - // Since comparing float/double values is not precise, we compare the string representation - assertEquals("[2.2,3.3]", rs.getString("f2")); - assertEquals("[3.3,4.4]", rs.getString("f3")); - assertArrayEquals(new byte[] {4, 5}, (byte[]) rs.getArray("f4").getArray()); - assertArrayEquals(new short[] {5, 6}, (short[]) rs.getArray("f5").getArray()); - assertArrayEquals(new int[] {6, 7}, (int[]) rs.getArray("f6").getArray()); - assertArrayEquals(new long[] {7L, 8L}, (long[]) rs.getArray("f7").getArray()); - assertArrayEquals(new String[] {"eight", "nine"}, (String[]) rs.getArray("f8").getArray()); - assertArrayEquals(new byte[] {9, 10}, (byte[]) rs.getArray("f9").getArray()); - assertArrayEquals(new short[] {10, 11}, (short[]) rs.getArray("f10").getArray()); - assertArrayEquals(new int[] {11, 12}, (int[]) rs.getArray("f11").getArray()); - assertArrayEquals(new long[] {12L, 13L}, (long[]) rs.getArray("f12").getArray()); - assertArrayEquals(new String[] {"abc", "cde"}, (String[]) rs.getArray("f13").getArray()); - assertArrayEquals(new String[] {"cde", "abc"}, (String[]) rs.getArray("f14").getArray()); - assertArrayEquals(new boolean[] {true, false}, (boolean[]) rs.getArray("f15").getArray()); + try (Records records = executeQuery("SELECT * FROM test_array_of_primitive_types")) { + for (GenericRecord record : records) { + // Date/time arrays as strings + assertEquals("[2030-10-01, 2031-10-01]", record.getString("f0")); + assertEquals("[2030-10-09 08:07:06, 2031-10-09 08:07:06]", record.getString("f1")); + assertEquals("[2.2, 3.3]", record.getString("f2")); + assertEquals("[3.3, 4.4]", record.getString("f3")); + + // Use the proper typed array methods + assertArrayEquals(new byte[] {4, 5}, record.getByteArray("f4")); // Int8 + assertArrayEquals(new short[] {5, 6}, record.getShortArray("f5")); // Int16 + assertArrayEquals(new int[] {6, 7}, record.getIntArray("f6")); // Int32 + assertArrayEquals(new long[] {7L, 8L}, record.getLongArray("f7")); // Int64 + assertArrayEquals(new String[] {"eight", "nine"}, record.getStringArray("f8")); // String + assertArrayEquals(new short[] {9, 10}, record.getShortArray("f9")); // UInt8 -> short + assertArrayEquals(new int[] {10, 11}, record.getIntArray("f10")); // UInt16 -> int + assertArrayEquals(new long[] {11, 12}, record.getLongArray("f11")); // UInt32 -> long + assertEquals("[12, 13]", record.getString("f12")); // UInt64 + assertEquals("[abc, cde]", record.getString("f13")); // FixedString + assertEquals("[cde, abc]", record.getString("f14")); // FixedString + assertArrayEquals(new boolean[] {true, false}, record.getBooleanArray("f15")); + } } } @@ -475,6 +493,12 @@ public void testPojo() throws Exception { } private ClickHouseIO.Write write(String table) { - return ClickHouseIO.write(clickHouse.getJdbcUrl(), table).withMaxRetries(0); + Properties properties = new Properties(); + properties.setProperty("user", clickHouse.getUsername()); + properties.setProperty("password", clickHouse.getPassword()); + + return ClickHouseIO.write(clickHouseUrl, database, table) + .withProperties(properties) + .withMaxRetries(0); } } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java new file mode 100644 index 000000000000..4b994522d9b3 --- /dev/null +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.clickhouse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Properties; +import org.apache.beam.sdk.io.clickhouse.ClickHouseJdbcUrlParser.ParsedJdbcUrl; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ClickHouseJdbcUrlParser}. */ +@RunWith(JUnit4.class) +public class ClickHouseJdbcUrlParserTest { + + @Test + public void testBasicJdbcUrl() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/default"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("default", parsed.getDatabase()); + assertTrue(parsed.getProperties().isEmpty()); + } + + @Test + public void testJdbcUrlWithCustomDatabase() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + assertTrue(parsed.getProperties().isEmpty()); + } + + @Test + public void testJdbcUrlWithoutPort() { + String jdbcUrl = "jdbc:clickhouse://localhost/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithoutDatabase() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("default", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithTrailingSlash() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("default", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithHttpPrefix() { + String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithHttpsPrefix() { + String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("https://localhost:8443", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithHttpsWithoutPort() { + String jdbcUrl = "jdbc:clickhouse:https://localhost/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("https://localhost:8443", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithSingleParameter() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?user=admin"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + assertEquals("admin", parsed.getProperties().getProperty("user")); + } + + @Test + public void testJdbcUrlWithMultipleParameters() { + String jdbcUrl = + "jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret&compress=true"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + + Properties props = parsed.getProperties(); + assertEquals("admin", props.getProperty("user")); + assertEquals("secret", props.getProperty("password")); + assertEquals("true", props.getProperty("compress")); + } + + @Test + public void testJdbcUrlWithUrlEncodedParameters() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?user=my%20user&password=p%40ssw0rd"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + Properties props = parsed.getProperties(); + assertEquals("my user", props.getProperty("user")); + assertEquals("p@ssw0rd", props.getProperty("password")); + } + + @Test + public void testJdbcUrlWithParameterWithoutValue() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?compress"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("true", parsed.getProperties().getProperty("compress")); + } + + @Test + public void testJdbcUrlShorthandCh() { + String jdbcUrl = "jdbc:ch://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithRemoteHost() { + String jdbcUrl = "jdbc:clickhouse://clickhouse.example.com:9000/production"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://clickhouse.example.com:9000", parsed.getClickHouseUrl()); + assertEquals("production", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithIpAddress() { + String jdbcUrl = "jdbc:clickhouse://192.168.1.100:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://192.168.1.100:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithComplexQueryString() { + String jdbcUrl = + "jdbc:clickhouse://localhost:8123/mydb?" + + "user=admin&password=secret&" + + "socket_timeout=30000&" + + "connection_timeout=10000&" + + "compress=true"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + Properties props = parsed.getProperties(); + assertEquals("admin", props.getProperty("user")); + assertEquals("secret", props.getProperty("password")); + assertEquals("30000", props.getProperty("socket_timeout")); + assertEquals("10000", props.getProperty("connection_timeout")); + assertEquals("true", props.getProperty("compress")); + } + + @Test + public void testJdbcUrlCaseInsensitivePrefix() { + String jdbcUrl = "JDBC:CLICKHOUSE://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testJdbcUrlWithDatabaseContainingUnderscore() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/my_database_name"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("my_database_name", parsed.getDatabase()); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullJdbcUrl() { + ClickHouseJdbcUrlParser.parse(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyJdbcUrl() { + ClickHouseJdbcUrlParser.parse(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidJdbcPrefix() { + ClickHouseJdbcUrlParser.parse("jdbc:mysql://localhost:3306/mydb"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidSchemeFtp() { + ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:ftp://localhost:8123/mydb"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidSchemeGopher() { + ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:gopher://localhost:8123/mydb"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidSchemeFile() { + ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:file://localhost:8123/mydb"); + } + + @Test + public void testValidHttpSchemeExplicit() { + String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testValidHttpsSchemeExplicit() { + String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("https://localhost:8443", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testImplicitHttpScheme() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test(expected = IllegalArgumentException.class) + public void testMissingHost() { + ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://:8123/mydb"); + } + + @Test(expected = IllegalArgumentException.class) + public void testMalformedUrl() { + ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://localhost:invalid_port/mydb"); + } + + @Test + public void testJdbcUrlWithoutJdbcPrefix() { + // Should still work if user somehow passes URL without jdbc: prefix + String jdbcUrl = "clickhouse://localhost:8123/mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("mydb", parsed.getDatabase()); + } + + @Test + public void testBackwardCompatibilityScenario() { + // Simulating a real-world legacy JDBC URL + String legacyJdbcUrl = + "jdbc:clickhouse://prod-clickhouse.internal:8123/analytics?" + + "user=analytics_user&" + + "password=secure123&" + + "compress=true&" + + "socket_timeout=60000"; + + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(legacyJdbcUrl); + + assertEquals("http://prod-clickhouse.internal:8123", parsed.getClickHouseUrl()); + assertEquals("analytics", parsed.getDatabase()); + + Properties props = parsed.getProperties(); + assertEquals("analytics_user", props.getProperty("user")); + assertEquals("secure123", props.getProperty("password")); + assertEquals("true", props.getProperty("compress")); + assertEquals("60000", props.getProperty("socket_timeout")); + } + + @Test + public void testJdbcUrlWithMultipleSlashesInPath() { + // Edge case: malformed URL with multiple slashes + String jdbcUrl = "jdbc:clickhouse://localhost:8123//mydb"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + // URI parsing should normalize this + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("/mydb", parsed.getDatabase()); // Will have leading slash + } + + @Test + public void testJdbcUrlWithQueryButNoDatabase() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123?user=admin"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + assertEquals("http://localhost:8123", parsed.getClickHouseUrl()); + assertEquals("default", parsed.getDatabase()); + assertEquals("admin", parsed.getProperties().getProperty("user")); + } + + @Test + public void testJdbcUrlWithEmptyQueryParameter() { + String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?user=&password=secret"; + ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl); + + Properties props = parsed.getProperties(); + assertEquals("", props.getProperty("user")); + assertEquals("secret", props.getProperty("password")); + } +}