diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/RecordSchemaValidator.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/RecordSchemaValidator.java index a4364b0c08488..0232234de3ddc 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/RecordSchemaValidator.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/RecordSchemaValidator.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.exception.RecordSchemaValidationException; @@ -22,9 +23,9 @@ public class RecordSchemaValidator { - private final Map streams; + private final Map streams; - public RecordSchemaValidator(final Map streamNamesToSchemas) { + public RecordSchemaValidator(final Map streamNamesToSchemas) { // streams is Map of a stream source namespace + name mapped to the stream schema // for easy access when we check each record's schema this.streams = streamNamesToSchemas; @@ -37,7 +38,8 @@ public RecordSchemaValidator(final Map streamNamesToSchemas) { * @param message * @throws RecordSchemaValidationException */ - public void validateSchema(final AirbyteRecordMessage message, final String messageStream) throws RecordSchemaValidationException { + public void validateSchema(final AirbyteRecordMessage message, final AirbyteStreamNameNamespacePair messageStream) + throws RecordSchemaValidationException { final JsonNode messageData = message.getData(); final JsonNode matchingSchema = streams.get(messageStream); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java index 44a1ae1073247..072bf6a08397b 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java @@ -7,6 +7,7 @@ import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.OssMetricsRegistry; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; public class WorkerMetricReporter { @@ -21,9 +22,9 @@ public WorkerMetricReporter(final MetricClient metricClient, final String docker this.metricClient = metricClient; } - public void trackSchemaValidationError(final String stream) { + public void trackSchemaValidationError(final AirbyteStreamNameNamespacePair stream) { metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo), - new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream)); + new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream.toString())); } public void trackStateMetricTrackerError() { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 81da1aab53ec6..b704ade5f6a43 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -13,6 +13,7 @@ import io.airbyte.config.WorkerSourceConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; @@ -22,11 +23,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,18 +131,12 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out throw new WorkerException(defaultErrorMessage); } - public static Map mapStreamNamesToSchemas(final StandardSyncInput syncInput) { + public static Map mapStreamNamesToSchemas(final StandardSyncInput syncInput) { return syncInput.getCatalog().getStreams().stream().collect( Collectors.toMap( - k -> { - return streamNameWithNamespace(k.getStream().getNamespace(), k.getStream().getName()); - }, + k -> AirbyteStreamNameNamespacePair.fromAirbyteStream(k.getStream()), v -> v.getStream().getJsonSchema())); } - public static String streamNameWithNamespace(final @Nullable String namespace, final String streamName) { - return Objects.toString(namespace, "").trim() + streamName.trim(); - } - } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 35aae3d1beeec..f27a8de58c113 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -27,6 +27,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.WorkerUtils; @@ -287,7 +288,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); Long recordsRead = 0L; - final Map, Integer>> validationErrors = new HashMap<>(); + final Map, Integer>> validationErrors = new HashMap<>(); try { while (!cancelled.get() && !source.isFinished()) { final Optional messageOptional; @@ -516,14 +517,14 @@ private List getFailureReasons(AtomicReference rep } private static void validateSchema(final RecordSchemaValidator recordSchemaValidator, - final Map, Integer>> validationErrors, + final Map, Integer>> validationErrors, final AirbyteMessage message) { if (message.getRecord() == null) { return; } final AirbyteRecordMessage record = message.getRecord(); - final String messageStream = WorkerUtils.streamNameWithNamespace(record.getNamespace(), record.getStream()); + final AirbyteStreamNameNamespacePair messageStream = AirbyteStreamNameNamespacePair.fromRecordMessage(record); // avoid noise by validating only if the stream has less than 10 records with validation errors final boolean streamHasLessThenTenErrs = validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/RecordSchemaValidatorTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/RecordSchemaValidatorTest.java index 3412b5573e0f4..7bc4920d407ab 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/RecordSchemaValidatorTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/RecordSchemaValidatorTest.java @@ -9,6 +9,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.workers.exception.RecordSchemaValidationException; import io.airbyte.workers.test_utils.AirbyteMessageUtils; import io.airbyte.workers.test_utils.TestConfigHelpers; @@ -34,13 +35,14 @@ void setup() throws Exception { @Test void testValidateValidSchema() throws Exception { final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)); - recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), STREAM_NAME); + recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), AirbyteStreamNameNamespacePair.fromRecordMessage(VALID_RECORD.getRecord())); } @Test void testValidateInvalidSchema() throws Exception { final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)); - assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(), STREAM_NAME)); + assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(INVALID_RECORD.getRecord()))); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/WorkerUtilsTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/WorkerUtilsTest.java index b0f51c46f9620..6f820d921eb71 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/WorkerUtilsTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/WorkerUtilsTest.java @@ -16,6 +16,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.workers.internal.HeartbeatMonitor; import io.airbyte.workers.test_utils.TestConfigHelpers; import java.time.Duration; @@ -128,17 +129,17 @@ void testProcessDies() { void testMapStreamNamesToSchemasWithNullNamespace() { final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(); final StandardSyncInput syncInput = syncPair.getValue(); - final Map mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput); - assertNotNull(mapOutput.get("user_preferences")); + final Map mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput); + assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", null))); } @Test void testMapStreamNamesToSchemasWithMultipleNamespaces() { final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(true); final StandardSyncInput syncInput = syncPair.getValue(); - final Map mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput); - assertNotNull(mapOutput.get("namespaceuser_preferences")); - assertNotNull(mapOutput.get("namespace2user_preferences")); + final Map mapOutput = WorkerUtils.mapStreamNamesToSchemas(syncInput); + assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace"))); + assertNotNull(mapOutput.get(new AirbyteStreamNameNamespacePair("user_preferences", "namespace2"))); } /** diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index f6a570b46fbc7..c40ce959f2822 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -43,6 +43,7 @@ import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.protocol.models.AirbyteLogMessage.Level; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.workers.*; @@ -160,8 +161,10 @@ void test() throws Exception { verify(destination).accept(RECORD_MESSAGE2); verify(source, atLeastOnce()).close(); verify(destination).close(); - verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME); - verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME); + verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord())); + verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord())); } @Test @@ -185,9 +188,12 @@ void testInvalidSchema() throws Exception { verify(destination).accept(RECORD_MESSAGE1); verify(destination).accept(RECORD_MESSAGE2); verify(destination).accept(RECORD_MESSAGE3); - verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), STREAM_NAME); - verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), STREAM_NAME); - verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), STREAM_NAME); + verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE1.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE1.getRecord())); + verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE2.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE2.getRecord())); + verify(recordSchemaValidator).validateSchema(RECORD_MESSAGE3.getRecord(), + AirbyteStreamNameNamespacePair.fromRecordMessage(RECORD_MESSAGE3.getRecord())); verify(source).close(); verify(destination).close(); } diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java index 4586ce7389ef0..3ec3e81e09731 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java @@ -9,7 +9,6 @@ import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; @@ -18,6 +17,7 @@ import io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java index 3d1552675026f..07221f113b1b0 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/SerializedBufferFactory.java @@ -6,7 +6,6 @@ import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer; @@ -16,6 +15,7 @@ import io.airbyte.integrations.destination.s3.jsonl.JsonLSerializedBuffer; import io.airbyte.integrations.destination.s3.jsonl.S3JsonlFormatConfig; import io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.concurrent.Callable; import java.util.function.Function; diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java index b925df4b956a8..b410716f9688b 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBuffer.java @@ -5,11 +5,11 @@ package io.airbyte.integrations.destination.s3.avro; import io.airbyte.commons.functional.CheckedBiFunction; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.IOException; import java.io.OutputStream; diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java index 2a63471e2371c..3fe0696945167 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBuffer.java @@ -5,12 +5,12 @@ package io.airbyte.integrations.destination.s3.csv; import io.airbyte.commons.functional.CheckedBiFunction; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.util.CompressionType; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.IOException; import java.io.OutputStream; diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java index 1f0a177d1982e..ed2ce65893c26 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBuffer.java @@ -9,7 +9,6 @@ import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer; import io.airbyte.integrations.destination.record_buffer.BufferStorage; @@ -17,6 +16,7 @@ import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.util.CompressionType; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.OutputStream; import java.io.PrintWriter; diff --git a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBuffer.java b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBuffer.java index 4f56ed61ce13f..be466088cdd66 100644 --- a/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBuffer.java +++ b/airbyte-integrations/bases/base-java-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBuffer.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.s3.parquet; import io.airbyte.commons.functional.CheckedBiFunction; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.S3DestinationConfig; @@ -13,6 +12,7 @@ import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.File; import java.io.FileInputStream; diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java index 77ea3af80dbe6..84a1408caae45 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java @@ -10,11 +10,11 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBufferTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBufferTest.java index 34e096710976c..697bc245eb106 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBufferTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/CsvSerializedBufferTest.java @@ -10,13 +10,13 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBufferTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBufferTest.java index 3d7b1551aed75..b66b53a4dd5f3 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBufferTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/jsonl/JsonLSerializedBufferTest.java @@ -10,11 +10,11 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; diff --git a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java index fdc1cb4eb545e..e43bef35f5cc4 100644 --- a/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java +++ b/airbyte-integrations/bases/base-java-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java @@ -11,10 +11,10 @@ import com.amazonaws.util.IOUtils; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index e5ce77e303e3d..dc49330633036 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -11,7 +11,6 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DefaultDestStateLifecycleManager; import io.airbyte.integrations.destination.dest_state_lifecycle_manager.DestStateLifecycleManager; @@ -19,6 +18,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.HashMap; import java.util.Map; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/CheckAndRemoveRecordWriter.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/CheckAndRemoveRecordWriter.java index 1c988ed33086c..e399d2c01cd8f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/CheckAndRemoveRecordWriter.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/CheckAndRemoveRecordWriter.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.buffered_stream_consumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; @FunctionalInterface public interface CheckAndRemoveRecordWriter { diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordWriter.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordWriter.java index 247733f96cef9..f8296dc65ff4e 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordWriter.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/RecordWriter.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.buffered_stream_consumer; import io.airbyte.commons.functional.CheckedBiConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import java.util.List; public interface RecordWriter extends CheckedBiConsumer, Exception> { diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java index b638906666282..6483c0f9867b6 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/BufferingStrategy.java @@ -4,8 +4,8 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; /** * High-level interface used by diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java index 70ca30da88b18..ebb4151ebf56b 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.java @@ -4,12 +4,12 @@ package io.airbyte.integrations.destination.record_buffer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter; import io.airbyte.integrations.destination.buffered_stream_consumer.RecordSizeEstimator; import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java index 71f9ddde36e92..9ab420a23126f 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java @@ -7,8 +7,8 @@ import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.ArrayList; import java.util.HashMap; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 58ab5fa56dc1d..0a2561e94086e 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -20,12 +20,12 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.Field; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java index 330b3c998e11c..3a611c3c35f6e 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java @@ -12,10 +12,10 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import java.util.List; import org.junit.jupiter.api.Test; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java index 2de320114ebed..37497324c0023 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategyTest.java @@ -17,9 +17,9 @@ import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 3305d960cc8cd..6421810cabbd8 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -9,7 +9,7 @@ import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter; import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; @@ -85,7 +85,7 @@ protected void startTracked() throws Exception { final AirbyteStream stream = configuredStream.getStream(); final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair - .fromAirbyteSteam(stream); + .fromAirbyteStream(stream); streamNameAndNamespaceToWriters.put(streamNamePair, writer); } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index 7f0b45eb9e0c3..7fd2234299595 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -9,7 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Table; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; @@ -106,7 +106,7 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream, AbstractBigQueryUploader uploader = BigQueryUploaderFactory.getUploader(uploaderConfig); uploaderMap.put( - AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), uploader); } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 5b36c7bc1b319..b87f9708bbe51 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -28,7 +28,7 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java index 817b40c2768df..9c6887f13db7b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAvroSerializedBuffer.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.functional.CheckedBiFunction; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.record_buffer.BufferStorage; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index e47b39e7dceec..cbac5ab2cdda7 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -20,7 +20,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.StandardNameTransformer; @@ -237,7 +237,7 @@ protected void putStreamIntoUploaderMap(final AirbyteStream stream, final Map> uploaderMap) throws IOException { uploaderMap.put( - AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), BigQueryUploaderFactory.getUploader(uploaderConfig)); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index 92fd666eb6452..2a2508b5d3fe8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -6,7 +6,7 @@ import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; import io.airbyte.protocol.models.AirbyteMessage; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 3c61de7e35e10..0dee0e9bdb666 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -13,7 +13,7 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.record_buffer.SerializableBuffer; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java index 50fb51e4cf755..8c10b2ec39c29 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.bigquery; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; diff --git a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java index 023110d00cab1..30c5eb3b9d4ef 100644 --- a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.cassandra; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java index babd12b13af30..bf9d301cfdda9 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java @@ -12,7 +12,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.*; import java.util.*; @@ -70,7 +70,7 @@ protected void startTracked() throws Exception { final AirbyteStream stream = configuredStream.getStream(); final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair - .fromAirbyteSteam(stream); + .fromAirbyteStream(stream); streamNameAndNamespaceToWriters.put(streamNamePair, writer); } } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java index abc2cd57fbe28..9342ca91a942d 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.e2e_test.logging; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.time.Instant; import java.time.OffsetDateTime; diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java index bd021cc61ea87..7347955db2356 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.e2e_test.logging; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteRecordMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java index ef8fb8806a77c..b482b1107003b 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.e2e_test.logging; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteRecordMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java index 0b62220acdcd7..deb15cc97df13 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java @@ -6,7 +6,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -41,7 +41,7 @@ public LoggingConsumer(final TestingLoggerFactory loggerFactory, public void start() { for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { final AirbyteStream stream = configuredStream.getStream(); - final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); + final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteStream(stream); final TestingLogger logger = loggerFactory.create(streamNamePair); loggers.put(streamNamePair, logger); } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java index 4fa159d6f32a7..aab4df89c090b 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.e2e_test.logging; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.util.Random; import org.slf4j.Logger; diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java index f0a6969aa88f2..e7beb522f732e 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.e2e_test.logging; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.e2e_test.logging.TestingLogger.LoggingType; public class TestingLoggerFactory { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 54f9348b9d148..18507caf79db1 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -12,7 +12,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 6d8783ec12bfd..d444d28e2b899 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -9,7 +9,7 @@ import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.CheckAndRemoveRecordWriter; @@ -77,7 +77,7 @@ private static Map createWrite final String stagingFolder = UUID.randomUUID().toString(); for (final var configuredStream : catalog.getStreams()) { final var stream = configuredStream.getStream(); - final var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); + final var pair = AirbyteStreamNameNamespacePair.fromAirbyteStream(stream); final var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations); pairToCopier.put(pair, copier); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index 39f1c4dc42c0d..5c4291b953059 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -11,7 +11,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; diff --git a/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java b/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java index 005b558ace6d1..e2f4ab914724d 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteMessage; @@ -80,7 +80,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) { Map buildTopicMap() { return catalog.getStreams().stream() - .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream())) + .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())) .collect(Collectors.toMap(Function.identity(), pair -> nameTransformer.getIdentifier(topicPattern .replaceAll("\\{namespace}", Optional.ofNullable(pair.getNamespace()).orElse("")) diff --git a/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java index f06345317bff8..b18acf638ea7b 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java @@ -13,7 +13,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; diff --git a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java index 2bd0360f5a43b..f7d1336d783a1 100644 --- a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.kinesis; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java index 9f2442ae86704..a00c2bb9f26e4 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java @@ -22,7 +22,7 @@ import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; @@ -124,7 +124,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, } } - writeConfigs.put(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + writeConfigs.put(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), new MongodbWriteConfig(collectionName, tmpCollectionName, configStream.getDestinationSyncMode(), collection, documentsHash)); } return new MongodbRecordConsumer(writeConfigs, database, catalog, outputRecordCollector); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumer.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumer.java index 77749890fa4a9..f1bcfa43e4e7b 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumer.java @@ -13,7 +13,7 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumerTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumerTest.java index 5a334c6476e90..c3f0f513242c1 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test/java/io/airbyte/integrations/destination/mongodb/MongodbRecordConsumerTest.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.mongodb; import io.airbyte.db.mongodb.MongoDatabase; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; import io.airbyte.protocol.models.AirbyteMessage; diff --git a/airbyte-integrations/connectors/destination-mqtt/src/main/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumer.java b/airbyte-integrations/connectors/destination-mqtt/src/main/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumer.java index abbabf34fd7a3..2431556baa2ee 100644 --- a/airbyte-integrations/connectors/destination-mqtt/src/main/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-mqtt/src/main/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumer.java @@ -9,7 +9,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -99,7 +99,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) { Map buildTopicMap() { return catalog.getStreams().stream() - .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream())) + .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())) .collect(Collectors.toMap(Function.identity(), pair -> config.getTopicPattern() .replaceAll("\\{namespace}", Optional.ofNullable(pair.getNamespace()).orElse("")) .replaceAll("\\{stream}", Optional.ofNullable(pair.getName()).orElse("")), diff --git a/airbyte-integrations/connectors/destination-mqtt/src/test/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumerTest.java b/airbyte-integrations/connectors/destination-mqtt/src/test/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumerTest.java index 9f3c2d93bd1b6..b98cb44b70585 100644 --- a/airbyte-integrations/connectors/destination-mqtt/src/test/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-mqtt/src/test/java/io/airbyte/integrations/destination/mqtt/MqttRecordConsumerTest.java @@ -14,7 +14,7 @@ import com.google.common.collect.Sets; import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; diff --git a/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java b/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java index 38f78ce0a7e39..d026805558969 100644 --- a/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java +++ b/airbyte-integrations/connectors/destination-pubsub/src/main/java/io/airbyte/integrations/destination/pubsub/PubsubConsumer.java @@ -16,7 +16,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.protocol.models.AirbyteMessage; @@ -68,7 +68,7 @@ protected void startTracked() throws Exception { .setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final Map attrs = Maps.newHashMap(); - final var key = AirbyteStreamNameNamespacePair.fromAirbyteSteam(configStream.getStream()); + final var key = AirbyteStreamNameNamespacePair.fromAirbyteStream(configStream.getStream()); attrs.put(PubsubDestination.STREAM, key.getName()); if (!Strings.isNullOrEmpty(key.getNamespace())) { attrs.put(PubsubDestination.NAMESPACE, key.getNamespace()); diff --git a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java index 143eeb3c74d8d..0e7852b4a3df7 100644 --- a/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-pubsub/src/test-integration/java/io/airbyte/integrations/destination/pubsub/PubsubDestinationAcceptanceTest.java @@ -35,7 +35,7 @@ import com.google.pubsub.v1.TopicName; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; diff --git a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java index fde2db986903c..6ba1c1826ee9f 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.pulsar; import io.airbyte.commons.lang.Exceptions; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteMessage; @@ -80,7 +80,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) { Map> buildProducerMap() { return catalog.getStreams().stream() - .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream())) + .map(stream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())) .collect(Collectors.toMap(Function.identity(), pair -> { String topic = nameTransformer.getIdentifier(config.getTopicPattern() .replaceAll("\\{namespace}", Optional.ofNullable(pair.getNamespace()).orElse("")) diff --git a/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java b/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java index 7542922b8bb79..348b182bc80e2 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java @@ -14,7 +14,7 @@ import com.google.common.collect.Streams; import com.google.common.net.InetAddresses; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; diff --git a/airbyte-integrations/connectors/destination-redis/src/main/java/io/airbyte/integrations/destination/redis/RedisMessageConsumer.java b/airbyte-integrations/connectors/destination-redis/src/main/java/io/airbyte/integrations/destination/redis/RedisMessageConsumer.java index 6fa8403af831f..86d04b224f5dd 100644 --- a/airbyte-integrations/connectors/destination-redis/src/main/java/io/airbyte/integrations/destination/redis/RedisMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-redis/src/main/java/io/airbyte/integrations/destination/redis/RedisMessageConsumer.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java index a03053e8a3dd0..e0380e9e7b0f4 100644 --- a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.destination.scylla; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 1fb65944d474e..c8bbb3d090c32 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -40,7 +40,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.StreamingJdbcDatabase; import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; import io.airbyte.integrations.source.relationaldb.AbstractDbSource; @@ -617,7 +617,7 @@ protected List identifyStreamsToSnapshot(final Configur final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); return catalog.getStreams().stream() - .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream.getStream()))) + .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) .map(Jsons::clone) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcCatalogHelper.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcCatalogHelper.java index c7b67e28c9d48..4bd85230a7d44 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcCatalogHelper.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcCatalogHelper.java @@ -10,7 +10,7 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.SyncMode; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index f20bab26ff498..913f5c1530a91 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -28,7 +28,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshWrappedSource; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java index e3c44bca9e63f..90c64b9cc9423 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresCdcGetPublicizedTablesTest.java @@ -18,7 +18,7 @@ import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import java.util.List; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 62c79dc47bedc..17f702f33fe58 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -24,7 +24,7 @@ import io.airbyte.db.IncrementalUtils; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.BaseConnector; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CdcStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CdcStateManager.java index 1e898a7061043..ea0058247c585 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CdcStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/CdcStateManager.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.source.relationaldb; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.models.CdcState; import java.util.Collections; import java.util.Set; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 738f7eba31930..c43c1b20f09c9 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -6,7 +6,7 @@ import com.google.common.collect.AbstractIterator; import io.airbyte.db.IncrementalUtils; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java index df8d1200b5b5e..1197c866d0a80 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/AbstractStateManager.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.source.relationaldb.state; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java index ca5c0504d9e9b..19a5989d78eb5 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.source.relationaldb.state; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -93,7 +93,7 @@ protected Map createCursorInfoMap( final Set allStreamNames = catalog.getStreams() .stream() .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStreamNameNamespacePair::fromAirbyteSteam) + .map(AirbyteStreamNameNamespacePair::fromAirbyteStream) .collect(Collectors.toSet()); allStreamNames.addAll(streamSupplier.get().stream().map(namespacePairFunction).filter(Objects::nonNull).collect(Collectors.toSet())); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java index ee170e5d518c8..2efdc18ba475d 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java @@ -10,7 +10,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; import io.airbyte.integrations.source.relationaldb.models.CdcState; import io.airbyte.integrations.source.relationaldb.models.DbState; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java index a1e147e76d055..7e73f9855cf10 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.source.relationaldb.state; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java index 130a520e98b23..cca021fd66c5a 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -6,7 +6,7 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java index 3039758f97465..09e8305fe8cc6 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManager.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.source.relationaldb.state; import com.google.common.base.Preconditions; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.protocol.models.AirbyteStateMessage; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java index 2d1cd66673d18..593c07ed335dd 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManager.java @@ -10,7 +10,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.protocol.models.AirbyteStateMessage; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 088f46fdfd5be..c94b83928cd05 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -13,7 +13,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java index 5f85d99be4d8e..eec2d4b7e1efd 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/CursorManagerTest.java @@ -15,7 +15,7 @@ import static io.airbyte.integrations.source.relationaldb.state.StateTestConstants.getStream; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import java.util.Collections; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java index d342347fbc0ef..48442e356bb9b 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManagerTest.java @@ -17,7 +17,7 @@ import static org.mockito.Mockito.mock; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.models.CdcState; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java index 1e6ac72d25b3f..2df9760c8ea4d 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateTestConstants.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.source.relationaldb.state; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java index e2733bfbbb924..f37b415675247 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StreamStateManagerTest.java @@ -20,7 +20,7 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteStateMessage; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteStreamNameNamespacePair.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/AirbyteStreamNameNamespacePair.java similarity index 78% rename from airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteStreamNameNamespacePair.java rename to airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/AirbyteStreamNameNamespacePair.java index d733efbed0e37..123397533bc99 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteStreamNameNamespacePair.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/AirbyteStreamNameNamespacePair.java @@ -2,12 +2,8 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.base; +package io.airbyte.protocol.models; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -35,12 +31,12 @@ public String getNamespace() { return namespace; } + /** + * As this is used as a metrics tag, enforce snake case. + */ @Override public String toString() { - return "AirbyteStreamNameNamespacePair{" + - "name='" + name + '\'' + - ", namespace='" + namespace + '\'' + - '}'; + return (namespace != null ? namespace : "") + "_" + name; } @Override @@ -85,27 +81,23 @@ public int compareTo(final AirbyteStreamNameNamespacePair o) { return namespace.compareTo(o.getNamespace()); } - public static void main(final String[] args) { - System.out.println("test".compareTo(null)); - } - public static AirbyteStreamNameNamespacePair fromRecordMessage(final AirbyteRecordMessage msg) { return new AirbyteStreamNameNamespacePair(msg.getStream(), msg.getNamespace()); } - public static AirbyteStreamNameNamespacePair fromAirbyteSteam(final AirbyteStream stream) { + public static AirbyteStreamNameNamespacePair fromAirbyteStream(final AirbyteStream stream) { return new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace()); } public static AirbyteStreamNameNamespacePair fromConfiguredAirbyteSteam(final ConfiguredAirbyteStream stream) { - return fromAirbyteSteam(stream.getStream()); + return fromAirbyteStream(stream.getStream()); } public static Set fromConfiguredCatalog(final ConfiguredAirbyteCatalog catalog) { final var pairs = new HashSet(); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - final var pair = fromAirbyteSteam(stream.getStream()); + final var pair = fromAirbyteStream(stream.getStream()); pairs.add(pair); }