Skip to content

Commit

Permalink
Introducing HierarchicalShardSyncer inorder to run multiple Scheduler…
Browse files Browse the repository at this point in the history
…s in a JVM (#395)

* Run multiple instance of scheduler on one JVM

* handling creation of shardSyncer in DynamoDBLeaseManagementFactory and LeaseManagementConfig

* remove multi-threading unit test and do some small refactorings

* refectoring

* deprecate ShardSyncer and use HierarchichalShardSyncer instead; change the order for metricsFactory and HierarchichalShardSyncer in ShardConsumerArgument

* fix typos and use mock object of shardSyncer

* delete improper comments

* fix comments

* remove duplicated comments
  • Loading branch information
xiaoyu-meng-mxy authored and sahilpalvia committed Oct 10, 2018
1 parent 854e316 commit 14c6829
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class Scheduler implements Runnable {
private final ShardDetector shardDetector;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
Expand Down Expand Up @@ -195,6 +197,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardDetector = this.shardSyncTaskManager.shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
}

/**
Expand Down Expand Up @@ -239,7 +242,8 @@ private void initialize() {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory);
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
Expand Down Expand Up @@ -575,8 +579,9 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetector,
metricsFactory,
aggregatorUtil);
aggregatorUtil,
hierarchicalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@

import org.apache.commons.lang3.StringUtils;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
Expand All @@ -54,24 +53,27 @@
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class ShardSyncer {
@KinesisClientInternalApi
public class HierarchicalShardSyncer {

/**
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard).
*
*
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
Expand Down Expand Up @@ -152,7 +154,7 @@ private static Set<String> findInconsistentShardIds(final Map<String, Set<String
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
Expand Down Expand Up @@ -181,7 +183,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String,
}
}

private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null;
Expand Down Expand Up @@ -583,7 +585,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set<String> curren
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Expand Down Expand Up @@ -625,7 +627,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection<
* @throws InvalidStateException
* @throws DependencyException
*/
static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor {
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;

private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();

private LeaseManagementFactory leaseManagementFactory;

public LeaseManagementFactory leaseManagementFactory() {
Expand Down Expand Up @@ -258,6 +260,7 @@ public LeaseManagementFactory leaseManagementFactory() {
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback());
}
return leaseManagementFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class ShardSyncTask implements ConsumerTask {
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;

private final TaskType taskType = TaskType.SHARDSYNC;
Expand All @@ -62,7 +64,7 @@ public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);

try {
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition,
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,68 @@ public class ShardSyncTaskManager {
@NonNull
private final ExecutorService executorService;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;

/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
*
* @param shardDetector
* @param leaseRefresher
* @param initialPositionInStream
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIdleTimeMillis
* @param executorService
* @param metricsFactory
*/
@Deprecated
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
}

/**
* Constructor.
*
* @param shardDetector
* @param leaseRefresher
* @param initialPositionInStream
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIdleTimeMillis
* @param executorService
* @param hierarchicalShardSyncer
* @param metricsFactory
*/
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory;
}

private ConsumerTask currentTask;
private Future<TaskResult> future;

Expand Down Expand Up @@ -82,6 +142,7 @@ private synchronized boolean checkAndSubmitNextTask() {
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory),
metricsFactory);
future = executorService.submit(currentTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementFactory;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final ExecutorService executorService;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;

private final long failoverTimeMillis;
private final long epsilonMillis;
private final int maxLeasesForWorker;
Expand Down Expand Up @@ -162,7 +166,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
}

/**
Expand Down Expand Up @@ -191,6 +195,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
Expand All @@ -203,6 +208,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
Expand All @@ -227,6 +233,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
}

Expand All @@ -253,6 +260,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
metricsFactory);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package software.amazon.kinesis.leases.exceptions;

import lombok.NonNull;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.metrics.MetricsScope;

/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*
* <p>NOTE: This class is deprecated and will be removed in a future release.</p>
*/
@Deprecated
public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();

/**
* <p>NOTE: This method is deprecated and will be removed in a future release.</p>
*
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
@Deprecated
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
argument.leaseRefresher(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
Expand Down Expand Up @@ -65,7 +66,8 @@ public class ShardConsumerArgument {
private final boolean ignoreUnexpectedChildShards;
@NonNull
private final ShardDetector shardDetector;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
}
Loading

0 comments on commit 14c6829

Please sign in to comment.