Skip to content

Commit

Permalink
feat(connector): unify and simplify path config of minio and s3 (risi…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 14, 2023
1 parent 0e93998 commit 84a9831
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ src/log/

log/

*.log

.risingwave/
.bin/

Expand Down
6 changes: 4 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
location.type='minio',
warehouse.path='minio://hummockadmin:hummockadmin@127.0.0.1:9301/iceberg',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
);
Expand Down
6 changes: 4 additions & 2 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ FROM
bhv_mv WITH (
connector = 'iceberg',
sink.mode='upsert',
location.type='minio',
warehouse.path='minio://hummockadmin:hummockadmin@minio-0:9301/hummock001/iceberg-data',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
);
12 changes: 8 additions & 4 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,21 @@ def test_print_sink(input_file):
def test_iceberg_sink(input_file):
test_sink("iceberg",
{"sink.mode":"append-only",
"location.type":"minio",
"warehouse.path":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
"s3.secret.key": "minioadmin",
"database.name":"demo_db",
"table.name":"demo_table"},
input_file)

def test_upsert_iceberg_sink(input_file):
test_upsert_sink("iceberg",
{"sink.mode":"upsert",
"location.type":"minio",
"warehouse.path":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
"s3.secret.key": "minioadmin",
"database.name":"demo_db",
"table.name":"demo_table"},
input_file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkBase;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.java.utils.MinioUrlParser;
import io.grpc.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,11 +39,15 @@ public class IcebergSinkFactory implements SinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class);

public static final String SINK_MODE_PROP = "sink.mode";
public static final String LOCATION_TYPE_PROP = "location.type";
public static final String WAREHOUSE_PATH_PROP = "warehouse.path";
public static final String DATABASE_NAME_PROP = "database.name";
public static final String TABLE_NAME_PROP = "table.name";
public static final String S3_ACCESS_KEY_PROP = "s3.access.key";
public static final String S3_SECRET_KEY_PROP = "s3.secret.key";
public static final String S3_ENDPOINT_PROP = "s3.endpoint";
public static final FileFormat FILE_FORMAT = FileFormat.PARQUET;

// hadoop catalog config
private static final String confEndpoint = "fs.s3a.endpoint";
private static final String confKey = "fs.s3a.access.key";
private static final String confSecret = "fs.s3a.secret.key";
Expand All @@ -56,83 +61,88 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
validate(tableSchema, tableProperties);

String mode = tableProperties.get(SINK_MODE_PROP);
String location = tableProperties.get(LOCATION_TYPE_PROP);
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
String warehousePath = getWarehousePath(tableProperties);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);

String scheme = parseWarehousePathScheme(warehousePath);

TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath);
Configuration hadoopConf = createHadoopConf(scheme, tableProperties);
HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath);
Table icebergTable;
try {
icebergTable = hadoopCatalog.loadTable(tableIdentifier);
} catch (Exception e) {
LOG.error("load table error: {}", e);
throw Status.FAILED_PRECONDITION
.withDescription("failed to load iceberg table")
.withDescription(
String.format("failed to load iceberg table: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}

if (mode.equals("append-only")) {
return new IcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT);
} else if (mode.equals("upsert")) {
return new UpsertIcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT);
return new UpsertIcebergSink(
tableSchema, hadoopCatalog,
icebergTable, FILE_FORMAT);
}
throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException();
}

@Override
public void validate(TableSchema tableSchema, Map<String, String> tableProperties) {
if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert
|| !tableProperties.containsKey(LOCATION_TYPE_PROP) // only local, s3, minio
|| !tableProperties.containsKey(WAREHOUSE_PATH_PROP)
|| !tableProperties.containsKey(DATABASE_NAME_PROP)
|| !tableProperties.containsKey(TABLE_NAME_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"%s, %s, %s, %s or %s is not specified",
"%s, %s, %s or %s is not specified",
SINK_MODE_PROP,
LOCATION_TYPE_PROP,
WAREHOUSE_PATH_PROP,
DATABASE_NAME_PROP,
TABLE_NAME_PROP))
.asRuntimeException();
}

String mode = tableProperties.get(SINK_MODE_PROP);
String location = tableProperties.get(LOCATION_TYPE_PROP);
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
String warehousePath = getWarehousePath(tableProperties);

String schema = parseWarehousePathScheme(warehousePath);

TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath);
Configuration hadoopConf = createHadoopConf(schema, tableProperties);
HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath);
Table icebergTable;
try {
icebergTable = hadoopCatalog.loadTable(tableIdentifier);
} catch (Exception e) {
LOG.error("load table error: {}", e);
throw Status.FAILED_PRECONDITION
.withDescription("failed to load iceberg table")
.withDescription(
String.format("failed to load iceberg table: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}
// check that all columns in tableSchema exist in the iceberg table
for (String columnName : tableSchema.getColumnNames()) {
if (icebergTable.schema().findField(columnName) == null) {
LOG.error("column not found: {}", columnName);
throw Status.FAILED_PRECONDITION
.withDescription("table schema does not match")
.withDescription(
String.format(
"table schema does not match. Column %s not found in iceberg table",
columnName))
.asRuntimeException();
}
}
// check that all required columns in the iceberg table exist in tableSchema
Set<String> columnNames = Set.of(tableSchema.getColumnNames());
for (Types.NestedField column : icebergTable.schema().columns()) {
if (column.isRequired() && !columnNames.contains(column.name())) {
LOG.error("required column not found: {}", column.name());
throw Status.FAILED_PRECONDITION
.withDescription(
String.format("missing a required field %s", column.name()))
Expand All @@ -153,26 +163,62 @@ public void validate(TableSchema tableSchema, Map<String, String> tablePropertie
}
}

private HadoopCatalog createHadoopCatalog(String location, String warehousePath) {
Configuration hadoopConf = new Configuration();
switch (location) {
case "local":
return new HadoopCatalog(hadoopConf, warehousePath);
case "s3":
hadoopConf.set(confIoImpl, s3FileIOImpl);
String s3aPath = "s3a:" + warehousePath.substring(warehousePath.indexOf('/'));
return new HadoopCatalog(hadoopConf, s3aPath);
case "minio":
private static String getWarehousePath(Map<String, String> tableProperties) {
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
// unify s3 and s3a
if (warehousePath.startsWith("s3://")) {
return warehousePath.replace("s3://", "s3a://");
}
return warehousePath;
}

private static String parseWarehousePathScheme(String warehousePath) {
try {
URI uri = new URI(warehousePath);
String scheme = uri.getScheme();
if (scheme == null) {
throw INVALID_ARGUMENT
.withDescription("warehouse path should set scheme (e.g. s3a://)")
.asRuntimeException();
}
return scheme;
} catch (URISyntaxException e) {
throw INVALID_ARGUMENT
.withDescription(
String.format("invalid warehouse path uri: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}
}

private Configuration createHadoopConf(String scheme, Map<String, String> tableProperties) {
switch (scheme) {
case "file":
return new Configuration();
case "s3a":
Configuration hadoopConf = new Configuration();
hadoopConf.set(confIoImpl, s3FileIOImpl);
MinioUrlParser minioUrlParser = new MinioUrlParser(warehousePath);
hadoopConf.set(confEndpoint, minioUrlParser.getEndpoint());
hadoopConf.set(confKey, minioUrlParser.getKey());
hadoopConf.set(confSecret, minioUrlParser.getSecret());
hadoopConf.setBoolean(confPathStyleAccess, true);
return new HadoopCatalog(hadoopConf, "s3a://" + minioUrlParser.getBucket());
if (!tableProperties.containsKey(S3_ENDPOINT_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"Should set %s for warehouse with scheme %s",
S3_ENDPOINT_PROP, scheme))
.asRuntimeException();
}
hadoopConf.set(confEndpoint, tableProperties.get(S3_ENDPOINT_PROP));
if (tableProperties.containsKey(S3_ACCESS_KEY_PROP)) {
hadoopConf.set(confKey, tableProperties.get(S3_ACCESS_KEY_PROP));
}
if (tableProperties.containsKey(S3_SECRET_KEY_PROP)) {
hadoopConf.set(confSecret, tableProperties.get(S3_SECRET_KEY_PROP));
}
return hadoopConf;
default:
throw UNIMPLEMENTED
.withDescription("unsupported iceberg sink type: " + location)
.withDescription(
String.format("scheme %s not supported for warehouse path", scheme))
.asRuntimeException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
import org.junit.Test;

public class IcebergSinkFactoryTest {
static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse";
static String warehousePath = "file:///tmp/rw-sinknode/iceberg-sink/warehouse";
static String databaseName = "demo_db";
static String tableName = "demo_table";
static String locationType = "local";
static String sinkMode = "append-only";
static Schema icebergTableSchema =
new Schema(
Expand Down Expand Up @@ -67,8 +66,6 @@ public void testCreate() throws IOException {
Map.of(
IcebergSinkFactory.SINK_MODE_PROP,
sinkMode,
IcebergSinkFactory.LOCATION_TYPE_PROP,
locationType,
IcebergSinkFactory.WAREHOUSE_PATH_PROP,
warehousePath,
IcebergSinkFactory.DATABASE_NAME_PROP,
Expand Down

0 comments on commit 84a9831

Please sign in to comment.