diff --git a/README.md b/README.md index d378e0a..c44013b 100644 --- a/README.md +++ b/README.md @@ -98,28 +98,28 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) ### Kafka Specific Configuration -| Variable | Type | Required | Default | Description | -|-------------------------------------|-------------------|----------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. | -| `kafka.brokers` | []string | yes | | Broker ip and port information | -| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. | -| `kafka.producerBatchBytes` | 64 bit integer | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. | -| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. | -| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | -| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations | -| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations | -| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd | -| `kafka.balancer` | string | no | Hash | Define balancer strategy. Available fields: Hash, LeastBytes, RoundRobin, ReferenceHash, CRC32Balancer, Murmur2Balancer. | -| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes | -| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. | -| `kafka.rootCAPath` | string | no | *not set | Define root CA path. | -| `kafka.interCAPath` | string | no | *not set | Define inter CA path. | -| `kafka.scramUsername` | string | no | *not set | Define scram username. | -| `kafka.scramPassword` | string | no | *not set | Define scram password. | -| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). | -| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | -| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | -| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). | +| Variable | Type | Required | Default | Description | +|-------------------------------------|-------------------|----------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. | +| `kafka.brokers` | []string | yes | | Broker ip and port information | +| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. | +| `kafka.producerBatchBytes` | 64 bit integer | no | 1mb | Maximum size(byte) for batch, if exceed flush will be triggered. `1mb` is default. | +| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. | +| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | +| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations | +| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations | +| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd | +| `kafka.balancer` | string | no | Hash | Define balancer strategy. Available fields: Hash, LeastBytes, RoundRobin, ReferenceHash, CRC32Balancer, Murmur2Balancer. | +| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes | +| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. | +| `kafka.rootCAPath` | string | no | *not set | Define root CA path. | +| `kafka.interCAPath` | string | no | *not set | Define inter CA path. | +| `kafka.scramUsername` | string | no | *not set | Define scram username. | +| `kafka.scramPassword` | string | no | *not set | Define scram password. | +| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). | +| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | +| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | +| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). | ### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka) diff --git a/config/config.go b/config/config.go index bcf8117..7511f00 100644 --- a/config/config.go +++ b/config/config.go @@ -91,7 +91,7 @@ func (c *Connector) ApplyDefaults() { } if c.Kafka.ProducerBatchBytes == nil { - c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("10mb") + c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("1mb") } if c.Kafka.RequiredAcks == 0 {