From e2d37e901c0d1870b422cda4c72a7168881ba960 Mon Sep 17 00:00:00 2001 From: lizhenglei <127465317+jackyyyyyssss@users.noreply.github.com> Date: Fri, 23 Feb 2024 11:16:23 +0800 Subject: [PATCH] [Improve][Connector-V2]Support multi-table sink feature for redis (#6314) --- .../seatunnel/redis/sink/RedisSink.java | 4 +- .../seatunnel/redis/sink/RedisSinkWriter.java | 4 +- .../e2e/connector/redis/RedisIT.java | 20 +++- .../fake-to-multipletableredissink.conf | 97 +++++++++++++++++++ 4 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java index ac8c544703bb..7e6d23dbec89 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -29,7 +30,8 @@ import java.io.IOException; -public class RedisSink extends AbstractSimpleSink { +public class RedisSink extends AbstractSimpleSink + implements SupportMultiTableSink { private final RedisParameters redisParameters = new RedisParameters(); private SeaTunnelRowType seaTunnelRowType; private ReadonlyConfig readonlyConfig; diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 80b1449b9d6d..23eda5720297 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.sink; import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -32,7 +33,8 @@ import java.util.List; import java.util.Objects; -public class RedisSinkWriter extends AbstractSinkWriter { +public class RedisSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private final SeaTunnelRowType seaTunnelRowType; private final RedisParameters redisParameters; private final SerializationSchema serializationSchema; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 2a2feb7744fe..7b03818c0bfa 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -28,7 +28,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.format.json.JsonSerializationSchema; import org.junit.jupiter.api.AfterAll; @@ -212,7 +214,7 @@ public void testRedisWithExpire(TestContainer container) } @TestTemplate - public void restRedisDbNum(TestContainer container) throws IOException, InterruptedException { + public void testRedisDbNum(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/redis-to-redis-by-db-num.conf"); Assertions.assertEquals(0, execResult.getExitCode()); jedis.select(2); @@ -220,4 +222,20 @@ public void restRedisDbNum(TestContainer container) throws IOException, Interrup jedis.del("db_test"); jedis.select(0); } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + public void testMultipletableRedisSink(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake-to-multipletableredissink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + jedis.select(3); + Assertions.assertEquals(2, jedis.llen("key_multi_list")); + jedis.del("key_multi_list"); + jedis.select(0); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf new file mode 100644 index 000000000000..051c51eea668 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-multipletableredissink.conf @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "redis_sink_1" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "lzl", "2020-02-02T02:02:02"] + } + ] + }, + { + schema = { + table = "redis_sink_2" + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "lzl", "2020-02-02T02:02:02"] + } + ] + } + ] + } +} + + +sink { + Redis { + host = "redis-e2e" + port = 6379 + db_num=3 + auth = "U2VhVHVubmVs" + key = "key_multi_list" + data_type = list + } +}