diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml index 60f2f8adac..9ea959f78a 100644 --- a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml +++ b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml @@ -21,7 +21,7 @@ event_collect_timeout: "120s" maximum_size: "2mb" aggregate_threshold: - maximum_size: "256kb" + maximum_size: "128mb" flush_capacity_ratio: 0 object_key: path_prefix: "${getMetadata(\"s3_partition_key\")}" @@ -63,7 +63,11 @@ sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" acknowledgments: true delete_s3_objects_on_read: true + disable_s3_metadata_in_event: true scan: + folder_partitions: + depth: [ "<>.source.documentdb.s3_prefix>>" ] + max_objects_per_ownership: 50 buckets: - bucket: name: "<<$.<>.source.documentdb.s3_bucket>>" diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java index 3ce75f2d14..71bf92efe1 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java @@ -67,7 +67,7 @@ void setUp() { .build(); bucket = System.getProperty("tests.s3source.bucket"); s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket); - eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY); + eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false); buffer = mock(Buffer.class); recordsReceived = 0; diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java index bbff028167..cebd91daea 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java @@ -131,7 +131,7 @@ void setUp() { .build(); bucket = System.getProperty("tests.s3source.bucket"); s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket); - eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY); + eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, s3SourceConfig.isDeleteS3MetadataInEvent()); buffer = mock(Buffer.class); recordsReceived = 0; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifier.java index 6bd37ba436..8e384a72c8 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifier.java @@ -14,15 +14,19 @@ class EventMetadataModifier implements BiConsumer { private static final String BUCKET_FIELD_NAME = "bucket"; private static final String KEY_FIELD_NAME = "key"; private final String baseKey; + private final boolean deleteS3MetadataInEvent; - EventMetadataModifier(final String metadataRootKey) { + EventMetadataModifier(final String metadataRootKey, boolean deleteS3MetadataInEvent) { baseKey = generateBaseKey(metadataRootKey); + this.deleteS3MetadataInEvent = deleteS3MetadataInEvent; } @Override public void accept(final Event event, final S3ObjectReference s3ObjectReference) { - event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName()); - event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey()); + if(!deleteS3MetadataInEvent) { + event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName()); + event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey()); + } } private static String generateBaseKey(String metadataRootKey) { diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java index c0b844123e..1d272c36f6 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3Source.java @@ -86,7 +86,7 @@ public void start(Buffer> buffer) { final S3ObjectRequest.Builder s3ObjectRequestBuilder = new S3ObjectRequest.Builder(buffer, s3SourceConfig.getNumberOfRecordsToAccumulate(), s3SourceConfig.getBufferTimeout(), s3ObjectPluginMetrics); final BiConsumer eventMetadataModifier = new EventMetadataModifier( - s3SourceConfig.getMetadataRootKey()); + s3SourceConfig.getMetadataRootKey(), s3SourceConfig.isDeleteS3MetadataInEvent()); final S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3ClientBuilderFactory.getS3Client(), pluginMetrics); if (s3SelectOptional.isPresent()) { diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java index 1bb62865b7..0e8d7108b3 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java @@ -97,6 +97,9 @@ public class S3SourceConfig { @JsonProperty("delete_s3_objects_on_read") private boolean deleteS3ObjectsOnRead = false; + @JsonProperty("disable_s3_metadata_in_event") + private boolean deleteS3MetadataInEvent = false; + @AssertTrue(message = "A codec is required for reading objects.") boolean isCodecProvidedWhenNeeded() { if(s3SelectOptions == null) @@ -189,4 +192,8 @@ public Map getBucketOwners() { public String getDefaultBucketOwner() { return defaultBucketOwner; } + + public boolean isDeleteS3MetadataInEvent() { + return deleteS3MetadataInEvent; + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifierTest.java index 4ca46bb923..1eab96160f 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/EventMetadataModifierTest.java @@ -13,7 +13,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.mockito.ArgumentMatchers.anyString; import org.mockito.Mock; +import static org.mockito.Mockito.never; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; @@ -41,13 +43,13 @@ void setUp() { key = UUID.randomUUID().toString(); } - private EventMetadataModifier createObjectUnderTest(final String metadataRootKey) { - return new EventMetadataModifier(metadataRootKey); + private EventMetadataModifier createObjectUnderTest(final String metadataRootKey, final Boolean isDeleteS3MetadataInEvent) { + return new EventMetadataModifier(metadataRootKey, isDeleteS3MetadataInEvent); } @Test void constructor_throws_if_metadataRootKey_is_null() { - assertThrows(NullPointerException.class, () -> createObjectUnderTest(null)); + assertThrows(NullPointerException.class, () -> createObjectUnderTest(null, null)); } @ParameterizedTest @@ -56,12 +58,21 @@ void accept_sets_correct_S3_bucket_and_key(final String metadataKey, final Strin when(s3ObjectReference.getBucketName()).thenReturn(bucketName); when(s3ObjectReference.getKey()).thenReturn(key); - createObjectUnderTest(metadataKey).accept(event, s3ObjectReference); + createObjectUnderTest(metadataKey, false).accept(event, s3ObjectReference); verify(event).put(expectedRootKey + "bucket", bucketName); verify(event).put(expectedRootKey + "key", key); } + @ParameterizedTest + @ArgumentsSource(KeysArgumentsProvider.class) + void accept_does_not_set_correct_S3_bucket_and_key(final String metadataKey, final String expectedRootKey) { + createObjectUnderTest(metadataKey, true).accept(event, s3ObjectReference); + + verify(event, never()).put(anyString(), anyString()); + verify(event, never()).put(anyString(), anyString()); + } + static class KeysArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) {