diff --git a/.gitignore b/.gitignore index 3ebf11628f38c..43da088879425 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,8 @@ src/log/ log/ +*.log + .risingwave/ .bin/ diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 26338f089dcca..ecb91b96fd0f7 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -8,8 +8,10 @@ statement ok CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( connector = 'iceberg', sink.mode='append-only', - location.type='minio', - warehouse.path='minio://hummockadmin:hummockadmin@127.0.0.1:9301/iceberg', + warehouse.path = 's3://iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', database.name='demo_db', table.name='demo_table' ); diff --git a/integration_tests/iceberg-sink/create_sink.sql b/integration_tests/iceberg-sink/create_sink.sql index 6c984c5cc83b2..8913dbb05525b 100644 --- a/integration_tests/iceberg-sink/create_sink.sql +++ b/integration_tests/iceberg-sink/create_sink.sql @@ -3,8 +3,10 @@ FROM bhv_mv WITH ( connector = 'iceberg', sink.mode='upsert', - location.type='minio', - warehouse.path='minio://hummockadmin:hummockadmin@minio-0:9301/hummock001/iceberg-data', + warehouse.path = 's3://hummock001/iceberg-data', + s3.endpoint = 'http://minio-0:9301', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', database.name='demo_db', table.name='demo_table' ); \ No newline at end of file diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index 274df9cdeb2ed..a64e3a79a720f 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -161,8 +161,10 @@ def test_print_sink(input_file): def test_iceberg_sink(input_file): test_sink("iceberg", {"sink.mode":"append-only", - "location.type":"minio", - "warehouse.path":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket", + "warehouse.path":"s3a://bucket", + "s3.endpoint": "http://127.0.0.1:9000", + "s3.access.key": "minioadmin", + "s3.secret.key": "minioadmin", "database.name":"demo_db", "table.name":"demo_table"}, input_file) @@ -170,8 +172,10 @@ def test_iceberg_sink(input_file): def test_upsert_iceberg_sink(input_file): test_upsert_sink("iceberg", {"sink.mode":"upsert", - "location.type":"minio", - "warehouse.path":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket", + "warehouse.path":"s3a://bucket", + "s3.endpoint": "http://127.0.0.1:9000", + "s3.access.key": "minioadmin", + "s3.secret.key": "minioadmin", "database.name":"demo_db", "table.name":"demo_table"}, input_file) diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java index f51a024783bf4..cb115f0b85011 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java @@ -20,8 +20,9 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkBase; import com.risingwave.connector.api.sink.SinkFactory; -import com.risingwave.java.utils.MinioUrlParser; import io.grpc.Status; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -38,11 +39,15 @@ public class IcebergSinkFactory implements SinkFactory { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class); public static final String SINK_MODE_PROP = "sink.mode"; - public static final String LOCATION_TYPE_PROP = "location.type"; public static final String WAREHOUSE_PATH_PROP = "warehouse.path"; public static final String DATABASE_NAME_PROP = "database.name"; public static final String TABLE_NAME_PROP = "table.name"; + public static final String S3_ACCESS_KEY_PROP = "s3.access.key"; + public static final String S3_SECRET_KEY_PROP = "s3.secret.key"; + public static final String S3_ENDPOINT_PROP = "s3.endpoint"; public static final FileFormat FILE_FORMAT = FileFormat.PARQUET; + + // hadoop catalog config private static final String confEndpoint = "fs.s3a.endpoint"; private static final String confKey = "fs.s3a.access.key"; private static final String confSecret = "fs.s3a.secret.key"; @@ -56,20 +61,22 @@ public SinkBase create(TableSchema tableSchema, Map tablePropert validate(tableSchema, tableProperties); String mode = tableProperties.get(SINK_MODE_PROP); - String location = tableProperties.get(LOCATION_TYPE_PROP); - String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP); + String warehousePath = getWarehousePath(tableProperties); String databaseName = tableProperties.get(DATABASE_NAME_PROP); String tableName = tableProperties.get(TABLE_NAME_PROP); + String scheme = parseWarehousePathScheme(warehousePath); + TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath); + Configuration hadoopConf = createHadoopConf(scheme, tableProperties); + HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); Table icebergTable; try { icebergTable = hadoopCatalog.loadTable(tableIdentifier); } catch (Exception e) { - LOG.error("load table error: {}", e); throw Status.FAILED_PRECONDITION - .withDescription("failed to load iceberg table") + .withDescription( + String.format("failed to load iceberg table: %s", e.getMessage())) .withCause(e) .asRuntimeException(); } @@ -77,7 +84,9 @@ public SinkBase create(TableSchema tableSchema, Map tablePropert if (mode.equals("append-only")) { return new IcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT); } else if (mode.equals("upsert")) { - return new UpsertIcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT); + return new UpsertIcebergSink( + tableSchema, hadoopCatalog, + icebergTable, FILE_FORMAT); } throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException(); } @@ -85,16 +94,14 @@ public SinkBase create(TableSchema tableSchema, Map tablePropert @Override public void validate(TableSchema tableSchema, Map tableProperties) { if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert - || !tableProperties.containsKey(LOCATION_TYPE_PROP) // only local, s3, minio || !tableProperties.containsKey(WAREHOUSE_PATH_PROP) || !tableProperties.containsKey(DATABASE_NAME_PROP) || !tableProperties.containsKey(TABLE_NAME_PROP)) { throw INVALID_ARGUMENT .withDescription( String.format( - "%s, %s, %s, %s or %s is not specified", + "%s, %s, %s or %s is not specified", SINK_MODE_PROP, - LOCATION_TYPE_PROP, WAREHOUSE_PATH_PROP, DATABASE_NAME_PROP, TABLE_NAME_PROP)) @@ -102,29 +109,33 @@ public void validate(TableSchema tableSchema, Map tablePropertie } String mode = tableProperties.get(SINK_MODE_PROP); - String location = tableProperties.get(LOCATION_TYPE_PROP); - String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP); String databaseName = tableProperties.get(DATABASE_NAME_PROP); String tableName = tableProperties.get(TABLE_NAME_PROP); + String warehousePath = getWarehousePath(tableProperties); + + String schema = parseWarehousePathScheme(warehousePath); TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath); + Configuration hadoopConf = createHadoopConf(schema, tableProperties); + HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); Table icebergTable; try { icebergTable = hadoopCatalog.loadTable(tableIdentifier); } catch (Exception e) { - LOG.error("load table error: {}", e); throw Status.FAILED_PRECONDITION - .withDescription("failed to load iceberg table") + .withDescription( + String.format("failed to load iceberg table: %s", e.getMessage())) .withCause(e) .asRuntimeException(); } // check that all columns in tableSchema exist in the iceberg table for (String columnName : tableSchema.getColumnNames()) { if (icebergTable.schema().findField(columnName) == null) { - LOG.error("column not found: {}", columnName); throw Status.FAILED_PRECONDITION - .withDescription("table schema does not match") + .withDescription( + String.format( + "table schema does not match. Column %s not found in iceberg table", + columnName)) .asRuntimeException(); } } @@ -132,7 +143,6 @@ public void validate(TableSchema tableSchema, Map tablePropertie Set columnNames = Set.of(tableSchema.getColumnNames()); for (Types.NestedField column : icebergTable.schema().columns()) { if (column.isRequired() && !columnNames.contains(column.name())) { - LOG.error("required column not found: {}", column.name()); throw Status.FAILED_PRECONDITION .withDescription( String.format("missing a required field %s", column.name())) @@ -153,26 +163,62 @@ public void validate(TableSchema tableSchema, Map tablePropertie } } - private HadoopCatalog createHadoopCatalog(String location, String warehousePath) { - Configuration hadoopConf = new Configuration(); - switch (location) { - case "local": - return new HadoopCatalog(hadoopConf, warehousePath); - case "s3": - hadoopConf.set(confIoImpl, s3FileIOImpl); - String s3aPath = "s3a:" + warehousePath.substring(warehousePath.indexOf('/')); - return new HadoopCatalog(hadoopConf, s3aPath); - case "minio": + private static String getWarehousePath(Map tableProperties) { + String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP); + // unify s3 and s3a + if (warehousePath.startsWith("s3://")) { + return warehousePath.replace("s3://", "s3a://"); + } + return warehousePath; + } + + private static String parseWarehousePathScheme(String warehousePath) { + try { + URI uri = new URI(warehousePath); + String scheme = uri.getScheme(); + if (scheme == null) { + throw INVALID_ARGUMENT + .withDescription("warehouse path should set scheme (e.g. s3a://)") + .asRuntimeException(); + } + return scheme; + } catch (URISyntaxException e) { + throw INVALID_ARGUMENT + .withDescription( + String.format("invalid warehouse path uri: %s", e.getMessage())) + .withCause(e) + .asRuntimeException(); + } + } + + private Configuration createHadoopConf(String scheme, Map tableProperties) { + switch (scheme) { + case "file": + return new Configuration(); + case "s3a": + Configuration hadoopConf = new Configuration(); hadoopConf.set(confIoImpl, s3FileIOImpl); - MinioUrlParser minioUrlParser = new MinioUrlParser(warehousePath); - hadoopConf.set(confEndpoint, minioUrlParser.getEndpoint()); - hadoopConf.set(confKey, minioUrlParser.getKey()); - hadoopConf.set(confSecret, minioUrlParser.getSecret()); hadoopConf.setBoolean(confPathStyleAccess, true); - return new HadoopCatalog(hadoopConf, "s3a://" + minioUrlParser.getBucket()); + if (!tableProperties.containsKey(S3_ENDPOINT_PROP)) { + throw INVALID_ARGUMENT + .withDescription( + String.format( + "Should set %s for warehouse with scheme %s", + S3_ENDPOINT_PROP, scheme)) + .asRuntimeException(); + } + hadoopConf.set(confEndpoint, tableProperties.get(S3_ENDPOINT_PROP)); + if (tableProperties.containsKey(S3_ACCESS_KEY_PROP)) { + hadoopConf.set(confKey, tableProperties.get(S3_ACCESS_KEY_PROP)); + } + if (tableProperties.containsKey(S3_SECRET_KEY_PROP)) { + hadoopConf.set(confSecret, tableProperties.get(S3_SECRET_KEY_PROP)); + } + return hadoopConf; default: throw UNIMPLEMENTED - .withDescription("unsupported iceberg sink type: " + location) + .withDescription( + String.format("scheme %s not supported for warehouse path", scheme)) .asRuntimeException(); } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java index 8923cb5a6db74..6ff0ce99e38a8 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java @@ -30,10 +30,9 @@ import org.junit.Test; public class IcebergSinkFactoryTest { - static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse"; + static String warehousePath = "file:///tmp/rw-sinknode/iceberg-sink/warehouse"; static String databaseName = "demo_db"; static String tableName = "demo_table"; - static String locationType = "local"; static String sinkMode = "append-only"; static Schema icebergTableSchema = new Schema( @@ -67,8 +66,6 @@ public void testCreate() throws IOException { Map.of( IcebergSinkFactory.SINK_MODE_PROP, sinkMode, - IcebergSinkFactory.LOCATION_TYPE_PROP, - locationType, IcebergSinkFactory.WAREHOUSE_PATH_PROP, warehousePath, IcebergSinkFactory.DATABASE_NAME_PROP,