Skip to content

Commit

Permalink
feat(connector-node): specify sink payload format in start sink and c…
Browse files Browse the repository at this point in the history
…all close for iterator and sink row (risingwavelabs#8585)
  • Loading branch information
wenym1 authored Mar 17, 2023
1 parent f4a2f8d commit 78ddbce
Show file tree
Hide file tree
Showing 29 changed files with 441 additions and 297 deletions.
45 changes: 43 additions & 2 deletions dashboard/proto/gen/connector_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion java/connector-node/assembly/scripts/start-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ while getopts ":h:p:" o; do
done
shift $((OPTIND-1))

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
DIR="$( cd "$( dirname "$0" )" && pwd )"
MAIN='com.risingwave.connector.ConnectorService'
PORT=50051

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import com.risingwave.proto.Data;

public class ArraySinkrow implements SinkRow {
public class ArraySinkRow implements SinkRow {
public final Object[] values;
public final Data.Op op;

public ArraySinkrow(Data.Op op, Object... value) {
public ArraySinkRow(Data.Op op, Object... value) {
this.op = op;
this.values = value;
}
Expand All @@ -39,4 +39,7 @@ public Data.Op getOp() {
public int size() {
return values.length;
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector.api.sink;

import java.util.Iterator;

public interface CloseableIterator<E> extends AutoCloseable, Iterator<E> {}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;
package com.risingwave.connector.api.sink;

import com.risingwave.connector.api.sink.SinkRow;
import java.util.Iterator;
import com.risingwave.proto.ConnectorServiceProto;

public interface Deserializer {
Iterator<SinkRow> deserialize(Object payload);
CloseableIterator<SinkRow> deserialize(
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

import com.risingwave.proto.Data;

public interface SinkRow {
public Object get(int index);
public interface SinkRow extends AutoCloseable {
Object get(int index);

public Data.Op getOp();
Data.Op getOp();

public int size();
int size();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector.api.sink;

import java.util.Iterator;

public class TrivialCloseIterator<E> implements CloseableIterator<E> {

private final Iterator<E> inner;

public TrivialCloseIterator(Iterator<E> inner) {
this.inner = inner;
}

@Override
public void close() throws Exception {}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public E next() {
return inner.next();
}
}
2 changes: 2 additions & 0 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def test_upsert_sink(type, prop, input_file):
stub = connector_service_pb2_grpc.ConnectorServiceStub(channel)
request_list = [
connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink(
format=connector_service_pb2.SinkPayloadFormat.JSON,
sink_config=connector_service_pb2.SinkConfig(
connector_type=type,
properties=prop,
Expand Down Expand Up @@ -85,6 +86,7 @@ def test_sink(type, prop, input_file):
stub = connector_service_pb2_grpc.ConnectorServiceStub(channel)
request_list = [
connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink(
format=connector_service_pb2.SinkPayloadFormat.JSON,
sink_config=connector_service_pb2.SinkConfig(
connector_type=type,
properties=prop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,28 @@ public FileSink(String sinkPath, TableSchema tableSchema) {
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

import com.google.gson.Gson;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkrow;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.*;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
import com.risingwave.proto.Data;
import java.util.Iterator;
import java.util.Map;

public class JsonDeserializer implements Deserializer {
Expand All @@ -33,34 +32,39 @@ public JsonDeserializer(TableSchema tableSchema) {
}

@Override
public Iterator<SinkRow> deserialize(Object payload) {
if (!(payload instanceof JsonPayload)) {
public CloseableIterator<SinkRow> deserialize(
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
if (!writeBatch.hasJsonPayload()) {
throw INVALID_ARGUMENT
.withDescription("expected JsonPayload, got " + payload.getClass().getName())
.withDescription("expected JsonPayload, got " + writeBatch.getPayloadCase())
.asRuntimeException();
}
JsonPayload jsonPayload = (JsonPayload) payload;
return jsonPayload.getRowOpsList().stream()
.map(
rowOp -> {
Map columnValues = new Gson().fromJson(rowOp.getLine(), Map.class);
Object[] values = new Object[columnValues.size()];
for (String columnName : tableSchema.getColumnNames()) {
if (!columnValues.containsKey(columnName)) {
throw INVALID_ARGUMENT
.withDescription(
"column " + columnName + " not found in json")
.asRuntimeException();
}
Data.DataType.TypeName typeName =
tableSchema.getColumnType(columnName);
values[tableSchema.getColumnIndex(columnName)] =
validateJsonDataTypes(
typeName, columnValues.get(columnName));
}
return (SinkRow) new ArraySinkrow(rowOp.getOpType(), values);
})
.iterator();
JsonPayload jsonPayload = writeBatch.getJsonPayload();
return new TrivialCloseIterator<>(
jsonPayload.getRowOpsList().stream()
.map(
rowOp -> {
Map columnValues =
new Gson().fromJson(rowOp.getLine(), Map.class);
Object[] values = new Object[columnValues.size()];
for (String columnName : tableSchema.getColumnNames()) {
if (!columnValues.containsKey(columnName)) {
throw INVALID_ARGUMENT
.withDescription(
"column "
+ columnName
+ " not found in json")
.asRuntimeException();
}
Data.DataType.TypeName typeName =
tableSchema.getColumnType(columnName);
values[tableSchema.getColumnIndex(columnName)] =
validateJsonDataTypes(
typeName, columnValues.get(columnName));
}
return (SinkRow) new ArraySinkRow(rowOp.getOpType(), values);
})
.iterator());
}

private static Long castLong(Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ public PrintSink(Map<String, String> properties, TableSchema tableSchema, PrintS
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
SinkRow row = rows.next();
out.println(
"PrintSink: "
+ row.getOp().name()
+ " values "
+ Arrays.toString(
IntStream.range(0, row.size()).mapToObj(row::get).toArray()));
try (SinkRow row = rows.next()) {
out.println(
"PrintSink: "
+ row.getOp().name()
+ " values "
+ Arrays.toString(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Expand Down
Loading

0 comments on commit 78ddbce

Please sign in to comment.