Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/config/chunks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ chunks:
- common/localstack-test
- statestore
- statestore-committer-core
- statestore-committer
- statestore-lambda
- metrics
- example-iterators
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/chunk-common.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ on:
- 'java/common/localstack-test/**'
- 'java/statestore/**'
- 'java/statestore-committer-core/**'
- 'java/statestore-committer/**'
- 'java/statestore-lambda/**'
- 'java/statestore/**'
- 'java/metrics/**'
Expand Down
4 changes: 4 additions & 0 deletions code-style/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Or>
</Match>
<Match>
<Class name="sleeper.statestore.committer.MultiThreadedStateStoreCommitter" />
<Bug pattern="DM_GC" />
</Match>
</FindBugsFilter>
1 change: 1 addition & 0 deletions docs/deployment/docker-images.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ are listed here.
| bulk-import-runner | EksBulkImportStack | false |
| compaction-job-execution | CompactionStack | true |
| bulk-export-task-execution | BulkExportStack | false |
| statestore-committer | | false |


## Lambda images
Expand Down
4 changes: 4 additions & 0 deletions docs/usage/properties/instance/user/table_state.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ The following instance properties relate to handling the state of Sleeper tables
| sleeper.statestore.transaction.follower.memory.mb | The amount of memory in MB for the lambda that follows the state store transaction log to trigger updates. | | true |
| sleeper.tables.index.dynamo.pointintimerecovery | This specifies whether point in time recovery is enabled for the Sleeper table index. This is set on the DynamoDB tables. | false | true |
| sleeper.tables.index.dynamo.consistent.reads | This specifies whether queries and scans against the table index DynamoDB tables are strongly consistent. | true | false |
| sleeper.statestore.committer.platform | The platform that the State Store Committer will be deployed to for execution.<br>Valid values are: [lambda, ec2]<br> | LAMBDA | true |
| sleeper.statestore.committer.lambda.memory.mb | The amount of memory in MB for the lambda that commits state store updates. | | true |
| sleeper.statestore.committer.lambda.timeout.seconds | The timeout for the lambda that commits state store updates in seconds. | 900 | true |
| sleeper.statestore.committer.batch.size | The number of state store updates to be sent to the state store committer lambda in one invocation. This will be the batch size for a lambda as an SQS FIFO event source. This can be a maximum of 10. | 10 | true |
| sleeper.statestore.committer.concurrency.reserved | The reserved concurrency for the state store committer lambda.<br>Presently this value defaults to 10 to align with expectations around table efficiency.<br>This is to ensure that state store operations can still be applied to at least 10 tables, even when concurrency is used up in the account.<br>See reserved concurrency overview at: https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html | 10 | true |
| sleeper.statestore.committer.concurrency.max | The maximum given concurrency allowed for the state store committer lambda.<br>See maximum concurrency overview at: https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/ | | true |
| sleeper.statestore.committer.ec2.type | The EC2 instance type that the multi-threaded state store committer should be deployed onto. | m8g.xlarge | true |
| sleeper.statestore.committer.ec2.min.heap.target.amount | The minimum amount of heap space that the committer will try to keep available. This affects how many state stores can be cached in memory. | 100M | false |
| sleeper.statestore.committer.ec2.min.heap.target.percentage | The percentage of the total heap space that the committer should try to keep available, as a minimum. This affects how many state stores can be cached in memory. | 1 | false |
15 changes: 15 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ sleeper.tables.index.dynamo.pointintimerecovery=false
# consistent.
sleeper.tables.index.dynamo.consistent.reads=true

# The platform that the State Store Committer will be deployed to for execution.
# Valid values are: [lambda, ec2]
sleeper.statestore.committer.platform=LAMBDA

# The amount of memory in MB for the lambda that commits state store updates.
sleeper.statestore.committer.lambda.memory.mb=4096

Expand All @@ -280,6 +284,17 @@ sleeper.statestore.committer.concurrency.reserved=10
# https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/
sleeper.statestore.committer.concurrency.max=10

# The EC2 instance type that the multi-threaded state store committer should be deployed onto.
sleeper.statestore.committer.ec2.type=m8g.xlarge

# The minimum amount of heap space that the committer will try to keep available. This affects how
# many state stores can be cached in memory.
sleeper.statestore.committer.ec2.min.heap.target.amount=100M

# The percentage of the total heap space that the committer should try to keep available, as a
# minimum. This affects how many state stores can be cached in memory.
sleeper.statestore.committer.ec2.min.heap.target.percentage=1


## The following instance properties relate to standard ingest.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@

import software.amazon.awscdk.Duration;
import software.amazon.awscdk.NestedStack;
import software.amazon.awscdk.services.autoscaling.AutoScalingGroup;
import software.amazon.awscdk.services.autoscaling.UpdatePolicy;
import software.amazon.awscdk.services.ec2.IVpc;
import software.amazon.awscdk.services.ec2.InstanceType;
import software.amazon.awscdk.services.ec2.LaunchTemplate;
import software.amazon.awscdk.services.ec2.UserData;
import software.amazon.awscdk.services.ec2.Vpc;
import software.amazon.awscdk.services.ec2.VpcLookupOptions;
import software.amazon.awscdk.services.ecr.IRepository;
import software.amazon.awscdk.services.ecr.Repository;
import software.amazon.awscdk.services.ecs.AmiHardwareType;
import software.amazon.awscdk.services.ecs.AsgCapacityProvider;
import software.amazon.awscdk.services.ecs.Cluster;
import software.amazon.awscdk.services.ecs.ContainerDefinitionOptions;
import software.amazon.awscdk.services.ecs.ContainerImage;
import software.amazon.awscdk.services.ecs.Ec2Service;
import software.amazon.awscdk.services.ecs.Ec2TaskDefinition;
import software.amazon.awscdk.services.ecs.EcsOptimizedImage;
import software.amazon.awscdk.services.iam.Effect;
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.iam.Role;
import software.amazon.awscdk.services.iam.ServicePrincipal;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.ILogGroup;
Expand All @@ -37,18 +57,26 @@
import sleeper.cdk.stack.ingest.IngestTrackerResources;
import sleeper.cdk.util.TrackDeadLetters;
import sleeper.cdk.util.Utils;
import sleeper.core.deploy.DockerDeployment;
import sleeper.core.deploy.LambdaHandler;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.instance.TableStateProperty;
import sleeper.core.properties.model.StateStoreCommitterPlatform;
import sleeper.core.util.EnvironmentUtils;

import java.util.List;
import java.util.Map;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.REGION;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_EVENT_SOURCE_ID;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.VERSION;
import static sleeper.core.properties.instance.CommonProperty.ID;
import static sleeper.core.properties.instance.CommonProperty.VPC_ID;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_BATCH_SIZE;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_LAMBDA_CONCURRENCY_MAXIMUM;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_LAMBDA_CONCURRENCY_RESERVED;
Expand Down Expand Up @@ -79,10 +107,15 @@ public StateStoreCommitterStack(
SleeperLambdaCode lambdaCode = jars.lambdaCode(jarsBucket);

commitQueue = sqsQueueForStateStoreCommitter(policiesStack, deadLetters);
lambdaToCommitStateStoreUpdates(

if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) {
ecsTaskToCommitStateStoreUpdates(loggingStack, configBucketStack, tableIndexStack, stateStoreStacks, commitQueue);
} else {
lambdaToCommitStateStoreUpdates(
loggingStack, policiesStack, lambdaCode,
configBucketStack, tableIndexStack, stateStoreStacks,
compactionTracker, ingestTracker);
}
}

private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack, TrackDeadLetters deadLetters) {
Expand Down Expand Up @@ -116,6 +149,91 @@ private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack,
return queue;
}

private void ecsTaskToCommitStateStoreUpdates(LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks, Queue commitQueue) {
String instanceId = this.instanceProperties.get(ID);

IVpc vpc = Vpc.fromLookup(this, "vpc", VpcLookupOptions.builder()
.vpcId(this.instanceProperties.get(VPC_ID))
.build());
String clusterName = String.join("-", "sleeper",
Utils.cleanInstanceId(this.instanceProperties), "statestore-commit-cluster");
Cluster cluster = Cluster.Builder.create(this, "cluster")
.clusterName(clusterName)
.vpc(vpc)
.build();

String ec2InstanceType = this.instanceProperties.get(TableStateProperty.STATESTORE_COMMITTER_EC2_INSTANCE_TYPE);

cluster.addAsgCapacityProvider(AsgCapacityProvider.Builder.create(this, "capacity-provider")
.autoScalingGroup(AutoScalingGroup.Builder.create(this, "asg")
.launchTemplate(LaunchTemplate.Builder.create(this, "instance-template")
.instanceType(new InstanceType(ec2InstanceType))
.machineImage(EcsOptimizedImage.amazonLinux2023(AmiHardwareType.ARM))
.requireImdsv2(true)
.userData(UserData.forLinux())
.role(Role.Builder.create(this, "role")
.assumedBy(ServicePrincipal.fromStaticServicePrincipleName("ec2.amazonaws.com"))
.build())
.build())
.vpc(vpc)
.minCapacity(0)
.maxCapacity(1)
.desiredCapacity(1)
.minHealthyPercentage(0)
.maxHealthyPercentage(100)
.defaultInstanceWarmup(Duration.seconds(30))
.newInstancesProtectedFromScaleIn(false)
.updatePolicy(UpdatePolicy.rollingUpdate())
.build())
.instanceWarmupPeriod(30)
.enableManagedTerminationProtection(false)
.build());

IRepository repository = Repository.fromRepositoryName(this, "ecr", DockerDeployment.STATESTORE_COMMITTER.getEcrRepositoryName(this.instanceProperties));
ContainerImage containerImage = ContainerImage.fromEcrRepository(repository, this.instanceProperties.get(VERSION));

ILogGroup logGroup = loggingStack.getLogGroup(LogGroupRef.STATESTORE_COMMITTER);

Map<String, String> environmentVariables = EnvironmentUtils.createDefaultEnvironment(this.instanceProperties);
environmentVariables.put(Utils.AWS_REGION, this.instanceProperties.get(REGION));

if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) {
Ec2TaskDefinition taskDefinition = Ec2TaskDefinition.Builder.create(this, "task-definition")
.family(String.join("-", Utils.cleanInstanceId(this.instanceProperties), "StateStoreCommitterOnEC2"))
.build();

taskDefinition.addContainer("committer", ContainerDefinitionOptions.builder()
.containerName("committer")
.image(containerImage)
.command(List.of(instanceId, commitQueue.getQueueUrl()))
.environment(environmentVariables)
.memoryReservationMiB(1024)
.logging(Utils.createECSContainerLogDriver(logGroup))
.build());

commitQueue.grantConsumeMessages(taskDefinition.getTaskRole());
configBucketStack.grantRead(taskDefinition.getTaskRole());
tableIndexStack.grantRead(taskDefinition.getTaskRole());
stateStoreStacks.grantReadWriteAllFilesAndPartitions(taskDefinition.getTaskRole());

Ec2Service.Builder.create(this, "service")
.cluster(cluster)
.taskDefinition(taskDefinition)
.desiredCount(1)
.build();

} else {
throw new IllegalArgumentException(
"Unknown value for " +
TableStateProperty.STATESTORE_COMMITTER_PLATFORM.getPropertyName() + ": " +
this.instanceProperties.getEnumValue(
TableStateProperty.STATESTORE_COMMITTER_PLATFORM,
StateStoreCommitterPlatform.class
)
);
}
}

private void lambdaToCommitStateStoreUpdates(
LoggingStack loggingStack, ManagedPoliciesStack policiesStack, SleeperLambdaCode lambdaCode,
ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public List<StackDockerImage> getAllImagesToUpload() {

private Stream<StackDockerImage> dockerDeploymentImages(Collection<OptionalStack> stacks) {
return dockerDeployments.stream()
.filter(deployment -> stacks.contains(deployment.getOptionalStack()))
.filter(deployment -> deployment.getOptionalStack() == null || stacks.contains(deployment.getOptionalStack()))
.map(StackDockerImage::fromDockerDeployment);
}

Expand Down
Loading
Loading