Skip to content

Commit

Permalink
[BitSail][connector]rename connector register file. (#97)
Browse files Browse the repository at this point in the history
Co-authored-by: haoke <haoke@bytedance.com>
  • Loading branch information
hk-lrzy and haoke authored Nov 7, 2022
1 parent 3497600 commit 9275c48
Show file tree
Hide file tree
Showing 39 changed files with 146 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,39 @@ public interface Source<T, SplitT extends SourceSplit, StateT extends Serializab
*/
Boundedness getSourceBoundedness();

/**
* Create Source Reader.
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext);

/**
* Create split coordinator.
*/
SourceSplitCoordinator<SplitT, StateT> createSplitCoordinator(SourceSplitCoordinator.Context<SplitT, StateT> coordinatorContext);

/**
* Get Split serializer for the framework,{@link SplitT}should implement from {@link Serializable}
*/
default BinarySerializer<SplitT> getSplitSerializer() {
return new SimpleBinarySerializer<>();
}

default BinarySerializer<StateT> getEnumeratorCheckpointSerializer() {
/**
* Get State serializer for the framework, {@link StateT}should implement from {@link Serializable}
*/
default BinarySerializer<StateT> getSplitCoordinatorCheckpointSerializer() {
return new SimpleBinarySerializer<>();
}

/**
* Create type info converter for the source, default value {@link BitSailTypeInfoConverter}
*/
default TypeInfoConverter createTypeInfoConverter() {
return new BitSailTypeInfoConverter();
}

/**
* Get Source' name.
*/
String getReaderName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package com.bytedance.bitsail.base.connector.writer.v1;

import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
import com.bytedance.bitsail.common.type.TypeInfoConverter;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;

/**
* Created 2022/6/10
*/
public interface WriterGenerator<InputT, CommitT, WriterStateT> extends Serializable {
public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {

/**
* @return The name of writer operation.
Expand All @@ -49,23 +49,9 @@ public interface WriterGenerator<InputT, CommitT, WriterStateT> extends Serializ
/**
* Create a writer for processing elements.
*
* @param writerConfiguration Options for writer.
* @return An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> createWriter(BitSailConfiguration writerConfiguration,
Writer.Context context) throws IOException;

/**
* Restore a writer for processing elements from configurations and history states.
*
* @param writerConfiguration Options for writer.
* @param writerStates History writer states.
* @param context Writer context
* @return An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> restoreWriter(BitSailConfiguration writerConfiguration,
List<WriterStateT> writerStates,
Writer.Context context) throws IOException;
Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;

/**
* @return A converter which supports conversion from BitSail {@link TypeInfo}
Expand All @@ -85,14 +71,14 @@ default Optional<WriterCommitter<CommitT>> createCommitter() {
/**
* @return A serializer which convert committable object to byte array.
*/
default Optional<BinarySerializer<CommitT>> getCommittableSerializer() {
return Optional.empty();
default BinarySerializer<CommitT> getCommittableSerializer() {
return new SimpleBinarySerializer<CommitT>();
}

/**
* @return A serializer which convert state object to byte array.
*/
default Optional<BinarySerializer<WriterStateT>> getWriteStateSerializer() {
return Optional.empty();
default BinarySerializer<WriterStateT> getWriteStateSerializer() {
return new SimpleBinarySerializer<WriterStateT>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ default void close() throws IOException {

}

interface Context extends Serializable {
interface Context<WriterStateT> extends Serializable {

TypeInfo<?>[] getTypeInfos();

int getIndexOfSubTaskId();

boolean isRestored();

List<WriterStateT> getRestoreStates();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-fake",
"name": "bitsail-connector-legacy-fake",
"classes": [
"com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
"com.bytedance.bitsail.connector.legacy.fake.source.FakeFlinkSource"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-ftp",
"name": "bitsail-connector-legacy-ftp",
"classes": [
"com.bytedance.bitsail.connector.legacy.ftp.source.FtpInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hadoop",
"name": "bitsail-connector-legacy-hadoop",
"classes": [
"com.bytedance.bitsail.connector.hadoop.source.HadoopInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hbase",
"name": "bitsail-connector-legacy-hbase",
"classes": [
"com.bytedance.bitsail.connector.hbase.source.HBaseInputFormat",
"com.bytedance.bitsail.connector.hbase.sink.HBaseOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hive",
"name": "bitsail-connector-legacy-hive",
"classes": [
"com.bytedance.bitsail.connector.legacy.hive.source.HiveInputFormat",
"com.bytedance.bitsail.connector.legacy.hive.sink.HiveOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hudi",
"name": "bitsail-connector-legacy-hudi",
"classes": [
"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSinkFunctionDAGBuilder",
"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSourceFunctionDAGBuilder",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-jdbc",
"name": "bitsail-connector-legacy-jdbc",
"classes": [
"com.bytedance.bitsail.connector.legacy.jdbc.sink.JDBCOutputFormat",
"com.bytedance.bitsail.connector.legacy.jdbc.source.JDBCInputFormat",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-kafka",
"name": "bitsail-connector-legacy-kafka",
"classes": [
"com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder",
"com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-larksheet",
"name": "bitsail-connector-legacy-larksheet",
"classes": [
"com.bytedance.bitsail.connector.legacy.larksheet.source.LarkSheetInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-mongodb",
"name": "bitsail-connector-legacy-mongodb",
"classes": [
"com.bytedance.bitsail.connector.legacy.mongodb.source.MongoDBInputFormat",
"com.bytedance.bitsail.connector.legacy.mongodb.sink.MongoDBOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-print",
"name": "bitsail-connector-legacy-print",
"classes": [
"com.bytedance.bitsail.connector.legacy.print.sink.PrintSink"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-redis",
"name": "bitsail-connector-legacy-redis",
"classes": [
"com.bytedance.bitsail.connector.legacy.redis.source.RedisOutputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-streamingfile",
"name": "bitsail-connector-legacy-streamingfile",
"classes": [
"com.bytedance.bitsail.connector.legacy.streamingfile.sink.FileSystemSinkFunctionDAGBuilder"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package com.bytedance.bitsail.connector.doris.sink;

import com.bytedance.bitsail.base.connector.writer.v1.Sink;
import com.bytedance.bitsail.base.connector.writer.v1.Writer;
import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter;
import com.bytedance.bitsail.base.connector.writer.v1.WriterGenerator;
import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
Expand Down Expand Up @@ -52,11 +52,12 @@
import java.util.Properties;
import java.util.stream.Collectors;

public class DorisWriterGenerator<InputT> implements WriterGenerator<InputT, DorisCommittable, DorisWriterState> {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterGenerator.class);
public class DorisSink<InputT> implements Sink<InputT, DorisCommittable, DorisWriterState> {
private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private DorisExecutionOptions.WRITE_MODE writeMode;
private DorisOptions dorisOptions;
private DorisExecutionOptions dorisExecutionOptions;
private BitSailConfiguration writerConfiguration;

@Override
public String getWriterName() {
Expand All @@ -65,6 +66,7 @@ public String getWriterName() {

@Override
public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws IOException {
this.writerConfiguration = writerConfiguration;
this.writeMode =
DorisExecutionOptions.WRITE_MODE.valueOf(writerConfiguration.get(DorisWriterOptions.SINK_WRITE_MODE).toUpperCase());
initDorisExecutionOptions(writerConfiguration);
Expand All @@ -86,15 +88,10 @@ public void configure(BitSailConfiguration commonConfiguration, BitSailConfigura
}

@Override
public Writer<InputT, DorisCommittable, DorisWriterState> createWriter(BitSailConfiguration writerConfiguration, Writer.Context context) {
public Writer<InputT, DorisCommittable, DorisWriterState> createWriter(Writer.Context<DorisWriterState> context) {
return new DorisWriter(writerConfiguration, dorisOptions, dorisExecutionOptions);
}

@Override
public Writer<InputT, DorisCommittable, DorisWriterState> restoreWriter(BitSailConfiguration writerConfiguration, List writerStates, Writer.Context context) {
return createWriter(writerConfiguration, null);
}

@Override
public TypeInfoConverter createTypeInfoConverter() {
return new FileMappingTypeInfoConverter(getWriterName());
Expand All @@ -106,13 +103,13 @@ public Optional<WriterCommitter<DorisCommittable>> createCommitter() {
}

@Override
public Optional<BinarySerializer<DorisCommittable>> getCommittableSerializer() {
return Optional.of(new DorisCommittableSerializer());
public BinarySerializer<DorisCommittable> getCommittableSerializer() {
return new DorisCommittableSerializer();
}

@Override
public Optional<BinarySerializer<DorisWriterState>> getWriteStateSerializer() {
return Optional.of(new SimpleBinarySerializer<DorisWriterState>());
public BinarySerializer<DorisWriterState> getWriteStateSerializer() {
return new SimpleBinarySerializer<DorisWriterState>();
}

private void initDorisOptions(BitSailConfiguration writerConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "bitsail-connector-unified-doris",
"classes": [
"com.bytedance.bitsail.connector.doris.sink.DorisSink"
],
"libs": [
"bitsail-connector-doris-${version}.jar"
]
}

This file was deleted.

Loading

0 comments on commit 9275c48

Please sign in to comment.