Skip to content

Commit

Permalink
bump to r2dbc 1.0.0-RELEASE
Browse files Browse the repository at this point in the history
  • Loading branch information
rernas35 committed May 7, 2022
1 parent 620195c commit 3fead38
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 70 deletions.
2 changes: 1 addition & 1 deletion clickhouse-r2dbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<hikari-cp.version>4.0.3</hikari-cp.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<r2dbc-spi.version>0.9.1.RELEASE</r2dbc-spi.version>
<r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<testcontainers.elasticsearch.version>1.16.3</testcontainers.elasticsearch.version>
<testcontainers.clickhouse.version>1.16.3</testcontainers.clickhouse.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ClickHouseResult implements Result {
.map(rec -> Pair.of(resp.getColumns(), rec))))
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
.map(RowSegment::new);
this.updatedCount = response.map(clickHouseResponse -> clickHouseResponse.getSummary())
this.updatedCount = response.map(ClickHouseResponse::getSummary)
.map(ClickHouseResponseSummary::getProgress)
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
.map(UpdateCount::new);
Expand All @@ -50,14 +50,8 @@ public class ClickHouseResult implements Result {
* @return
*/
@Override
public Mono<Integer> getRowsUpdated() {

return updatedCount.map(val -> {
UpdateCount updateCount = (UpdateCount) val;
if (updateCount.value() > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else return Long.valueOf(updateCount.value()).intValue();
});
public Mono<Long> getRowsUpdated() {
return updatedCount.map(val -> ((UpdateCount) val).value());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.RowMetadata;

import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -32,12 +32,6 @@ public ColumnMetadata getColumnMetadata(String columnName) {

@Override
public List<? extends ColumnMetadata> getColumnMetadatas() {
return Collections.unmodifiableList(columnNameMetadataMap.entrySet().stream()
.map(Map.Entry::getValue).collect(Collectors.toList()));
}

@Override
public Collection<String> getColumnNames() {
return Collections.unmodifiableSet(columnNameMetadataMap.keySet());
return Collections.unmodifiableList(new ArrayList<>(columnNameMetadataMap.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.clickhouse.client.logging.LoggerFactory;
import io.r2dbc.spi.Blob;
import io.r2dbc.spi.Clob;
import io.r2dbc.spi.Parameter;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;
Expand All @@ -24,13 +25,18 @@ public class ClickHouseStatement implements Statement {
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes;
private static final String NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE = "null values are not allowed as value.";
private static final String CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE = "class types are not allowed as value.";
private static final String INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0 = "Invalid identifier index! identifier index must be greater than 0.";
private static final String INVALID_PARAMETER_INDEX = "Invalid parameter index! Parameter index must be greater than 0.";

private static final Object EXPLICITLY_SET_NULL_VALUE = new Object();
public static final String NULL_VALUES_ARE_NOT_ALLOWED_AS_PARAMETER_NAME = "null values are not allowed as parameter name.";
public static final String GENERATED_VALUES_CAN_NOT_BE_RETURNED_FROM_CLICKHOUSE_DATABASE = "Generated values can not be returned from Clickhouse database.";
public static final String NON_EXISTING_IDENTIFIER_TEMPLATE = "non-existing identifier : %s";
public static final String UNSUPPORTED_DATATYPE_BLOB = "Unsupported datatype: Blob";
public static final String UNSUPPORTED_DATATYPE_CLOB = "Unsupported datatype: Clob";

private final ClickHouseRequest<?> request;
private final List<String> namedParameters;
private ClickHouseStatementBinding bindings;
private final ClickHouseStatementBinding bindings;
private int fetchSize;

public ClickHouseStatement(String sql, ClickHouseRequest<?> request) {
Expand All @@ -52,8 +58,9 @@ public Statement bind(int identifierIndex, Object o) {
} else if (o instanceof Class) {
throw new IllegalArgumentException(CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE);
}

if (identifierIndex < 0) {
throw new IllegalArgumentException(INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0);
throw new IllegalArgumentException(INVALID_PARAMETER_INDEX);
}

bindings.addBinding(identifierIndex, safeValue(o));
Expand All @@ -62,9 +69,14 @@ public Statement bind(int identifierIndex, Object o) {

private Object safeValue(Object o) {
if (o instanceof Blob) {
throw new IllegalArgumentException("Unsupported datatype: Blob");
throw new IllegalArgumentException(UNSUPPORTED_DATATYPE_BLOB);
} else if (o instanceof Clob) {
throw new IllegalArgumentException("Unsupported datatype: Clob");
throw new IllegalArgumentException(UNSUPPORTED_DATATYPE_CLOB);
} else if (o instanceof Parameter) {
Object value = ((Parameter) o).getValue();
if (value == null)
return EXPLICITLY_SET_NULL_VALUE;
return value;
}
return o;
}
Expand All @@ -76,9 +88,10 @@ public Statement bind(String identifierName, Object o) {
} else if (o instanceof Class) {
throw new IllegalArgumentException(CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE);
}

int index = namedParameters.indexOf(identifierName);
if (index < 0) {
throw new NoSuchElementException(String.format("non-existing identifier : %s", identifierName));
throw new NoSuchElementException(String.format(NON_EXISTING_IDENTIFIER_TEMPLATE, identifierName));
}
bindings.addBinding(index, safeValue(o));
return this;
Expand All @@ -87,7 +100,7 @@ public Statement bind(String identifierName, Object o) {
@Override
public Statement bindNull(int identifierIndex, Class<?> aClass) {
if (identifierIndex < 0) {
throw new IllegalArgumentException(INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0);
throw new IllegalArgumentException(INVALID_PARAMETER_INDEX);
}
bindings.addBinding(identifierIndex, EXPLICITLY_SET_NULL_VALUE);
return this;
Expand All @@ -96,7 +109,7 @@ public Statement bindNull(int identifierIndex, Class<?> aClass) {
@Override
public Statement bindNull(String identifierName, Class<?> aClass) {
if (identifierName == null) {
throw new IllegalArgumentException("null values are not allowed as identifier name.");
throw new IllegalArgumentException(NULL_VALUES_ARE_NOT_ALLOWED_AS_PARAMETER_NAME);
}
bindings.addBinding(namedParameters.indexOf(identifierName), EXPLICITLY_SET_NULL_VALUE);
return this;
Expand Down Expand Up @@ -135,6 +148,6 @@ public Flux<? extends Result> execute() {

@Override
public Statement returnGeneratedValues(String... columns) {
throw new UnsupportedOperationException("Generated values can not be returned from Clickhouse database.");
throw new UnsupportedOperationException(GENERATED_VALUES_CAN_NOT_BE_RETURNED_FROM_CLICKHOUSE_DATABASE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;

class ClickHouseStatementBinding {
public static final String NOT_ALL_PARAMETERS_ARE_SET = "Not all parameters are set.";
List<Binding> bindingList;
Binding current;
int size;
Expand Down Expand Up @@ -33,7 +34,7 @@ List<Binding> getBoundList() {
return bindingList;
}

public class Binding {
public static class Binding {

Object[] values;

Expand All @@ -47,14 +48,10 @@ private void setParam(int index, Object value) {

private boolean isCompleted(){
for (Object value: values) {
if (value == null) throw new IllegalStateException("Not all identifiers are set.");
if (value == null) throw new IllegalStateException(NOT_ALL_PARAMETERS_ARE_SET);
}
return true;
}

public Object[] getValues() {
return values;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.logging.Logger;
Expand Down Expand Up @@ -29,15 +30,14 @@ public class ClickHouseConnection implements Connection {
final ClickHouseNode server;
private boolean closed = false;

ClickHouseConnection(ClickHouseClient client, ClickHouseNode server) {
this.client = client;
ClickHouseConnection(ClickHouseNode server, ClickHouseProtocol preferredProtocol) {
this.client = ClickHouseClient.newInstance(preferredProtocol);
this.server = server;
}


/**
* Transactions are not supported so this is a no-op implementation,
* @return
*/
@Override
public Mono<Void> beginTransaction() {
Expand All @@ -47,7 +47,6 @@ public Mono<Void> beginTransaction() {

/**
* Transactions are not supported so this is a no-op implementation,
* @return
*/
@Override
public Mono<Void> beginTransaction(TransactionDefinition transactionDefinition) {
Expand All @@ -68,7 +67,6 @@ public Publisher<Void> close() {

/**
* Transactions are not supported so this is a no-op implementation,
* @return
*/
@Override
public Publisher<Void> commitTransaction() {
Expand All @@ -78,7 +76,7 @@ public Publisher<Void> commitTransaction() {

/**
* Returns {@link ClickHouseBatch} for batching statements.
* @return
* @return Batch object
*/
@Override
public Batch createBatch() {
Expand Down Expand Up @@ -120,7 +118,7 @@ public ConnectionMetadata getMetadata() {

/**
*
* @return
* @return Always returns read committed.
*/
@Override
public IsolationLevel getTransactionIsolationLevel() {
Expand All @@ -134,7 +132,6 @@ public Publisher<Void> releaseSavepoint(String s) {

/**
* Transactions are not supported so this is a no-op implementation,
* @return
*/
@Override
public Publisher<Void> rollbackTransaction() {
Expand All @@ -149,7 +146,6 @@ public Publisher<Void> rollbackTransactionToSavepoint(String s) {

/**
* Transactions are not supported so this is a no-op implementation,
* @return
*/
@Override
public Publisher<Void> setAutoCommit(boolean b) {
Expand All @@ -169,8 +165,7 @@ public Publisher<Void> setStatementTimeout(Duration duration) {

/**
* Since transactions are not supported, this method will throw exception.
* @param isolationLevel
* @return
* @param isolationLevel @{{@link IsolationLevel}}
*/
@Override
public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.clickhouse.r2dbc.connection;

import com.clickhouse.client.ClickHouseClient;

import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import io.r2dbc.spi.Connection;
Expand All @@ -10,29 +10,18 @@

public class ClickHouseConnectionFactory implements ConnectionFactory {

Mono<ClickHouseConnection> client;

private ClickHouseConnectionFactory(Mono<ClickHouseConnection> client) {
this.client = client;
}
private final ClickHouseNode server;
private final ClickHouseProtocol preferredProtocol;

public static ClickHouseConnectionFactory from(ClickHouseNode.Builder builder, String protocol) {
ClickHouseNode server = builder.build();
ClickHouseProtocol preferredProtocol = null;
if (protocol.equalsIgnoreCase("grpc")) {
preferredProtocol = ClickHouseProtocol.GRPC;
} else if (protocol.equalsIgnoreCase("http")) {
preferredProtocol = ClickHouseProtocol.HTTP;
} else {
throw new IllegalArgumentException("Undefined protocol");
}
ClickHouseClient client = ClickHouseClient.newInstance(preferredProtocol);
return new ClickHouseConnectionFactory(Mono.just(new ClickHouseConnection(client, server)));
ClickHouseConnectionFactory(ClickHouseNode server, ClickHouseProtocol preferredProtocol) {
this.server = server;
this.preferredProtocol = preferredProtocol;
}

@Override
public Mono<? extends Connection> create() {
return client;
return Mono.just(new ClickHouseConnection(server, preferredProtocol));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;

Expand All @@ -23,15 +24,26 @@ public ConnectionFactory create(ConnectionFactoryOptions cfOpt) {
nodeBuilder.credentials(ClickHouseCredentials.fromUserAndPassword(cfOpt.getValue(USER).toString(),
cfOpt.getValue(PASSWORD).toString()));
}
return ClickHouseConnectionFactory.from(nodeBuilder, cfOpt.getValue(PROTOCOL).toString());
return from(nodeBuilder, cfOpt.getValue(PROTOCOL).toString());
}


private static ClickHouseConnectionFactory from(ClickHouseNode.Builder builder, String protocol) {
ClickHouseNode server = builder.build();
ClickHouseProtocol preferredProtocol;
if (protocol.equalsIgnoreCase("grpc")) {
preferredProtocol = ClickHouseProtocol.GRPC;
} else if (protocol.equalsIgnoreCase("http")) {
preferredProtocol = ClickHouseProtocol.HTTP;
} else {
throw new IllegalArgumentException("Undefined protocol");
}
return new ClickHouseConnectionFactory(server, preferredProtocol);
}

@Override
public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
if (connectionFactoryOptions.getValue(DRIVER).equals(CLICKHOUSE_DRIVER)) {
return true;
}
return false;
return connectionFactoryOptions.getValue(DRIVER).equals(CLICKHOUSE_DRIVER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ClickHouseConnectionMetadata implements ConnectionMetadata {
final ClickHouseClient client;
final ClickHouseNode server;

private String serverVersion = null;
private final String serverVersion = null;

ClickHouseConnectionMetadata(ClickHouseClient client, ClickHouseNode server) {
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public static void setup() throws Exception {
clickHouseContainer.setPortBindings(Arrays.asList("0.0.0.0:" + GRPC_PORT + ":" + GRPC_PORT));
clickHouseContainer.start();
log.info("Exposed ports are : {}", clickHouseContainer.getExposedPorts());
// jdbcTemplate(clickHouseContainer.getMappedPort(HTTP_PORT), null).execute(format("CREATE DATABASE %s [ENGINE = Atomic]", DATABASE));
// log.info("Database with a name of {} is created", DATABASE);
jdbcTemplate = jdbcTemplate(clickHouseContainer.getMappedPort(HTTP_PORT), DATABASE);
}

Expand Down

0 comments on commit 3fead38

Please sign in to comment.