Skip to content

Commit

Permalink
Conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong committed Feb 3, 2021
1 parent ef35a00 commit 5d10a77
Showing 1 changed file with 60 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

package io.airbyte.server.converters;

import io.airbyte.api.model.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.AirbyteStreamConfiguration;
import io.airbyte.api.model.AirbyteStreamFieldConfiguration;
import io.airbyte.api.model.SourceSchema;
import io.airbyte.api.model.SourceSchemaField;
import io.airbyte.api.model.SourceSchemaStream;
Expand All @@ -35,51 +34,82 @@
import io.airbyte.config.Field;
import io.airbyte.config.Schema;
import io.airbyte.config.Stream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

// todo (cgardens) - update this before merging.
/**
* Convert classes between io.airbyte.protocol.models and io.airbyte.api.model
*/
public class SchemaConverter {

public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog convertTo(final io.airbyte.api.model.ConfiguredAirbyteCatalog catalog) {
return new io.airbyte.protocol.models.ConfiguredAirbyteCatalog().withStreams(catalog.getStreams()
.stream()
.map(pair -> {
final io.airbyte.protocol.models.AirbyteStream airbyteStream = new io.airbyte.protocol.models.AirbyteStream()
.withName(pair.getStream().getName())
.withJsonSchema(pair.getStream().getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(pair.getStream().getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class))
.withSourceDefinedCursor(pair.getStream().getSourceDefinedCursor())
.withDefaultCursorField(pair.getStream().getDefaultCursorField());
return new io.airbyte.protocol.models.ConfiguredAirbyteStream()
.withStream(airbyteStream)
.withSyncMode(Enums.convertTo(pair.getConfiguration().getSyncMode(), io.airbyte.protocol.models.SyncMode.class))
.withCursorField(pair.getConfiguration().getCursorField());
})
.collect(Collectors.toList()));
public static io.airbyte.api.model.AirbyteStream convertTo(final io.airbyte.protocol.models.AirbyteStream stream) {
return new io.airbyte.api.model.AirbyteStream()
.name(stream.getName())
.jsonSchema(stream.getJsonSchema())
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.api.model.SyncMode.class))
.sourceDefinedCursor(stream.getSourceDefinedCursor())
.defaultCursorField(stream.getDefaultCursorField());
}

public static io.airbyte.protocol.models.AirbyteStream convertTo(final io.airbyte.api.model.AirbyteStream stream) {
return new io.airbyte.protocol.models.AirbyteStream()
.withName(stream.getName())
.withJsonSchema(stream.getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class))
.withSourceDefinedCursor(stream.getSourceDefinedCursor())
.withDefaultCursorField(stream.getDefaultCursorField());
}

public static io.airbyte.api.model.AirbyteCatalog convertTo(final io.airbyte.protocol.models.AirbyteCatalog catalog) {
return new io.airbyte.api.model.AirbyteCatalog()
.streams(catalog.getStreams()
.stream()
.map(SchemaConverter::convertTo)
.collect(Collectors.toList()));
}

public static io.airbyte.protocol.models.AirbyteCatalog convertTo(final io.airbyte.api.model.AirbyteCatalog catalog) {
return new io.airbyte.protocol.models.AirbyteCatalog()
.withStreams(catalog.getStreams()
.stream()
.map(SchemaConverter::convertTo)
.collect(Collectors.toList()));
}

public static io.airbyte.api.model.ConfiguredAirbyteCatalog convertTo(final io.airbyte.protocol.models.ConfiguredAirbyteCatalog catalog) {
final List<AirbyteStreamAndConfiguration> persistenceStreams = catalog.getStreams()
final List<io.airbyte.api.model.AirbyteStreamAndConfiguration> persistenceStreams = catalog.getStreams()
.stream()
.map(configuredStream -> {
final io.airbyte.api.model.AirbyteStream stream = new io.airbyte.api.model.AirbyteStream()
.name(configuredStream.getStream().getName())
.jsonSchema(configuredStream.getStream().getJsonSchema())
.supportedSyncModes(Enums.convertListTo(configuredStream.getStream().getSupportedSyncModes(), io.airbyte.api.model.SyncMode.class))
.sourceDefinedCursor(configuredStream.getStream().getSourceDefinedCursor())
.defaultCursorField(configuredStream.getStream().getDefaultCursorField());
final AirbyteStreamConfiguration configuration = new AirbyteStreamConfiguration()
final List<AirbyteStreamFieldConfiguration> fields = new ArrayList<>();
// TODO extract fields
final io.airbyte.api.model.AirbyteStreamConfiguration configuration = new io.airbyte.api.model.AirbyteStreamConfiguration()
.syncMode(Enums.convertTo(configuredStream.getSyncMode(), io.airbyte.api.model.SyncMode.class))
.cursorField(configuredStream.getCursorField());
return new AirbyteStreamAndConfiguration()
.stream(stream)
.cursorField(configuredStream.getCursorField())
// TODO cleanedName?
.selected(true)
.fields(fields);
return new io.airbyte.api.model.AirbyteStreamAndConfiguration()
.stream(convertTo(configuredStream.getStream()))
._configuration(configuration);
})
.collect(Collectors.toList());
return new io.airbyte.api.model.ConfiguredAirbyteCatalog().streams(persistenceStreams);
}

public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog convertTo(final io.airbyte.api.model.ConfiguredAirbyteCatalog catalog) {
return new io.airbyte.protocol.models.ConfiguredAirbyteCatalog().withStreams(catalog.getStreams()
.stream()
.map(pair -> new io.airbyte.protocol.models.ConfiguredAirbyteStream()
.withStream(convertTo(pair.getStream()))
.withSyncMode(Enums.convertTo(pair.getConfiguration().getSyncMode(), io.airbyte.protocol.models.SyncMode.class))
.withCursorField(pair.getConfiguration().getCursorField()))
// TODO filter selected?
.collect(Collectors.toList()));
}

// TODO DEPRECATE BELOW

public static Schema toPersistenceSchema(SourceSchema sourceSchema) {
final List<Stream> persistenceStreams =
sourceSchema.getStreams()
Expand Down

0 comments on commit 5d10a77

Please sign in to comment.