From c697e0a2412621e3d921a5208d1f0eaa0708c9c9 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sat, 29 Oct 2022 18:27:42 +0800 Subject: [PATCH 01/22] add cassandra connector --- .../connector-cassandra/pom.xml | 52 +++ .../cassandra/client/CassandraClient.java | 65 ++++ .../cassandra/config/CassandraConfig.java | 109 ++++++ .../cassandra/sink/CassandraSink.java | 107 ++++++ .../cassandra/sink/CassandraSinkWriter.java | 112 +++++++ .../cassandra/source/CassandraSource.java | 114 +++++++ .../source/CassandraSourceReader.java | 92 ++++++ .../source/CassandraSourceSplit.java | 27 ++ .../CassandraSourceSplitEnumerator.java | 97 ++++++ .../cassandra/state/CassandraSourceState.java | 23 ++ .../cassandra/util/TypeConvertUtil.java | 309 ++++++++++++++++++ seatunnel-connectors-v2/pom.xml | 1 + 12 files changed, 1108 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-cassandra/pom.xml create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java diff --git a/seatunnel-connectors-v2/connector-cassandra/pom.xml b/seatunnel-connectors-v2/connector-cassandra/pom.xml new file mode 100644 index 00000000000..4be1ba14c7e --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/pom.xml @@ -0,0 +1,52 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-cassandra + + + 4.14.0 + + + + + com.datastax.oss + java-driver-core + ${cassandra.driver.version} + + + org.apache.commons + commons-lang3 + + + org.apache.seatunnel + connector-common + ${project.version} + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java new file mode 100644 index 00000000000..deff52a6b57 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.client; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import org.apache.commons.lang3.StringUtils; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +public class CassandraClient { + public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String keyspace, String username, String password, String dataCenter) { + List cqlSessionList = Arrays.stream(nodeAddress.split(",")).map(address -> { + String[] nodeAndPort = address.split(":", 2); + if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) { + return CqlSession.builder() + .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1]))) + .withKeyspace(keyspace) + .withLocalDatacenter(dataCenter); + } + return CqlSession.builder() + .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1]))) + .withAuthCredentials(username, password) + .withKeyspace(keyspace) + .withLocalDatacenter(dataCenter); + }).collect(Collectors.toList()); + return cqlSessionList.get(ThreadLocalRandom.current().nextInt(cqlSessionList.size())); + } + + public static SimpleStatement createSimpleStatement(String cql, ConsistencyLevel consistencyLevel) { + return SimpleStatement.builder(cql).setConsistencyLevel(consistencyLevel).build(); + } + + public static ColumnDefinitions getTableSchema(CqlSession session, String table) { + try { + return session.execute(String.format("select * from %s limit 1", table)) + .getColumnDefinitions(); + } catch (Exception e) { + throw new RuntimeException("Cannot get table schema from cassandra", e); + } + + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java new file mode 100644 index 00000000000..a23b3bb0eaa --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.ToString; + +import java.io.Serializable; +import java.util.List; + +@Data +@ToString +@NoArgsConstructor +public class CassandraConfig implements Serializable { + + public static final String HOST = "host"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String DATACENTER = "datacenter"; + public static final String KEYSPACE = "keyspace"; + public static final String TABLE = "table"; + public static final String CQL = "cql"; + public static final String FIELDS = "fields"; + public static final String CONSISTENCY_LEVEL = "consistency_level"; + public static final String BATCH_SIZE = "batch_size"; + public static final String BATCH_TYPE = "batch_type"; + + private String host; + private String username; + private String password; + private String datacenter; + private String keyspace; + private String table; + private String cql; + private List fields; + private ConsistencyLevel consistencyLevel; + private Integer batchSize; + private DefaultBatchType batchType; + + public CassandraConfig(@NonNull String host, @NonNull String keyspace) { + this.host = host; + this.keyspace = keyspace; + } + + public static CassandraConfig getCassandraConfig(Config config) { + CassandraConfig cassandraConfig = new CassandraConfig( + config.getString(HOST), + config.getString(KEYSPACE) + ); + if (config.hasPath(USERNAME)) { + cassandraConfig.setUsername(config.getString(USERNAME)); + } + if (config.hasPath(PASSWORD)) { + cassandraConfig.setPassword(config.getString(PASSWORD)); + } + if (config.hasPath(DATACENTER)) { + cassandraConfig.setDatacenter(config.getString(DATACENTER)); + } else { + cassandraConfig.setDatacenter("datacenter1"); + } + if (config.hasPath(TABLE)) { + cassandraConfig.setTable(config.getString(TABLE)); + } + if (config.hasPath(CQL)) { + cassandraConfig.setCql(config.getString(CQL)); + } + if (config.hasPath(FIELDS)) { + cassandraConfig.setFields(config.getStringList(FIELDS)); + } + if (config.hasPath(CONSISTENCY_LEVEL)) { + cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL))); + } else { + cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE); + } + if (config.hasPath(BATCH_SIZE)) { + cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE)); + } else { + cassandraConfig.setBatchSize(Integer.parseInt("100")); + } + if (config.hasPath(BATCH_TYPE)) { + cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE))); + } else { + cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED); + } + return cassandraConfig; + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java new file mode 100644 index 00000000000..ac32376aa20 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.sink; + +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE; +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; +import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.google.auto.service.AutoService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@AutoService(SeaTunnelSink.class) +public class CassandraSink extends AbstractSimpleSink { + + private CassandraConfig cassandraConfig; + + private SeaTunnelRowType seaTunnelRowType; + + private ColumnDefinitions tableSchema; + + @Override + public String getPluginName() { + return "Cassandra"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE); + if (!checkResult.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, checkResult.getMsg()); + } + this.cassandraConfig = CassandraConfig.getCassandraConfig(config); + CqlSession session = CassandraClient.getCqlSessionBuilder( + cassandraConfig.getHost(), + cassandraConfig.getKeyspace(), + cassandraConfig.getUsername(), + cassandraConfig.getPassword(), + cassandraConfig.getDatacenter() + ).build(); + List fields = cassandraConfig.getFields(); + this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable()); + if (fields == null || fields.isEmpty()) { + List newFields = new ArrayList<>(); + for (int i = 0; i < tableSchema.size(); i++) { + newFields.add(tableSchema.get(i).getName().asInternal()); + } + cassandraConfig.setFields(newFields); + } else { + for (String field : fields) { + if (!tableSchema.contains(field)) { + throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE)); + } + } + } + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema); + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java new file mode 100644 index 00000000000..b164c806d5a --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; +import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; +import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.type.DataType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +public class CassandraSinkWriter extends AbstractSinkWriter { + + private final CassandraConfig cassandraConfig; + private final SeaTunnelRowType seaTunnelRowType; + private final ColumnDefinitions tableSchema; + private final CqlSession session; + private BatchStatement batchStatement; + private final PreparedStatement preparedStatement; + private final AtomicInteger counter = new AtomicInteger(0); + + public CassandraSinkWriter(CassandraConfig cassandraConfig, SeaTunnelRowType seaTunnelRowType, ColumnDefinitions tableSchema) { + this.cassandraConfig = cassandraConfig; + this.seaTunnelRowType = seaTunnelRowType; + this.tableSchema = tableSchema; + this.session = CassandraClient.getCqlSessionBuilder( + cassandraConfig.getHost(), + cassandraConfig.getKeyspace(), + cassandraConfig.getUsername(), + cassandraConfig.getPassword(), + cassandraConfig.getDatacenter()).build(); + this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build(); + this.preparedStatement = session.prepare(initPrepareSQL()); + } + + @Override + public void write(SeaTunnelRow row) throws IOException { + BoundStatement boundStatement = this.preparedStatement.bind(); + addIntoBatch(row, boundStatement); + if (counter.getAndIncrement() >= cassandraConfig.getBatchSize()) { + flush(); + counter.set(0); + } + } + + private void flush() { + this.session.execute(this.batchStatement); + this.batchStatement.clear(); + } + + private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) { + try { + for (int i = 0; i < cassandraConfig.getFields().size(); i++) { + String fieldName = cassandraConfig.getFields().get(i); + DataType dataType = tableSchema.get(i).getType(); + Object fieldValue = row.getField(seaTunnelRowType.indexOf(fieldName)); + boundStatement = TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue); + } + this.batchStatement = this.batchStatement.add(boundStatement); + } catch (Exception e) { + throw new RuntimeException("Add row data into batch error", e); + } + } + + private String initPrepareSQL() { + String[] placeholder = new String[cassandraConfig.getFields().size()]; + Arrays.fill(placeholder, "?"); + return String.format("INSERT INTO %s (%s) VALUES (%s)", + cassandraConfig.getTable(), + String.join(",", cassandraConfig.getFields()), + String.join(",", placeholder)); + } + + @Override + public void close() throws IOException { + flush(); + try { + if (this.session != null) { + this.session.close(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to close CqlSession.", e); + } + + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java new file mode 100644 index 00000000000..bfce6cfe5c0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.source; + +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.CQL; +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; +import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; +import org.apache.seatunnel.connectors.seatunnel.cassandra.state.CassandraSourceState; +import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSource.class) +public class CassandraSource implements SeaTunnelSource { + + private SeaTunnelRowType rowTypeInfo; + private CassandraConfig cassandraConfig; + + @Override + public String getPluginName() { + return "Cassandra"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + this.cassandraConfig = CassandraConfig.getCassandraConfig(config); + CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, CQL); + if (!checkResult.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, checkResult.getMsg()); + } + // try { + CqlSession currentSession = CassandraClient.getCqlSessionBuilder( + cassandraConfig.getHost(), + cassandraConfig.getKeyspace(), + cassandraConfig.getUsername(), + cassandraConfig.getPassword(), + cassandraConfig.getDatacenter() + ).build(); + Row rs = currentSession.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())).one(); + if (rs == null) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "No data in the table!"); + } + int columnSize = rs.getColumnDefinitions().size(); + String[] fieldNames = new String[columnSize]; + SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; + for (int i = 0; i < columnSize; i++) { + fieldNames[i] = rs.getColumnDefinitions().get(i).getName().asInternal(); + seaTunnelDataTypes[i] = TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType()); + } + this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); + // } catch (Exception e) { + // throw new PrepareFailException(getPluginName(), PluginType.SOURCE, e.getMessage()); + // } + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new CassandraSourceReader(cassandraConfig, readerContext); + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new CassandraSourceSplitEnumerator(enumeratorContext); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, CassandraSourceState checkpointState) throws Exception { + return new CassandraSourceSplitEnumerator(enumeratorContext); + } + +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java new file mode 100644 index 00000000000..bcc78788d41 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; +import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; +import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class CassandraSourceReader implements SourceReader { + private final CassandraConfig cassandraConfig; + private final SourceReader.Context readerContext; + private final List splits; + private CqlSession session; + + CassandraSourceReader(CassandraConfig cassandraConfig, Context readerContext) { + this.cassandraConfig = cassandraConfig; + this.readerContext = readerContext; + this.splits = new ArrayList<>(); + } + + @Override + public void open() throws Exception { + session = CassandraClient.getCqlSessionBuilder( + cassandraConfig.getHost(), + cassandraConfig.getKeyspace(), + cassandraConfig.getUsername(), + cassandraConfig.getPassword(), + cassandraConfig.getDatacenter() + ).build(); + } + + @Override + public void close() throws IOException { + if (session != null) { + session.close(); + } + } + + @Override + public void pollNext(Collector output) throws Exception { + if (!splits.isEmpty()) { + ResultSet resultSet = session.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())); + resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row))); + this.readerContext.signalNoMoreElement(); + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void addSplits(List splits) { + this.splits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java new file mode 100644 index 00000000000..5cac1cdcebb --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +public class CassandraSourceSplit implements SourceSplit { + @Override + public String splitId() { + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java new file mode 100644 index 00000000000..9ccf365eec6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.cassandra.state.CassandraSourceState; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class CassandraSourceSplitEnumerator implements SourceSplitEnumerator { + + private final Context context; + private final Set readers; + private volatile int assigned = -1; + + CassandraSourceSplitEnumerator(Context context) { + this.context = context; + this.readers = new HashSet<>(); + } + + @Override + public void open() { + } + + @Override + public void run() throws Exception { + + } + + @Override + public void close() throws IOException { + + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (splits.isEmpty()) { + return; + } + if (subtaskId == assigned) { + Optional otherReader = readers.stream().filter(r -> r != subtaskId).findAny(); + if (otherReader.isPresent()) { + context.assignSplit(otherReader.get(), splits); + } else { + assigned = -1; + } + } + } + + @Override + public int currentUnassignedSplitSize() { + return assigned < 0 ? 0 : 1; + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } + + @Override + public void registerReader(int subtaskId) { + readers.add(subtaskId); + if (assigned < 0) { + assigned = subtaskId; + context.assignSplit(subtaskId, new CassandraSourceSplit()); + } + } + + @Override + public CassandraSourceState snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + } +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java new file mode 100644 index 00000000000..c8a0c8045cd --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.state; + +import java.io.Serializable; + +public class CassandraSourceState implements Serializable { +} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java new file mode 100644 index 00000000000..1e492c62e0c --- /dev/null +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra.util; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.internal.core.type.DefaultListType; +import com.datastax.oss.driver.internal.core.type.DefaultMapType; +import com.datastax.oss.driver.internal.core.type.DefaultSetType; +import com.datastax.oss.protocol.internal.ProtocolConstants; +import org.apache.commons.lang3.ArrayUtils; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +public class TypeConvertUtil { + public static SeaTunnelDataType convert(DataType type) { + switch (type.getProtocolCode()) { + case ProtocolConstants.DataType.VARCHAR: + case ProtocolConstants.DataType.VARINT: + case ProtocolConstants.DataType.ASCII: + case ProtocolConstants.DataType.UUID: + case ProtocolConstants.DataType.INET: + case ProtocolConstants.DataType.TIMEUUID: + return BasicType.STRING_TYPE; + case ProtocolConstants.DataType.TINYINT: + return BasicType.BYTE_TYPE; + case ProtocolConstants.DataType.SMALLINT: + return BasicType.SHORT_TYPE; + case ProtocolConstants.DataType.INT: + return BasicType.INT_TYPE; + case ProtocolConstants.DataType.BIGINT: + case ProtocolConstants.DataType.COUNTER: + return BasicType.LONG_TYPE; + case ProtocolConstants.DataType.FLOAT: + return BasicType.FLOAT_TYPE; + case ProtocolConstants.DataType.DOUBLE: + case ProtocolConstants.DataType.DECIMAL: + return BasicType.DOUBLE_TYPE; + case ProtocolConstants.DataType.BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case ProtocolConstants.DataType.TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case ProtocolConstants.DataType.DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case ProtocolConstants.DataType.TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case ProtocolConstants.DataType.BLOB: + return ArrayType.BYTE_ARRAY_TYPE; + case ProtocolConstants.DataType.MAP: + return new MapType<>(convert(((DefaultMapType) type).getKeyType()), convert(((DefaultMapType) type).getValueType())); + case ProtocolConstants.DataType.LIST: + return convertToArrayType(convert(((DefaultListType) type).getElementType())); + case ProtocolConstants.DataType.SET: + return convertToArrayType(convert(((DefaultSetType) type).getElementType())); + default: + throw new RuntimeException("not supported data type: " + type); + } + } + + private static ArrayType convertToArrayType(SeaTunnelDataType dataType) { + if (dataType.equals(BasicType.STRING_TYPE)) { + return ArrayType.STRING_ARRAY_TYPE; + } else if (dataType.equals(BasicType.BYTE_TYPE)) { + return ArrayType.BYTE_ARRAY_TYPE; + } else if (dataType.equals(BasicType.SHORT_TYPE)) { + return ArrayType.SHORT_ARRAY_TYPE; + } else if (dataType.equals(BasicType.INT_TYPE)) { + return ArrayType.INT_ARRAY_TYPE; + } else if (dataType.equals(BasicType.LONG_TYPE)) { + return ArrayType.LONG_ARRAY_TYPE; + } else if (dataType.equals(BasicType.FLOAT_TYPE)) { + return ArrayType.FLOAT_ARRAY_TYPE; + } else if (dataType.equals(BasicType.DOUBLE_TYPE)) { + return ArrayType.DOUBLE_ARRAY_TYPE; + } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) { + return ArrayType.BOOLEAN_ARRAY_TYPE; + } else { + throw new RuntimeException("not supported data type: " + dataType); + } + } + + public static SeaTunnelRow buildSeaTunnelRow(Row row) { + DataType subType; + Class typeClass; + Object[] fields = new Object[row.size()]; + ColumnDefinitions metaData = row.getColumnDefinitions(); + for (int i = 0; i < row.size(); i++) { + switch (metaData.get(i).getType().getProtocolCode()) { + case ProtocolConstants.DataType.ASCII: + case ProtocolConstants.DataType.VARCHAR: + fields[i] = row.getString(i); + break; + case ProtocolConstants.DataType.VARINT: + fields[i] = Objects.requireNonNull(row.getBigInteger(i)).toString(); + break; + case ProtocolConstants.DataType.TIMEUUID: + case ProtocolConstants.DataType.UUID: + fields[i] = Objects.requireNonNull(row.getUuid(i)).toString(); + break; + case ProtocolConstants.DataType.INET: + fields[i] = Objects.requireNonNull(row.getInetAddress(i)).getHostAddress(); + break; + case ProtocolConstants.DataType.TINYINT: + fields[i] = row.getByte(i); + break; + case ProtocolConstants.DataType.SMALLINT: + fields[i] = row.getShort(i); + break; + case ProtocolConstants.DataType.INT: + fields[i] = row.getInt(i); + break; + case ProtocolConstants.DataType.BIGINT: + fields[i] = row.getLong(i); + break; + case ProtocolConstants.DataType.FLOAT: + fields[i] = row.getFloat(i); + break; + case ProtocolConstants.DataType.DOUBLE: + fields[i] = row.getDouble(i); + break; + case ProtocolConstants.DataType.DECIMAL: + fields[i] = Objects.requireNonNull(row.getBigDecimal(i)).doubleValue(); + break; + case ProtocolConstants.DataType.BOOLEAN: + fields[i] = row.getBoolean(i); + break; + case ProtocolConstants.DataType.TIME: + fields[i] = row.getLocalTime(i); + break; + case ProtocolConstants.DataType.DATE: + fields[i] = row.getLocalDate(i); + break; + case ProtocolConstants.DataType.TIMESTAMP: + fields[i] = Timestamp.from(Objects.requireNonNull(row.getInstant(i))).toLocalDateTime(); + break; + case ProtocolConstants.DataType.BLOB: + fields[i] = ArrayUtils.toObject(Objects.requireNonNull(row.getByteBuffer(i)).array()); + break; + case ProtocolConstants.DataType.MAP: + subType = metaData.get(i).getType(); + fields[i] = row.getMap(i, convert(((DefaultMapType) subType).getKeyType()).getTypeClass(), convert(((DefaultMapType) subType).getValueType()).getTypeClass()); + break; + case ProtocolConstants.DataType.LIST: + typeClass = convert(((DefaultListType) metaData.get(i).getType()).getElementType()).getTypeClass(); + if (String.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, String.class)).toArray(new String[0]); + } else if (Byte.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Byte.class)).toArray(new Byte[0]); + } else if (Short.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Short.class)).toArray(new Short[0]); + } else if (Integer.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Integer.class)).toArray(new Integer[0]); + } else if (Long.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Long.class)).toArray(new Long[0]); + } else if (Float.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Float.class)).toArray(new Float[0]); + } else if (Double.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Double.class)).toArray(new Double[0]); + } else if (Boolean.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getList(i, Boolean.class)).toArray(new Boolean[0]); + } else { + throw new RuntimeException("List not supported data type: " + typeClass.toString()); + } + break; + case ProtocolConstants.DataType.SET: + typeClass = convert(((DefaultSetType) metaData.get(i).getType()).getElementType()).getTypeClass(); + if (String.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, String.class)).toArray(new String[0]); + } else if (Byte.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Byte.class)).toArray(new Byte[0]); + } else if (Short.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Short.class)).toArray(new Short[0]); + } else if (Integer.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Integer.class)).toArray(new Integer[0]); + } else if (Long.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Long.class)).toArray(new Long[0]); + } else if (Float.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Float.class)).toArray(new Float[0]); + } else if (Double.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Double.class)).toArray(new Double[0]); + } else if (Boolean.class.equals(typeClass)) { + fields[i] = Objects.requireNonNull(row.getSet(i, Boolean.class)).toArray(new Boolean[0]); + } else { + throw new RuntimeException("List not supported data type: " + typeClass.toString()); + } + break; + default: + fields[i] = row.getObject(i); + } + } + return new SeaTunnelRow(fields); + } + + public static BoundStatement reconvertAndInject(BoundStatement statement, int index, DataType type, Object fileValue) { + switch (type.getProtocolCode()) { + case ProtocolConstants.DataType.VARCHAR: + case ProtocolConstants.DataType.ASCII: + statement = statement.setString(index, (String) fileValue); + return statement; + case ProtocolConstants.DataType.VARINT: + statement = statement.setBigInteger(index, new BigInteger((String) fileValue)); + return statement; + case ProtocolConstants.DataType.UUID: + case ProtocolConstants.DataType.TIMEUUID: + statement = statement.setUuid(index, UUID.fromString((String) fileValue)); + return statement; + case ProtocolConstants.DataType.INET: + try { + statement = statement.setInetAddress(index, InetAddress.getByName((String) fileValue)); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + return statement; + case ProtocolConstants.DataType.TINYINT: + statement = statement.setByte(index, (Byte) fileValue); + return statement; + case ProtocolConstants.DataType.SMALLINT: + statement = statement.setShort(index, (Short) fileValue); + return statement; + case ProtocolConstants.DataType.INT: + statement = statement.setInt(index, (Integer) fileValue); + return statement; + case ProtocolConstants.DataType.BIGINT: + case ProtocolConstants.DataType.COUNTER: + statement = statement.setLong(index, (Long) fileValue); + return statement; + case ProtocolConstants.DataType.FLOAT: + statement = statement.setFloat(index, (Float) fileValue); + return statement; + case ProtocolConstants.DataType.DOUBLE: + statement = statement.setDouble(index, (Double) fileValue); + return statement; + case ProtocolConstants.DataType.DECIMAL: + statement = statement.setBigDecimal(index, BigDecimal.valueOf((Double) fileValue)); + return statement; + case ProtocolConstants.DataType.BOOLEAN: + statement = statement.setBoolean(index, (Boolean) fileValue); + return statement; + case ProtocolConstants.DataType.TIME: + statement = statement.setLocalTime(index, (LocalTime) fileValue); + return statement; + case ProtocolConstants.DataType.DATE: + statement = statement.setLocalDate(index, (LocalDate) fileValue); + return statement; + case ProtocolConstants.DataType.TIMESTAMP: + statement = statement.setInstant(index, ((LocalDateTime) fileValue).atZone(ZoneId.systemDefault()).toInstant()); + return statement; + case ProtocolConstants.DataType.BLOB: + if (fileValue.getClass().equals(Object[].class)) { + fileValue = Arrays.stream((Object[]) fileValue).toArray(Byte[]::new); + } + statement = statement.setByteBuffer(index, ByteBuffer.wrap(ArrayUtils.toPrimitive((Byte[]) fileValue))); + return statement; + case ProtocolConstants.DataType.MAP: + statement = statement.set(index, (Map) fileValue, Map.class); + return statement; + case ProtocolConstants.DataType.LIST: + statement = statement.set(index, Arrays.stream((Object[]) fileValue).collect(Collectors.toList()), List.class); + return statement; + case ProtocolConstants.DataType.SET: + statement = statement.set(index, Arrays.stream((Object[]) fileValue).collect(Collectors.toSet()), Set.class); + return statement; + default: + statement = statement.set(index, fileValue, Object.class); + return statement; + } + } + +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index ec05e06e584..07338e590c3 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -56,6 +56,7 @@ connector-mongodb connector-iceberg connector-influxdb + connector-cassandra From 89a8adfe03dd86aeda9563e8f7063c38b0c3cad7 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sat, 29 Oct 2022 18:35:31 +0800 Subject: [PATCH 02/22] add cassandra connector --- .../connector-cassandra-e2e/pom.xml | 54 +++ .../seatunnel/cassandra/CassandraIT.java | 356 ++++++++++++++++++ .../resources/cassandra_to_cassandra.conf | 58 +++ .../test/resources/init/cassandra_init.conf | 105 ++++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 5 files changed, 574 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml new file mode 100644 index 00000000000..655e03691ae --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml @@ -0,0 +1,54 @@ + + + + + seatunnel-connector-v2-e2e + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-cassandra-e2e + + + 4.14.0 + + + + + org.apache.seatunnel + connector-cassandra + ${project.version} + + + org.testcontainers + cassandra + ${testcontainer.version} + test + + + com.datastax.oss + java-driver-core + ${cassandra.driver.version} + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java new file mode 100644 index 00000000000..fe13edcf298 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cassandra; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchType; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class CassandraIT extends TestSuiteBase implements TestResource { + private static final String CASSANDRA_DOCKER_IMAGE = "cassandra"; + private static final String HOST = "cassandra"; + private static final Integer PORT = 9042; + private static final String INIT_CASSANDRA_PATH = "/init/cassandra_init.conf"; + private static final String CASSANDRA_JOB_CONFIG = "/cassandra_to_cassandra.conf"; + private static final String DATACENTER = "datacenter1"; + private static final String KEYSPACE = "test"; + private static final String SOURCE_TABLE = "source_table"; + private static final String SINK_TABLE = "sink_table"; + private static final String INSERT_CQL = "insert_cql"; + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + private Config config; + private CassandraContainer container; + private CqlSession session; + + @TestTemplate + public void testCassandra(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob(CASSANDRA_JOB_CONFIG); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertNotNull(getRow()); + compareResult(); + clearSinkTable(); + Assertions.assertNull(getRow()); + } + + @BeforeAll + @Override + public void startUp() throws Exception { + this.container = new CassandraContainer<>(CASSANDRA_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CASSANDRA_DOCKER_IMAGE))); + container.setPortBindings(Lists.newArrayList(String.format("%s:%s", PORT, PORT))); + Startables.deepStart(Stream.of(this.container)).join(); + log.info("Cassandra container started"); + Awaitility.given() + .ignoreExceptions() + .await() + .atMost(180L, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + this.initializeCassandraTable(); + this.batchInsertData(); + } + + private void initializeCassandraTable() { + initCassandraConfig(); + createKeyspace(); + try { + session.execute(SimpleStatement.builder(config.getString(SOURCE_TABLE)).setKeyspace(KEYSPACE).build()); + session.execute(SimpleStatement.builder(config.getString(SINK_TABLE)).setKeyspace(KEYSPACE).build()); + } catch (Exception e) { + throw new RuntimeException("Initializing Cassandra table failed!", e); + } + } + + private void initConnection() { + this.session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(container.getHost(), container.getExposedPorts().get(0))) + .withLocalDatacenter(DATACENTER) + .build(); + } + + private void batchInsertData() { + try { + BatchStatement batchStatement = BatchStatement.builder(BatchType.UNLOGGED).build(); + BoundStatement boundStatement = session.prepare( + SimpleStatement.builder(config.getString(INSERT_CQL)).setKeyspace(KEYSPACE).build()) + .bind(); + for (SeaTunnelRow row : TEST_DATASET._2()) { + boundStatement = boundStatement + .setLong(0, (Long) row.getField(0)) + .setString(1, (String) row.getField(1)) + .setLong(2, (Long) row.getField(2)) + .setByteBuffer(3, (ByteBuffer) row.getField(3)) + .setBoolean(4, (Boolean) row.getField(4)) + .setBigDecimal(5, (BigDecimal) row.getField(5)) + .setDouble(6, (Double) row.getField(6)) + .setFloat(7, (Float) row.getField(7)) + .setInt(8, (Integer) row.getField(8)) + .setInstant(9, (Instant) row.getField(9)) + .setUuid(10, (UUID) row.getField(10)) + .setString(11, (String) row.getField(11)) + .setBigInteger(12, (BigInteger) row.getField(12)) + .setUuid(13, (UUID) row.getField(13)) + .setInetAddress(14, (InetAddress) row.getField(14)) + .setLocalDate(15, (LocalDate) row.getField(15)) + .setShort(16, (Short) row.getField(16)) + .setByte(17, (Byte) row.getField(17)) + .setList(18, (List) row.getField(18), Float.class) + .setList(19, (List) row.getField(19), Integer.class) + .setSet(20, (Set) row.getField(20), Double.class) + .setSet(21, (Set) row.getField(21), Long.class) + .setMap(22, (Map) row.getField(22), String.class, Integer.class); + batchStatement = batchStatement.add(boundStatement); + } + session.execute(batchStatement); + batchStatement.clear(); + } catch (Exception e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + private void compareResult() throws IOException { + String sourceCql = "select * from " + SOURCE_TABLE; + String sinkCql = "select * from " + SINK_TABLE; + List columnList = Arrays.stream(generateTestDataSet()._1().getFieldNames()).collect(Collectors.toList()); + ResultSet sourceResultSet = session.execute(SimpleStatement.builder(sourceCql).setKeyspace(KEYSPACE).build()); + ResultSet sinkResultSet = session.execute(SimpleStatement.builder(sinkCql).setKeyspace(KEYSPACE).build()); + Assertions.assertEquals(sourceResultSet.getColumnDefinitions().size(), sinkResultSet.getColumnDefinitions().size()); + Iterator sourceIterator = sourceResultSet.iterator(); + Iterator sinkIterator = sinkResultSet.iterator(); + while (sourceIterator.hasNext()) { + if (sinkIterator.hasNext()) { + Row sourceNext = sourceIterator.next(); + Row sinkNext = sinkIterator.next(); + for (String column : columnList) { + Object source = sourceNext.getObject(column); + Object sink = sinkNext.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceNext.get(column, ByteArrayInputStream.class); + InputStream sinkAsciiStream = sinkNext.get(column, ByteArrayInputStream.class); + Assertions.assertNotNull(sourceAsciiStream); + Assertions.assertNotNull(sinkAsciiStream); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); + } + } + } + + } + + private void createKeyspace() { + try { + this.session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + + " WITH replication = \n" + + "{'class':'SimpleStrategy','replication_factor':'1'};"); + } catch (Exception e) { + throw new RuntimeException("Create keyspace failed!", e); + } + } + + private void clearSinkTable() { + try { + session.execute(SimpleStatement.builder(String.format("truncate table %s", SINK_TABLE)).setKeyspace(KEYSPACE).build()); + } catch (Exception e) { + throw new RuntimeException("Test clickhouse server image error", e); + } + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_ascii", + "c_bigint", + "c_blob", + "c_boolean", + "c_decimal", + "c_double", + "c_float", + "c_int", + "c_timestamp", + "c_uuid", + "c_text", + "c_varint", + "c_timeuuid", + "c_inet", + "c_date", + "c_smallint", + "c_tinyint", + "c_list_float", + "c_list_int", + "c_set_double", + "c_set_bigint", + "c_map" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + BasicType.STRING_TYPE, + BasicType.LONG_TYPE, + ArrayType.BYTE_ARRAY_TYPE, + BasicType.BOOLEAN_TYPE, + new DecimalType(9, 4), + BasicType.DOUBLE_TYPE, + BasicType.FLOAT_TYPE, + BasicType.INT_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + BasicType.SHORT_TYPE, + BasicType.BYTE_TYPE, + ArrayType.FLOAT_ARRAY_TYPE, + ArrayType.INT_ARRAY_TYPE, + ArrayType.DOUBLE_ARRAY_TYPE, + ArrayType.LONG_ARRAY_TYPE, + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + List rows = new ArrayList<>(); + for (int i = 0; i < 2; ++i) { + SeaTunnelRow row; + try { + row = new SeaTunnelRow( + new Object[]{ + (long) i, + String.valueOf(i), + (long) i, + ByteBuffer.wrap(new byte[]{Byte.parseByte("1")}), + Boolean.FALSE, + BigDecimal.valueOf(11L, 1), + Double.parseDouble("1.1"), + Float.parseFloat("2.1"), + i, + Instant.now(), + UUID.randomUUID(), + "text", + new BigInteger("12345678909876543210"), + Uuids.timeBased(), + InetAddress.getByName("1.2.3.4"), + LocalDate.now(), + Short.parseShort("1"), + Byte.parseByte("1"), + Collections.singletonList((float) i), + Collections.singletonList(i), + Collections.singleton(Double.valueOf("1.1")), + Collections.singleton((long) i), + Collections.singletonMap("key_" + i, i) + }); + } catch (UnknownHostException e) { + throw new RuntimeException("Generate Test DataSet", e); + } + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + private Row getRow() { + try { + String sql = String.format("select * from %s limit 1", SINK_TABLE); + ResultSet resultSet = session.execute(SimpleStatement.builder(sql).setKeyspace(KEYSPACE).build()); + return resultSet.one(); + } catch (Exception e) { + throw new RuntimeException("test cassandra server image error", e); + } + } + + private void initCassandraConfig() { + File file = ContainerUtil.getResourcesFile(INIT_CASSANDRA_PATH); + Config config = ConfigFactory.parseFile(file); + assert config.hasPath(SOURCE_TABLE) && config.hasPath(SINK_TABLE) && config.hasPath(INSERT_CQL); + this.config = config; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (this.session != null) { + this.session.close(); + } + if (this.container != null) { + this.container.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf new file mode 100644 index 00000000000..5955a851605 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Cassandra { + host = "cassandra:9042" + username = "" + password = "" + datacenter = "datacenter1" + keyspace = "test" + cql = "select * from source_table" + result_table_name = "source_table" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Cassandra { + host = "cassandra:9042" + username = "" + password = "" + datacenter = "datacenter1" + keyspace = "test" + table = "sink_table" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf new file mode 100644 index 00000000000..62b49524403 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source_table = """ +create table if not exists source_table( + id bigint, + c_ascii ascii, + c_bigint bigint, + c_blob blob, + c_boolean boolean, + c_decimal decimal, + c_double double, + c_float float, + c_int int, + c_timestamp timestamp, + c_uuid uuid, + c_text text, + c_varint varint, + c_timeuuid timeuuid, + c_inet inet, + c_date date, + c_smallint smallint, + c_tinyint tinyint, + c_list_float list, + c_list_int list, + c_set_double set, + c_set_bigint set, + c_map map, + PRIMARY KEY (id) +); +""" + +sink_table = """ +create table if not exists sink_table( + id bigint, + c_ascii ascii, + c_bigint bigint, + c_blob blob, + c_boolean boolean, + c_decimal decimal, + c_double double, + c_float float, + c_int int, + c_timestamp timestamp, + c_uuid uuid, + c_text text, + c_varint varint, + c_timeuuid timeuuid, + c_inet inet, + c_date date, + c_smallint smallint, + c_tinyint tinyint, + c_list_float list, + c_list_int list, + c_set_double set, + c_set_bigint set, + c_map map, + PRIMARY KEY (id) +); +""" + +insert_cql = """ +insert into source_table +( + id, + c_ascii, + c_bigint, + c_blob, + c_boolean, + c_decimal, + c_double, + c_float, + c_int, + c_timestamp, + c_uuid, + c_text, + c_varint, + c_timeuuid, + c_inet, + c_date, + c_smallint, + c_tinyint, + c_list_float, + c_list_int, + c_set_double, + c_set_bigint, + c_map +) +values +(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) +""" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index d5404c565d7..f4227ad71a2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -29,6 +29,7 @@ connector-redis-e2e connector-clickhouse-e2e connector-influxdb-e2e + connector-cassandra-e2e seatunnel-connector-v2-e2e From 1cb1906d80f4a6eced7b63df161337e2434118da Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sat, 29 Oct 2022 21:53:22 +0800 Subject: [PATCH 03/22] add cassandra connector --- docs/en/connector-v2/sink/Cassandra.md | 93 ++++++++++++++++++ docs/en/connector-v2/source/Cassandra.md | 76 +++++++++++++++ plugin-mapping.properties | 4 +- .../cassandra/client/CassandraClient.java | 4 +- .../cassandra/config/CassandraConfig.java | 9 +- .../cassandra/sink/CassandraSink.java | 33 ++++--- .../cassandra/sink/CassandraSinkWriter.java | 38 +++++++- .../cassandra/source/CassandraSource.java | 56 +++++------ .../source/CassandraSourceReader.java | 35 +++---- .../source/CassandraSourceSplit.java | 27 ------ .../CassandraSourceSplitEnumerator.java | 97 ------------------- .../cassandra/state/CassandraSourceState.java | 23 ----- .../seatunnel/cassandra/CassandraIT.java | 2 +- .../resources/cassandra_to_cassandra.conf | 1 + 14 files changed, 270 insertions(+), 228 deletions(-) create mode 100644 docs/en/connector-v2/sink/Cassandra.md create mode 100644 docs/en/connector-v2/source/Cassandra.md delete mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java delete mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java delete mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md new file mode 100644 index 00000000000..403870859f0 --- /dev/null +++ b/docs/en/connector-v2/sink/Cassandra.md @@ -0,0 +1,93 @@ +# Cassandra + +> Cassandra sink connector + +## Description + +Write data to Apache Cassandra. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------------|--------|----------|---------------| +| host | String | Yes | - | +| keyspace | String | Yes | - | +| table | String | Yes | - | +| username | String | No | - | +| password | String | No | - | +| datacenter | String | No | datacenter1 | +| consistency_level | String | No | LOCAL_ONE | +| fields | String | No | LOCAL_ONE | +| batch_size | String | No | 5000 | +| batch_type | String | No | UNLOGGER | +| async_write | String | No | true | + +### host [string] + +`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as +`"cassandra1:9042,cassandra2:9042"`. + +### keyspace [string] + +The `Cassandra` keyspace. + +### table [String] + +The `Cassandra` table name. + +### username [string] + +`Cassandra` user username. + +### password [string] + +`Cassandra` user password. + +### datacenter [String] + +The `Cassandra` datacenter, default is `datacenter1`. + +### consistency_level [String] + +The `Cassandra` write consistency level, default is `LOCAL_ONE`. + +### fields [array] + +The data field that needs to be output to `Cassandra` , if not configured, it will be automatically adapted +according to the sink table `schema`. + +### batch_size [number] + +The number of rows written through [Cassandra-Java-Driver](https://github.com/datastax/java-driver) each time, +default is `5000`. + +### batch_type [String] + +The `Cassandra` batch processing mode, default is `UNLOGGER`. + +### async_write [boolean] + +Whether `cassandra` writes in asynchronous mode, default is `true`. + +## Examples + +```hocon +sink { + Cassandra { + host = "localhost:9042" + username = "cassandra" + password = "cassandra" + datacenter = "datacenter1" + keyspace = "test" + } +} +``` + + + diff --git a/docs/en/connector-v2/source/Cassandra.md b/docs/en/connector-v2/source/Cassandra.md new file mode 100644 index 00000000000..66642f4163d --- /dev/null +++ b/docs/en/connector-v2/source/Cassandra.md @@ -0,0 +1,76 @@ +# Cassandra + +> Cassandra source connector + +## Description + +Read data from Apache Cassandra. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------------------|--------|----------|---------------| +| host | String | Yes | - | +| keyspace | String | Yes | - | +| cql | String | Yes | - | +| username | String | No | - | +| password | String | No | - | +| datacenter | String | No | datacenter1 | +| consistency_level | String | No | LOCAL_ONE | + +### host [string] + +`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as +`"cassandra1:9042,cassandra2:9042"`. + +### keyspace [string] + +The `Cassandra` keyspace. + +### cql [String] + +The query cql used to search data though Cassandra session. + +### username [string] + +`Cassandra` user username. + +### password [string] + +`Cassandra` user password. + +### datacenter [String] + +The `Cassandra` datacenter, default is `datacenter1`. + +### consistency_level [String] + +The `Cassandra` write consistency level, default is `LOCAL_ONE`. + +## Examples + +```hocon +source { + Cassandra { + host = "localhost:9042" + username = "cassandra" + password = "cassandra" + datacenter = "datacenter1" + keyspace = "test" + cql = "select * from source_table" + result_table_name = "source_table" + } +} +``` + + + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e022fa59b1b..dff93bdf2aa 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -135,4 +135,6 @@ seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 -seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file +seatunnel.sink.S3File = connector-file-s3 +seatunnel.source.Cassandra = connector-cassandra +seatunnel.sink.Cassandra = connector-cassandra \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java index deff52a6b57..a806b48fe62 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java @@ -32,7 +32,7 @@ public class CassandraClient { public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String keyspace, String username, String password, String dataCenter) { - List cqlSessionList = Arrays.stream(nodeAddress.split(",")).map(address -> { + List cqlSessionBuilderList = Arrays.stream(nodeAddress.split(",")).map(address -> { String[] nodeAndPort = address.split(":", 2); if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) { return CqlSession.builder() @@ -46,7 +46,7 @@ public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String .withKeyspace(keyspace) .withLocalDatacenter(dataCenter); }).collect(Collectors.toList()); - return cqlSessionList.get(ThreadLocalRandom.current().nextInt(cqlSessionList.size())); + return cqlSessionBuilderList.get(ThreadLocalRandom.current().nextInt(cqlSessionBuilderList.size())); } public static SimpleStatement createSimpleStatement(String cql, ConsistencyLevel consistencyLevel) { diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java index a23b3bb0eaa..c2d6c129f06 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java @@ -46,6 +46,7 @@ public class CassandraConfig implements Serializable { public static final String CONSISTENCY_LEVEL = "consistency_level"; public static final String BATCH_SIZE = "batch_size"; public static final String BATCH_TYPE = "batch_type"; + public static final String ASYNC_WRITE = "async_write"; private String host; private String username; @@ -58,6 +59,7 @@ public class CassandraConfig implements Serializable { private ConsistencyLevel consistencyLevel; private Integer batchSize; private DefaultBatchType batchType; + private Boolean asyncWrite; public CassandraConfig(@NonNull String host, @NonNull String keyspace) { this.host = host; @@ -97,13 +99,18 @@ public static CassandraConfig getCassandraConfig(Config config) { if (config.hasPath(BATCH_SIZE)) { cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE)); } else { - cassandraConfig.setBatchSize(Integer.parseInt("100")); + cassandraConfig.setBatchSize(Integer.parseInt("5000")); } if (config.hasPath(BATCH_TYPE)) { cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE))); } else { cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED); } + if (config.hasPath(ASYNC_WRITE)) { + cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE)); + } else { + cassandraConfig.setAsyncWrite(true); + } return cassandraConfig; } } diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java index ac32376aa20..b9c1154b50c 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java @@ -63,30 +63,33 @@ public String getPluginName() { public void prepare(Config config) throws PrepareFailException { CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE); if (!checkResult.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, checkResult.getMsg()); + throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg()); } this.cassandraConfig = CassandraConfig.getCassandraConfig(config); - CqlSession session = CassandraClient.getCqlSessionBuilder( + try (CqlSession session = CassandraClient.getCqlSessionBuilder( cassandraConfig.getHost(), cassandraConfig.getKeyspace(), cassandraConfig.getUsername(), cassandraConfig.getPassword(), cassandraConfig.getDatacenter() - ).build(); - List fields = cassandraConfig.getFields(); - this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable()); - if (fields == null || fields.isEmpty()) { - List newFields = new ArrayList<>(); - for (int i = 0; i < tableSchema.size(); i++) { - newFields.add(tableSchema.get(i).getName().asInternal()); - } - cassandraConfig.setFields(newFields); - } else { - for (String field : fields) { - if (!tableSchema.contains(field)) { - throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE)); + ).build()) { + List fields = cassandraConfig.getFields(); + this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable()); + if (fields == null || fields.isEmpty()) { + List newFields = new ArrayList<>(); + for (int i = 0; i < tableSchema.size(); i++) { + newFields.add(tableSchema.get(i).getName().asInternal()); + } + cassandraConfig.setFields(newFields); + } else { + for (String field : fields) { + if (!tableSchema.contains(field)) { + throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE)); + } } } + } catch (Exception e) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, e.getMessage()); } } diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java index b164c806d5a..9b941cd5bcd 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; @@ -32,7 +33,10 @@ import com.datastax.oss.driver.api.core.type.DataType; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; public class CassandraSinkWriter extends AbstractSinkWriter { @@ -42,6 +46,8 @@ public class CassandraSinkWriter extends AbstractSinkWriter private final ColumnDefinitions tableSchema; private final CqlSession session; private BatchStatement batchStatement; + private List boundStatementList; + private List> completionStages; private final PreparedStatement preparedStatement; private final AtomicInteger counter = new AtomicInteger(0); @@ -56,6 +62,8 @@ public CassandraSinkWriter(CassandraConfig cassandraConfig, SeaTunnelRowType sea cassandraConfig.getPassword(), cassandraConfig.getDatacenter()).build(); this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build(); + this.boundStatementList = new ArrayList<>(); + this.completionStages = new ArrayList<>(); this.preparedStatement = session.prepare(initPrepareSQL()); } @@ -70,8 +78,28 @@ public void write(SeaTunnelRow row) throws IOException { } private void flush() { - this.session.execute(this.batchStatement); - this.batchStatement.clear(); + if (cassandraConfig.getAsyncWrite()) { + completionStages.forEach(resultStage -> resultStage.whenComplete( + (resultSet, error) -> { + if (error != null) { + error.printStackTrace(); + } + } + )); + completionStages.clear(); + } else { + try { + this.session.execute(this.batchStatement.addAll(boundStatementList)); + } catch (Exception e) { + for (BoundStatement statement : boundStatementList) { + this.session.execute(statement); + } + } finally { + this.batchStatement.clear(); + this.boundStatementList.clear(); + } + } + } private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) { @@ -82,7 +110,11 @@ private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) { Object fieldValue = row.getField(seaTunnelRowType.indexOf(fieldName)); boundStatement = TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue); } - this.batchStatement = this.batchStatement.add(boundStatement); + if (cassandraConfig.getAsyncWrite()) { + completionStages.add(session.executeAsync(boundStatement)); + } else { + boundStatementList.add(boundStatement); + } } catch (Exception e) { throw new RuntimeException("Add row data into batch error", e); } diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java index bfce6cfe5c0..163433c4744 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java @@ -24,8 +24,6 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.source.SourceReader; -import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -34,8 +32,10 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; -import org.apache.seatunnel.connectors.seatunnel.cassandra.state.CassandraSourceState; import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -44,7 +44,7 @@ import com.google.auto.service.AutoService; @AutoService(SeaTunnelSource.class) -public class CassandraSource implements SeaTunnelSource { +public class CassandraSource extends AbstractSingleSplitSource { private SeaTunnelRowType rowTypeInfo; private CassandraConfig cassandraConfig; @@ -56,34 +56,32 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - this.cassandraConfig = CassandraConfig.getCassandraConfig(config); CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, CQL); if (!checkResult.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, checkResult.getMsg()); } - // try { - CqlSession currentSession = CassandraClient.getCqlSessionBuilder( + this.cassandraConfig = CassandraConfig.getCassandraConfig(config); + try (CqlSession currentSession = CassandraClient.getCqlSessionBuilder( cassandraConfig.getHost(), cassandraConfig.getKeyspace(), cassandraConfig.getUsername(), cassandraConfig.getPassword(), - cassandraConfig.getDatacenter() - ).build(); - Row rs = currentSession.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())).one(); - if (rs == null) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "No data in the table!"); - } - int columnSize = rs.getColumnDefinitions().size(); - String[] fieldNames = new String[columnSize]; - SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; - for (int i = 0; i < columnSize; i++) { - fieldNames[i] = rs.getColumnDefinitions().get(i).getName().asInternal(); - seaTunnelDataTypes[i] = TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType()); + cassandraConfig.getDatacenter()).build()) { + Row rs = currentSession.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())).one(); + if (rs == null) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "No data in the table!"); + } + int columnSize = rs.getColumnDefinitions().size(); + String[] fieldNames = new String[columnSize]; + SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; + for (int i = 0; i < columnSize; i++) { + fieldNames[i] = rs.getColumnDefinitions().get(i).getName().asInternal(); + seaTunnelDataTypes[i] = TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType()); + } + this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); + } catch (Exception e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, e.getMessage()); } - this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); - // } catch (Exception e) { - // throw new PrepareFailException(getPluginName(), PluginType.SOURCE, e.getMessage()); - // } } @Override @@ -97,18 +95,8 @@ public SeaTunnelDataType getProducedType() { } @Override - public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { return new CassandraSourceReader(cassandraConfig, readerContext); } - @Override - public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new CassandraSourceSplitEnumerator(enumeratorContext); - } - - @Override - public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, CassandraSourceState checkpointState) throws Exception { - return new CassandraSourceSplitEnumerator(enumeratorContext); - } - } diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java index bcc78788d41..fb4abf8c53c 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java @@ -17,32 +17,30 @@ package org.apache.seatunnel.connectors.seatunnel.cassandra.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.ResultSet; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; @Slf4j -public class CassandraSourceReader implements SourceReader { +public class CassandraSourceReader extends AbstractSingleSplitReader { private final CassandraConfig cassandraConfig; - private final SourceReader.Context readerContext; - private final List splits; + private final SingleSplitReaderContext readerContext; private CqlSession session; - CassandraSourceReader(CassandraConfig cassandraConfig, Context readerContext) { + CassandraSourceReader(CassandraConfig cassandraConfig, SingleSplitReaderContext readerContext) { this.cassandraConfig = cassandraConfig; this.readerContext = readerContext; - this.splits = new ArrayList<>(); } @Override @@ -65,27 +63,16 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - if (!splits.isEmpty()) { + try { ResultSet resultSet = session.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())); resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row))); - this.readerContext.signalNoMoreElement(); + } finally { + if (Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + this.readerContext.signalNoMoreElement(); + } } } - @Override - public List snapshotState(long checkpointId) throws Exception { - return null; - } - - @Override - public void addSplits(List splits) { - this.splits.addAll(splits); - } - - @Override - public void handleNoMoreSplits() { - } - @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { } diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java deleted file mode 100644 index 5cac1cdcebb..00000000000 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplit.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.cassandra.source; - -import org.apache.seatunnel.api.source.SourceSplit; - -public class CassandraSourceSplit implements SourceSplit { - @Override - public String splitId() { - return null; - } -} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java deleted file mode 100644 index 9ccf365eec6..00000000000 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceSplitEnumerator.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.cassandra.source; - -import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.connectors.seatunnel.cassandra.state.CassandraSourceState; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -public class CassandraSourceSplitEnumerator implements SourceSplitEnumerator { - - private final Context context; - private final Set readers; - private volatile int assigned = -1; - - CassandraSourceSplitEnumerator(Context context) { - this.context = context; - this.readers = new HashSet<>(); - } - - @Override - public void open() { - } - - @Override - public void run() throws Exception { - - } - - @Override - public void close() throws IOException { - - } - - @Override - public void addSplitsBack(List splits, int subtaskId) { - if (splits.isEmpty()) { - return; - } - if (subtaskId == assigned) { - Optional otherReader = readers.stream().filter(r -> r != subtaskId).findAny(); - if (otherReader.isPresent()) { - context.assignSplit(otherReader.get(), splits); - } else { - assigned = -1; - } - } - } - - @Override - public int currentUnassignedSplitSize() { - return assigned < 0 ? 0 : 1; - } - - @Override - public void handleSplitRequest(int subtaskId) { - - } - - @Override - public void registerReader(int subtaskId) { - readers.add(subtaskId); - if (assigned < 0) { - assigned = subtaskId; - context.assignSplit(subtaskId, new CassandraSourceSplit()); - } - } - - @Override - public CassandraSourceState snapshotState(long checkpointId) throws Exception { - return null; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - - } -} diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java deleted file mode 100644 index c8a0c8045cd..00000000000 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/state/CassandraSourceState.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.cassandra.state; - -import java.io.Serializable; - -public class CassandraSourceState implements Serializable { -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index fe13edcf298..ff5f5d62eac 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -289,7 +289,7 @@ private static Tuple2> generateTestDataSet( new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) }); List rows = new ArrayList<>(); - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < 50; ++i) { SeaTunnelRow row; try { row = new SeaTunnelRow( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf index 5955a851605..889606f2c70 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf @@ -51,6 +51,7 @@ sink { password = "" datacenter = "datacenter1" keyspace = "test" + async_write = "true" table = "sink_table" } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, From 1962d948edf863bf3a4af37ce7b2afd7603161ee Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sat, 29 Oct 2022 23:17:44 +0800 Subject: [PATCH 04/22] update --- docs/en/connector-v2/sink/Cassandra.md | 6 ++++++ docs/en/connector-v2/source/Cassandra.md | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md index 403870859f0..1040c61df9f 100644 --- a/docs/en/connector-v2/sink/Cassandra.md +++ b/docs/en/connector-v2/sink/Cassandra.md @@ -89,5 +89,11 @@ sink { } ``` +## Changelog + +### next version + +- Add Cassandra Sink Connector + diff --git a/docs/en/connector-v2/source/Cassandra.md b/docs/en/connector-v2/source/Cassandra.md index 66642f4163d..e2a6e4e8c77 100644 --- a/docs/en/connector-v2/source/Cassandra.md +++ b/docs/en/connector-v2/source/Cassandra.md @@ -72,5 +72,11 @@ source { } ``` +## Changelog + +### next version + +- Add Cassandra Source Connector + From b305857d58d75e858efc136a07541bced4aa7154 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sun, 30 Oct 2022 16:08:00 +0800 Subject: [PATCH 05/22] update --- .../seatunnel/connectors/seatunnel/cassandra/CassandraIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index ff5f5d62eac..a916208ac76 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -314,7 +314,7 @@ private static Tuple2> generateTestDataSet( Byte.parseByte("1"), Collections.singletonList((float) i), Collections.singletonList(i), - Collections.singleton(Double.valueOf("1.1")), + Collections.singleton(Double.valueOf("2.1")), Collections.singleton((long) i), Collections.singletonMap("key_" + i, i) }); From fb4b8528368fe61c0a6722e11c1f0f319b46d35c Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Sun, 30 Oct 2022 23:59:23 +0800 Subject: [PATCH 06/22] update --- seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index f4227ad71a2..d1d8a891f97 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -29,6 +29,7 @@ connector-redis-e2e connector-clickhouse-e2e connector-influxdb-e2e + connector-cassandra-e2e From 4b33676458c670a6ab6efd4a2e9e1ec136235d6e Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Mon, 31 Oct 2022 00:12:12 +0800 Subject: [PATCH 07/22] update --- seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index d1d8a891f97..d5404c565d7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -29,8 +29,6 @@ connector-redis-e2e connector-clickhouse-e2e connector-influxdb-e2e - - connector-cassandra-e2e seatunnel-connector-v2-e2e From 5d318a01d2ebe6eff68e23c075ff77cd946a240b Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Mon, 31 Oct 2022 00:12:53 +0800 Subject: [PATCH 08/22] update --- seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 46bb9df8fff..d000bd68c47 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -30,6 +30,7 @@ connector-clickhouse-e2e connector-influxdb-e2e connector-file-local-e2e + connector-cassandra-e2e seatunnel-connector-v2-e2e From fced49116922cedff23d72ee6ac584abb143a8dc Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Mon, 31 Oct 2022 22:02:11 +0800 Subject: [PATCH 09/22] update --- .../seatunnel/connectors/seatunnel/cassandra/CassandraIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index a916208ac76..77960cdfa8d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -122,7 +122,7 @@ public void startUp() throws Exception { Awaitility.given() .ignoreExceptions() .await() - .atMost(180L, TimeUnit.SECONDS) + .atMost(360L, TimeUnit.SECONDS) .untilAsserted(this::initConnection); this.initializeCassandraTable(); this.batchInsertData(); From 7166ad6a435b61ff0dd577444e6d3803f95a7070 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Tue, 1 Nov 2022 00:04:18 +0800 Subject: [PATCH 10/22] update --- .../seatunnel/cassandra/CassandraIT.java | 20 +++++++++++---- .../src/test/resources/application.conf | 25 +++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index 77960cdfa8d..2b61fcb4d38 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -31,6 +31,8 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DriverConfig; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -89,6 +91,7 @@ public class CassandraIT extends TestSuiteBase implements TestResource { private static final Integer PORT = 9042; private static final String INIT_CASSANDRA_PATH = "/init/cassandra_init.conf"; private static final String CASSANDRA_JOB_CONFIG = "/cassandra_to_cassandra.conf"; + private static final String CASSANDRA_DRIVER_CONFIG = "/application.conf"; private static final String DATACENTER = "datacenter1"; private static final String KEYSPACE = "test"; private static final String SOURCE_TABLE = "source_table"; @@ -122,7 +125,7 @@ public void startUp() throws Exception { Awaitility.given() .ignoreExceptions() .await() - .atMost(360L, TimeUnit.SECONDS) + .atMost(180L, TimeUnit.SECONDS) .untilAsserted(this::initConnection); this.initializeCassandraTable(); this.batchInsertData(); @@ -140,10 +143,17 @@ private void initializeCassandraTable() { } private void initConnection() { - this.session = CqlSession.builder() - .addContactPoint(new InetSocketAddress(container.getHost(), container.getExposedPorts().get(0))) - .withLocalDatacenter(DATACENTER) - .build(); + try { + File file = new File(CASSANDRA_DRIVER_CONFIG); + this.session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(container.getHost(), container.getExposedPorts().get(0))) + .withLocalDatacenter(DATACENTER) + .withConfigLoader(DriverConfigLoader.fromFile(file)) + .build(); + } catch (Exception e) { + throw new RuntimeException("Init connection failed!", e); + } + } private void batchInsertData() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf new file mode 100644 index 00000000000..88be3f73c57 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +datastax-java-driver { + advanced.protocol.version = V5 + profiles { + slow { + basic.request.timeout = 10 seconds + } + } +} \ No newline at end of file From 413b34c10c4683130b0cf1ba9a31fb088a34ff74 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Tue, 1 Nov 2022 08:23:09 +0800 Subject: [PATCH 11/22] update --- .../seatunnel/connectors/seatunnel/cassandra/CassandraIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index 2b61fcb4d38..29bb2ba37df 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -31,7 +31,6 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.config.DriverConfig; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BatchType; From e0a2a185f07be85de4812167f4bed28f93259dd0 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Tue, 1 Nov 2022 13:06:33 +0800 Subject: [PATCH 12/22] update --- docs/en/connector-v2/sink/Cassandra.md | 1 - .../cassandra/sink/CassandraSinkWriter.java | 14 +++++++++----- .../seatunnel/cassandra/CassandraIT.java | 6 +++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md index 1040c61df9f..0a4ece086ec 100644 --- a/docs/en/connector-v2/sink/Cassandra.md +++ b/docs/en/connector-v2/sink/Cassandra.md @@ -8,7 +8,6 @@ Write data to Apache Cassandra. ## Key features -- [x] [batch](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [schema projection](../../concept/connector-v2-features.md) diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java index 9b941cd5bcd..10639f7f054 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient; import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig; import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil; @@ -31,6 +32,7 @@ import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.type.DataType; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +41,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; +@Slf4j public class CassandraSinkWriter extends AbstractSinkWriter { private final CassandraConfig cassandraConfig; @@ -64,7 +67,7 @@ public CassandraSinkWriter(CassandraConfig cassandraConfig, SeaTunnelRowType sea this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build(); this.boundStatementList = new ArrayList<>(); this.completionStages = new ArrayList<>(); - this.preparedStatement = session.prepare(initPrepareSQL()); + this.preparedStatement = session.prepare(initPrepareCQL()); } @Override @@ -82,7 +85,7 @@ private void flush() { completionStages.forEach(resultStage -> resultStage.whenComplete( (resultSet, error) -> { if (error != null) { - error.printStackTrace(); + log.error(ExceptionUtils.getMessage(error)); } } )); @@ -91,6 +94,7 @@ private void flush() { try { this.session.execute(this.batchStatement.addAll(boundStatementList)); } catch (Exception e) { + log.error("Batch insert error,Try inserting one by one!"); for (BoundStatement statement : boundStatementList) { this.session.execute(statement); } @@ -116,11 +120,11 @@ private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) { boundStatementList.add(boundStatement); } } catch (Exception e) { - throw new RuntimeException("Add row data into batch error", e); + throw new RuntimeException("Add row data into batch error!", e); } } - private String initPrepareSQL() { + private String initPrepareCQL() { String[] placeholder = new String[cassandraConfig.getFields().size()]; Arrays.fill(placeholder, "?"); return String.format("INSERT INTO %s (%s) VALUES (%s)", @@ -137,7 +141,7 @@ public void close() throws IOException { this.session.close(); } } catch (Exception e) { - throw new RuntimeException("Failed to close CqlSession.", e); + throw new RuntimeException("Failed to close CqlSession!", e); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index 29bb2ba37df..e6170b842d5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -241,7 +241,7 @@ private void clearSinkTable() { try { session.execute(SimpleStatement.builder(String.format("truncate table %s", SINK_TABLE)).setKeyspace(KEYSPACE).build()); } catch (Exception e) { - throw new RuntimeException("Test clickhouse server image error", e); + throw new RuntimeException("Test clickhouse server image failed!", e); } } @@ -328,7 +328,7 @@ private static Tuple2> generateTestDataSet( Collections.singletonMap("key_" + i, i) }); } catch (UnknownHostException e) { - throw new RuntimeException("Generate Test DataSet", e); + throw new RuntimeException("Generate Test DataSet Failed!", e); } rows.add(row); } @@ -341,7 +341,7 @@ private Row getRow() { ResultSet resultSet = session.execute(SimpleStatement.builder(sql).setKeyspace(KEYSPACE).build()); return resultSet.one(); } catch (Exception e) { - throw new RuntimeException("test cassandra server image error", e); + throw new RuntimeException("test cassandra server image failed!", e); } } From 803efc5dd54309ca33ec45cdc07a31b0a5bdc46e Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Tue, 1 Nov 2022 14:23:47 +0800 Subject: [PATCH 13/22] update --- .../seatunnel/connectors/seatunnel/cassandra/CassandraIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index e6170b842d5..ffec92bbbf1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -308,7 +308,7 @@ private static Tuple2> generateTestDataSet( (long) i, ByteBuffer.wrap(new byte[]{Byte.parseByte("1")}), Boolean.FALSE, - BigDecimal.valueOf(11L, 1), + BigDecimal.valueOf(11L, 2), Double.parseDouble("1.1"), Float.parseFloat("2.1"), i, From 5fcbfa70a8a4774c0479d828cf549304b7991582 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Thu, 3 Nov 2022 16:11:19 +0800 Subject: [PATCH 14/22] update --- docs/en/Connector-v2-release-state.md | 102 +++++++++++++------------- 1 file changed, 52 insertions(+), 50 deletions(-) diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 4e29be67700..c8ddeedd561 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -9,53 +9,55 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex ## Connector V2 Health -| Connector Name | Type | Status | Support Version | -|-------------------------------------------------------------|--------|--------|-----------------| -| [Asset](connector-v2/sink/Assert.md) | Sink | Beta | 2.2.0-beta | -| [ClickHouse](connector-v2/source/Clickhouse.md) | Source | Beta | 2.2.0-beta | -| [ClickHouse](connector-v2/sink/Clickhouse.md) | Sink | Beta | 2.2.0-beta | -| [ClickHouseFile](connector-v2/sink/ClickhouseFile.md) | Sink | Beta | 2.2.0-beta | -| [Console](connector-v2/sink/Console.md) | Sink | Beta | 2.2.0-beta | -| [DataHub](connector-v2/sink/Datahub.md) | Sink | Alpha | 2.2.0-beta | -| [DingTalk](connector-v2/sink/dingtalk.md) | Sink | Alpha | 2.2.0-beta | -| [Elasticsearch](connector-v2/sink/Elasticsearch.md) | Sink | Beta | 2.2.0-beta | -| [Email](connector-v2/sink/Email.md) | Sink | Alpha | 2.2.0-beta | -| [Enterprise WeChat](connector-v2/sink/Enterprise-WeChat.md) | Sink | Alpha | 2.2.0-beta | -| [FeiShu](connector-v2/sink/Feishu.md) | Sink | Alpha | 2.2.0-beta | -| [Fake](connector-v2/source/FakeSource.md) | Source | Beta | 2.2.0-beta | -| [FtpFile](connector-v2/sink/FtpFile.md) | Sink | Alpha | 2.2.0-beta | -| [Greenplum](connector-v2/sink/Greenplum.md) | Sink | Alpha | 2.2.0-beta | -| [Greenplum](connector-v2/source/Greenplum.md) | Source | Alpha | 2.2.0-beta | -| [HdfsFile](connector-v2/sink/HdfsFile.md) | Sink | Beta | 2.2.0-beta | -| [HdfsFile](connector-v2/source/HdfsFile.md) | Source | Beta | 2.2.0-beta | -| [Hive](connector-v2/sink/Hive.md) | Sink | Beta | 2.2.0-beta | -| [Hive](connector-v2/source/Hive.md) | Source | Beta | 2.2.0-beta | -| [Http](connector-v2/sink/Http.md) | Sink | Beta | 2.2.0-beta | -| [Http](connector-v2/source/Http.md) | Source | Beta | 2.2.0-beta | -| [Hudi](connector-v2/source/Hudi.md) | Source | Alpha | 2.2.0-beta | -| [Iceberg](connector-v2/source/Iceberg.md) | Source | Alpha | 2.2.0-beta | -| [IoTDB](connector-v2/source/IoTDB.md) | Source | Beta | 2.2.0-beta | -| [IoTDB](connector-v2/sink/IoTDB.md) | Sink | Beta | 2.2.0-beta | -| [Jdbc](connector-v2/source/Jdbc.md) | Source | Beta | 2.2.0-beta | -| [Jdbc](connector-v2/sink/Jdbc.md) | Sink | Beta | 2.2.0-beta | -| [Kudu](connector-v2/source/Kudu.md) | Source | Alpha | 2.2.0-beta | -| [Kudu](connector-v2/sink/Kudu.md) | Sink | Alpha | 2.2.0-beta | -| [LocalFile](connector-v2/sink/LocalFile.md) | Sink | Beta | 2.2.0-beta | -| [LocalFile](connector-v2/source/LocalFile.md) | Source | Beta | 2.2.0-beta | -| [MongoDB](connector-v2/source/MongoDB.md) | Source | Beta | 2.2.0-beta | -| [MongoDB](connector-v2/sink/MongoDB.md) | Sink | Beta | 2.2.0-beta | -| [Neo4j](connector-v2/sink/Neo4j.md) | Sink | Alpha | 2.2.0-beta | -| [OssFile](connector-v2/sink/OssFile.md) | Sink | Alpha | 2.2.0-beta | -| [OssFile](connector-v2/source/OssFile.md) | Source | Beta | 2.2.0-beta | -| [Phoenix](connector-v2/sink/Phoenix.md) | Sink | Alpha | 2.2.0-beta | -| [Phoenix](connector-v2/source/Phoenix.md) | Source | Alpha | 2.2.0-beta | -| [Pulsar](connector-v2/source/pulsar.md) | Source | Beta | 2.2.0-beta | -| [Redis](connector-v2/sink/Redis.md) | Sink | Beta | 2.2.0-beta | -| [Redis](connector-v2/source/Redis.md) | Source | Alpha | 2.2.0-beta | -| [Sentry](connector-v2/sink/Sentry.md) | Sink | Alpha | 2.2.0-beta | -| [Socket](connector-v2/sink/Socket.md) | Sink | Alpha | 2.2.0-beta | -| [Socket](connector-v2/source/Socket.md) | Source | Alpha | 2.2.0-beta | -| [Kafka](connector-v2/source/kafka.md) | Source | Alpha | 2.3.0-beta | -| [Kafka](connector-v2/sink/Kafka.md) | Sink | Alpha | 2.3.0-beta | -| [S3File](connector-v2/source/S3File.md) | Source | Alpha | 2.3.0-beta | -| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta | \ No newline at end of file +| Connector Name | Type | Status | Support Version | +|-------------------------------------------------------------------|--------|--------|-----------------| +| [Asset](connector-v2/sink/Assert.md) | Sink | Beta | 2.2.0-beta | +| [ClickHouse](connector-v2/source/Clickhouse.md) | Source | Beta | 2.2.0-beta | +| [ClickHouse](connector-v2/sink/Clickhouse.md) | Sink | Beta | 2.2.0-beta | +| [ClickHouseFile](connector-v2/sink/ClickhouseFile.md) | Sink | Beta | 2.2.0-beta | +| [Console](connector-v2/sink/Console.md) | Sink | Beta | 2.2.0-beta | +| [DataHub](connector-v2/sink/Datahub.md) | Sink | Alpha | 2.2.0-beta | +| [DingTalk](connector-v2/sink/dingtalk.md) | Sink | Alpha | 2.2.0-beta | +| [Elasticsearch](connector-v2/sink/Elasticsearch.md) | Sink | Beta | 2.2.0-beta | +| [Email](connector-v2/sink/Email.md) | Sink | Alpha | 2.2.0-beta | +| [Enterprise WeChat](connector-v2/sink/Enterprise-WeChat.md) | Sink | Alpha | 2.2.0-beta | +| [FeiShu](connector-v2/sink/Feishu.md) | Sink | Alpha | 2.2.0-beta | +| [Fake](connector-v2/source/FakeSource.md) | Source | Beta | 2.2.0-beta | +| [FtpFile](connector-v2/sink/FtpFile.md) | Sink | Alpha | 2.2.0-beta | +| [Greenplum](connector-v2/sink/Greenplum.md) | Sink | Alpha | 2.2.0-beta | +| [Greenplum](connector-v2/source/Greenplum.md) | Source | Alpha | 2.2.0-beta | +| [HdfsFile](connector-v2/sink/HdfsFile.md) | Sink | Beta | 2.2.0-beta | +| [HdfsFile](connector-v2/source/HdfsFile.md) | Source | Beta | 2.2.0-beta | +| [Hive](connector-v2/sink/Hive.md) | Sink | Beta | 2.2.0-beta | +| [Hive](connector-v2/source/Hive.md) | Source | Beta | 2.2.0-beta | +| [Http](connector-v2/sink/Http.md) | Sink | Beta | 2.2.0-beta | +| [Http](connector-v2/source/Http.md) | Source | Beta | 2.2.0-beta | +| [Hudi](connector-v2/source/Hudi.md) | Source | Alpha | 2.2.0-beta | +| [Iceberg](connector-v2/source/Iceberg.md) | Source | Alpha | 2.2.0-beta | +| [IoTDB](connector-v2/source/IoTDB.md) | Source | Beta | 2.2.0-beta | +| [IoTDB](connector-v2/sink/IoTDB.md) | Sink | Beta | 2.2.0-beta | +| [Jdbc](connector-v2/source/Jdbc.md) | Source | Beta | 2.2.0-beta | +| [Jdbc](connector-v2/sink/Jdbc.md) | Sink | Beta | 2.2.0-beta | +| [Kudu](connector-v2/source/Kudu.md) | Source | Alpha | 2.2.0-beta | +| [Kudu](connector-v2/sink/Kudu.md) | Sink | Alpha | 2.2.0-beta | +| [LocalFile](connector-v2/sink/LocalFile.md) | Sink | Beta | 2.2.0-beta | +| [LocalFile](connector-v2/source/LocalFile.md) | Source | Beta | 2.2.0-beta | +| [MongoDB](connector-v2/source/MongoDB.md) | Source | Beta | 2.2.0-beta | +| [MongoDB](connector-v2/sink/MongoDB.md) | Sink | Beta | 2.2.0-beta | +| [Neo4j](connector-v2/sink/Neo4j.md) | Sink | Alpha | 2.2.0-beta | +| [OssFile](connector-v2/sink/OssFile.md) | Sink | Alpha | 2.2.0-beta | +| [OssFile](connector-v2/source/OssFile.md) | Source | Beta | 2.2.0-beta | +| [Phoenix](connector-v2/sink/Phoenix.md) | Sink | Alpha | 2.2.0-beta | +| [Phoenix](connector-v2/source/Phoenix.md) | Source | Alpha | 2.2.0-beta | +| [Pulsar](connector-v2/source/pulsar.md) | Source | Beta | 2.2.0-beta | +| [Redis](connector-v2/sink/Redis.md) | Sink | Beta | 2.2.0-beta | +| [Redis](connector-v2/source/Redis.md) | Source | Alpha | 2.2.0-beta | +| [Sentry](connector-v2/sink/Sentry.md) | Sink | Alpha | 2.2.0-beta | +| [Socket](connector-v2/sink/Socket.md) | Sink | Alpha | 2.2.0-beta | +| [Socket](connector-v2/source/Socket.md) | Source | Alpha | 2.2.0-beta | +| [Kafka](connector-v2/source/kafka.md) | Source | Alpha | 2.3.0-beta | +| [Kafka](connector-v2/sink/Kafka.md) | Sink | Alpha | 2.3.0-beta | +| [S3File](connector-v2/source/S3File.md) | Source | Alpha | 2.3.0-beta | +| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta | +| [Cassandra](connector-v2/source/Cassandra.md) | Source | Alpha | 2.3.0-beta | +| [Cassandra](connector-v2/sink/Cassandra.md) | Sink | Alpha | 2.3.0-beta | \ No newline at end of file From 9e9bfc18171a585c88bdf7256de73f77a43ad54e Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Thu, 3 Nov 2022 18:37:03 +0800 Subject: [PATCH 15/22] update --- .../src/test/resources/cassandra_to_cassandra.conf | 7 ------- 1 file changed, 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf index 889606f2c70..4f42eb8625f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### env { # You can set spark configuration here @@ -35,8 +32,6 @@ source { cql = "select * from source_table" result_table_name = "source_table" } - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource } transform { @@ -54,6 +49,4 @@ sink { async_write = "true" table = "sink_table" } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink } \ No newline at end of file From b3eb8e61f0c8ada4c853897f83d62ea7605e526f Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Thu, 3 Nov 2022 21:12:48 +0800 Subject: [PATCH 16/22] update --- .../seatunnel/connectors/seatunnel/cassandra/CassandraIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java index ffec92bbbf1..9e54b914d5b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java @@ -323,7 +323,7 @@ private static Tuple2> generateTestDataSet( Byte.parseByte("1"), Collections.singletonList((float) i), Collections.singletonList(i), - Collections.singleton(Double.valueOf("2.1")), + Collections.singleton(Double.valueOf("1.1")), Collections.singleton((long) i), Collections.singletonMap("key_" + i, i) }); From 752be5145da449c3f95269b734e4fabeb481d44f Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:03:55 +0800 Subject: [PATCH 17/22] update --- plugin-mapping.properties | 2 ++ seatunnel-connectors-v2/pom.xml | 1 + .../connector-cassandra-e2e/pom.xml | 11 +---------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index dff93bdf2aa..58066cdf3c6 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -136,5 +136,7 @@ seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 seatunnel.sink.S3File = connector-file-s3 + + seatunnel.source.Cassandra = connector-cassandra seatunnel.sink.Cassandra = connector-cassandra \ No newline at end of file diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 07338e590c3..d28c050b85c 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -56,6 +56,7 @@ connector-mongodb connector-iceberg connector-influxdb + connector-cassandra diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml index 655e03691ae..d9775f3b2eb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml @@ -27,15 +27,12 @@ connector-cassandra-e2e - - 4.14.0 - - org.apache.seatunnel connector-cassandra ${project.version} + test org.testcontainers @@ -43,12 +40,6 @@ ${testcontainer.version} test - - com.datastax.oss - java-driver-core - ${cassandra.driver.version} - test - \ No newline at end of file From a9cd693c2468d0441d533ba87da637191ea3c89d Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:06:24 +0800 Subject: [PATCH 18/22] update --- plugin-mapping.properties | 2 -- seatunnel-connectors-v2/pom.xml | 2 -- 2 files changed, 4 deletions(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 58066cdf3c6..9cef71b6b8a 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -138,5 +138,3 @@ seatunnel.source.S3File = connector-file-s3 seatunnel.sink.S3File = connector-file-s3 -seatunnel.source.Cassandra = connector-cassandra -seatunnel.sink.Cassandra = connector-cassandra \ No newline at end of file diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index d28c050b85c..ec05e06e584 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -56,8 +56,6 @@ connector-mongodb connector-iceberg connector-influxdb - - connector-cassandra From 8fc442092af7c7ad679687874221cd62519f5df8 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:10:23 +0800 Subject: [PATCH 19/22] update --- plugin-mapping.properties | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 9cef71b6b8a..e022fa59b1b 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -135,6 +135,4 @@ seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 -seatunnel.sink.S3File = connector-file-s3 - - +seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file From 4dd7fee778df641e675e40ad1d0c1e0d52277142 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:11:43 +0800 Subject: [PATCH 20/22] update --- plugin-mapping.properties | 2 ++ seatunnel-connectors-v2/pom.xml | 1 + 2 files changed, 3 insertions(+) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 1555c1c1427..10234a5f04e 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -139,3 +139,5 @@ seatunnel.source.S3File = connector-file-s3 seatunnel.sink.S3File = connector-file-s3 seatunnel.source.Amazondynamodb = connector-amazondynamodb seatunnel.sink.Amazondynamodb = connector-amazondynamodb +seatunnel.source.Cassandra = connector-cassandra +seatunnel.sink.Cassandra = connector-cassandra \ No newline at end of file diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 8ca99e023cf..7d46cc2cc04 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -57,6 +57,7 @@ connector-iceberg connector-influxdb connector-amazondynamodb + connector-cassandra From 816de344dae5a4ebb899c1d3c70a933d06dd0642 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:13:35 +0800 Subject: [PATCH 21/22] update --- .../seatunnel/cassandra/source/CassandraSourceReader.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java index fb4abf8c53c..6ae6593616b 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java @@ -67,9 +67,7 @@ public void pollNext(Collector output) throws Exception { ResultSet resultSet = session.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())); resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row))); } finally { - if (Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { - this.readerContext.signalNoMoreElement(); - } + this.readerContext.signalNoMoreElement(); } } From 6f464c15bc1335840e120ac35d28e6bd46826787 Mon Sep 17 00:00:00 2001 From: yangbinbin Date: Fri, 4 Nov 2022 12:33:30 +0800 Subject: [PATCH 22/22] update --- .../seatunnel/cassandra/source/CassandraSourceReader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java index 6ae6593616b..e3f95629ad4 100644 --- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java +++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.cassandra.source; -import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;