diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9f760101-60ae-462f-9ee6-b7a9dafd454d.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9f760101-60ae-462f-9ee6-b7a9dafd454d.json index 06ca1971d456..657a83d19b6d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9f760101-60ae-462f-9ee6-b7a9dafd454d.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/9f760101-60ae-462f-9ee6-b7a9dafd454d.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "9f760101-60ae-462f-9ee6-b7a9dafd454d", "name": "Kafka", "dockerRepository": "airbyte/destination-kafka", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/kafka" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 6f76ea01f376..fb9ae85282a0 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -78,7 +78,7 @@ - destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d name: Kafka dockerRepository: airbyte/destination-kafka - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka - destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89 name: DynamoDB diff --git a/airbyte-integrations/connectors/destination-kafka/Dockerfile b/airbyte-integrations/connectors/destination-kafka/Dockerfile index 4d8b2c0ee5a0..de08194ffbeb 100644 --- a/airbyte-integrations/connectors/destination-kafka/Dockerfile +++ b/airbyte-integrations/connectors/destination-kafka/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/destination-kafka diff --git a/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaDestinationConfig.java b/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaDestinationConfig.java index 2c86ad2fbc51..0f1be9c3c734 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaDestinationConfig.java @@ -55,7 +55,7 @@ private KafkaDestinationConfig(String topicPattern, boolean sync, JsonNode confi public static KafkaDestinationConfig getKafkaDestinationConfig(JsonNode config) { return new KafkaDestinationConfig( config.get("topic_pattern").asText(), - config.has("sync_producer") && config.get("sync_producer").booleanValue(), + config.has("sync_producer") && config.get("sync_producer").asBoolean(), config); } @@ -64,27 +64,27 @@ private KafkaProducer buildKafkaProducer(JsonNode config) { .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()) .putAll(propertiesByProtocol(config)) .put(ProducerConfig.CLIENT_ID_CONFIG, - config.has("client_id") ? config.get("client_id").asText() : null) + config.has("client_id") ? config.get("client_id").asText() : "") .put(ProducerConfig.ACKS_CONFIG, config.get("acks").asText()) - .put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").booleanValue()) + .put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").asBoolean()) .put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get("compression_type").asText()) - .put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").intValue()) - .put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").longValue()) + .put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").asInt()) + .put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").asLong()) .put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, - config.get("max_in_flight_requests_per_connection").intValue()) + config.get("max_in_flight_requests_per_connection").asInt()) .put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()) - .put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").longValue()) - .put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").intValue()) - .put(ProducerConfig.RETRIES_CONFIG, config.get("retries").intValue()) + .put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").asLong()) + .put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").asInt()) + .put(ProducerConfig.RETRIES_CONFIG, config.get("retries").asInt()) .put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, - config.get("socket_connection_setup_timeout_ms").longValue()) + config.get("socket_connection_setup_timeout_ms").asLong()) .put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, - config.get("socket_connection_setup_timeout_max_ms").longValue()) - .put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").longValue()) - .put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").intValue()) - .put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").intValue()) - .put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").intValue()) - .put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").intValue()) + config.get("socket_connection_setup_timeout_max_ms").asLong()) + .put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").asInt()) + .put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").asInt()) + .put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").asInt()) + .put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").asInt()) + .put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").asInt()) .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()) .build(); diff --git a/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json index 0ffa9eefbaec..2eb35a5b22cd 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json @@ -162,19 +162,19 @@ "title": "Batch size", "description": "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.", "type": "integer", - "default": 16384 + "examples": [16384] }, "linger_ms": { "title": "Linger ms", "description": "The producer groups together any records that arrive in between request transmissions into a single batched request.", - "type": "number", - "default": 0 + "type": "string", + "examples": [0] }, "max_in_flight_requests_per_connection": { "title": "Max in flight requests per connection", "description": "The maximum number of unacknowledged requests the client will send on a single connection before blocking.", "type": "integer", - "default": 5 + "examples": [5] }, "client_dns_lookup": { "title": "Client DNS lookup", @@ -191,62 +191,62 @@ "buffer_memory": { "title": "Buffer memory", "description": "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.", - "type": "number", - "default": 33554432 + "type": "string", + "examples": 33554432 }, "max_request_size": { "title": "Max request size", "description": "The maximum size of a request in bytes.", "type": "integer", - "default": 1048576 + "examples": [1048576] }, "retries": { "title": "Retries", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.", "type": "integer", - "default": 2147483647 + "examples": [2147483647] }, "socket_connection_setup_timeout_ms": { "title": "Socket connection setup timeout", "description": "The amount of time the client will wait for the socket connection to be established.", - "type": "number", - "default": 10000 + "type": "string", + "examples": [10000] }, "socket_connection_setup_timeout_max_ms": { "title": "Socket connection setup max timeout", "description": "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.", - "type": "number", - "default": 30000 + "type": "string", + "examples": [30000] }, "max_block_ms": { "title": "Max block ms", "description": "The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block.", - "type": "number", - "default": 60000 + "type": "string", + "examples": [60000] }, "request_timeout_ms": { "title": "Request timeout", "description": "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.", "type": "integer", - "default": 30000 + "examples": [30000] }, "delivery_timeout_ms": { "title": "Delivery timeout", "description": "An upper bound on the time to report success or failure after a call to 'send()' returns.", "type": "integer", - "default": 120000 + "examples": [120000] }, "send_buffer_bytes": { "title": "Send buffer bytes", "description": "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.", "type": "integer", - "default": 131072 + "examples": [131072] }, "receive_buffer_bytes": { "title": "Receive buffer bytes", "description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.", "type": "integer", - "default": 32768 + "examples": [32768] } } } diff --git a/airbyte-integrations/connectors/destination-kafka/src/test-integration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-kafka/src/test-integration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java index 0de8df3703cc..12c46825a9f4 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/test-integration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-kafka/src/test-integration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java @@ -75,15 +75,15 @@ protected JsonNode getConfig() { .put("enable_idempotence", true) .put("compression_type", "none") .put("batch_size", 16384) - .put("linger_ms", 0) + .put("linger_ms", "0") .put("max_in_flight_requests_per_connection", 5) .put("client_dns_lookup", "use_all_dns_ips") - .put("buffer_memory", 33554432) + .put("buffer_memory", "33554432") .put("max_request_size", 1048576) .put("retries", 2147483647) - .put("socket_connection_setup_timeout_ms", 10000) - .put("socket_connection_setup_timeout_max_ms", 30000) - .put("max_block_ms", 60000) + .put("socket_connection_setup_timeout_ms", "10000") + .put("socket_connection_setup_timeout_max_ms", "30000") + .put("max_block_ms", "60000") .put("request_timeout_ms", 30000) .put("delivery_timeout_ms", 120000) .put("send_buffer_bytes", -1) diff --git a/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java index 1898d21ab9f7..22867e782ba2 100644 --- a/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java @@ -122,16 +122,16 @@ private JsonNode getConfig(String topicPattern) { .put("transactional_id", "txn-id") .put("enable_idempotence", true) .put("compression_type", "none") - .put("batch_size", 16384) - .put("linger_ms", 0) - .put("max_in_flight_requests_per_connection", 5) + .put("batch_size", "16384") + .put("linger_ms", "0") + .put("max_in_flight_requests_per_connection", "5") .put("client_dns_lookup", "use_all_dns_ips") .put("buffer_memory", 33554432) .put("max_request_size", 1048576) .put("retries", 1) - .put("socket_connection_setup_timeout_ms", 10) - .put("socket_connection_setup_timeout_max_ms", 30) - .put("max_block_ms", 100) + .put("socket_connection_setup_timeout_ms", "10") + .put("socket_connection_setup_timeout_max_ms", "30") + .put("max_block_ms", "100") .put("request_timeout_ms", 100) .put("delivery_timeout_ms", 120) .put("send_buffer_bytes", -1) diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java index 3b1146ec42da..3c33a93dc4cf 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java @@ -75,9 +75,9 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI /** * Tries to inject keen.timestamp field to the given message data. If the stream contains cursor * field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed - * from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream. - * If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data - * is returned. Otherwise, keen.timestamp is set to emittedAt value + * from timestamp-parsable stream map, so parsing is not tried for future messages in the same + * stream. If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and + * whole data is returned. Otherwise, keen.timestamp is set to emittedAt value * * @param message AirbyteRecordMessage containing record data * @return Record data together with keen.timestamp field diff --git a/docs/integrations/destinations/kafka.md b/docs/integrations/destinations/kafka.md index d9fb1b902c1f..658e884be51c 100644 --- a/docs/integrations/destinations/kafka.md +++ b/docs/integrations/destinations/kafka.md @@ -114,5 +114,6 @@ More info about this can be found in the [Kafka producer configs documentation s | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.2 | 2021-09-14 | [#6040](https://github.com/airbytehq/airbyte/pull/6040) | Change spec.json and config parser | | 0.1.1 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | -| 0.1.0 | 2021-07-21 | [3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release | +| 0.1.0 | 2021-07-21 | [#3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release |