Skip to content

Commit

Permalink
feat: update default value for kafka.producerBatchBytes (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
nurerkartal authored Nov 14, 2024
1 parent 65065c4 commit 2432397
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,28 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Kafka Specific Configuration

| Variable | Type | Required | Default | Description |
|-------------------------------------|-------------------|----------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. |
| `kafka.brokers` | []string | yes | | Broker ip and port information |
| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. |
| `kafka.producerBatchBytes` | 64 bit integer | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. |
| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations |
| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations |
| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd |
| `kafka.balancer` | string | no | Hash | Define balancer strategy. Available fields: Hash, LeastBytes, RoundRobin, ReferenceHash, CRC32Balancer, Murmur2Balancer. |
| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes |
| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. |
| `kafka.rootCAPath` | string | no | *not set | Define root CA path. |
| `kafka.interCAPath` | string | no | *not set | Define inter CA path. |
| `kafka.scramUsername` | string | no | *not set | Define scram username. |
| `kafka.scramPassword` | string | no | *not set | Define scram password. |
| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). |
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |
| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). |
| Variable | Type | Required | Default | Description |
|-------------------------------------|-------------------|----------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. |
| `kafka.brokers` | []string | yes | | Broker ip and port information |
| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. |
| `kafka.producerBatchBytes` | 64 bit integer | no | 1mb | Maximum size(byte) for batch, if exceed flush will be triggered. `1mb` is default. |
| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. |
| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations |
| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations |
| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd |
| `kafka.balancer` | string | no | Hash | Define balancer strategy. Available fields: Hash, LeastBytes, RoundRobin, ReferenceHash, CRC32Balancer, Murmur2Balancer. |
| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes |
| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. |
| `kafka.rootCAPath` | string | no | *not set | Define root CA path. |
| `kafka.interCAPath` | string | no | *not set | Define inter CA path. |
| `kafka.scramUsername` | string | no | *not set | Define scram username. |
| `kafka.scramPassword` | string | no | *not set | Define scram password. |
| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). |
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |
| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). |

### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka)

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Connector) ApplyDefaults() {
}

if c.Kafka.ProducerBatchBytes == nil {
c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("10mb")
c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("1mb")
}

if c.Kafka.RequiredAcks == 0 {
Expand Down

0 comments on commit 2432397

Please sign in to comment.