Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail][connector]rename connector register file. #97

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,39 @@ public interface Source<T, SplitT extends SourceSplit, StateT extends Serializab
*/
Boundedness getSourceBoundedness();

/**
* Create Source Reader.
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext);

/**
* Create split coordinator.
*/
SourceSplitCoordinator<SplitT, StateT> createSplitCoordinator(SourceSplitCoordinator.Context<SplitT, StateT> coordinatorContext);

/**
* Get Split serializer for the framework,{@link SplitT}should implement from {@link Serializable}
*/
default BinarySerializer<SplitT> getSplitSerializer() {
return new SimpleBinarySerializer<>();
}

default BinarySerializer<StateT> getEnumeratorCheckpointSerializer() {
/**
* Get State serializer for the framework, {@link StateT}should implement from {@link Serializable}
*/
default BinarySerializer<StateT> getSplitCoordinatorCheckpointSerializer() {
return new SimpleBinarySerializer<>();
}

/**
* Create type info converter for the source, default value {@link BitSailTypeInfoConverter}
*/
default TypeInfoConverter createTypeInfoConverter() {
return new BitSailTypeInfoConverter();
}

/**
* Get Source' name.
*/
String getReaderName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package com.bytedance.bitsail.base.connector.writer.v1;

import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
import com.bytedance.bitsail.common.type.TypeInfoConverter;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;

/**
* Created 2022/6/10
*/
public interface WriterGenerator<InputT, CommitT, WriterStateT> extends Serializable {
public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {

/**
* @return The name of writer operation.
Expand All @@ -49,23 +49,9 @@ public interface WriterGenerator<InputT, CommitT, WriterStateT> extends Serializ
/**
* Create a writer for processing elements.
*
* @param writerConfiguration Options for writer.
* @return An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> createWriter(BitSailConfiguration writerConfiguration,
Writer.Context context) throws IOException;

/**
* Restore a writer for processing elements from configurations and history states.
*
* @param writerConfiguration Options for writer.
* @param writerStates History writer states.
* @param context Writer context
* @return An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> restoreWriter(BitSailConfiguration writerConfiguration,
List<WriterStateT> writerStates,
Writer.Context context) throws IOException;
Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;

/**
* @return A converter which supports conversion from BitSail {@link TypeInfo}
Expand All @@ -85,14 +71,14 @@ default Optional<WriterCommitter<CommitT>> createCommitter() {
/**
* @return A serializer which convert committable object to byte array.
*/
default Optional<BinarySerializer<CommitT>> getCommittableSerializer() {
return Optional.empty();
default BinarySerializer<CommitT> getCommittableSerializer() {
return new SimpleBinarySerializer<CommitT>();
}

/**
* @return A serializer which convert state object to byte array.
*/
default Optional<BinarySerializer<WriterStateT>> getWriteStateSerializer() {
return Optional.empty();
default BinarySerializer<WriterStateT> getWriteStateSerializer() {
return new SimpleBinarySerializer<WriterStateT>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ default void close() throws IOException {

}

interface Context extends Serializable {
interface Context<WriterStateT> extends Serializable {

TypeInfo<?>[] getTypeInfos();

int getIndexOfSubTaskId();

boolean isRestored();

List<WriterStateT> getRestoreStates();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-fake",
"name": "bitsail-connector-legacy-fake",
"classes": [
"com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
"com.bytedance.bitsail.connector.legacy.fake.source.FakeFlinkSource"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-ftp",
"name": "bitsail-connector-legacy-ftp",
"classes": [
"com.bytedance.bitsail.connector.legacy.ftp.source.FtpInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hadoop",
"name": "bitsail-connector-legacy-hadoop",
"classes": [
"com.bytedance.bitsail.connector.hadoop.source.HadoopInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hbase",
"name": "bitsail-connector-legacy-hbase",
"classes": [
"com.bytedance.bitsail.connector.hbase.source.HBaseInputFormat",
"com.bytedance.bitsail.connector.hbase.sink.HBaseOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hive",
"name": "bitsail-connector-legacy-hive",
"classes": [
"com.bytedance.bitsail.connector.legacy.hive.source.HiveInputFormat",
"com.bytedance.bitsail.connector.legacy.hive.sink.HiveOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-hudi",
"name": "bitsail-connector-legacy-hudi",
"classes": [
"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSinkFunctionDAGBuilder",
"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSourceFunctionDAGBuilder",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-jdbc",
"name": "bitsail-connector-legacy-jdbc",
"classes": [
"com.bytedance.bitsail.connector.legacy.jdbc.sink.JDBCOutputFormat",
"com.bytedance.bitsail.connector.legacy.jdbc.source.JDBCInputFormat",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-kafka",
"name": "bitsail-connector-legacy-kafka",
"classes": [
"com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder",
"com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-larksheet",
"name": "bitsail-connector-legacy-larksheet",
"classes": [
"com.bytedance.bitsail.connector.legacy.larksheet.source.LarkSheetInputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-mongodb",
"name": "bitsail-connector-legacy-mongodb",
"classes": [
"com.bytedance.bitsail.connector.legacy.mongodb.source.MongoDBInputFormat",
"com.bytedance.bitsail.connector.legacy.mongodb.sink.MongoDBOutputFormat"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-print",
"name": "bitsail-connector-legacy-print",
"classes": [
"com.bytedance.bitsail.connector.legacy.print.sink.PrintSink"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-redis",
"name": "bitsail-connector-legacy-redis",
"classes": [
"com.bytedance.bitsail.connector.legacy.redis.source.RedisOutputFormat"
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "bitsail-connector-streamingfile",
"name": "bitsail-connector-legacy-streamingfile",
"classes": [
"com.bytedance.bitsail.connector.legacy.streamingfile.sink.FileSystemSinkFunctionDAGBuilder"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package com.bytedance.bitsail.connector.doris.sink;

import com.bytedance.bitsail.base.connector.writer.v1.Sink;
import com.bytedance.bitsail.base.connector.writer.v1.Writer;
import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter;
import com.bytedance.bitsail.base.connector.writer.v1.WriterGenerator;
import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
Expand Down Expand Up @@ -52,11 +52,12 @@
import java.util.Properties;
import java.util.stream.Collectors;

public class DorisWriterGenerator<InputT> implements WriterGenerator<InputT, DorisCommittable, DorisWriterState> {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterGenerator.class);
public class DorisSink<InputT> implements Sink<InputT, DorisCommittable, DorisWriterState> {
private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private DorisExecutionOptions.WRITE_MODE writeMode;
private DorisOptions dorisOptions;
private DorisExecutionOptions dorisExecutionOptions;
private BitSailConfiguration writerConfiguration;

@Override
public String getWriterName() {
Expand All @@ -65,6 +66,7 @@ public String getWriterName() {

@Override
public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws IOException {
this.writerConfiguration = writerConfiguration;
this.writeMode =
DorisExecutionOptions.WRITE_MODE.valueOf(writerConfiguration.get(DorisWriterOptions.SINK_WRITE_MODE).toUpperCase());
initDorisExecutionOptions(writerConfiguration);
Expand All @@ -86,15 +88,10 @@ public void configure(BitSailConfiguration commonConfiguration, BitSailConfigura
}

@Override
public Writer<InputT, DorisCommittable, DorisWriterState> createWriter(BitSailConfiguration writerConfiguration, Writer.Context context) {
public Writer<InputT, DorisCommittable, DorisWriterState> createWriter(Writer.Context<DorisWriterState> context) {
return new DorisWriter(writerConfiguration, dorisOptions, dorisExecutionOptions);
}

@Override
public Writer<InputT, DorisCommittable, DorisWriterState> restoreWriter(BitSailConfiguration writerConfiguration, List writerStates, Writer.Context context) {
return createWriter(writerConfiguration, null);
}

@Override
public TypeInfoConverter createTypeInfoConverter() {
return new FileMappingTypeInfoConverter(getWriterName());
Expand All @@ -106,13 +103,13 @@ public Optional<WriterCommitter<DorisCommittable>> createCommitter() {
}

@Override
public Optional<BinarySerializer<DorisCommittable>> getCommittableSerializer() {
return Optional.of(new DorisCommittableSerializer());
public BinarySerializer<DorisCommittable> getCommittableSerializer() {
return new DorisCommittableSerializer();
}

@Override
public Optional<BinarySerializer<DorisWriterState>> getWriteStateSerializer() {
return Optional.of(new SimpleBinarySerializer<DorisWriterState>());
public BinarySerializer<DorisWriterState> getWriteStateSerializer() {
return new SimpleBinarySerializer<DorisWriterState>();
}

private void initDorisOptions(BitSailConfiguration writerConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "bitsail-connector-unified-doris",
"classes": [
"com.bytedance.bitsail.connector.doris.sink.DorisSink"
],
"libs": [
"bitsail-connector-doris-${version}.jar"
]
}

This file was deleted.

Loading