diff --git a/pom.xml b/pom.xml index 3815ca9..eac08a4 100644 --- a/pom.xml +++ b/pom.xml @@ -156,6 +156,12 @@ slf4j-api provided + + net.jcip + jcip-annotations + 1.0 + provided + org.apache.kafka connect-runtime diff --git a/src/main/java/com/arangodb/kafka/ArangoSinkConnector.java b/src/main/java/com/arangodb/kafka/ArangoSinkConnector.java index c87e4b4..a468fc1 100644 --- a/src/main/java/com/arangodb/kafka/ArangoSinkConnector.java +++ b/src/main/java/com/arangodb/kafka/ArangoSinkConnector.java @@ -34,8 +34,6 @@ public class ArangoSinkConnector extends SinkConnector { private static final Logger LOG = LoggerFactory.getLogger(SinkConnector.class); private Map config; - private boolean acquireHostList; - private List initialEndpoints; private HostListMonitor hostListMonitor; @Override @@ -56,13 +54,8 @@ public void start(Map props) { throw new ConnectException(e); } - acquireHostList = sinkConfig.isAcquireHostListEnabled(); - initialEndpoints = sinkConfig.getEndpoints(); - - if (acquireHostList) { - hostListMonitor = new HostListMonitor(sinkConfig, context); - hostListMonitor.start(); - } + hostListMonitor = new HostListMonitor(sinkConfig, context); + hostListMonitor.start(); } @Override @@ -72,7 +65,7 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { - List endpoints = new ArrayList<>(acquireHostList ? hostListMonitor.getEndpoints() : initialEndpoints); + List endpoints = hostListMonitor.getEndpoints(); int rotationDistance = endpoints.size() / maxTasks; if (rotationDistance == 0) { rotationDistance = 1; @@ -94,10 +87,8 @@ public List> taskConfigs(int maxTasks) { @Override public void stop() { - if (acquireHostList) { - LOG.info("stopping ArangoSinkConnector"); - hostListMonitor.stop(); - } + LOG.info("stopping ArangoSinkConnector"); + hostListMonitor.stop(); } @Override diff --git a/src/main/java/com/arangodb/kafka/HostListMonitor.java b/src/main/java/com/arangodb/kafka/HostListMonitor.java index b042f1e..dde9a18 100644 --- a/src/main/java/com/arangodb/kafka/HostListMonitor.java +++ b/src/main/java/com/arangodb/kafka/HostListMonitor.java @@ -30,43 +30,53 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import net.jcip.annotations.GuardedBy; + public class HostListMonitor { private static final Logger LOG = LoggerFactory.getLogger(HostListMonitor.class); - private final ArangoDB adb; private final ScheduledExecutorService es; private final ConnectorContext context; private final int acquireHostIntervalMs; - private volatile Set endpoints; + private final int rebalanceIntervalMs; + private final ArangoDB adb; + @GuardedBy("this") + private List endpoints; public HostListMonitor(ArangoSinkConfig sinkConfig, ConnectorContext context) { acquireHostIntervalMs = sinkConfig.getAcquireHostIntervalMs(); - endpoints = new HashSet<>(sinkConfig.getEndpoints()); - adb = sinkConfig.createMonitorClient(); + rebalanceIntervalMs = sinkConfig.getRebalanceIntervalMs(); this.context = context; es = Executors.newSingleThreadScheduledExecutor(); + adb = sinkConfig.isAcquireHostListEnabled() ? sinkConfig.createMonitorClient() : null; + endpoints = sinkConfig.getEndpoints(); } void start() { LOG.info("starting host list monitor background task"); - updateHostList(); - es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS); + if (adb != null) { + updateHostList(); + es.scheduleAtFixedRate(this::monitorHosts, acquireHostIntervalMs, acquireHostIntervalMs, TimeUnit.MILLISECONDS); + } + es.scheduleAtFixedRate(this::rebalance, rebalanceIntervalMs, rebalanceIntervalMs, TimeUnit.MILLISECONDS); } - public Set getEndpoints() { - return endpoints; + public List getEndpoints() { + synchronized (this) { + return endpoints; + } } public void stop() { LOG.info("stopping host list monitor background task"); - adb.shutdown(); + if (adb != null) { + adb.shutdown(); + } es.shutdown(); try { if (!es.awaitTermination(ArangoSinkConfig.MONITOR_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { @@ -105,19 +115,32 @@ private Set parseAcquireHostListResponse(ObjectNode node) { private boolean updateHostList() { LOG.debug("Fetching host list."); Set hosts = acquireHostList(); - if (!hosts.isEmpty() && !endpoints.equals(hosts)) { - LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts); - endpoints = hosts; - return true; - } else { - return false; + synchronized (this) { + if (!hosts.isEmpty() && !hosts.equals(new HashSet<>(endpoints))) { + LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", endpoints, hosts); + endpoints = new ArrayList<>(hosts); + return true; + } else { + return false; + } } } private void monitorHosts() { if (updateHostList()) { - LOG.info("Requesting tasks reconfiguration."); - context.requestTaskReconfiguration(); + reconfigureTasks(); + } + } + + private void rebalance() { + synchronized (this) { + Collections.shuffle(endpoints); } + reconfigureTasks(); + } + + private void reconfigureTasks() { + LOG.info("Requesting tasks reconfiguration."); + context.requestTaskReconfiguration(); } } diff --git a/src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java b/src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java index ca07e58..1a9d007 100644 --- a/src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java +++ b/src/main/java/com/arangodb/kafka/config/ArangoSinkConfig.java @@ -105,6 +105,12 @@ public enum DataErrorsTolerance { private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DOC = "Interval for acquiring the host list."; private static final String CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY = "Acquire Host List Interval"; + public static final String CONNECTION_REBALANCE_INTERVAL_MS = CONNECTION_PREFIX + "rebalance.interval.ms"; + private static final int CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT = 30 * 60 * 1_000; // 30 min + private static final String CONNECTION_REBALANCE_INTERVAL_MS_DOC = "Interval for re-balancing the connections " + + "across the endpoints."; + private static final String CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY = "Connections re-balancing interval"; + public static final String CONNECTION_PROTOCOL = CONNECTION_PREFIX + "protocol"; private static final String CONNECTION_PROTOCOL_DEFAULT = Protocol.HTTP2.toString(); private static final String CONNECTION_PROTOCOL_DOC = "Communication protocol."; @@ -334,6 +340,17 @@ public enum DataErrorsTolerance { ConfigDef.Width.SHORT, CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS_DISPLAY ) + .define( + CONNECTION_REBALANCE_INTERVAL_MS, + ConfigDef.Type.INT, + CONNECTION_REBALANCE_INTERVAL_MS_DEFAULT, + ConfigDef.Importance.LOW, + CONNECTION_REBALANCE_INTERVAL_MS_DOC, + CONNECTION_GROUP, + 8, + ConfigDef.Width.SHORT, + CONNECTION_REBALANCE_INTERVAL_MS_DISPLAY + ) .define( CONNECTION_PROTOCOL, ConfigDef.Type.STRING, @@ -342,7 +359,7 @@ public enum DataErrorsTolerance { ConfigDef.Importance.MEDIUM, CONNECTION_PROTOCOL_DOC, CONNECTION_GROUP, - 8, + 9, ConfigDef.Width.SHORT, CONNECTION_PROTOCOL_DISPLAY, new EnumRecommender(Protocol.class) @@ -355,7 +372,7 @@ public enum DataErrorsTolerance { ConfigDef.Importance.LOW, CONNECTION_CONTENT_TYPE_DOC, CONNECTION_GROUP, - 9, + 10, ConfigDef.Width.SHORT, CONNECTION_CONTENT_TYPE_DISPLAY, new EnumRecommender(ContentType.class) @@ -767,6 +784,10 @@ public int getAcquireHostIntervalMs() { return getInt(CONNECTION_ACQUIRE_HOST_LIST_INTERVAL_MS); } + public int getRebalanceIntervalMs() { + return getInt(CONNECTION_REBALANCE_INTERVAL_MS); + } + public List getEndpoints() { return getList(CONNECTION_ENDPOINTS).stream() .map(HostDescription::parse) diff --git a/src/test/java/com/arangodb/kafka/HostListMonitorTest.java b/src/test/java/com/arangodb/kafka/HostListMonitorTest.java index 800084f..9da720b 100644 --- a/src/test/java/com/arangodb/kafka/HostListMonitorTest.java +++ b/src/test/java/com/arangodb/kafka/HostListMonitorTest.java @@ -14,6 +14,9 @@ import org.mockito.Spy; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.arangodb.kafka.utils.Utils.map; import static org.assertj.core.api.Assertions.assertThat; @@ -90,5 +93,40 @@ void acquireHostList() throws InterruptedException { verify(adb, times(1)).shutdown(); } + @Test + void acquireHostListFalse() { + ArangoSinkConfig cfg = new ArangoSinkConfig(config() + .add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false")); + + HostListMonitor monitor = new HostListMonitor(cfg, context); + monitor.start(); + assertThat(monitor.getEndpoints()).containsExactly(new HostDescription("a", 1)); + monitor.stop(); + } + + @Test + void rebalance() throws InterruptedException { + String epList = IntStream.range(0, 10) + .mapToObj(i -> "host:" + i) + .collect(Collectors.joining(",")); + ArangoSinkConfig cfg = new ArangoSinkConfig(config() + .add(ArangoSinkConfig.CONNECTION_ACQUIRE_HOST_LIST_ENABLED, "false") + .add(ArangoSinkConfig.CONNECTION_ENDPOINTS, epList) + .add(ArangoSinkConfig.CONNECTION_REBALANCE_INTERVAL_MS, "200") + ); + + HostListMonitor monitor = new HostListMonitor(cfg, context); + monitor.start(); + Thread.sleep(250); + monitor.stop(); + verify(context, times(1)).requestTaskReconfiguration(); + + List endpoints = IntStream.range(0, 10) + .mapToObj(i -> new HostDescription("host", i)) + .collect(Collectors.toList()); + assertThat(monitor.getEndpoints()) + .containsExactlyInAnyOrderElementsOf(endpoints) + .isNotEqualTo(endpoints); + } } diff --git a/src/test/java/com/arangodb/kafka/config/ArangoSinkConfigTest.java b/src/test/java/com/arangodb/kafka/config/ArangoSinkConfigTest.java index 55b404c..307ea42 100644 --- a/src/test/java/com/arangodb/kafka/config/ArangoSinkConfigTest.java +++ b/src/test/java/com/arangodb/kafka/config/ArangoSinkConfigTest.java @@ -38,6 +38,7 @@ void defaults() { assertThat(config.getRetryBackoffMs()).isEqualTo(3000); assertThat(config.isAcquireHostListEnabled()).isFalse(); assertThat(config.getAcquireHostIntervalMs()).isEqualTo(60_000); + assertThat(config.getRebalanceIntervalMs()).isEqualTo(30 * 60 * 1_000); assertThat(config.getTolerateDataErrors()).isFalse(); assertThat(config.getLogDataErrors()).isFalse(); }