Skip to content

Commit

Permalink
Merge pull request awslabs#48 from ashwing/ltr_1_hsyncperstreamconfig
Browse files Browse the repository at this point in the history
Hierachical stream syncer per stream config changes
  • Loading branch information
ashwing authored Jun 10, 2020
2 parents 291d1b4 + b323e7c commit 61aeae3
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class Scheduler implements Runnable {
private final Function<StreamConfig, ShardDetector> shardDetectorProvider;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
Expand Down Expand Up @@ -284,8 +284,7 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
// TODO : LTR : Check if this needs to be per stream.
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(isMultiStreamMode);
this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
Expand Down Expand Up @@ -922,7 +921,7 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig),
aggregatorUtil,
hierarchicalShardSyncer,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class HierarchicalShardSyncer {

private final boolean isMultiStreamMode;

private String streamIdentifier = "";
private final String streamIdentifier;

private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
Expand All @@ -84,10 +84,12 @@ public class HierarchicalShardSyncer {

public HierarchicalShardSyncer() {
isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
}

public HierarchicalShardSyncer(final boolean isMultiStreamMode) {
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
}

private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
Expand Down Expand Up @@ -118,7 +120,6 @@ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDet
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, garbageCollectLeases,
Expand All @@ -132,7 +133,6 @@ public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDet
final MetricsScope scope, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {

this.streamIdentifier = shardDetector.streamIdentifier().serialize();
//TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191

if (!CollectionUtils.isNullOrEmpty(latestShards)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,6 @@ private HierarchicalShardSyncer hierarchicalShardSyncer() {
return hierarchicalShardSyncer;
}

/**
* Vends HierarchicalShardSyncer based on MultiStreamingMode. With MultiStreamMode shard syncer creates
* leases to accommodate more than one stream.
* @param isMultiStreamingMode
* @return HierarchicalShardSyncer
*/
public HierarchicalShardSyncer hierarchicalShardSyncer(boolean isMultiStreamingMode) {
if(hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer(isMultiStreamingMode);
}
return hierarchicalShardSyncer;
}

@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
Expand Down Expand Up @@ -351,12 +338,13 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(isMultiStreamingMode),
hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
leaseSerializer,
customShardDetectorProvider());
customShardDetectorProvider(),
isMultiStreamingMode);
}
return leaseManagementFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final ExecutorService executorService;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
@NonNull
private final LeaseSerializer leaseSerializer;
@NonNull
Expand All @@ -82,6 +82,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean isMultiStreamMode;

/**
* Constructor.
Expand Down Expand Up @@ -207,7 +208,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
*/
@Deprecated
Expand All @@ -221,14 +222,14 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}

/**
Expand Down Expand Up @@ -257,7 +258,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
Expand All @@ -272,15 +273,15 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
}

/**
Expand Down Expand Up @@ -309,7 +310,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
Expand All @@ -325,7 +326,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {

this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
Expand All @@ -334,7 +335,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}

/**
Expand Down Expand Up @@ -362,7 +363,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
Expand All @@ -376,16 +377,16 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null);
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null, false);
this.streamConfig = streamConfig;
}

Expand All @@ -412,11 +413,13 @@ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, f
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchicalShardSyncer
* @param deprecatedHierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
Expand All @@ -427,9 +430,9 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider) {
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
Expand All @@ -451,12 +454,13 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
}

@Override
Expand All @@ -481,8 +485,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
executorService, deprecatedHierarchicalShardSyncer,
metricsFactory);
}

Expand All @@ -501,7 +504,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
metricsFactory);
}

Expand Down
Loading

0 comments on commit 61aeae3

Please sign in to comment.