Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify AcknowledgementSet add API to accept EventHandle instead of Event #4948

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* dropped, etc)
*/
public interface AcknowledgementSet {
/**
* Adds an event handle to the acknowledgement set. Assigns initial reference
* count of 1.
*
* @param eventHandle event handle to be added
* @since 2.11
*/
void add(EventHandle eventHandle);

/**
* Adds an event to the acknowledgement set. Assigns initial reference
Expand All @@ -29,7 +37,9 @@ public interface AcknowledgementSet {
* @param event event to be added
* @since 2.2
*/
public void add(Event event);
default void add(Event event) {
add(event.getEventHandle());
}

/**
* Aquires a reference to the event by incrementing the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public boolean release(boolean result) {
return returnValue;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef : acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}
}
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() {
return acknowledgementSet != null;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}

@Override
public void acquireReference() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,12 @@ public interface InternalEventHandle {
*/
void acquireReference();

/**
* Adds new event handle to the acknowledgement sets associated
* with this event handle
* @param eventHandle event handle to add
* @since 2.11
*/
void addEventHandle(EventHandle eventHandle);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.acknowledgements;

import org.junit.jupiter.api.Test;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.junit.jupiter.api.BeforeEach;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class AcknowledgementSetTests {

Event event;
EventHandle eventHandle;
AcknowledgementSet acknowledgementSet;

@BeforeEach
public void setup() {
event = mock(Event.class);
eventHandle = mock(EventHandle.class);
when(event.getEventHandle()).thenReturn(eventHandle);
acknowledgementSet = spy(AcknowledgementSet.class);
}

@Test
public void testAcknowledgementSetAdd() {
acknowledgementSet.add(event);
verify(acknowledgementSet).add(eventHandle);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() {

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet1);
eventHandle.addAcknowledgementSet(acknowledgementSet2);
AggregateEventHandle eventHandle2 = new AggregateEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet1).add(eventHandle2);
verify(acknowledgementSet2).add(eventHandle2);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import org.mockito.Mock;

import java.time.Instant;
Expand All @@ -39,7 +38,6 @@ void testBasic() {
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
doNothing().when(acknowledgementSet).acquire(any(EventHandle.class));
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
Expand Down Expand Up @@ -75,5 +73,16 @@ void testWithOnReleaseHandler() {
assertThat(count, equalTo(1));

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
acknowledgementSet = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet);
DefaultEventHandle eventHandle2 = new DefaultEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet).add(eventHandle2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,18 +72,13 @@ public void checkProgress() {
}

@Override
public void add(Event event) {
public void add(EventHandle eventHandle) {
lock.lock();
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
}
}
InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.dataprepper.core.parser.DataFlowComponent;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -95,13 +94,13 @@ public Record getRecord(final Record record) {
final Event recordEvent = (Event) record.getData();
JacksonEvent newRecordEvent;
Record newRecord;
DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle();
if (internalHandle != null && internalHandle.hasAcknowledgementSet()) {
final EventMetadata eventMetadata = recordEvent.getMetadata();
final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap());
newRecordEvent = (JacksonEvent) eventBuilder.build();

eventHandle.getAcknowledgementSet().add(newRecordEvent);
internalHandle.addEventHandle(newRecordEvent.getEventHandle());
newRecord = new Record<>(newRecordEvent);
acquireEventReference(newRecord);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {

@Test
void testDefaultAcknowledgementInvalidAcquire() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event.getEventHandle());
defaultAcknowledgementSet.complete();
DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest();
defaultAcknowledgementSet.acquire(handle2);
Expand Down Expand Up @@ -247,7 +247,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
Duration.ofSeconds(1)
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event2);
defaultAcknowledgementSet.add(event2.getEventHandle());
defaultAcknowledgementSet.complete();
lenient().doAnswer(a -> {
AcknowledgementSet acknowledgementSet = a.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -241,10 +242,10 @@ void test_one_record_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down Expand Up @@ -279,10 +280,10 @@ void test_multiple_records_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void testProcessRecordsWithAcknowledgementsEnabled()
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void setUp() throws Exception {
lenient().doAnswer(a -> {
numEventsAdded++;
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));
bucketName = UUID.randomUUID().toString();
key = UUID.randomUUID().toString();
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
Expand Down Expand Up @@ -196,6 +196,8 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg
numEventsAdded = 0;
doAnswer(a -> {
Record record = mock(Record.class);
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
Consumer c = (Consumer)a.getArgument(2);
c.accept(record);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void setup() {
lenient().doAnswer(a -> {
numEventsAdded++;
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));
final String bucketName = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV

protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
eventHandle.getAcknowledgementSet().add(recordEvent);
if (eventHandle != null) {
eventHandle.addEventHandle(recordEvent.getEventHandle());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public void testAddToAcknowledgementSetFromOriginEvent() {

DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle();
// Verify that the add method is called on the acknowledgement set
verify(spyEventHandle).getAcknowledgementSet();
verify(spyEventHandle).addEventHandle(recordEvent.getEventHandle());

AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet();
DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle();
Expand Down
Loading