Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new S3 source option and modify docdb template #4492

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
event_collect_timeout: "120s"
maximum_size: "2mb"
aggregate_threshold:
maximum_size: "256kb"
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
Expand Down Expand Up @@ -63,7 +63,11 @@
sts_header_overrides: "<<$.<<pipeline-name>>.source.documentdb.aws.sts_header_overrides>>"
acknowledgments: true
delete_s3_objects_on_read: true
disable_s3_metadata_in_event: true
scan:
folder_partitions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if we added an integration test for the transform. Not needed in this PR

depth: [ "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.documentdb.s3_prefix>>" ]
max_objects_per_ownership: 50
buckets:
- bucket:
name: "<<$.<<pipeline-name>>.source.documentdb.s3_bucket>>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, false);

buffer = mock(Buffer.class);
recordsReceived = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY, s3SourceConfig.isDeleteS3MetadataInEvent());

buffer = mock(Buffer.class);
recordsReceived = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ class EventMetadataModifier implements BiConsumer<Event, S3ObjectReference> {
private static final String BUCKET_FIELD_NAME = "bucket";
private static final String KEY_FIELD_NAME = "key";
private final String baseKey;
private final boolean deleteS3MetadataInEvent;

EventMetadataModifier(final String metadataRootKey) {
EventMetadataModifier(final String metadataRootKey, boolean deleteS3MetadataInEvent) {
baseKey = generateBaseKey(metadataRootKey);
this.deleteS3MetadataInEvent = deleteS3MetadataInEvent;
}

@Override
public void accept(final Event event, final S3ObjectReference s3ObjectReference) {
event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
if(!deleteS3MetadataInEvent) {
event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
}
}

private static String generateBaseKey(String metadataRootKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void start(Buffer<Record<Event>> buffer) {
final S3ObjectRequest.Builder s3ObjectRequestBuilder = new S3ObjectRequest.Builder(buffer, s3SourceConfig.getNumberOfRecordsToAccumulate(),
s3SourceConfig.getBufferTimeout(), s3ObjectPluginMetrics);
final BiConsumer<Event, S3ObjectReference> eventMetadataModifier = new EventMetadataModifier(
s3SourceConfig.getMetadataRootKey());
s3SourceConfig.getMetadataRootKey(), s3SourceConfig.isDeleteS3MetadataInEvent());
final S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3ClientBuilderFactory.getS3Client(), pluginMetrics);

if (s3SelectOptional.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public class S3SourceConfig {
@JsonProperty("delete_s3_objects_on_read")
private boolean deleteS3ObjectsOnRead = false;

@JsonProperty("disable_s3_metadata_in_event")
private boolean deleteS3MetadataInEvent = false;

@AssertTrue(message = "A codec is required for reading objects.")
boolean isCodecProvidedWhenNeeded() {
if(s3SelectOptions == null)
Expand Down Expand Up @@ -189,4 +192,8 @@ public Map<String, String> getBucketOwners() {
public String getDefaultBucketOwner() {
return defaultBucketOwner;
}

public boolean isDeleteS3MetadataInEvent() {
return deleteS3MetadataInEvent;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: variable and function rename to disableS3MetadataInEvent;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.mockito.ArgumentMatchers.anyString;
import org.mockito.Mock;
import static org.mockito.Mockito.never;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

Expand Down Expand Up @@ -41,13 +43,13 @@ void setUp() {
key = UUID.randomUUID().toString();
}

private EventMetadataModifier createObjectUnderTest(final String metadataRootKey) {
return new EventMetadataModifier(metadataRootKey);
private EventMetadataModifier createObjectUnderTest(final String metadataRootKey, final Boolean isDeleteS3MetadataInEvent) {
return new EventMetadataModifier(metadataRootKey, isDeleteS3MetadataInEvent);
}

@Test
void constructor_throws_if_metadataRootKey_is_null() {
assertThrows(NullPointerException.class, () -> createObjectUnderTest(null));
assertThrows(NullPointerException.class, () -> createObjectUnderTest(null, null));
}

@ParameterizedTest
Expand All @@ -56,12 +58,21 @@ void accept_sets_correct_S3_bucket_and_key(final String metadataKey, final Strin
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
when(s3ObjectReference.getKey()).thenReturn(key);

createObjectUnderTest(metadataKey).accept(event, s3ObjectReference);
createObjectUnderTest(metadataKey, false).accept(event, s3ObjectReference);

verify(event).put(expectedRootKey + "bucket", bucketName);
verify(event).put(expectedRootKey + "key", key);
}

@ParameterizedTest
@ArgumentsSource(KeysArgumentsProvider.class)
void accept_does_not_set_correct_S3_bucket_and_key(final String metadataKey, final String expectedRootKey) {
createObjectUnderTest(metadataKey, true).accept(event, s3ObjectReference);

verify(event, never()).put(anyString(), anyString());
verify(event, never()).put(anyString(), anyString());
}

static class KeysArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Loading