Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Docs and Data Model Update #1021

Merged
merged 4 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "2470e835-feaf-4db6-96f3-70fd645acc77",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce-singer",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-salesforce-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1",
"name": "Google Analytics",
"dockerRepository": "airbyte/source-googleanalytics-singer",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-googleanalytics-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "57eb1576-8f52-463d-beb6-2e107cdf571d",
"name": "Hubspot",
"dockerRepository": "airbyte/source-hubspot-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://https://docs.airbyte.io/integrations/sources/hubspot"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7",
"name": "Google Sheets",
"dockerRepository": "airbyte/source-google-sheets",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "74d47f79-8d01-44ac-9755-f5eb0d7caacb",
"name": "Facebook Marketing APIs",
"dockerRepository": "airbyte/source-facebook-marketing-api-singer",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing-api-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77",
"name": "File",
"dockerRepository": "airbyte/source-file",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-file"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348",
"name": "Marketo",
"dockerRepository": "airbyte/source-marketo-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-marketo-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20",
"name": "exchangeratesapi.io",
"dockerRepository": "airbyte/source-exchangeratesapi-singer",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b03a9f3e-22a5-11eb-adc1-0242ac120002",
"name": "Mailchimp",
"dockerRepository": "airbyte/source-mailchimp",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mailchimp"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b1892b11-788d-44bd-b9ec-3a436f7b54ce",
"name": "Shopify",
"dockerRepository": "airbyte/source-shopify-singer",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-shopify-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e094cb9a-26de-4645-8761-65c0c425d1de",
"name": "Stripe",
"dockerRepository": "airbyte/source-stripe-singer",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-stripe-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e",
"name": "Github",
"dockerRepository": "airbyte/source-github-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-github-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "fdc8b827-3257-4b33-83cc-106d234c34d4",
"name": "Google Adwords",
"dockerRepository": "airbyte/source-google-adwords-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-google-adwords"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.ArrayList;
Expand All @@ -44,10 +45,11 @@ public class AirbyteProtocolConverters {
public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
List<ConfiguredAirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new ConfiguredAirbyteStream()
.withName(s.getName())
.withJsonSchema(toJson(s.getFields())))
.withStream(new AirbyteStream()
.withName(s.getName())
.withJsonSchema(toJson(s.getFields()))))
// perform selection based on the output of toJson, which keeps properties if selected=true
.filter(s -> !s.getJsonSchema().get("properties").isEmpty())
.filter(s -> !s.getStream().getJsonSchema().get("properties").isEmpty())
.collect(Collectors.toList());
return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ class AirbyteProtocolConvertersTest {
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withName(STREAM)
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));
.withStream(
new AirbyteStream()
.withName(STREAM)
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER))))));

private static final Schema SCHEMA = new Schema()
.withStreams(Lists.newArrayList(new Stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,18 @@ class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
supported_sync_modes: Optional[List[SyncMode]] = None
source_defined_cursor: Optional[bool] = Field(
None,
description="If the source defines the cursor field, then it does any other cursor field inputs will be ignored. If it does not either the user_provided one is used or as a backup the default one is used.",
)
default_cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.",
)


class ConfiguredAirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
stream: AirbyteStream
sync_mode: Optional[SyncMode] = "full_refresh"
cursor_field: Optional[List[str]] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,18 @@ def process_node(
def generate_dbt_model(catalog: dict, json_col: str, schema: str) -> Tuple[dict, Set[Union[str]]]:
result = {}
source_tables = set()
for obj in catalog["streams"]:
if "name" in obj:
name = obj["name"]
for configuredStream in catalog["streams"]:
if "stream" in configuredStream:
stream = configuredStream["stream"]
else:
stream = {}

if "name" in stream:
name = stream["name"]
else:
name = "undefined" # todo: should this raise an exception?
if "json_schema" in obj and "properties" in obj["json_schema"]:
properties = obj["json_schema"]["properties"]
if "json_schema" in stream and "properties" in stream["json_schema"]:
properties = stream["json_schema"]["properties"]
else:
properties = {}
# TODO Replace {name}_raw by an argument like we do for the json_blob column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog: ConfiguredAirby
masked_singer_streams = []

stream_to_airbyte_schema = {}
for stream in masked_airbyte_catalog["streams"]:
for configured_stream in masked_airbyte_catalog["streams"]:
stream = configured_stream["stream"]
stream_to_airbyte_schema[stream.get("name")] = stream

for singer_stream in discovered_singer_catalog.get("streams"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ private void runSync(JsonNode config, List<AirbyteMessage> messages, ConfiguredA
pbf, targetConfig.getDestinationConnectionConfiguration());
runner.start();
final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));
if (!runner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(),
targetConfig.getCatalog())) {
if (!runner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) {
throw new WorkerException("Normalization Failed.");
}
runner.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb

// create tmp tables if not exist
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getName());
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli();
final String tableName = NamingHelper.getRawTableName(stream.getStream().getName());
final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli();

createTable(bigquery, datasetId, tmpTableName);
// https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source
Expand All @@ -206,7 +206,7 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb

final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration);

writeConfigs.put(stream.getName(), new WriteConfig(TableId.of(datasetId, tableName), TableId.of(datasetId, tmpTableName), writer));
writeConfigs.put(stream.getStream().getName(), new WriteConfig(TableId.of(datasetId, tableName), TableId.of(datasetId, tmpTableName), writer));
}

// write to tmp tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -228,7 +229,11 @@ void testWriteSuccess() throws Exception {
assertEquals(expectedTasksJson.size(), tasksActual.size());
assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson));

assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList()));
assertTmpTablesNotPresent(CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.collect(Collectors.toList()));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand All @@ -244,8 +249,16 @@ void testWriteFailure() throws Exception {
consumer.accept(MESSAGE_USERS2);
consumer.close();

final List<String> tableNames = CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(toList());
assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList()));
final List<String> tableNames = CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.collect(toList());
assertTmpTablesNotPresent(CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.collect(Collectors.toList()));
// assert that no tables were created.
assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith)));
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final long now = Instant.now().toEpochMilli();
final Map<String, WriteConfig> writeConfigs = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv");
final Path finalPath = destinationDir.resolve(stream.getName() + ".csv");
final Path tmpPath = destinationDir.resolve(stream.getStream().getName() + "_" + now + ".csv");
final Path finalPath = destinationDir.resolve(stream.getStream().getName() + ".csv");
final FileWriter fileWriter = new FileWriter(tmpPath.toFile());
final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA));
writeConfigs.put(stream.getName(), new WriteConfig(printer, tmpPath, finalPath));
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
}

return new CsvConsumer(writeConfigs, catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb

// create tmp tables if not exist
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getName());
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli();
final String tableName = NamingHelper.getRawTableName(stream.getStream().getName());
final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli();
database.query(ctx -> ctx.execute(String.format(
"CREATE TABLE \"%s\" ( \n"
+ "\"ab_id\" VARCHAR PRIMARY KEY,\n"
Expand All @@ -144,8 +144,8 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
tmpTableName, COLUMN_NAME)));

final Path queueRoot = Files.createTempDirectory("queues");
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getName()), stream.getName());
writeBuffers.put(stream.getName(), new WriteConfig(tableName, tmpTableName, writeBuffer));
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getStream().getName()), stream.getStream().getName());
writeBuffers.put(stream.getStream().getName(), new WriteConfig(tableName, tmpTableName, writeBuffer));
}

// write to tmp tables
Expand Down
Loading