Skip to content

Commit

Permalink
Add new S3 source option and modify docdb template (#4492)
Browse files Browse the repository at this point in the history
Signed-off-by: srigovs <srigovs@amazon.com>
  • Loading branch information
srikanthjg authored May 6, 2024
1 parent 44d689f commit f6a06a0
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
event_collect_timeout: "120s"
maximum_size: "2mb"
aggregate_threshold:
maximum_size: "256kb"
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
Expand Down Expand Up @@ -63,7 +63,11 @@
sts_header_overrides: "<<$.<<pipeline-name>>.source.documentdb.aws.sts_header_overrides>>"
acknowledgments: true
delete_s3_objects_on_read: true
disable_s3_metadata_in_event: true
scan:
folder_partitions:
depth: [ "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.documentdb.s3_prefix>>" ]
max_objects_per_ownership: 50
buckets:
- bucket:
name: "<<$.<<pipeline-name>>.source.documentdb.s3_bucket>>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false);

buffer = mock(Buffer.class);
recordsReceived = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, s3SourceConfig.isDeleteS3MetadataInEvent());

buffer = mock(Buffer.class);
recordsReceived = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ class EventMetadataModifier implements BiConsumer<Event, S3ObjectReference> {
private static final String BUCKET_FIELD_NAME = "bucket";
private static final String KEY_FIELD_NAME = "key";
private final String baseKey;
private final boolean deleteS3MetadataInEvent;

EventMetadataModifier(final String metadataRootKey) {
EventMetadataModifier(final String metadataRootKey, boolean deleteS3MetadataInEvent) {
baseKey = generateBaseKey(metadataRootKey);
this.deleteS3MetadataInEvent = deleteS3MetadataInEvent;
}

@Override
public void accept(final Event event, final S3ObjectReference s3ObjectReference) {
event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
if(!deleteS3MetadataInEvent) {
event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
}
}

private static String generateBaseKey(String metadataRootKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void start(Buffer<Record<Event>> buffer) {
final S3ObjectRequest.Builder s3ObjectRequestBuilder = new S3ObjectRequest.Builder(buffer, s3SourceConfig.getNumberOfRecordsToAccumulate(),
s3SourceConfig.getBufferTimeout(), s3ObjectPluginMetrics);
final BiConsumer<Event, S3ObjectReference> eventMetadataModifier = new EventMetadataModifier(
s3SourceConfig.getMetadataRootKey());
s3SourceConfig.getMetadataRootKey(), s3SourceConfig.isDeleteS3MetadataInEvent());
final S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3ClientBuilderFactory.getS3Client(), pluginMetrics);

if (s3SelectOptional.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public class S3SourceConfig {
@JsonProperty("delete_s3_objects_on_read")
private boolean deleteS3ObjectsOnRead = false;

@JsonProperty("disable_s3_metadata_in_event")
private boolean deleteS3MetadataInEvent = false;

@AssertTrue(message = "A codec is required for reading objects.")
boolean isCodecProvidedWhenNeeded() {
if(s3SelectOptions == null)
Expand Down Expand Up @@ -189,4 +192,8 @@ public Map<String, String> getBucketOwners() {
public String getDefaultBucketOwner() {
return defaultBucketOwner;
}

public boolean isDeleteS3MetadataInEvent() {
return deleteS3MetadataInEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.mockito.ArgumentMatchers.anyString;
import org.mockito.Mock;
import static org.mockito.Mockito.never;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

Expand Down Expand Up @@ -41,13 +43,13 @@ void setUp() {
key = UUID.randomUUID().toString();
}

private EventMetadataModifier createObjectUnderTest(final String metadataRootKey) {
return new EventMetadataModifier(metadataRootKey);
private EventMetadataModifier createObjectUnderTest(final String metadataRootKey, final Boolean isDeleteS3MetadataInEvent) {
return new EventMetadataModifier(metadataRootKey, isDeleteS3MetadataInEvent);
}

@Test
void constructor_throws_if_metadataRootKey_is_null() {
assertThrows(NullPointerException.class, () -> createObjectUnderTest(null));
assertThrows(NullPointerException.class, () -> createObjectUnderTest(null, null));
}

@ParameterizedTest
Expand All @@ -56,12 +58,21 @@ void accept_sets_correct_S3_bucket_and_key(final String metadataKey, final Strin
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
when(s3ObjectReference.getKey()).thenReturn(key);

createObjectUnderTest(metadataKey).accept(event, s3ObjectReference);
createObjectUnderTest(metadataKey, false).accept(event, s3ObjectReference);

verify(event).put(expectedRootKey + "bucket", bucketName);
verify(event).put(expectedRootKey + "key", key);
}

@ParameterizedTest
@ArgumentsSource(KeysArgumentsProvider.class)
void accept_does_not_set_correct_S3_bucket_and_key(final String metadataKey, final String expectedRootKey) {
createObjectUnderTest(metadataKey, true).accept(event, s3ObjectReference);

verify(event, never()).put(anyString(), anyString());
verify(event, never()).put(anyString(), anyString());
}

static class KeysArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down

0 comments on commit f6a06a0

Please sign in to comment.