Skip to content

[DE-662] rebalance connections #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
Expand Down
19 changes: 5 additions & 14 deletions src/main/java/com/arangodb/kafka/ArangoSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
public class ArangoSinkConnector extends SinkConnector {
private static final Logger LOG = LoggerFactory.getLogger(SinkConnector.class);
private Map<String, String> config;
private boolean acquireHostList;
private List<HostDescription> initialEndpoints;
private HostListMonitor hostListMonitor;

@Override
Expand All @@ -56,13 +54,8 @@ public void start(Map<String, String> props) {
throw new ConnectException(e);
}

acquireHostList = sinkConfig.isAcquireHostListEnabled();
initialEndpoints = sinkConfig.getEndpoints();

if (acquireHostList) {
hostListMonitor = new HostListMonitor(sinkConfig, context);
hostListMonitor.start();
}
hostListMonitor = new HostListMonitor(sinkConfig, context);
hostListMonitor.start();
}

@Override
Expand All @@ -72,7 +65,7 @@ public Class<? extends Task> taskClass() {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<HostDescription> endpoints = new ArrayList<>(acquireHostList ? hostListMonitor.getEndpoints() : initialEndpoints);
List<HostDescription> endpoints = hostListMonitor.getEndpoints();
int rotationDistance = endpoints.size() / maxTasks;
if (rotationDistance == 0) {
rotationDistance = 1;
Expand All @@ -94,10 +87,8 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

@Override
public void stop() {
if (acquireHostList) {
LOG.info("stopping ArangoSinkConnector");
hostListMonitor.stop();
}
LOG.info("stopping ArangoSinkConnector");
hostListMonitor.stop();
}

@Override
Expand Down
63 changes: 43 additions & 20 deletions src/main/java/com/arangodb/kafka/HostListMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,53 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import net.jcip.annotations.GuardedBy;

public class HostListMonitor {
private static final Logger LOG = LoggerFactory.getLogger(HostListMonitor.class);

private final ArangoDB adb;
private final ScheduledExecutorService es;
private final ConnectorContext context;
private final int acquireHostIntervalMs;
private volatile Set<HostDescription> endpoints;
private final int rebalanceIntervalMs;
private final ArangoDB adb;
@GuardedBy("this")
private List<HostDescription> endpoints;

public HostListMonitor(ArangoSinkConfig sinkConfig, ConnectorContext context) {
acquireHostIntervalMs = sinkConfig.getAcquireHostIntervalMs();
endpoints = new HashSet<>(sinkConfig.getEndpoints());
adb = sinkConfig.createMonitorClient();
rebalanceIntervalMs = sinkConfig.getRebalanceIntervalMs();
this.context = context;
es = Executors.newSingleThreadScheduledExecutor();
adb = sinkConfig.isAcquireHostListEnabled() ? sinkConfig.createMonitorClient() : null;
endpoints = sinkConfig.getEndpoints();
}

void start() {
LOG.info("starting host list monitor background task");
updateHostList();
es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS);
if (adb != null) {
updateHostList();
es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS);
}
es.scheduleAtFixedRate(this::rebalance, rebalanceIntervalMs, rebalanceIntervalMs, TimeUnit.MILLISECONDS);
}

public Set<HostDescription> getEndpoints() {
return endpoints;
public List<HostDescription> getEndpoints() {
synchronized (this) {
return endpoints;
}
}

public void stop() {
LOG.info("stopping host list monitor background task");
adb.shutdown();
if (adb != null) {
adb.shutdown();
}
es.shutdown();
try {
if (!es.awaitTermination(ArangoSinkConfig.MONITOR_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
Expand Down Expand Up @@ -105,19 +115,32 @@ private Set<HostDescription> parseAcquireHostListResponse(ObjectNode node) {
private boolean updateHostList() {
LOG.debug("Fetching host list.");
Set<HostDescription> hosts = acquireHostList();
if (!hosts.isEmpty() && !endpoints.equals(hosts)) {
LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts);
endpoints = hosts;
return true;
} else {
return false;
synchronized (this) {
if (!hosts.isEmpty() && !hosts.equals(new HashSet<>(endpoints))) {
LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts);
endpoints = new ArrayList<>(hosts);
return true;
} else {
return false;
}
}
}

private void monitorHosts() {
if (updateHostList()) {
LOG.info("Requesting tasks reconfiguration.");
context.requestTaskReconfiguration();
reconfigureTasks();
}
}

private void rebalance() {
synchronized (this) {
Collections.shuffle(endpoints);
}
reconfigureTasks();
}

private void reconfigureTasks() {
LOG.info("Requesting tasks reconfiguration.");
context.requestTaskReconfiguration();
}
}
25 changes: 23 additions & 2 deletions src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public enum DataErrorsTolerance {
private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DOC = "Interval for acquiring the host list.";
private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY = "Acquire Host List Interval";

public static final String CONNECTION_REBALANCE_INTERVAL_MS = CONNECTION_PREFIX + "rebalance.interval.ms";
private static final int CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT = 30 * 60 * 1_000; // 30 min
private static final String CONNECTION_REBALANCE_INTERVAL_MS_DOC = "Interval for re-balancing the connections " +
"across the endpoints.";
private static final String CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY = "Connections re-balancing interval";

public static final String CONNECTION_PROTOCOL = CONNECTION_PREFIX + "protocol";
private static final String CONNECTION_PROTOCOL_DEFAULT = Protocol.HTTP2.toString();
private static final String CONNECTION_PROTOCOL_DOC = "Communication protocol.";
Expand Down Expand Up @@ -334,6 +340,17 @@ public enum DataErrorsTolerance {
ConfigDef.Width.SHORT,
CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY
)
.define(
CONNECTION_REBALANCE_INTERVAL_MS,
ConfigDef.Type.INT,
CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT,
ConfigDef.Importance.LOW,
CONNECTION_REBALANCE_INTERVAL_MS_DOC,
CONNECTION_GROUP,
8,
ConfigDef.Width.SHORT,
CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY
)
.define(
CONNECTION_PROTOCOL,
ConfigDef.Type.STRING,
Expand All @@ -342,7 +359,7 @@ public enum DataErrorsTolerance {
ConfigDef.Importance.MEDIUM,
CONNECTION_PROTOCOL_DOC,
CONNECTION_GROUP,
8,
9,
ConfigDef.Width.SHORT,
CONNECTION_PROTOCOL_DISPLAY,
new EnumRecommender(Protocol.class)
Expand All @@ -355,7 +372,7 @@ public enum DataErrorsTolerance {
ConfigDef.Importance.LOW,
CONNECTION_CONTENT_TYPE_DOC,
CONNECTION_GROUP,
9,
10,
ConfigDef.Width.SHORT,
CONNECTION_CONTENT_TYPE_DISPLAY,
new EnumRecommender(ContentType.class)
Expand Down Expand Up @@ -767,6 +784,10 @@ public int getAcquireHostIntervalMs() {
return getInt(CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS);
}

public int getRebalanceIntervalMs() {
return getInt(CONNECTION_REBALANCE_INTERVAL_MS);
}

public List<HostDescription> getEndpoints() {
return getList(CONNECTION_ENDPOINTS).stream()
.map(HostDescription::parse)
Expand Down
38 changes: 38 additions & 0 deletions src/test/java/com/arangodb/kafka/HostListMonitorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.mockito.Spy;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.arangodb.kafka.utils.Utils.map;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -90,5 +93,40 @@ void acquireHostList() throws InterruptedException {
verify(adb, times(1)).shutdown();
}

@Test
void acquireHostListFalse() {
ArangoSinkConfig cfg = new ArangoSinkConfig(config()
.add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false"));

HostListMonitor monitor = new HostListMonitor(cfg, context);
monitor.start();
assertThat(monitor.getEndpoints()).containsExactly(new HostDescription("a", 1));
monitor.stop();
}

@Test
void rebalance() throws InterruptedException {
String epList = IntStream.range(0, 10)
.mapToObj(i -> "host:" + i)
.collect(Collectors.joining(","));
ArangoSinkConfig cfg = new ArangoSinkConfig(config()
.add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false")
.add(ArangoSinkConfig.CONNECTION_ENDPOINTS, epList)
.add(ArangoSinkConfig.CONNECTION_REBALANCE_INTERVAL_MS, "200")
);

HostListMonitor monitor = new HostListMonitor(cfg, context);
monitor.start();
Thread.sleep(250);
monitor.stop();
verify(context, times(1)).requestTaskReconfiguration();

List<HostDescription> endpoints = IntStream.range(0, 10)
.mapToObj(i -> new HostDescription("host", i))
.collect(Collectors.toList());
assertThat(monitor.getEndpoints())
.containsExactlyInAnyOrderElementsOf(endpoints)
.isNotEqualTo(endpoints);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void defaults() {
assertThat(config.getRetryBackoffMs()).isEqualTo(3000);
assertThat(config.isAcquireHostListEnabled()).isFalse();
assertThat(config.getAcquireHostIntervalMs()).isEqualTo(60_000);
assertThat(config.getRebalanceIntervalMs()).isEqualTo(30 * 60 * 1_000);
assertThat(config.getTolerateDataErrors()).isFalse();
assertThat(config.getLogDataErrors()).isFalse();
}
Expand Down