Skip to content

Commit

Permalink
Destination Kafka: correct spec json and data types in config (#6040)
Browse files Browse the repository at this point in the history
* correct spec json and data types in config

* bump version

* correct tests

* correct config parser NPE

* format files

Co-authored-by: Marcos Marx <marcosmarx@MacBook-Pro-de-Marcos.local>
  • Loading branch information
marcosmarxm and Marcos Marx authored Sep 15, 2021
1 parent b596194 commit 9dafec6
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "9f760101-60ae-462f-9ee6-b7a9dafd454d",
"name": "Kafka",
"dockerRepository": "airbyte/destination-kafka",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/kafka"
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
dockerRepository: airbyte/destination-kafka
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka
- destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89
name: DynamoDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-kafka
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private KafkaDestinationConfig(String topicPattern, boolean sync, JsonNode confi
public static KafkaDestinationConfig getKafkaDestinationConfig(JsonNode config) {
return new KafkaDestinationConfig(
config.get("topic_pattern").asText(),
config.has("sync_producer") && config.get("sync_producer").booleanValue(),
config.has("sync_producer") && config.get("sync_producer").asBoolean(),
config);
}

Expand All @@ -64,27 +64,27 @@ private KafkaProducer<String, JsonNode> buildKafkaProducer(JsonNode config) {
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText())
.putAll(propertiesByProtocol(config))
.put(ProducerConfig.CLIENT_ID_CONFIG,
config.has("client_id") ? config.get("client_id").asText() : null)
config.has("client_id") ? config.get("client_id").asText() : "")
.put(ProducerConfig.ACKS_CONFIG, config.get("acks").asText())
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").booleanValue())
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").asBoolean())
.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get("compression_type").asText())
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").intValue())
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").longValue())
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").asInt())
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").asLong())
.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
config.get("max_in_flight_requests_per_connection").intValue())
config.get("max_in_flight_requests_per_connection").asInt())
.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText())
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").longValue())
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").intValue())
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").intValue())
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").asLong())
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").asInt())
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").asInt())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
config.get("socket_connection_setup_timeout_ms").longValue())
config.get("socket_connection_setup_timeout_ms").asLong())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
config.get("socket_connection_setup_timeout_max_ms").longValue())
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").longValue())
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").intValue())
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").intValue())
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").intValue())
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").intValue())
config.get("socket_connection_setup_timeout_max_ms").asLong())
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").asInt())
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").asInt())
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").asInt())
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").asInt())
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").asInt())
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,19 @@
"title": "Batch size",
"description": "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.",
"type": "integer",
"default": 16384
"examples": [16384]
},
"linger_ms": {
"title": "Linger ms",
"description": "The producer groups together any records that arrive in between request transmissions into a single batched request.",
"type": "number",
"default": 0
"type": "string",
"examples": [0]
},
"max_in_flight_requests_per_connection": {
"title": "Max in flight requests per connection",
"description": "The maximum number of unacknowledged requests the client will send on a single connection before blocking.",
"type": "integer",
"default": 5
"examples": [5]
},
"client_dns_lookup": {
"title": "Client DNS lookup",
Expand All @@ -191,62 +191,62 @@
"buffer_memory": {
"title": "Buffer memory",
"description": "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.",
"type": "number",
"default": 33554432
"type": "string",
"examples": 33554432
},
"max_request_size": {
"title": "Max request size",
"description": "The maximum size of a request in bytes.",
"type": "integer",
"default": 1048576
"examples": [1048576]
},
"retries": {
"title": "Retries",
"description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.",
"type": "integer",
"default": 2147483647
"examples": [2147483647]
},
"socket_connection_setup_timeout_ms": {
"title": "Socket connection setup timeout",
"description": "The amount of time the client will wait for the socket connection to be established.",
"type": "number",
"default": 10000
"type": "string",
"examples": [10000]
},
"socket_connection_setup_timeout_max_ms": {
"title": "Socket connection setup max timeout",
"description": "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.",
"type": "number",
"default": 30000
"type": "string",
"examples": [30000]
},
"max_block_ms": {
"title": "Max block ms",
"description": "The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block.",
"type": "number",
"default": 60000
"type": "string",
"examples": [60000]
},
"request_timeout_ms": {
"title": "Request timeout",
"description": "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.",
"type": "integer",
"default": 30000
"examples": [30000]
},
"delivery_timeout_ms": {
"title": "Delivery timeout",
"description": "An upper bound on the time to report success or failure after a call to 'send()' returns.",
"type": "integer",
"default": 120000
"examples": [120000]
},
"send_buffer_bytes": {
"title": "Send buffer bytes",
"description": "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.",
"type": "integer",
"default": 131072
"examples": [131072]
},
"receive_buffer_bytes": {
"title": "Receive buffer bytes",
"description": "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.",
"type": "integer",
"default": 32768
"examples": [32768]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ protected JsonNode getConfig() {
.put("enable_idempotence", true)
.put("compression_type", "none")
.put("batch_size", 16384)
.put("linger_ms", 0)
.put("linger_ms", "0")
.put("max_in_flight_requests_per_connection", 5)
.put("client_dns_lookup", "use_all_dns_ips")
.put("buffer_memory", 33554432)
.put("buffer_memory", "33554432")
.put("max_request_size", 1048576)
.put("retries", 2147483647)
.put("socket_connection_setup_timeout_ms", 10000)
.put("socket_connection_setup_timeout_max_ms", 30000)
.put("max_block_ms", 60000)
.put("socket_connection_setup_timeout_ms", "10000")
.put("socket_connection_setup_timeout_max_ms", "30000")
.put("max_block_ms", "60000")
.put("request_timeout_ms", 30000)
.put("delivery_timeout_ms", 120000)
.put("send_buffer_bytes", -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ private JsonNode getConfig(String topicPattern) {
.put("transactional_id", "txn-id")
.put("enable_idempotence", true)
.put("compression_type", "none")
.put("batch_size", 16384)
.put("linger_ms", 0)
.put("max_in_flight_requests_per_connection", 5)
.put("batch_size", "16384")
.put("linger_ms", "0")
.put("max_in_flight_requests_per_connection", "5")
.put("client_dns_lookup", "use_all_dns_ips")
.put("buffer_memory", 33554432)
.put("max_request_size", 1048576)
.put("retries", 1)
.put("socket_connection_setup_timeout_ms", 10)
.put("socket_connection_setup_timeout_max_ms", 30)
.put("max_block_ms", 100)
.put("socket_connection_setup_timeout_ms", "10")
.put("socket_connection_setup_timeout_max_ms", "30")
.put("max_block_ms", "100")
.put("request_timeout_ms", 100)
.put("delivery_timeout_ms", 120)
.put("send_buffer_bytes", -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI
/**
* Tries to inject keen.timestamp field to the given message data. If the stream contains cursor
* field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream.
* If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data
* is returned. Otherwise, keen.timestamp is set to emittedAt value
* from timestamp-parsable stream map, so parsing is not tried for future messages in the same
* stream. If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and
* whole data is returned. Otherwise, keen.timestamp is set to emittedAt value
*
* @param message AirbyteRecordMessage containing record data
* @return Record data together with keen.timestamp field
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,6 @@ More info about this can be found in the [Kafka producer configs documentation s

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.2 | 2021-09-14 | [#6040](https://github.com/airbytehq/airbyte/pull/6040) | Change spec.json and config parser |
| 0.1.1 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.0 | 2021-07-21 | [3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release |
| 0.1.0 | 2021-07-21 | [#3746](https://github.com/airbytehq/airbyte/pull/3746) | Initial Release |

0 comments on commit 9dafec6

Please sign in to comment.