From b323e7c48704c07cb14763076c90302610e83d5a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sun, 7 Jun 2020 20:35:25 -0700 Subject: [PATCH] Introducing dedicated shard syncer for each of the streamconfig --- .../amazon/kinesis/coordinator/Scheduler.java | 7 ++- .../leases/HierarchicalShardSyncer.java | 8 ++-- .../kinesis/leases/LeaseManagementConfig.java | 18 ++------ .../DynamoDBLeaseManagementFactory.java | 45 ++++++++++--------- .../kinesis/coordinator/SchedulerTest.java | 16 ++++++- .../leases/HierarchicalShardSyncerTest.java | 2 +- 6 files changed, 49 insertions(+), 47 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e196920de..38a8131ad 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -160,7 +160,7 @@ public class Scheduler implements Runnable { private final Function shardDetectorProvider; private final boolean ignoreUnexpetedChildShards; private final AggregatorUtil aggregatorUtil; - private final HierarchicalShardSyncer hierarchicalShardSyncer; + private final Function hierarchicalShardSyncerProvider; private final long schedulerInitializationBackoffTimeMillis; private final LeaderDecider leaderDecider; private final Map staleStreamDeletionMap = new HashMap<>(); @@ -284,8 +284,7 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards(); this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); - // TODO : LTR : Check if this needs to be per stream. - this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode); + this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, @@ -922,7 +921,7 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, ignoreUnexpetedChildShards, shardDetectorProvider.apply(streamConfig), aggregatorUtil, - hierarchicalShardSyncer, + hierarchicalShardSyncerProvider.apply(streamConfig), metricsFactory); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index a2700097d..68a1701d5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -74,7 +74,7 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; - private String streamIdentifier = ""; + private final String streamIdentifier; private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); @@ -84,10 +84,12 @@ public class HierarchicalShardSyncer { public HierarchicalShardSyncer() { isMultiStreamMode = false; + streamIdentifier = "SingleStreamMode"; } - public HierarchicalShardSyncer(final boolean isMultiStreamMode) { + public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) { this.isMultiStreamMode = isMultiStreamMode; + this.streamIdentifier = streamIdentifier; } private static final BiFunction shardIdFromLeaseDeducer = @@ -118,7 +120,6 @@ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDet final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { - this.streamIdentifier = shardDetector.streamIdentifier().serialize(); final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases, @@ -132,7 +133,6 @@ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDet final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - this.streamIdentifier = shardDetector.streamIdentifier().serialize(); //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 if (!CollectionUtils.isNullOrEmpty(latestShards)) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index acaa8de04..789a30081 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -276,19 +276,6 @@ private HierarchicalShardSyncer hierarchicalShardSyncer() { return hierarchicalShardSyncer; } - /** - * Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates - * leases to accommodate more than one stream. - * @param isMultiStreamingMode - * @return HierarchicalShardSyncer - */ - public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) { - if(hierarchicalShardSyncer == null) { - hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode); - } - return hierarchicalShardSyncer; - } - @Deprecated public LeaseManagementFactory leaseManagementFactory() { if (leaseManagementFactory == null) { @@ -351,12 +338,13 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease cacheMissWarningModulus(), initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), - hierarchicalShardSyncer(isMultiStreamingMode), + hierarchicalShardSyncer(), tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), leaseSerializer, - customShardDetectorProvider()); + customShardDetectorProvider(), + isMultiStreamingMode); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 44879c1c9..c1b250a43 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -55,7 +55,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { @NonNull private final ExecutorService executorService; @NonNull - private final HierarchicalShardSyncer hierarchicalShardSyncer; + private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer; @NonNull private final LeaseSerializer leaseSerializer; @NonNull @@ -82,6 +82,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final TableCreatorCallback tableCreatorCallback; private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; + private final boolean isMultiStreamMode; /** * Constructor. @@ -207,7 +208,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer + * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback */ @Deprecated @@ -221,14 +222,14 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } /** @@ -257,7 +258,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer + * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ @@ -272,7 +273,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, @@ -280,7 +281,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); } /** @@ -309,7 +310,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer + * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout * @param billingMode @@ -325,7 +326,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode) { this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, @@ -334,7 +335,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); + deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); } /** @@ -362,7 +363,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer + * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout * @param billingMode @@ -376,7 +377,7 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { this(kinesisClient, dynamoDBClient, tableName, workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, @@ -384,8 +385,8 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, - null); + deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, + null, false); this.streamConfig = streamConfig; } @@ -412,11 +413,13 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param hierarchicalShardSyncer + * @param deprecatedHierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout * @param billingMode * @param leaseSerializer + * @param customShardDetectorProvider + * @param isMultiStreamMode */ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, @@ -427,9 +430,9 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, - Function customShardDetectorProvider) { + Function customShardDetectorProvider, boolean isMultiStreamMode) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -451,12 +454,13 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, this.cacheMissWarningModulus = cacheMissWarningModulus; this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; - this.hierarchicalShardSyncer = hierarchicalShardSyncer; + this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer; this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; this.leaseSerializer = leaseSerializer; this.customShardDetectorProvider = customShardDetectorProvider; + this.isMultiStreamMode = isMultiStreamMode; } @Override @@ -481,8 +485,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, - executorService, - hierarchicalShardSyncer, + executorService, deprecatedHierarchicalShardSyncer, metricsFactory); } @@ -501,7 +504,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac ignoreUnexpectedChildShards, shardSyncIntervalMillis, executorService, - hierarchicalShardSyncer, + new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()), metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 1d24ae689..a1601edaf 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; +import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; @@ -153,11 +154,13 @@ public class SchedulerTest { @Mock private MultiStreamTracker multiStreamTracker; - private Map shardSyncTaskManagerMap = new HashMap<>(); - private Map shardDetectorMap = new HashMap<>(); + private Map shardSyncTaskManagerMap; + private Map shardDetectorMap; @Before public void setup() { + shardSyncTaskManagerMap = new HashMap<>(); + shardDetectorMap = new HashMap<>(); shardRecordProcessorFactory = new TestShardRecordProcessorFactory(); checkpointConfig = new CheckpointConfig().checkpointFactory(new TestKinesisCheckpointFactory()); @@ -190,6 +193,7 @@ public void setup() { }); when(leaseCoordinator.leaseRefresher()).thenReturn(dynamoDBLeaseRefresher); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(new HierarchicalShardSyncer()); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); @@ -334,6 +338,8 @@ public final void testMultiStreamInitialization() throws ProvisionedThroughputEx scheduler.initialize(); shardDetectorMap.values().stream() .forEach(shardDetector -> verify(shardDetector, times(1)).listShards()); + shardSyncTaskManagerMap.values().stream() + .forEach(shardSyncTM -> verify(shardSyncTM, times(1)).hierarchicalShardSyncer()); } @Test @@ -352,6 +358,10 @@ public final void testMultiStreamInitializationWithFailures() { .forEach(shardDetector -> verify(shardDetector, atLeast(2)).listShards()); shardDetectorMap.values().stream() .forEach(shardDetector -> verify(shardDetector, atMost(5)).listShards()); + shardSyncTaskManagerMap.values().stream() + .forEach(shardSyncTM -> verify(shardSyncTM, atLeast(2)).hierarchicalShardSyncer()); + shardSyncTaskManagerMap.values().stream() + .forEach(shardSyncTM -> verify(shardSyncTM, atMost(5)).hierarchicalShardSyncer()); } @@ -1035,6 +1045,8 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac shardSyncTaskManagerMap.put(streamConfig.streamIdentifier(), shardSyncTaskManager); shardDetectorMap.put(streamConfig.streamIdentifier(), shardDetector); when(shardSyncTaskManager.shardDetector()).thenReturn(shardDetector); + final HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer(); + when(shardSyncTaskManager.hierarchicalShardSyncer()).thenReturn(hierarchicalShardSyncer); when(shardDetector.streamIdentifier()).thenReturn(streamConfig.streamIdentifier()); when(shardSyncTaskManager.callShardSyncTask()).thenReturn(new TaskResult(null)); if(shardSyncFirstAttemptFailure) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 096bf33a5..1be28b1df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -112,7 +112,7 @@ public void setup() { } private void setupMultiStream() { - hierarchicalShardSyncer = new HierarchicalShardSyncer(true); + hierarchicalShardSyncer = new HierarchicalShardSyncer(true, STREAM_IDENTIFIER); when(shardDetector.streamIdentifier()).thenReturn(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); }