Skip to content

Commit 80b9725

Browse files
authored
[DE-627] batch writes (#23)
* batch size config * batch writes * test fixes * test fixes * enabled snapshots-repo * tests * disabled snapshot repo * fixed endpoints rotation
1 parent 48ff98d commit 80b9725

18 files changed

+393
-92
lines changed

ChangeLog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a
55

66
## [Unreleased]
77

8+
- batch writes (DE-627, #23)
9+
810
## [1.0.0] - 2023-05-26
911

1012
- initial release

demo/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
Set package version:
66

77
```shell
8-
export PACKAGE_VERSION=1.0.0
8+
export PACKAGE_VERSION=1.1.0
99
```
1010

1111
Create the Docker network:
@@ -69,7 +69,7 @@ The messages produced can be checked at [http://172.28.0.1:8080/topics/orders](h
6969
Create db collection:
7070

7171
```shell
72-
curl -u root:test http://172.28.0.1:8529/_api/collection -d '{"name": "orders"}'
72+
curl -u root:test http://172.28.0.1:8529/_api/collection -d '{"name": "orders", "numberOfShards": 3}'
7373
```
7474

7575
Explore configuration options in the console at [http://172.28.0.1:8080/connect-clusters/kafka-connect/create-connector](http://172.28.0.1:8080/connect-clusters/kafka-connect/create-connector)

pom.xml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.arangodb</groupId>
88
<artifactId>kafka-connect-arangodb</artifactId>
9-
<version>1.0.0</version>
9+
<version>1.1.0</version>
1010
<packaging>jar</packaging>
1111
<inceptionYear>2023</inceptionYear>
1212

@@ -62,7 +62,7 @@
6262
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6363
<kafka.version>3.4.1</kafka.version>
6464
<confluent.version>7.4.0</confluent.version>
65-
<arangodb.version>7.1.0</arangodb.version>
65+
<arangodb.version>7.3.0</arangodb.version>
6666
<jackson.version>2.13.5</jackson.version>
6767
<slf4j.version>1.7.36</slf4j.version>
6868
</properties>
@@ -78,6 +78,16 @@
7878
<name>Confluent</name>
7979
<url>https://packages.confluent.io/maven/</url>
8080
</repository>
81+
<repository>
82+
<id>snapshots-repo</id>
83+
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
84+
<releases>
85+
<enabled>false</enabled>
86+
</releases>
87+
<snapshots>
88+
<enabled>false</enabled>
89+
</snapshots>
90+
</repository>
8191
</repositories>
8292

8393
<dependencies>

src/main/java/com/arangodb/kafka/ArangoSinkConnector.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
public class ArangoSinkConnector extends SinkConnector {
3535
private static final Logger LOG = LoggerFactory.getLogger(SinkConnector.class);
3636
private Map<String, String> config;
37+
private boolean acquireHostList;
38+
private List<HostDescription> initialEndpoints;
3739
private HostListMonitor hostListMonitor;
3840

3941
@Override
@@ -54,8 +56,11 @@ public void start(Map<String, String> props) {
5456
throw new ConnectException(e);
5557
}
5658

57-
hostListMonitor = new HostListMonitor(sinkConfig, context);
58-
if (sinkConfig.isAcquireHostListEnabled()) {
59+
acquireHostList = sinkConfig.isAcquireHostListEnabled();
60+
initialEndpoints = sinkConfig.getEndpoints();
61+
62+
if (acquireHostList) {
63+
hostListMonitor = new HostListMonitor(sinkConfig, context);
5964
hostListMonitor.start();
6065
}
6166
}
@@ -67,7 +72,7 @@ public Class<? extends Task> taskClass() {
6772

6873
@Override
6974
public List<Map<String, String>> taskConfigs(int maxTasks) {
70-
List<HostDescription> endpoints = new ArrayList<>(hostListMonitor.getEndpoints());
75+
List<HostDescription> endpoints = new ArrayList<>(acquireHostList ? hostListMonitor.getEndpoints() : initialEndpoints);
7176
int rotationDistance = endpoints.size() / maxTasks;
7277
if (rotationDistance == 0) {
7378
rotationDistance = 1;
@@ -89,8 +94,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
8994

9095
@Override
9196
public void stop() {
92-
LOG.info("stopping ArangoSinkConnector");
93-
hostListMonitor.stop();
97+
if (acquireHostList) {
98+
LOG.info("stopping ArangoSinkConnector");
99+
hostListMonitor.stop();
100+
}
94101
}
95102

96103
@Override

0 commit comments

Comments
 (0)