From 66c77520b9903b24a0839e729d03c26aefd144a4 Mon Sep 17 00:00:00 2001 From: suyashtava <47897931+suyashtava@users.noreply.github.com> Date: Wed, 14 Sep 2022 14:59:03 +0530 Subject: [PATCH] UpdatePartitionState to avoid restarting Producer UpdatePartitionState and call it from NewPartitionHandler to avoid restarting Producer service again in case of a new partition --- .../monitor/services/ProduceService.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index 28e49242..d3fc3452 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -178,6 +178,20 @@ private void initializeStateForPartitions(int partitionNum) { } _partitionNum.set(partitionNum); } + + private void updateStateForPartitions(int partitionNum) { + Map keyMapping = generateKeyMappings(partitionNum); + for (int partition = 0; partition < partitionNum; partition++) { + String key = keyMapping.get(partition); + /* This is what preserves sequence numbers across restarts */ + if (!_nextIndexPerPartition.containsKey(partition)) { + _nextIndexPerPartition.put(partition, new AtomicLong(0)); + _sensors.addPartitionSensors(partition); + _produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition, key), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS); + } + } + _partitionNum.set(partitionNum); + } private Map generateKeyMappings(int partitionNum) { HashMap keyMapping = new HashMap<>(); @@ -280,7 +294,7 @@ public void run() { } LOG.info("{}/ProduceService detected new partitions of topic {}", _name, _topic); //TODO: Should the ProduceService exit if we can't restart the producer runnables? - _produceExecutor.shutdown(); + /* _produceExecutor.shutdown(); try { _produceExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -295,6 +309,8 @@ public void run() { } _produceExecutor = Executors.newScheduledThreadPool(_threadsNum); initializeStateForPartitions(currentPartitionNum); + */ + updateStateForPartitions(currentPartitionNum); LOG.info("New partitions added to monitoring."); } catch (InterruptedException e) { LOG.error("InterruptedException occurred.", e);