diff --git a/.github/config/chunks.yaml b/.github/config/chunks.yaml
index 14b21df6c12..1cda1123698 100644
--- a/.github/config/chunks.yaml
+++ b/.github/config/chunks.yaml
@@ -31,6 +31,7 @@ chunks:
- common/localstack-test
- statestore
- statestore-committer-core
+ - statestore-committer
- statestore-lambda
- metrics
- example-iterators
diff --git a/.github/workflows/chunk-common.yaml b/.github/workflows/chunk-common.yaml
index edffa93a9cb..61df88298e8 100644
--- a/.github/workflows/chunk-common.yaml
+++ b/.github/workflows/chunk-common.yaml
@@ -22,6 +22,7 @@ on:
- 'java/common/localstack-test/**'
- 'java/statestore/**'
- 'java/statestore-committer-core/**'
+ - 'java/statestore-committer/**'
- 'java/statestore-lambda/**'
- 'java/statestore/**'
- 'java/metrics/**'
diff --git a/code-style/spotbugs-exclude.xml b/code-style/spotbugs-exclude.xml
index da612bf353a..bf1b7bc438d 100644
--- a/code-style/spotbugs-exclude.xml
+++ b/code-style/spotbugs-exclude.xml
@@ -26,4 +26,8 @@
+
+
+
+
diff --git a/docs/deployment/docker-images.md b/docs/deployment/docker-images.md
index b1e1739d5f7..bdf974c4163 100644
--- a/docs/deployment/docker-images.md
+++ b/docs/deployment/docker-images.md
@@ -27,6 +27,7 @@ are listed here.
| bulk-import-runner | EksBulkImportStack | false |
| compaction-job-execution | CompactionStack | true |
| bulk-export-task-execution | BulkExportStack | false |
+| statestore-committer | | false |
## Lambda images
diff --git a/docs/usage/properties/instance/user/table_state.md b/docs/usage/properties/instance/user/table_state.md
index e05fad60d4a..85a5d0ab685 100644
--- a/docs/usage/properties/instance/user/table_state.md
+++ b/docs/usage/properties/instance/user/table_state.md
@@ -32,8 +32,12 @@ The following instance properties relate to handling the state of Sleeper tables
| sleeper.statestore.transaction.follower.memory.mb | The amount of memory in MB for the lambda that follows the state store transaction log to trigger updates. | | true |
| sleeper.tables.index.dynamo.pointintimerecovery | This specifies whether point in time recovery is enabled for the Sleeper table index. This is set on the DynamoDB tables. | false | true |
| sleeper.tables.index.dynamo.consistent.reads | This specifies whether queries and scans against the table index DynamoDB tables are strongly consistent. | true | false |
+| sleeper.statestore.committer.platform | The platform that the State Store Committer will be deployed to for execution.
Valid values are: [lambda, ec2]
| LAMBDA | true |
| sleeper.statestore.committer.lambda.memory.mb | The amount of memory in MB for the lambda that commits state store updates. | | true |
| sleeper.statestore.committer.lambda.timeout.seconds | The timeout for the lambda that commits state store updates in seconds. | 900 | true |
| sleeper.statestore.committer.batch.size | The number of state store updates to be sent to the state store committer lambda in one invocation. This will be the batch size for a lambda as an SQS FIFO event source. This can be a maximum of 10. | 10 | true |
| sleeper.statestore.committer.concurrency.reserved | The reserved concurrency for the state store committer lambda.
Presently this value defaults to 10 to align with expectations around table efficiency.
This is to ensure that state store operations can still be applied to at least 10 tables, even when concurrency is used up in the account.
See reserved concurrency overview at: https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html | 10 | true |
| sleeper.statestore.committer.concurrency.max | The maximum given concurrency allowed for the state store committer lambda.
See maximum concurrency overview at: https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/ | | true |
+| sleeper.statestore.committer.ec2.type | The EC2 instance type that the multi-threaded state store committer should be deployed onto. | m8g.xlarge | true |
+| sleeper.statestore.committer.ec2.min.heap.target.amount | The minimum amount of heap space that the committer will try to keep available. This affects how many state stores can be cached in memory. | 100M | false |
+| sleeper.statestore.committer.ec2.min.heap.target.percentage | The percentage of the total heap space that the committer should try to keep available, as a minimum. This affects how many state stores can be cached in memory. | 1 | false |
diff --git a/example/full/instance.properties b/example/full/instance.properties
index c05bf322386..8c340c39c8f 100644
--- a/example/full/instance.properties
+++ b/example/full/instance.properties
@@ -257,6 +257,10 @@ sleeper.tables.index.dynamo.pointintimerecovery=false
# consistent.
sleeper.tables.index.dynamo.consistent.reads=true
+# The platform that the State Store Committer will be deployed to for execution.
+# Valid values are: [lambda, ec2]
+sleeper.statestore.committer.platform=LAMBDA
+
# The amount of memory in MB for the lambda that commits state store updates.
sleeper.statestore.committer.lambda.memory.mb=4096
@@ -280,6 +284,17 @@ sleeper.statestore.committer.concurrency.reserved=10
# https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/
sleeper.statestore.committer.concurrency.max=10
+# The EC2 instance type that the multi-threaded state store committer should be deployed onto.
+sleeper.statestore.committer.ec2.type=m8g.xlarge
+
+# The minimum amount of heap space that the committer will try to keep available. This affects how
+# many state stores can be cached in memory.
+sleeper.statestore.committer.ec2.min.heap.target.amount=100M
+
+# The percentage of the total heap space that the committer should try to keep available, as a
+# minimum. This affects how many state stores can be cached in memory.
+sleeper.statestore.committer.ec2.min.heap.target.percentage=1
+
## The following instance properties relate to standard ingest.
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/core/StateStoreCommitterStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/core/StateStoreCommitterStack.java
index ddae01e112b..80501f25635 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/core/StateStoreCommitterStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/core/StateStoreCommitterStack.java
@@ -17,9 +17,29 @@
import software.amazon.awscdk.Duration;
import software.amazon.awscdk.NestedStack;
+import software.amazon.awscdk.services.autoscaling.AutoScalingGroup;
+import software.amazon.awscdk.services.autoscaling.UpdatePolicy;
+import software.amazon.awscdk.services.ec2.IVpc;
+import software.amazon.awscdk.services.ec2.InstanceType;
+import software.amazon.awscdk.services.ec2.LaunchTemplate;
+import software.amazon.awscdk.services.ec2.UserData;
+import software.amazon.awscdk.services.ec2.Vpc;
+import software.amazon.awscdk.services.ec2.VpcLookupOptions;
+import software.amazon.awscdk.services.ecr.IRepository;
+import software.amazon.awscdk.services.ecr.Repository;
+import software.amazon.awscdk.services.ecs.AmiHardwareType;
+import software.amazon.awscdk.services.ecs.AsgCapacityProvider;
+import software.amazon.awscdk.services.ecs.Cluster;
+import software.amazon.awscdk.services.ecs.ContainerDefinitionOptions;
+import software.amazon.awscdk.services.ecs.ContainerImage;
+import software.amazon.awscdk.services.ecs.Ec2Service;
+import software.amazon.awscdk.services.ecs.Ec2TaskDefinition;
+import software.amazon.awscdk.services.ecs.EcsOptimizedImage;
import software.amazon.awscdk.services.iam.Effect;
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.iam.PolicyStatement;
+import software.amazon.awscdk.services.iam.Role;
+import software.amazon.awscdk.services.iam.ServicePrincipal;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.ILogGroup;
@@ -37,18 +57,26 @@
import sleeper.cdk.stack.ingest.IngestTrackerResources;
import sleeper.cdk.util.TrackDeadLetters;
import sleeper.cdk.util.Utils;
+import sleeper.core.deploy.DockerDeployment;
import sleeper.core.deploy.LambdaHandler;
import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.instance.TableStateProperty;
+import sleeper.core.properties.model.StateStoreCommitterPlatform;
import sleeper.core.util.EnvironmentUtils;
import java.util.List;
+import java.util.Map;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.REGION;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_EVENT_SOURCE_ID;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.VERSION;
+import static sleeper.core.properties.instance.CommonProperty.ID;
+import static sleeper.core.properties.instance.CommonProperty.VPC_ID;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_BATCH_SIZE;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_LAMBDA_CONCURRENCY_MAXIMUM;
import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_LAMBDA_CONCURRENCY_RESERVED;
@@ -79,10 +107,15 @@ public StateStoreCommitterStack(
SleeperLambdaCode lambdaCode = jars.lambdaCode(jarsBucket);
commitQueue = sqsQueueForStateStoreCommitter(policiesStack, deadLetters);
- lambdaToCommitStateStoreUpdates(
+
+ if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) {
+ ecsTaskToCommitStateStoreUpdates(loggingStack, configBucketStack, tableIndexStack, stateStoreStacks, commitQueue);
+ } else {
+ lambdaToCommitStateStoreUpdates(
loggingStack, policiesStack, lambdaCode,
configBucketStack, tableIndexStack, stateStoreStacks,
compactionTracker, ingestTracker);
+ }
}
private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack, TrackDeadLetters deadLetters) {
@@ -116,6 +149,91 @@ private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack,
return queue;
}
+ private void ecsTaskToCommitStateStoreUpdates(LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks, Queue commitQueue) {
+ String instanceId = this.instanceProperties.get(ID);
+
+ IVpc vpc = Vpc.fromLookup(this, "vpc", VpcLookupOptions.builder()
+ .vpcId(this.instanceProperties.get(VPC_ID))
+ .build());
+ String clusterName = String.join("-", "sleeper",
+ Utils.cleanInstanceId(this.instanceProperties), "statestore-commit-cluster");
+ Cluster cluster = Cluster.Builder.create(this, "cluster")
+ .clusterName(clusterName)
+ .vpc(vpc)
+ .build();
+
+ String ec2InstanceType = this.instanceProperties.get(TableStateProperty.STATESTORE_COMMITTER_EC2_INSTANCE_TYPE);
+
+ cluster.addAsgCapacityProvider(AsgCapacityProvider.Builder.create(this, "capacity-provider")
+ .autoScalingGroup(AutoScalingGroup.Builder.create(this, "asg")
+ .launchTemplate(LaunchTemplate.Builder.create(this, "instance-template")
+ .instanceType(new InstanceType(ec2InstanceType))
+ .machineImage(EcsOptimizedImage.amazonLinux2023(AmiHardwareType.ARM))
+ .requireImdsv2(true)
+ .userData(UserData.forLinux())
+ .role(Role.Builder.create(this, "role")
+ .assumedBy(ServicePrincipal.fromStaticServicePrincipleName("ec2.amazonaws.com"))
+ .build())
+ .build())
+ .vpc(vpc)
+ .minCapacity(0)
+ .maxCapacity(1)
+ .desiredCapacity(1)
+ .minHealthyPercentage(0)
+ .maxHealthyPercentage(100)
+ .defaultInstanceWarmup(Duration.seconds(30))
+ .newInstancesProtectedFromScaleIn(false)
+ .updatePolicy(UpdatePolicy.rollingUpdate())
+ .build())
+ .instanceWarmupPeriod(30)
+ .enableManagedTerminationProtection(false)
+ .build());
+
+ IRepository repository = Repository.fromRepositoryName(this, "ecr", DockerDeployment.STATESTORE_COMMITTER.getEcrRepositoryName(this.instanceProperties));
+ ContainerImage containerImage = ContainerImage.fromEcrRepository(repository, this.instanceProperties.get(VERSION));
+
+ ILogGroup logGroup = loggingStack.getLogGroup(LogGroupRef.STATESTORE_COMMITTER);
+
+ Map environmentVariables = EnvironmentUtils.createDefaultEnvironment(this.instanceProperties);
+ environmentVariables.put(Utils.AWS_REGION, this.instanceProperties.get(REGION));
+
+ if (this.instanceProperties.getEnumValue(TableStateProperty.STATESTORE_COMMITTER_PLATFORM, StateStoreCommitterPlatform.class).equals(StateStoreCommitterPlatform.EC2)) {
+ Ec2TaskDefinition taskDefinition = Ec2TaskDefinition.Builder.create(this, "task-definition")
+ .family(String.join("-", Utils.cleanInstanceId(this.instanceProperties), "StateStoreCommitterOnEC2"))
+ .build();
+
+ taskDefinition.addContainer("committer", ContainerDefinitionOptions.builder()
+ .containerName("committer")
+ .image(containerImage)
+ .command(List.of(instanceId, commitQueue.getQueueUrl()))
+ .environment(environmentVariables)
+ .memoryReservationMiB(1024)
+ .logging(Utils.createECSContainerLogDriver(logGroup))
+ .build());
+
+ commitQueue.grantConsumeMessages(taskDefinition.getTaskRole());
+ configBucketStack.grantRead(taskDefinition.getTaskRole());
+ tableIndexStack.grantRead(taskDefinition.getTaskRole());
+ stateStoreStacks.grantReadWriteAllFilesAndPartitions(taskDefinition.getTaskRole());
+
+ Ec2Service.Builder.create(this, "service")
+ .cluster(cluster)
+ .taskDefinition(taskDefinition)
+ .desiredCount(1)
+ .build();
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown value for " +
+ TableStateProperty.STATESTORE_COMMITTER_PLATFORM.getPropertyName() + ": " +
+ this.instanceProperties.getEnumValue(
+ TableStateProperty.STATESTORE_COMMITTER_PLATFORM,
+ StateStoreCommitterPlatform.class
+ )
+ );
+ }
+ }
+
private void lambdaToCommitStateStoreUpdates(
LoggingStack loggingStack, ManagedPoliciesStack policiesStack, SleeperLambdaCode lambdaCode,
ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks,
diff --git a/java/clients/src/main/java/sleeper/clients/deploy/container/DockerImageConfiguration.java b/java/clients/src/main/java/sleeper/clients/deploy/container/DockerImageConfiguration.java
index dbe002e3b4a..dcbf20b8ef0 100644
--- a/java/clients/src/main/java/sleeper/clients/deploy/container/DockerImageConfiguration.java
+++ b/java/clients/src/main/java/sleeper/clients/deploy/container/DockerImageConfiguration.java
@@ -113,7 +113,7 @@ public List getAllImagesToUpload() {
private Stream dockerDeploymentImages(Collection stacks) {
return dockerDeployments.stream()
- .filter(deployment -> stacks.contains(deployment.getOptionalStack()))
+ .filter(deployment -> deployment.getOptionalStack() == null || stacks.contains(deployment.getOptionalStack()))
.map(StackDockerImage::fromDockerDeployment);
}
diff --git a/java/clients/src/main/java/sleeper/clients/table/TakeAllTablesOffline.java b/java/clients/src/main/java/sleeper/clients/table/TakeAllTablesOffline.java
new file mode 100644
index 00000000000..799897feb23
--- /dev/null
+++ b/java/clients/src/main/java/sleeper/clients/table/TakeAllTablesOffline.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sleeper.clients.table;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.table.TableProperties;
+import sleeper.core.properties.table.TablePropertiesStore;
+
+import static sleeper.configuration.utils.AwsV2ClientHelper.buildAwsV2Client;
+import static sleeper.core.properties.table.TableProperty.TABLE_ONLINE;
+
+public class TakeAllTablesOffline {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TakeAllTablesOffline.class);
+
+ private S3Client s3Client;
+ private DynamoDbClient dynamoClient;
+
+ public TakeAllTablesOffline(S3Client s3Client, DynamoDbClient dynamoClient) {
+ this.s3Client = s3Client;
+ this.dynamoClient = dynamoClient;
+ }
+
+ public void takeAllOffline(String instanceId) {
+ InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(this.s3Client, instanceId);
+ TablePropertiesStore tablePropertiesStore = S3TableProperties.createStore(instanceProperties, this.s3Client, this.dynamoClient);
+
+ tablePropertiesStore.streamOnlineTableIds().forEach(table -> {
+ TableProperties tableProperties = tablePropertiesStore.loadByName(table.getTableName());
+ tableProperties.set(TABLE_ONLINE, "false");
+ tablePropertiesStore.save(tableProperties);
+ LOGGER.info("Successfully took table offline {}:{}", instanceId, table.getTableName());
+ });
+ }
+
+ public static void main(String[] args) {
+ if (args.length == 0) {
+ System.out.println("Usage: [...]");
+ }
+
+ try (
+ S3Client s3Client = buildAwsV2Client(S3Client.builder());
+ DynamoDbClient dynamoClient = buildAwsV2Client(DynamoDbClient.builder())
+ ) {
+ TakeAllTablesOffline offliner = new TakeAllTablesOffline(s3Client, dynamoClient);
+ for (int i = 0; i < args.length; i++) {
+ offliner.takeAllOffline(args[i]);
+ }
+ }
+ }
+
+}
diff --git a/java/core/src/main/java/sleeper/core/deploy/DockerDeployment.java b/java/core/src/main/java/sleeper/core/deploy/DockerDeployment.java
index d87abe2ab73..0ec1c4410fb 100644
--- a/java/core/src/main/java/sleeper/core/deploy/DockerDeployment.java
+++ b/java/core/src/main/java/sleeper/core/deploy/DockerDeployment.java
@@ -51,6 +51,9 @@ public class DockerDeployment {
.deploymentName("bulk-export-task-execution")
.optionalStack(OptionalStack.BulkExportStack)
.add();
+ public static final DockerDeployment STATESTORE_COMMITTER = builder()
+ .deploymentName("statestore-committer")
+ .add();
private final String deploymentName;
private final OptionalStack optionalStack;
diff --git a/java/core/src/main/java/sleeper/core/properties/instance/TableStateProperty.java b/java/core/src/main/java/sleeper/core/properties/instance/TableStateProperty.java
index 5fd9bb0a057..4342093bf07 100644
--- a/java/core/src/main/java/sleeper/core/properties/instance/TableStateProperty.java
+++ b/java/core/src/main/java/sleeper/core/properties/instance/TableStateProperty.java
@@ -17,11 +17,13 @@
import sleeper.core.properties.SleeperPropertyIndex;
import sleeper.core.properties.model.SleeperPropertyValueUtils;
+import sleeper.core.properties.model.StateStoreCommitterPlatform;
import java.util.List;
import static sleeper.core.properties.instance.CommonProperty.DEFAULT_LAMBDA_CONCURRENCY_MAXIMUM;
import static sleeper.core.properties.instance.CommonProperty.DEFAULT_LAMBDA_CONCURRENCY_RESERVED;
+import static sleeper.core.properties.model.SleeperPropertyValueUtils.describeEnumValuesInLowerCase;
/**
* Definitions of instance properties relating to handling the state of Sleeper tables.
@@ -237,6 +239,14 @@ private static UserDefinedInstancePropertyImpl.Builder propertyBuilder(String pr
.validationPredicate(SleeperPropertyValueUtils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.TABLE_STATE)
.build();
+ UserDefinedInstanceProperty STATESTORE_COMMITTER_PLATFORM = Index.propertyBuilder("sleeper.statestore.committer.platform")
+ .description("The platform that the State Store Committer will be deployed to for execution.\n" +
+ "Valid values are: " + describeEnumValuesInLowerCase(StateStoreCommitterPlatform.class) + "\n")
+ .defaultValue(StateStoreCommitterPlatform.LAMBDA.toString())
+ .validationPredicate(StateStoreCommitterPlatform::isValid)
+ .propertyGroup(InstancePropertyGroup.TABLE_STATE)
+ .runCdkDeployWhenChanged(true)
+ .build();
UserDefinedInstanceProperty STATESTORE_COMMITTER_LAMBDA_MEMORY_IN_MB = Index.propertyBuilder("sleeper.statestore.committer.lambda.memory.mb")
.description("The amount of memory in MB for the lambda that commits state store updates.")
.defaultProperty(DEFAULT_TABLE_STATE_LAMBDA_MEMORY)
@@ -270,4 +280,23 @@ private static UserDefinedInstancePropertyImpl.Builder propertyBuilder(String pr
"See maximum concurrency overview at: https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/")
.propertyGroup(InstancePropertyGroup.TABLE_STATE)
.runCdkDeployWhenChanged(true).build();
+ UserDefinedInstanceProperty STATESTORE_COMMITTER_EC2_INSTANCE_TYPE = Index.propertyBuilder("sleeper.statestore.committer.ec2.type")
+ .description("The EC2 instance type that the multi-threaded state store committer should be deployed onto.")
+ .defaultValue("m8g.xlarge")
+ .runCdkDeployWhenChanged(true)
+ .validationPredicate(SleeperPropertyValueUtils::isNonNullNonEmptyString)
+ .propertyGroup(InstancePropertyGroup.TABLE_STATE)
+ .build();
+ UserDefinedInstanceProperty STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_AMOUNT = Index.propertyBuilder("sleeper.statestore.committer.ec2.min.heap.target.amount")
+ .description("The minimum amount of heap space that the committer will try to keep available. This affects how many state stores can be cached in memory.")
+ .defaultValue("100M")
+ .validationPredicate(SleeperPropertyValueUtils::isValidNumberOfBytes)
+ .propertyGroup(InstancePropertyGroup.TABLE_STATE)
+ .build();
+ UserDefinedInstanceProperty STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_PERCENTAGE = Index.propertyBuilder("sleeper.statestore.committer.ec2.min.heap.target.percentage")
+ .description("The percentage of the total heap space that the committer should try to keep available, as a minimum. This affects how many state stores can be cached in memory.")
+ .defaultValue("1")
+ .validationPredicate(SleeperPropertyValueUtils::isPositiveLong)
+ .propertyGroup(InstancePropertyGroup.TABLE_STATE)
+ .build();
}
diff --git a/java/core/src/main/java/sleeper/core/properties/model/StateStoreCommitterPlatform.java b/java/core/src/main/java/sleeper/core/properties/model/StateStoreCommitterPlatform.java
new file mode 100644
index 00000000000..185e0555e14
--- /dev/null
+++ b/java/core/src/main/java/sleeper/core/properties/model/StateStoreCommitterPlatform.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.core.properties.model;
+
+import org.apache.commons.lang3.EnumUtils;
+
+/**
+ * Indicates which execution environment the multi-threaded state store committer should be deployed onto.
+ */
+public enum StateStoreCommitterPlatform {
+ LAMBDA, EC2;
+
+ /**
+ * Checks if the value is a valid execution environment for the multi-threaded state store committer.
+ *
+ * @param value the value
+ * @return true if it is valid
+ */
+ public static boolean isValid(String value) {
+ return EnumUtils.isValidEnumIgnoreCase(StateStoreCommitterPlatform.class, value);
+ }
+}
diff --git a/java/core/src/main/java/sleeper/core/statestore/StateStoreProvider.java b/java/core/src/main/java/sleeper/core/statestore/StateStoreProvider.java
index 6dc9de5f829..9fe5cfa3494 100644
--- a/java/core/src/main/java/sleeper/core/statestore/StateStoreProvider.java
+++ b/java/core/src/main/java/sleeper/core/statestore/StateStoreProvider.java
@@ -65,6 +65,28 @@ public StateStore getStateStore(TableProperties tableProperties) {
return tableIdToStateStoreCache.get(tableId);
}
+ /**
+ * Remove a specific table's state store from the cache.
+ *
+ * @param tableProperties the Sleeper table properties
+ * @return true if the state store for the requested Sleeper table was in the cache and has been removed
+ */
+ public boolean removeStateStoreFromCache(TableProperties tableProperties) {
+ String tableId = tableProperties.get(TABLE_ID);
+ return this.removeStateStoreFromCache(tableId);
+ }
+
+ /**
+ * Remove a specific table's state store from the cache.
+ *
+ * @param tableId Sleeper table ID
+ * @return true if the state store for the requested Sleeper table was in the cache and has been removed
+ */
+ public boolean removeStateStoreFromCache(String tableId) {
+ return this.tableIdToStateStoreCache.remove(tableId) != null &&
+ this.tableIds.remove(tableId);
+ }
+
/**
* Creates an instance of the state store client for a Sleeper table. Implemented by {@link StateStoreFactory}.
*/
diff --git a/java/distribution/src/main/assembly/zip.xml b/java/distribution/src/main/assembly/zip.xml
index 97a66bfbe48..a6e5475d41e 100644
--- a/java/distribution/src/main/assembly/zip.xml
+++ b/java/distribution/src/main/assembly/zip.xml
@@ -86,6 +86,13 @@
*.jar
+
+ ${project.basedir}/../statestore-committer/docker
+ scripts/docker/statestore-committer
+
+ *.jar
+
+
@@ -124,6 +131,13 @@
scripts/docker/bulk-export-task-execution
bulk-export-task-execution.jar
+
+
+ ${project.basedir}/../statestore-committer/target/statestore-committer-${project.version}-utility.jar
+
+ scripts/docker/statestore-committer
+ statestore-committer.jar
+
${project.basedir}/../compaction/compaction-job-creation-lambda/target/compaction-job-creation-lambda-${project.version}-utility.jar
diff --git a/java/pom.xml b/java/pom.xml
index de5eef9547d..c3e65c97ff3 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -31,6 +31,7 @@
clients
statestore
statestore-committer-core
+ statestore-committer
statestore-lambda
configuration
foreign-bridge
diff --git a/java/statestore-committer/docker/Dockerfile b/java/statestore-committer/docker/Dockerfile
new file mode 100644
index 00000000000..1ba4a2e6bbb
--- /dev/null
+++ b/java/statestore-committer/docker/Dockerfile
@@ -0,0 +1,20 @@
+# Copyright 2022-2025 Crown Copyright
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+FROM amazoncorretto:17-al2023-headless
+
+COPY statestore-committer.jar /statestore-committer.jar
+COPY run.sh /run.sh
+RUN ["chmod", "+x", "/run.sh"]
+
+ENTRYPOINT ["/run.sh"]
diff --git a/java/statestore-committer/docker/run.sh b/java/statestore-committer/docker/run.sh
new file mode 100644
index 00000000000..19aee2da9b2
--- /dev/null
+++ b/java/statestore-committer/docker/run.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+# Copyright 2022-2025 Crown Copyright
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java \
+ -Xlog:gc::time \
+ -XX:InitialRAMPercentage=90.0 \
+ -XX:MaxRAMPercentage=90.0 \
+ -cp /statestore-committer.jar \
+ sleeper.statestore.committer.MultiThreadedStateStoreCommitter \
+ $*
diff --git a/java/statestore-committer/pom.xml b/java/statestore-committer/pom.xml
new file mode 100644
index 00000000000..fd9fff21734
--- /dev/null
+++ b/java/statestore-committer/pom.xml
@@ -0,0 +1,64 @@
+
+
+
+
+ aws
+ sleeper
+ 0.35.0-SNAPSHOT
+
+ 4.0.0
+
+ statestore-committer
+
+
+
+ commons-io
+ commons-io
+ 2.21.0
+
+
+ software.amazon.awssdk
+ sqs
+
+
+
+ sleeper
+ configuration
+ ${project.parent.version}
+
+
+ sleeper
+ statestore
+ ${project.parent.version}
+
+
+ sleeper
+ statestore-committer-core
+ ${project.parent.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
diff --git a/java/statestore-committer/src/main/java/sleeper/statestore/committer/MultiThreadedStateStoreCommitter.java b/java/statestore-committer/src/main/java/sleeper/statestore/committer/MultiThreadedStateStoreCommitter.java
new file mode 100644
index 00000000000..dcb9edd7421
--- /dev/null
+++ b/java/statestore-committer/src/main/java/sleeper/statestore/committer/MultiThreadedStateStoreCommitter.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.statestore.committer;
+
+import com.google.common.collect.Streams;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
+
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.table.TablePropertiesProvider;
+import sleeper.core.statestore.StateStoreProvider;
+import sleeper.core.statestore.commit.StateStoreCommitRequest;
+import sleeper.core.statestore.commit.StateStoreCommitRequestSerDe;
+import sleeper.core.statestore.transactionlog.transaction.TransactionSerDeProvider;
+import sleeper.core.util.LoggedDuration;
+import sleeper.core.util.PollWithRetries;
+import sleeper.dynamodb.tools.DynamoDBUtils;
+import sleeper.statestore.StateStoreFactory;
+import sleeper.statestore.committer.StateStoreCommitter.RequestHandle;
+import sleeper.statestore.transactionlog.S3TransactionBodyStore;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static sleeper.configuration.utils.AwsV2ClientHelper.buildAwsV2Client;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
+import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_AMOUNT;
+import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_PERCENTAGE;
+import static sleeper.core.properties.instance.TableStateProperty.STATESTORE_PROVIDER_CACHE_SIZE;
+
+/**
+ * Applies asynchronous commits to state stores in a manner optimised for multi core execution environments.
+ */
+public class MultiThreadedStateStoreCommitter {
+ public static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadedStateStoreCommitter.class);
+
+ private final S3Client s3Client;
+ private final DynamoDbClient dynamoClient;
+ private final SqsClient sqsClient;
+ private final String configBucketName;
+ private String qUrl;
+ private StateStoreCommitRequestSerDe serDe;
+ private S3TransactionBodyStore transactionBodyStore;
+ private StateStoreProvider stateStoreProvider;
+ private StateStoreCommitter committer;
+ private PollWithRetries throttlingRetriesConfig;
+ private long heapSpaceToKeepFree;
+ private LinkedList processedTableOrder = new LinkedList<>();
+ private Map> tableFutures = new HashMap<>();
+
+ public MultiThreadedStateStoreCommitter(S3Client s3Client, DynamoDbClient dynamoClient, SqsClient sqsClient, String configBucketName) {
+ this(s3Client, dynamoClient, sqsClient, configBucketName, null);
+ this.init();
+ }
+
+ public MultiThreadedStateStoreCommitter(S3Client s3Client, DynamoDbClient dynamoClient, SqsClient sqsClient, String configBucketName, String qUrl) {
+ this.s3Client = s3Client;
+ this.dynamoClient = dynamoClient;
+ this.sqsClient = sqsClient;
+ this.configBucketName = configBucketName;
+ this.qUrl = qUrl;
+ }
+
+ private void init() {
+ InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(this.s3Client, this.configBucketName);
+ // Disable fixed size state store caching with FIFO expiry by setting the cache size to a large number!
+ // TODO: Should this property be updated to allow the cache to be disabled by setting to -1?
+ instanceProperties.set(STATESTORE_PROVIDER_CACHE_SIZE, "1000000");
+
+ // If the URL to the SQS queue hasn't already been provided, retrieve it from instance properties
+ if (this.qUrl == null || this.qUrl.isEmpty()) {
+ this.qUrl = instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL);
+ }
+
+ long heapSpaceAmountToKeepFree = instanceProperties.getBytes(STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_AMOUNT);
+ long heapSpacePercToKeepFree = (Runtime.getRuntime().maxMemory() / 100) * instanceProperties.getLong(STATESTORE_COMMITTER_EC2_MIN_FREE_HEAP_TARGET_PERCENTAGE);
+ this.heapSpaceToKeepFree = Math.max(heapSpaceAmountToKeepFree, heapSpacePercToKeepFree);
+
+ if (this.heapSpaceToKeepFree > Runtime.getRuntime().maxMemory()) {
+ throw new IllegalArgumentException("This state store committer has been configured to keep at least " +
+ FileUtils.byteCountToDisplaySize(this.heapSpaceToKeepFree) +
+ " of heap available, but the maximum allowed heap size is only " +
+ FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory()) +
+ "!"
+ );
+ }
+ LOGGER.info("Will aim to keep {} of heap space available for use",
+ FileUtils.byteCountToDisplaySize(this.heapSpaceToKeepFree)
+ );
+
+ TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, this.s3Client, this.dynamoClient);
+ this.serDe = new StateStoreCommitRequestSerDe(tablePropertiesProvider);
+
+ StateStoreFactory stateStoreFactory = StateStoreFactory.forCommitterProcess(instanceProperties, this.s3Client, this.dynamoClient);
+ this.stateStoreProvider = new StateStoreProvider(instanceProperties, stateStoreFactory);
+ this.transactionBodyStore = new S3TransactionBodyStore(instanceProperties, this.s3Client, TransactionSerDeProvider.from(tablePropertiesProvider));
+ this.committer = new StateStoreCommitter(
+ tablePropertiesProvider,
+ this.stateStoreProvider,
+ this.transactionBodyStore
+ );
+ this.throttlingRetriesConfig = PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(5), Duration.ofMinutes(10));
+ }
+
+ /**
+ * Apply asynchronous commits from the committer SQS queue.
+ *
+ * @throws Exception Error
+ */
+ public void run() throws Exception {
+ Exception err = null;
+
+ Instant startedAt = Instant.now();
+ Instant lastReceivedCommitsAt = Instant.now();
+
+ if (this.qUrl == null || this.qUrl.isEmpty()) {
+ throw new IllegalArgumentException("Missing URL to State Store Committer Queue!");
+ }
+
+ try {
+ while (true) {
+ ReceiveMessageResponse response = this.sqsClient.receiveMessage(ReceiveMessageRequest.builder()
+ .queueUrl(this.qUrl)
+ .maxNumberOfMessages(10)
+ .waitTimeSeconds(20)
+ // TODO: Need to deal with commits potentially taking longer than this visibility threshold
+ .visibilityTimeout(15 * 60)
+ .build());
+
+ if (response.hasMessages()) {
+ lastReceivedCommitsAt = Instant.now();
+ if (this.committer == null) {
+ this.init();
+ }
+ }
+
+ LOGGER.info("Received {} messages from queue, have been running for {}, last received commits {} ago",
+ response.messages().size(),
+ LoggedDuration.withShortOutput(startedAt, Instant.now()),
+ LoggedDuration.withShortOutput(lastReceivedCommitsAt, Instant.now())
+ );
+
+ Map> messagesByTableId = response.messages().stream()
+ .map(message -> {
+ LOGGER.trace("Received message: {}", message);
+ StateStoreCommitRequest request = this.serDe.fromJson(message.body());
+ LOGGER.trace("Received request: {}", request);
+ return new StateStoreCommitRequestWithSqsReceipt(request, message.receiptHandle());
+ })
+ .collect(Collectors.groupingBy(request -> request.getCommitRequest().getTableId()));
+
+ // Try to make sure there is going to be enough heap space available to process these commits
+ this.ensureEnoughHeapSpaceAvailable(messagesByTableId.keySet());
+
+ messagesByTableId.entrySet().forEach(tableMessages -> {
+ String tableId = tableMessages.getKey();
+ List requestsWithHandle = tableMessages.getValue();
+ LOGGER.info("Received {} requests for table: {}", requestsWithHandle.size(), tableId);
+
+ // Wait until processing of previous commits for this table has finished
+ if (this.tableFutures.containsKey(tableId)) {
+ try {
+ this.tableFutures.get(tableId).get();
+ } catch (Exception e) {
+ throw new IllegalStateException("Exception was thrown during the processing of the previous batch of commits for table " + tableId + ":", e);
+ }
+ }
+
+ // Apply the commits for each table on a separate thread
+ CompletableFuture task = CompletableFuture.supplyAsync(() -> {
+ this.processMessagesForTable(tableId, requestsWithHandle);
+ return Instant.now();
+ });
+
+ this.tableFutures.put(tableId, task);
+ this.processedTableOrder.remove(tableId);
+ this.processedTableOrder.add(tableId);
+ });
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception, starting graceful shutdown:", e);
+ err = e;
+ } finally {
+ long pendingTaskCount = tableFutures.values().stream().filter(task -> !task.isDone()).count();
+ if (pendingTaskCount > 0) {
+ LOGGER.info("Requests for {} tables are still being processed, waiting for them to finish...", pendingTaskCount);
+ }
+ tableFutures.values().stream().map(task -> task.join()).collect(Collectors.toList());
+ LOGGER.info("All pending requests have been actioned");
+ if (err != null) {
+ throw err;
+ }
+ }
+ }
+
+ private void ensureEnoughHeapSpaceAvailable(Set requiredTableIds) {
+ LOGGER.debug("UsedMem: {} FreeMem: {} HeapSize: {} MaxMem: {}",
+ FileUtils.byteCountToDisplaySize(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()),
+ FileUtils.byteCountToDisplaySize(Runtime.getRuntime().freeMemory()),
+ FileUtils.byteCountToDisplaySize(Runtime.getRuntime().totalMemory()),
+ FileUtils.byteCountToDisplaySize(Runtime.getRuntime().maxMemory())
+ );
+
+ long availableMemory =
+ // Amount of heap space not currently in use
+ Runtime.getRuntime().freeMemory() +
+ // How much extra space the heap is allowed to grow to use
+ Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory();
+
+ while (availableMemory < this.heapSpaceToKeepFree) {
+ LOGGER.info("Removing old state stores from cache as limited memory available: {}", FileUtils.byteCountToDisplaySize(availableMemory));
+
+ // Find a state store that we can remove from the in-mem cache that we haven't used for a while
+ Optional tableIdToUncache = this.processedTableOrder.stream().filter(tableId -> {
+ return
+ // Don't remove a state store from the cache that we are about to use!
+ !requiredTableIds.contains(tableId) &&
+ // Only remove a state store that we have finished using
+ this.tableFutures.get(tableId).isDone();
+ }).findFirst();
+
+ if (tableIdToUncache.isPresent()) {
+ LOGGER.info("Removing state store for table {} from cache", tableIdToUncache.get());
+ this.stateStoreProvider.removeStateStoreFromCache(tableIdToUncache.get());
+ this.processedTableOrder.remove(tableIdToUncache.get());
+ } else {
+ LOGGER.error("Couldn't find any candidate state stores to remove from memory. All must currently be in use, will wait and try again...");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ System.gc();
+
+ availableMemory = Runtime.getRuntime().freeMemory() + Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory();
+ LOGGER.info("Memory now available: {}", FileUtils.byteCountToDisplaySize(availableMemory));
+ }
+ }
+
+ private boolean processMessagesForTable(String tableId, List requestsWithHandle) {
+ Instant startedAt = Instant.now();
+ LOGGER.info("Processing {} requests for table: {} ...", requestsWithHandle.size(), tableId);
+
+ this.committer.applyBatch(
+ operation -> DynamoDBUtils.retryOnThrottlingException(this.throttlingRetriesConfig, operation),
+ requestsWithHandle.stream().map(StateStoreCommitRequestWithSqsReceipt::getHandle).collect(Collectors.toList())
+ );
+
+ Map> requestResults = requestsWithHandle.stream().collect(Collectors.partitioningBy(StateStoreCommitRequestWithSqsReceipt::failed));
+ List failedRequests = requestResults.get(true);
+ List successfulRequests = requestResults.get(false);
+
+ if (successfulRequests.size() > 0) {
+ LOGGER.debug("Deleting {} requests for table {} as they have been successfully applied", successfulRequests.size(), tableId);
+ DeleteMessageBatchResponse deleteResponse = this.sqsClient.deleteMessageBatch(DeleteMessageBatchRequest.builder()
+ .queueUrl(this.qUrl)
+ .entries(
+ Streams.mapWithIndex(successfulRequests.stream(), (request, index) ->
+ DeleteMessageBatchRequestEntry.builder()
+ .id(request.getCommitRequest().getTableId() + "-" + index)
+ .receiptHandle(request.getSqsReceipt())
+ .build()
+ )
+ .collect(Collectors.toList())
+ )
+ .build()
+ );
+
+ if (!deleteResponse.failed().isEmpty()) {
+ LOGGER.error("Failed to delete {} requests for table {} from SQS queue! Successfully deleted {} requests: {}",
+ deleteResponse.failed().size(),
+ tableId,
+ deleteResponse.successful().size(),
+ deleteResponse.failed()
+ );
+ } else {
+ LOGGER.debug("Successfully deleted {} requests for table {} from SQS queue", deleteResponse.successful().size(), tableId);
+ }
+ }
+
+ if (failedRequests.size() > 0) {
+ LOGGER.debug("Resetting the message visibility of {} requests for table {} so they are immediately available for reprocessing", failedRequests.size(), tableId);
+
+ // TODO: Better retry handling
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ }
+
+ ChangeMessageVisibilityBatchResponse changeVisibilityResponse = this.sqsClient.changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest.builder()
+ .queueUrl(this.qUrl)
+ .entries(
+ Streams.mapWithIndex(failedRequests.stream(), (request, index) ->
+ ChangeMessageVisibilityBatchRequestEntry.builder()
+ .id(request.getCommitRequest().getTableId() + "-" + index)
+ .receiptHandle(request.getSqsReceipt())
+ .visibilityTimeout(0)
+ .build()
+ )
+ .collect(Collectors.toList())
+ ).build()
+ );
+
+ if (!changeVisibilityResponse.failed().isEmpty()) {
+ LOGGER.error("Failed to change visibility of {} requests for table {} in SQS queue! (successfully change visibility of {} requests):\n{}",
+ changeVisibilityResponse.failed().size(),
+ tableId,
+ changeVisibilityResponse.successful().size(),
+ String.join("\n", changeVisibilityResponse.failed().stream()
+ .map(failedRequest -> failedRequest.toString())
+ .collect(Collectors.toList())
+ )
+ );
+ } else {
+ LOGGER.debug("Successfully changed visibility of {} requests for table {} in SQS queue", changeVisibilityResponse.successful().size(), tableId);
+ }
+ }
+
+ LOGGER.info("Finished processing {} messages for table {} in {} ...",
+ requestsWithHandle.size(),
+ tableId,
+ LoggedDuration.withShortOutput(startedAt, Instant.now())
+ );
+ return true;
+ }
+
+ /**
+ * TODO: Explain why this is required.
+ */
+ private static class StateStoreCommitRequestWithSqsReceipt {
+
+ private StateStoreCommitRequest commitRequest;
+ private String sqsReceipt;
+ private boolean failed = false;
+
+ private StateStoreCommitRequestWithSqsReceipt(StateStoreCommitRequest request, String sqsReceipt) {
+ this.commitRequest = request;
+ this.sqsReceipt = sqsReceipt;
+ }
+
+ private StateStoreCommitRequest getCommitRequest() {
+ return this.commitRequest;
+ }
+
+ private RequestHandle getHandle() {
+ return RequestHandle.withCallbackOnFail(this.commitRequest, e -> {
+ LOGGER.error("Error whilst processing state store commit request for table: {}", this.commitRequest.getTableId(), e);
+ this.failed = true;
+ });
+ }
+
+ private String getSqsReceipt() {
+ return this.sqsReceipt;
+ }
+
+ private boolean failed() {
+ return this.failed;
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1 || args.length > 2) {
+ throw new IllegalArgumentException("Syntax: " + MultiThreadedStateStoreCommitter.class.getSimpleName() + " []");
+ }
+
+ String instanceId = args[0];
+ String qUrl = args.length > 1 ? args[1] : null;
+
+ try (
+ S3Client s3Client = buildAwsV2Client(S3Client.builder());
+ DynamoDbClient dynamoClient = buildAwsV2Client(DynamoDbClient.builder());
+ SqsClient sqsClient = buildAwsV2Client(SqsClient.builder());
+ ) {
+ String configBucketName = InstanceProperties.getConfigBucketFromInstanceId(instanceId);
+ MultiThreadedStateStoreCommitter committer = new MultiThreadedStateStoreCommitter(s3Client, dynamoClient, sqsClient, configBucketName, qUrl);
+ committer.run();
+ }
+ }
+
+}
diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForAllOnlineTables.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForAllOnlineTables.java
new file mode 100644
index 00000000000..c58ad00f272
--- /dev/null
+++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForAllOnlineTables.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.systemtest.drivers.ingest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.ecs.EcsClient;
+import software.amazon.awssdk.services.ecs.model.RunTaskResponse;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.core.properties.table.TablePropertiesProvider;
+import sleeper.core.properties.table.TableProperty;
+import sleeper.systemtest.configuration.SystemTestDataGenerationJob;
+import sleeper.systemtest.configuration.SystemTestProperties;
+import sleeper.systemtest.drivers.ingest.json.TasksJson;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class RunWriteRandomDataTaskOnECSForAllOnlineTables {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RunWriteRandomDataTaskOnECSForAllOnlineTables.class);
+
+ private S3Client s3Client;
+ private DynamoDbClient dynamoClient;
+ private EcsClient ecsClient;
+
+ public RunWriteRandomDataTaskOnECSForAllOnlineTables(S3Client s3Client, DynamoDbClient dynamoClient, EcsClient ecsClient) {
+ this.s3Client = s3Client;
+ this.dynamoClient = dynamoClient;
+ this.ecsClient = ecsClient;
+ }
+
+ public List run(String instanceId, int numWritersPerTable, int numIngestsPerWriter, long recordsPerIngest) {
+ SystemTestProperties systemTestProperties = SystemTestProperties.loadFromS3GivenInstanceId(this.s3Client, instanceId);
+ TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(systemTestProperties, this.s3Client, this.dynamoClient);
+ RunWriteRandomDataTaskOnECS runner = new RunWriteRandomDataTaskOnECS(systemTestProperties, this.ecsClient, this.s3Client);
+
+ List responses = tablePropertiesProvider.streamOnlineTables()
+ .flatMap(tableProperties -> {
+ String tableName = tableProperties.get(TableProperty.TABLE_NAME);
+ LOGGER.info("Submitting an ingest job with {} writers for table {}:{}", numWritersPerTable, instanceId, tableName);
+ SystemTestDataGenerationJob job = SystemTestDataGenerationJob.builder()
+ .instanceProperties(systemTestProperties)
+ .tableName(tableName)
+ .numberOfIngests(numIngestsPerWriter)
+ .rowsPerIngest(recordsPerIngest)
+ .build();
+ List response = runner.runTasks(numWritersPerTable, job);
+ return response.stream();
+ })
+ .toList();
+
+ return responses;
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length < 1 || args.length > 5) {
+ System.out.println("Usage: ");
+ return;
+ }
+
+ String instanceId = args[0];
+ int numWritersPerTable = args.length > 1 ? Integer.parseInt(args[1]) : 1;
+ int numIngestsPerWriter = args.length > 2 ? Integer.parseInt(args[2]) : 1;
+ long recordsPerIngest = args.length > 3 ? Long.parseLong(args[3]) : 10_000_000;
+
+ try (
+ S3Client s3Client = S3Client.create();
+ DynamoDbClient dynamoClient = DynamoDbClient.create();
+ EcsClient ecsClient = EcsClient.create();
+ ) {
+ RunWriteRandomDataTaskOnECSForAllOnlineTables runner = new RunWriteRandomDataTaskOnECSForAllOnlineTables(s3Client, dynamoClient, ecsClient);
+ List responses = runner.run(instanceId, numWritersPerTable, numIngestsPerWriter, recordsPerIngest);
+ if (args.length > 4) {
+ TasksJson.writeToFile(responses, Paths.get(args[4]));
+ }
+ }
+ }
+}
diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForMultipleNewTables.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForMultipleNewTables.java
new file mode 100644
index 00000000000..bbf335c76dc
--- /dev/null
+++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/RunWriteRandomDataTaskOnECSForMultipleNewTables.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sleeper.systemtest.drivers.ingest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.ecs.EcsClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import sleeper.clients.table.AddTable;
+import sleeper.clients.table.TakeAllTablesOffline;
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.core.properties.PropertiesUtils;
+import sleeper.core.properties.instance.CommonProperty;
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.table.TableProperties;
+import sleeper.core.properties.table.TablePropertiesStore;
+import sleeper.core.properties.table.TableProperty;
+import sleeper.core.schema.Schema;
+import sleeper.core.statestore.StateStoreProvider;
+import sleeper.statestore.StateStoreFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static sleeper.configuration.utils.AwsV2ClientHelper.buildAwsV2Client;
+
+public class RunWriteRandomDataTaskOnECSForMultipleNewTables {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RunWriteRandomDataTaskOnECSForMultipleNewTables.class);
+
+ private S3Client s3Client;
+ private DynamoDbClient dynamoClient;
+ private EcsClient ecsClient;
+
+ public RunWriteRandomDataTaskOnECSForMultipleNewTables(S3Client s3Client, DynamoDbClient dynamoClient, EcsClient ecsClient) {
+ this.s3Client = s3Client;
+ this.dynamoClient = dynamoClient;
+ this.ecsClient = ecsClient;
+ }
+
+ public void takeAllTablesOffline(String instanceId) {
+ TakeAllTablesOffline offliner = new TakeAllTablesOffline(this.s3Client, this.dynamoClient);
+ offliner.takeAllOffline(instanceId);
+ }
+
+ public List createTables(String instanceId, int tableCount, Path tablePropertiesFile, Path schemaFile, String splitPointsFile) throws IOException {
+ InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(this.s3Client, instanceId);
+ String tablePrefix = "table-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyMMdd-HHmm")) + '-';
+
+ List tableNames = IntStream.rangeClosed(1, tableCount).mapToObj(i -> {
+ String tableName = tablePrefix + i;
+
+ try {
+ TableProperties tableProperties = new TableProperties(instanceProperties, PropertiesUtils.loadProperties(tablePropertiesFile));
+ tableProperties.set(TableProperty.TABLE_NAME, tableName);
+ tableProperties.setSchema(Schema.load(schemaFile));
+ tableProperties.set(TableProperty.SPLIT_POINTS_FILE, splitPointsFile);
+
+ TablePropertiesStore tablePropertiesStore = S3TableProperties.createStore(instanceProperties, this.s3Client, this.dynamoClient);
+ StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, this.s3Client, this.dynamoClient);
+ new AddTable(instanceProperties, tableProperties, tablePropertiesStore, stateStoreProvider).run();
+ LOGGER.info("Added table " + instanceProperties.get(CommonProperty.ID) + ":" + tableName);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to create table " + instanceProperties.get(CommonProperty.ID) + ":" + tableName, e);
+ }
+
+ return tableName;
+ }).collect(Collectors.toList());
+
+ return tableNames;
+ }
+
+ public void run(
+ String instanceId, int tableCount,
+ Path tablePropertiesFile, Path schemaFile, String splitPointsFile,
+ int numWritersPerTable, int numIngestsPerWriter, long recordsPerIngest
+ ) throws IOException {
+
+ LOGGER.info("Taking tables offline");
+ this.takeAllTablesOffline(instanceId);
+
+ LOGGER.info("Creating " + tableCount + " tables");
+ this.createTables(instanceId, tableCount, tablePropertiesFile, schemaFile, splitPointsFile);
+
+ LOGGER.info("Submitting ingest tasks");
+ new RunWriteRandomDataTaskOnECSForAllOnlineTables(this.s3Client, this.dynamoClient, this.ecsClient)
+ .run(instanceId, numWritersPerTable, numIngestsPerWriter, recordsPerIngest);
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length < 5) {
+ System.out.println("Usage: [] [] []");
+ System.exit(1);
+ }
+
+ String instanceId = args[0];
+ int tableCount = Integer.parseInt(args[1]);
+ Path tablePropertiesFile = Path.of(args[2]);
+ Path schemaFile = Path.of(args[3]);
+ String splitPointsFile = args[4];
+
+ int numWritersPerTable = args.length > 5 ? Integer.parseInt(args[5]) : 1;
+ int numIngestsPerWriter = args.length > 6 ? Integer.parseInt(args[6]) : 1;
+ long recordsPerIngest = args.length > 7 ? Long.parseLong(args[7]) : 10_000_000;
+
+ try (
+ S3Client s3Client = buildAwsV2Client(S3Client.builder());
+ DynamoDbClient dynamoClient = buildAwsV2Client(DynamoDbClient.builder());
+ EcsClient ecsClient = buildAwsV2Client(EcsClient.builder());
+ ) {
+ new RunWriteRandomDataTaskOnECSForMultipleNewTables(s3Client, dynamoClient, ecsClient)
+ .run(
+ instanceId, tableCount,
+ tablePropertiesFile, schemaFile, splitPointsFile,
+ numWritersPerTable, numIngestsPerWriter, recordsPerIngest
+ );
+ }
+ }
+
+}
diff --git a/scripts/templates/instanceproperties.template b/scripts/templates/instanceproperties.template
index 81088d90713..4f22da7c82e 100644
--- a/scripts/templates/instanceproperties.template
+++ b/scripts/templates/instanceproperties.template
@@ -271,6 +271,10 @@ sleeper.tables.index.dynamo.pointintimerecovery=false
# consistent.
sleeper.tables.index.dynamo.consistent.reads=true
+# The platform that the State Store Committer will be deployed to for execution.
+# Valid values are: [lambda, ec2]
+sleeper.statestore.committer.platform=LAMBDA
+
# The amount of memory in MB for the lambda that commits state store updates.
sleeper.statestore.committer.lambda.memory.mb=4096
@@ -294,6 +298,17 @@ sleeper.statestore.committer.concurrency.reserved=10
# https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/
sleeper.statestore.committer.concurrency.max=10
+# The EC2 instance type that the multi-threaded state store committer should be deployed onto.
+sleeper.statestore.committer.ec2.type=m8g.xlarge
+
+# The minimum amount of heap space that the committer will try to keep available. This affects how
+# many state stores can be cached in memory.
+sleeper.statestore.committer.ec2.min.heap.target.amount=100M
+
+# The percentage of the total heap space that the committer should try to keep available, as a
+# minimum. This affects how many state stores can be cached in memory.
+sleeper.statestore.committer.ec2.min.heap.target.percentage=1
+
## The following instance properties relate to standard ingest.