diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index 99de1ebda..f8596419e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -47,6 +47,7 @@
 import software.amazon.kinesis.leases.ShardPrioritization;
 import software.amazon.kinesis.leases.ShardSyncTask;
 import software.amazon.kinesis.leases.ShardSyncTaskManager;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
 import software.amazon.kinesis.leases.exceptions.LeasingException;
 import software.amazon.kinesis.lifecycle.LifecycleConfig;
@@ -113,6 +114,7 @@ public class Scheduler implements Runnable {
     private final ShardDetector shardDetector;
     private final boolean ignoreUnexpetedChildShards;
     private final AggregatorUtil aggregatorUtil;
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
 
     // Holds consumers for shards the worker is currently tracking. Key is shard
     // info, value is ShardConsumer.
@@ -195,6 +197,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
         this.shardDetector = this.shardSyncTaskManager.shardDetector();
         this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
         this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
+        this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
     }
 
     /**
@@ -239,7 +242,8 @@ private void initialize() {
                     if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
                         log.info("Syncing Kinesis shard info");
                         ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
-                                cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory);
+                                cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
+                                metricsFactory);
                         result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
                     } else {
                         log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@@ -575,8 +579,9 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
                 cleanupLeasesUponShardCompletion,
                 ignoreUnexpetedChildShards,
                 shardDetector,
-                metricsFactory,
-                aggregatorUtil);
+                aggregatorUtil,
+                hierarchicalShardSyncer,
+                metricsFactory);
         return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument);
     }
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
similarity index 98%
rename from amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java
rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
index 1e021b6d0..c61bf935c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
@@ -30,13 +30,12 @@
 
 import org.apache.commons.lang3.StringUtils;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.utils.CollectionUtils;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
 import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
@@ -54,24 +53,27 @@
  * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
  * and begun processing it's child shards.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
 @Slf4j
-public class ShardSyncer {
+@KinesisClientInternalApi
+public class HierarchicalShardSyncer {
+
     /**
      * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
      * (e.g. at startup, or when we reach end of a shard).
-     * 
+     *
+     * @param shardDetector
      * @param leaseRefresher
      * @param initialPosition
      * @param cleanupLeasesOfCompletedShards
      * @param ignoreUnexpectedChildShards
+     * @param scope
      * @throws DependencyException
      * @throws InvalidStateException
      * @throws ProvisionedThroughputException
      * @throws KinesisClientLibIOException
      */
     // CHECKSTYLE:OFF CyclomaticComplexity
-    public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
+    public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
             final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
             final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
             final MetricsScope scope) throws DependencyException, InvalidStateException,
@@ -152,7 +154,7 @@ private static Set<String> findInconsistentShardIds(final Map<String, Set<String
      * @return ShardIds of child shards (children of the expectedClosedShard)
      * @throws KinesisClientLibIOException
      */
-    static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
+    synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
             final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
             throws KinesisClientLibIOException {
         final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
@@ -181,7 +183,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String,
         }
     }
 
-    private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
+    private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
             final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
             throws KinesisClientLibIOException {
         BigInteger minStartingHashKeyOfChildren = null;
@@ -583,7 +585,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set<String> curren
      * @throws ProvisionedThroughputException
      * @throws KinesisClientLibIOException
      */
-    private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
+    private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
             final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
             final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
             InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
@@ -625,7 +627,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection<
      * @throws InvalidStateException 
      * @throws DependencyException 
      */
-    static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
+    synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
             final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
             throws DependencyException, InvalidStateException, ProvisionedThroughputException {
         final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index d19c583e3..5c98bae68 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -231,6 +231,8 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor {
      */
     private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
 
+    private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
+
     private LeaseManagementFactory leaseManagementFactory;
 
     public LeaseManagementFactory leaseManagementFactory() {
@@ -258,6 +260,7 @@ public LeaseManagementFactory leaseManagementFactory() {
                     cacheMissWarningModulus(),
                     initialLeaseTableReadCapacity(),
                     initialLeaseTableWriteCapacity(),
+                    hierarchicalShardSyncer(),
                     tableCreatorCallback());
         }
         return leaseManagementFactory;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
index dd98206a8..046efdea4 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
@@ -48,6 +48,8 @@ public class ShardSyncTask implements ConsumerTask {
     private final boolean ignoreUnexpectedChildShards;
     private final long shardSyncTaskIdleTimeMillis;
     @NonNull
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
+    @NonNull
     private final MetricsFactory metricsFactory;
 
     private final TaskType taskType = TaskType.SHARDSYNC;
@@ -62,7 +64,7 @@ public TaskResult call() {
         final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
 
         try {
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition,
+            hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
                     cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
             if (shardSyncTaskIdleTimeMillis > 0) {
                 Thread.sleep(shardSyncTaskIdleTimeMillis);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
index dcc13dbd6..d97c9b904 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
@@ -50,8 +50,68 @@ public class ShardSyncTaskManager {
     @NonNull
     private final ExecutorService executorService;
     @NonNull
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
+    @NonNull
     private final MetricsFactory metricsFactory;
 
+    /**
+     * Constructor.
+     *
+     * <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
+     *
+     * @param shardDetector
+     * @param leaseRefresher
+     * @param initialPositionInStream
+     * @param cleanupLeasesUponShardCompletion
+     * @param ignoreUnexpectedChildShards
+     * @param shardSyncIdleTimeMillis
+     * @param executorService
+     * @param metricsFactory
+     */
+    @Deprecated
+    public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
+            InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
+            boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
+            MetricsFactory metricsFactory) {
+        this.shardDetector = shardDetector;
+        this.leaseRefresher = leaseRefresher;
+        this.initialPositionInStream = initialPositionInStream;
+        this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+        this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
+        this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
+        this.executorService = executorService;
+        this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
+        this.metricsFactory = metricsFactory;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param shardDetector
+     * @param leaseRefresher
+     * @param initialPositionInStream
+     * @param cleanupLeasesUponShardCompletion
+     * @param ignoreUnexpectedChildShards
+     * @param shardSyncIdleTimeMillis
+     * @param executorService
+     * @param hierarchicalShardSyncer
+     * @param metricsFactory
+     */
+    public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
+            InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
+            boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
+            HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) {
+        this.shardDetector = shardDetector;
+        this.leaseRefresher = leaseRefresher;
+        this.initialPositionInStream = initialPositionInStream;
+        this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+        this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
+        this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
+        this.executorService = executorService;
+        this.hierarchicalShardSyncer = hierarchicalShardSyncer;
+        this.metricsFactory = metricsFactory;
+    }
+
     private ConsumerTask currentTask;
     private Future<TaskResult> future;
 
@@ -82,6 +142,7 @@ private synchronized boolean checkAndSubmitNextTask() {
                                     cleanupLeasesUponShardCompletion,
                                     ignoreUnexpectedChildShards,
                                     shardSyncIdleTimeMillis,
+                                    hierarchicalShardSyncer,
                                     metricsFactory),
                             metricsFactory);
             future = executorService.submit(currentTask);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index 3fe692b2c..1ec3e0b39 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -23,6 +23,7 @@
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.leases.KinesisShardDetector;
 import software.amazon.kinesis.leases.LeaseCoordinator;
 import software.amazon.kinesis.leases.LeaseManagementFactory;
@@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
     private final ExecutorService executorService;
     @NonNull
     private final InitialPositionInStreamExtended initialPositionInStream;
+    @NonNull
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
+
     private final long failoverTimeMillis;
     private final long epsilonMillis;
     private final int maxLeasesForWorker;
@@ -162,7 +166,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
                 ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
                 maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
                 cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
-                TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
+                new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
     }
 
     /**
@@ -191,6 +195,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
      * @param cacheMissWarningModulus
      * @param initialLeaseTableReadCapacity
      * @param initialLeaseTableWriteCapacity
+     * @param hierarchicalShardSyncer
      * @param tableCreatorCallback
      */
     public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
@@ -203,6 +208,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
             final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
             final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
             final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
+            final HierarchicalShardSyncer hierarchicalShardSyncer,
             final TableCreatorCallback tableCreatorCallback) {
         this.kinesisClient = kinesisClient;
         this.streamName = streamName;
@@ -227,6 +233,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
         this.cacheMissWarningModulus = cacheMissWarningModulus;
         this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
         this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
+        this.hierarchicalShardSyncer = hierarchicalShardSyncer;
         this.tableCreatorCallback = tableCreatorCallback;
     }
 
@@ -253,6 +260,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
                 ignoreUnexpectedChildShards,
                 shardSyncIntervalMillis,
                 executorService,
+                hierarchicalShardSyncer,
                 metricsFactory);
     }
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java
new file mode 100644
index 000000000..4e9245f62
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java
@@ -0,0 +1,46 @@
+package software.amazon.kinesis.leases.exceptions;
+
+import lombok.NonNull;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
+import software.amazon.kinesis.leases.LeaseRefresher;
+import software.amazon.kinesis.leases.ShardDetector;
+import software.amazon.kinesis.metrics.MetricsScope;
+
+/**
+ * Helper class to sync leases with shards of the Kinesis stream.
+ * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
+ * It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
+ * and begun processing it's child shards.
+ *
+ * <p>NOTE: This class is deprecated and will be removed in a future release.</p>
+ */
+@Deprecated
+public class ShardSyncer {
+    private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
+
+    /**
+     * <p>NOTE: This method is deprecated and will be removed in a future release.</p>
+     *
+     * @param shardDetector
+     * @param leaseRefresher
+     * @param initialPosition
+     * @param cleanupLeasesOfCompletedShards
+     * @param ignoreUnexpectedChildShards
+     * @param scope
+     * @throws DependencyException
+     * @throws InvalidStateException
+     * @throws ProvisionedThroughputException
+     * @throws KinesisClientLibIOException
+     */
+    @Deprecated
+    public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
+            final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
+            final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
+            final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
+            KinesisClientLibIOException {
+        HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
+                cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
+    }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
index d3ce82c2e..ef0a8d75e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
@@ -495,6 +495,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
                     argument.leaseRefresher(),
                     argument.taskBackoffTimeMillis(),
                     argument.recordsPublisher(),
+                    argument.hierarchicalShardSyncer(),
                     argument.metricsFactory());
         }
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
index 4fcda0762..d5ec57feb 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
@@ -24,6 +24,7 @@
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.metrics.MetricsFactory;
 import software.amazon.kinesis.processor.Checkpointer;
 import software.amazon.kinesis.processor.ShardRecordProcessor;
@@ -65,7 +66,8 @@ public class ShardConsumerArgument {
     private final boolean ignoreUnexpectedChildShards;
     @NonNull
     private final ShardDetector shardDetector;
+    private final AggregatorUtil aggregatorUtil;
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
     @NonNull
     private final MetricsFactory metricsFactory;
-    private final AggregatorUtil aggregatorUtil;
 }
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
index a07dc7838..1466dd027 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
@@ -25,7 +25,7 @@
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
-import software.amazon.kinesis.leases.ShardSyncer;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
 import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
 import software.amazon.kinesis.metrics.MetricsFactory;
@@ -66,6 +66,8 @@ public class ShutdownTask implements ConsumerTask {
     @NonNull
     private final RecordsPublisher recordsPublisher;
     @NonNull
+    private final HierarchicalShardSyncer hierarchicalShardSyncer;
+    @NonNull
     private final MetricsFactory metricsFactory;
 
     private final TaskType taskType = TaskType.SHUTDOWN;
@@ -123,8 +125,8 @@ public TaskResult call() {
                 if (reason == ShutdownReason.SHARD_END) {
                     log.debug("Looking for child shards of shard {}", shardInfo.shardId());
                     // create leases for the child shards
-                    ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPositionInStream,
-                            cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
+                    hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
+                            initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
                     log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
                 }
 
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
similarity index 94%
rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java
rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index 0a567ab91..78d68b79f 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -43,6 +43,7 @@
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
@@ -63,7 +64,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 // CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
-public class ShardSyncerTest {
+public class HierarchicalShardSyncerTest {
     private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
             .newInitialPosition(InitialPositionInStream.LATEST);
     private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended
@@ -76,6 +77,9 @@ public class ShardSyncerTest {
 
     private final boolean cleanupLeasesOfCompletedShards = true;
     private final boolean ignoreUnexpectedChildShards = false;
+
+    private HierarchicalShardSyncer hierarchicalShardSyncer;
+
     /**
      * Old/Obsolete max value of a sequence number (2^128 -1).
      */
@@ -86,6 +90,11 @@ public class ShardSyncerTest {
     @Mock
     private DynamoDBLeaseRefresher dynamoDBLeaseRefresher;
 
+    @Before
+    public void setup() {
+        hierarchicalShardSyncer = new HierarchicalShardSyncer();
+    }
+
     /**
      * Test determineNewLeasesToCreate() where there are no shards
      */
@@ -94,7 +103,7 @@ public void testDetermineNewLeasesToCreateNoShards() {
         final List<Shard> shards = Collections.emptyList();
         final List<Lease> leases = Collections.emptyList();
 
-        assertThat(ShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(),
+        assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(shards, leases, INITIAL_POSITION_LATEST).isEmpty(),
                 equalTo(true));
     }
 
@@ -111,7 +120,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards() {
                 ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
         final List<Lease> currentLeases = Collections.emptyList();
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_LATEST);
         final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
         final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@@ -138,7 +147,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
 
         final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_LATEST, inconsistentShardIds);
         final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
         final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@@ -151,7 +160,7 @@ public void testDetermineNewLeasesToCreate0Leases0Reshards1Inconsistent() {
      */
     @Test
     public void testBootstrapShardLeasesAtTrimHorizon() throws Exception {
-        testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_TRIM_HORIZON);
+        testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_TRIM_HORIZON);
     }
 
     /**
@@ -159,11 +168,11 @@ public void testBootstrapShardLeasesAtTrimHorizon() throws Exception {
      */
     @Test
     public void testBootstrapShardLeasesAtLatest() throws Exception {
-        testCheckAndCreateLeasesForNewShards(INITIAL_POSITION_LATEST);
+        testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST);
     }
 
     @Test
-    public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception {
+    public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception {
         final List<Shard> shards = constructShardListForGraphA();
 
         final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
@@ -172,7 +181,8 @@ public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception {
         when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
         when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
 
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+        hierarchicalShardSyncer
+                .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
                 cleanupLeasesOfCompletedShards, false, SCOPE);
 
         final Set<String> expectedShardIds = new HashSet<>(
@@ -197,12 +207,12 @@ public void testCheckAndCreateLeasesForNewShardsAtLatest() throws Exception {
 
     @Test
     public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception {
-        testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);
+        testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);
     }
 
     @Test
     public void testCheckAndCreateLeasesForNewShardsAtTimestamp() throws Exception {
-        testCheckAndCreateLeaseForNewShards(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP);
+        testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_AT_TIMESTAMP);
     }
 
     @Test(expected = KinesisClientLibIOException.class)
@@ -217,7 +227,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() throws Except
         when(shardDetector.listShards()).thenReturn(shards);
 
         try {
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher,
+            hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
                     INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
         } finally {
             verify(shardDetector).listShards();
@@ -251,7 +261,8 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon
         when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
         when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
 
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
+        hierarchicalShardSyncer
+                .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
                 cleanupLeasesOfCompletedShards, true, SCOPE);
 
         final List<Lease> leases = leaseCaptor.getAllValues();
@@ -307,7 +318,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe
         doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
 
         // Initial call: No leases present, create leases.
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+        hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                 cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
         final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@@ -322,7 +333,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe
         verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
 
         // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+        hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                 cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
         final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
         final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@@ -382,7 +393,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx
                 .when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
 
         // Initial call: Call to create leases.
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+        hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                 cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
         final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@@ -398,7 +409,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx
 
         try {
             // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
         } finally {
             List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@@ -421,7 +433,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithDeleteLeaseEx
             verify(dynamoDBLeaseRefresher, times(1)).deleteLease(any(Lease.class));
 
             // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
             deleteLeases = leaseDeleteCaptor.getAllValues();
@@ -481,7 +494,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
 
         try {
             // Initial call: Call to create leases. Fails on ListLeases
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
         } finally {
             verify(shardDetector, times(1)).listShards();
@@ -490,7 +504,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
             verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
 
             // Second call: Leases not present, leases will be created.
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
             final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@@ -504,7 +519,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
             verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
 
             // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
             final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@@ -569,7 +585,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
 
         try {
             // Initial call: No leases present, create leases. Create lease Fails
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
         } finally {
             verify(shardDetector, times(1)).listShards();
@@ -577,7 +594,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
             verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class));
             verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
 
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
             final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@@ -591,7 +609,8 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
             verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
 
             // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
-            ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
+            hierarchicalShardSyncer
+                    .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
                     cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
             final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
@@ -653,7 +672,7 @@ public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception {
         when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases);
         doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
 
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher,
+        hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
                 INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
 
         assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
@@ -665,7 +684,7 @@ public void testCleanUpGarbageLeaseForNonExistentShard() throws Exception {
         verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
     }
 
-    private void testCheckAndCreateLeasesForNewShards(InitialPositionInStreamExtended initialPosition)
+    private void testCheckAndCreateLeasesForShardsIfMissing(InitialPositionInStreamExtended initialPosition)
             throws Exception {
         final String shardId0 = "shardId-0";
         final String shardId1 = "shardId-1";
@@ -673,10 +692,10 @@ private void testCheckAndCreateLeasesForNewShards(InitialPositionInStreamExtende
         final List<Shard> shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, sequenceRange),
                 ShardObjectHelper.newShard(shardId1, null, null, sequenceRange));
 
-        testCheckAndCreateLeaseForNewShards(shards, initialPosition);
+        testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition);
     }
 
-    private void testCheckAndCreateLeaseForNewShards(final List<Shard> shards,
+    private void testCheckAndCreateLeaseForShardsIfMissing(final List<Shard> shards,
             final InitialPositionInStreamExtended initialPosition) throws Exception {
         final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
 
@@ -684,7 +703,8 @@ private void testCheckAndCreateLeaseForNewShards(final List<Shard> shards,
         when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
         when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
 
-        ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
+        hierarchicalShardSyncer
+                .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
                 cleanupLeasesOfCompletedShards, false, SCOPE);
 
         final List<Lease> leases = leaseCaptor.getAllValues();
@@ -720,7 +740,7 @@ public void testDetermineNewLeasesToCreateStartingPosition() {
         final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
 
         for (InitialPositionInStreamExtended initialPosition : initialPositions) {
-            final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+            final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                     initialPosition);
             assertThat(newLeases.size(), equalTo(2));
 
@@ -743,7 +763,7 @@ public void testDetermineNewLeasesToCreateIgnoreClosedShard() {
                 ShardObjectHelper.newShard(lastShardId, null, null,
                         ShardObjectHelper.newSequenceNumberRange("405", null)));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_LATEST);
 
         assertThat(newLeases.size(), equalTo(1));
@@ -766,7 +786,7 @@ public void testDetermineNewLeasesToCreateSplitMergeLatest1() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
                 newLease("shardId-5"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_LATEST);
 
         final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
@@ -801,7 +821,7 @@ public void testDetermineNewLeasesToCreateSplitMergeLatest2() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
                 newLease("shardId-7"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_LATEST);
 
         final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
@@ -834,7 +854,7 @@ public void testDetermineNewLeasesToCreateSplitMergeHorizon1() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
                 newLease("shardId-5"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_TRIM_HORIZON);
 
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@@ -869,7 +889,7 @@ public void testDetermineNewLeasesToCreateSplitMergeHorizon2() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
                 newLease("shardId-7"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_TRIM_HORIZON);
 
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@@ -899,7 +919,7 @@ public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesTrim() {
         final List<Shard> shards = constructShardListForGraphB();
         final List<Lease> currentLeases = new ArrayList<>();
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_TRIM_HORIZON);
 
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@@ -934,7 +954,7 @@ public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp1() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-3"), newLease("shardId-4"),
                 newLease("shardId-5"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_AT_TIMESTAMP);
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
         final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@@ -968,7 +988,7 @@ public void testDetermineNewLeasesToCreateSplitMergeAtTimestamp2() {
         final List<Lease> currentLeases = Arrays.asList(newLease("shardId-4"), newLease("shardId-5"),
                 newLease("shardId-7"));
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_AT_TIMESTAMP);
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
         final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@@ -995,7 +1015,7 @@ public void testDetermineNewLeasesToCreateGraphBNoInitialLeasesAtTimestamp() {
         final List<Shard> shards = constructShardListForGraphB();
         final List<Lease> currentLeases = new ArrayList<>();
 
-        final List<Lease> newLeases = ShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
+        final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases,
                 INITIAL_POSITION_AT_TIMESTAMP);
         final Set<String> leaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
         final List<ExtendedSequenceNumber> checkpoints = newLeases.stream().map(Lease::checkpoint)
@@ -1092,7 +1112,8 @@ private List<Shard> constructShardListForGraphB() {
     public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() {
         final Map<String, Boolean> memoizationContext = new HashMap<>();
 
-        assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null,
+        assertThat(HierarchicalShardSyncer
+                .checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null,
                 null, memoizationContext), equalTo(false));
     }
 
@@ -1104,7 +1125,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestorsTrimmedShard() {
         final String shardId = "shardId-trimmed";
         final Map<String, Boolean> memoizationContext = new HashMap<>();
 
-        assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null,
+        assertThat(HierarchicalShardSyncer
+                .checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null,
                 new HashMap<>(), null, memoizationContext), equalTo(false));
     }
 
@@ -1120,7 +1142,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestorsForShardWithCurrentL
         final Map<String, Shard> kinesisShards = new HashMap<>();
         kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null));
 
-        assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
+        assertThat(
+                HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
                 shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(true));
         assertThat(newLeaseMap.isEmpty(), equalTo(true));
     }
@@ -1142,7 +1165,8 @@ public void testCheckIfDescendantAndAddNewLeasesForAncestors2P2ANotDescendant()
         kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null));
         kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null));
 
-        assertThat(ShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
+        assertThat(
+                HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST,
                 shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(false));
         assertThat(newLeaseMap.isEmpty(), equalTo(true));
     }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java
index cffb7154d..a89e8e560 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java
@@ -63,6 +63,7 @@ public class ShardSyncTaskIntegrationTest {
 
     private LeaseRefresher leaseRefresher;
     private ShardDetector shardDetector;
+    private HierarchicalShardSyncer hierarchicalShardSyncer;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -97,6 +98,7 @@ public void setup() {
 
         shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50,
                 LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS);
+        hierarchicalShardSyncer = new HierarchicalShardSyncer();
     }
 
     /**
@@ -117,7 +119,7 @@ public final void testCall() throws DependencyException, InvalidStateException,
         Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
         ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher,
                 InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L,
-                NULL_METRICS_FACTORY);
+                hierarchicalShardSyncer, NULL_METRICS_FACTORY);
         syncTask.call();
         List<Lease> leases = leaseRefresher.listLeases();
         Set<String> leaseKeys = new HashSet<>();
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index 6444d4200..f41d773b8 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -46,6 +46,7 @@
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
 import software.amazon.kinesis.metrics.MetricsFactory;
 import software.amazon.kinesis.processor.Checkpointer;
@@ -86,6 +87,8 @@ public class ConsumerStatesTest {
     @Mock
     private ShardDetector shardDetector;
     @Mock
+    private HierarchicalShardSyncer hierarchicalShardSyncer;
+    @Mock
     private MetricsFactory metricsFactory;
     @Mock
     private ProcessRecordsInput processRecordsInput;
@@ -109,7 +112,8 @@ public void setup() {
                 taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
                 listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
                 shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM,
-                cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, metricsFactory, new AggregatorUtil());
+                cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(),
+                hierarchicalShardSyncer, metricsFactory);
         consumer = spy(
                 new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument));
 
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
index b17b4ca35..07fb92aef 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
@@ -17,6 +17,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -33,6 +34,7 @@
 import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
 import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
+import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
@@ -73,6 +75,8 @@ public class ShutdownTaskTest {
     private LeaseRefresher leaseRefresher;
     @Mock
     private ShardDetector shardDetector;
+    @Mock
+    private HierarchicalShardSyncer hierarchicalShardSyncer;
 
     @Before
     public void setUp() throws Exception {
@@ -86,7 +90,7 @@ public void setUp() throws Exception {
         task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
                 TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
                 ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
-                NULL_METRICS_FACTORY);
+                hierarchicalShardSyncer, NULL_METRICS_FACTORY);
     }
 
     /**
@@ -104,9 +108,15 @@ public final void testCallWhenApplicationDoesNotCheckpoint() {
      * Test method for {@link ShutdownTask#call()}.
      */
     @Test
-    public final void testCallWhenSyncingShardsThrows() {
+    public final void testCallWhenSyncingShardsThrows() throws Exception {
         when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
         when(shardDetector.listShards()).thenReturn(null);
+        doAnswer((invocation) -> {
+            throw new KinesisClientLibIOException("KinesisClientLibIOException");
+        }).when(hierarchicalShardSyncer)
+                .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
+                        cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
+                        NULL_METRICS_FACTORY.createMetrics());
 
         TaskResult result = task.call();
         assertNotNull(result.getException());