Skip to content

Commit

Permalink
feat(sink): enable user-defined primary key for upsert sink (#8610)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored Mar 17, 2023
1 parent 88aa6a4 commit 961e342
Show file tree
Hide file tree
Showing 24 changed files with 219 additions and 162 deletions.
34 changes: 17 additions & 17 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,29 @@ cargo make ci-start ci-1cn-1fe

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
# sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sleep 1

# check sink destination postgres
# sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
# sleep 1
# sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
# sleep 1
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/jdbc.load.slt'
sleep 1
sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
# if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
# if ($1 == 1 && $2 == "Alex") c1++;
# if ($1 == 3 && $2 == "Carl") c2++;
# if ($1 == 4 && $2 == "Doris") c3++;
# if ($1 == 5 && $2 == "Eve") c4++;
# if ($1 == 6 && $2 == "Frank") c5++; }
# END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
# echo "mysql sink check passed"
# else
# echo "The output is not as expected."
# exit 1
# fi
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
if ($1 == 1 && $2 == "Alex") c1++;
if ($1 == 3 && $2 == "Carl") c2++;
if ($1 == 4 && $2 == "Doris") c3++;
if ($1 == 5 && $2 == "Eve") c4++;
if ($1 == 6 && $2 == "Frank") c5++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
echo "mysql sink check passed"
else
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
pkill -f connector-node
Expand Down
31 changes: 16 additions & 15 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )
explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' )

statement ok
drop table t;
4 changes: 2 additions & 2 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_t from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

statement ok
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', type = 'append-only', force_append_only = 'true' );

# Create a mview with duplicated name.
statement error
Expand Down
38 changes: 7 additions & 31 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
statement ok
create table t1 (v1 int, v2 int);

statement error No primary key for the upsert sink
create sink s1 from t1 with (connector = 'console');

statement ok
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');

statement ok
create table t2 (v1 int, v2 int primary key);

statement ok
create sink s2 from t2 with (connector = 'console');

statement error No primary key for the upsert sink
create sink s3 as select avg(v1) from t2 with (connector = 'console');
create table t (v1 int, v2 int);

statement ok
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s1 from t with (connector = 'console');

statement ok
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');

statement error The sink cannot be append-only
create sink s5 from t2 with (connector = 'console', format = 'append_only');
create sink s3 from t with (connector = 'console', type = 'append-only');

statement ok
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');

statement ok
drop sink s1
Expand All @@ -41,13 +26,4 @@ statement ok
drop sink s3

statement ok
drop sink s4

statement ok
drop sink s5

statement ok
drop table t1

statement ok
drop table t2
drop table t
3 changes: 2 additions & 1 deletion e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ from
s5 with (
properties.bootstrap.server = '127.0.0.1:29092',
topic = 'sink_target',
format = 'append_only',
type = 'append-only',
connector = 'kafka'
)

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE SINK bhv_iceberg_sink
FROM
bhv_mv WITH (
connector = 'iceberg',
sink.mode='upsert',
type = 'upsert',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchem
.collect(Collectors.toList()));
}

/** @deprecated pk here is from Risingwave, it may not match the pk in the database */
@Deprecated
public List<String> getPrimaryKeys() {
return primaryKeys;
}
Expand Down
4 changes: 2 additions & 2 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_print_sink(input_file):

def test_iceberg_sink(input_file):
test_sink("iceberg",
{"sink.mode":"append-only",
{"type":"append-only",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
Expand All @@ -173,7 +173,7 @@ def test_iceberg_sink(input_file):

def test_upsert_iceberg_sink(input_file):
test_upsert_sink("iceberg",
{"sink.mode":"upsert",
{"type":"upsert",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class IcebergSinkFactory implements SinkFactory {

private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class);

public static final String SINK_MODE_PROP = "sink.mode";
public static final String SINK_TYPE_PROP = "type";
public static final String WAREHOUSE_PATH_PROP = "warehouse.path";
public static final String DATABASE_NAME_PROP = "database.name";
public static final String TABLE_NAME_PROP = "table.name";
Expand All @@ -58,7 +58,7 @@ public class IcebergSinkFactory implements SinkFactory {

@Override
public SinkBase create(TableSchema tableSchema, Map<String, String> tableProperties) {
String mode = tableProperties.get(SINK_MODE_PROP);
String mode = tableProperties.get(SINK_TYPE_PROP);
String warehousePath = getWarehousePath(tableProperties);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
Expand Down Expand Up @@ -93,22 +93,22 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
@Override
public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert
if (!tableProperties.containsKey(SINK_TYPE_PROP) // only append-only, upsert
|| !tableProperties.containsKey(WAREHOUSE_PATH_PROP)
|| !tableProperties.containsKey(DATABASE_NAME_PROP)
|| !tableProperties.containsKey(TABLE_NAME_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"%s, %s, %s or %s is not specified",
SINK_MODE_PROP,
SINK_TYPE_PROP,
WAREHOUSE_PATH_PROP,
DATABASE_NAME_PROP,
TABLE_NAME_PROP))
.asRuntimeException();
}

String mode = tableProperties.get(SINK_MODE_PROP);
String mode = tableProperties.get(SINK_TYPE_PROP);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
String warehousePath = getWarehousePath(tableProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testCreate() throws IOException {
sinkFactory.create(
TableSchema.getMockTableSchema(),
Map.of(
IcebergSinkFactory.SINK_MODE_PROP,
IcebergSinkFactory.SINK_TYPE_PROP,
sinkMode,
IcebergSinkFactory.WAREHOUSE_PATH_PROP,
warehousePath,
Expand Down
Loading

0 comments on commit 961e342

Please sign in to comment.