Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added ODPManager implementation #489

Merged
merged 8 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ public synchronized List<String> getAllSegments() {
public synchronized void setAllSegments(List<String> allSegments) {
this.allSegments = allSegments;
}

public Boolean equals(ODPConfig toCompare) {
return getApiHost().equals(toCompare.getApiHost()) && getApiKey().equals(toCompare.getApiKey()) && getAllSegments().equals(toCompare.allSegments);
}

public synchronized ODPConfig getClone() {
return new ODPConfig(apiKey, apiHost, allSegments);
}
}
18 changes: 18 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.beans.Transient;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -64,4 +65,21 @@ public Map<String, Object> getData() {
public void setData(Map<String, Object> data) {
this.data = data;
}

@Transient
public Boolean isDataValid() {
for (Object entry: this.data.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null is also valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (
!( entry instanceof String
|| entry instanceof Integer
|| entry instanceof Long
|| entry instanceof Boolean
|| entry instanceof Float
|| entry instanceof Double
|| entry == null)) {
return false;
}
}
return true;
}
}
36 changes: 31 additions & 5 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ODPEventManager {

// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here
// because `LinkedBlockingQueue` itself is thread safe.
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Object> eventQueue = new LinkedBlockingQueue<>();

public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, apiManager, null, null, null);
Expand All @@ -71,7 +71,9 @@ public void start() {
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
if (!this.odpConfig.equals(odpConfig) && eventQueue.offer(new FlushEvent(this.odpConfig))) {
this.odpConfig = odpConfig;
}
}

public void identifyUser(@Nullable String vuid, String userId) {
Expand All @@ -85,6 +87,10 @@ public void identifyUser(@Nullable String vuid, String userId) {
}

public void sendEvent(ODPEvent event) {
if (!event.isDataValid()) {
logger.error("ODP event send failed (ODP data is not valid)");
return;
}
event.setData(augmentCommonData(event.getData()));
processEvent(event);
}
Expand Down Expand Up @@ -137,7 +143,7 @@ private class EventDispatcherThread extends Thread {
public void run() {
while (true) {
try {
ODPEvent nextEvent;
Object nextEvent;

// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
Expand All @@ -158,12 +164,17 @@ public void run() {
continue;
}

if (nextEvent instanceof FlushEvent) {
flush(((FlushEvent) nextEvent).getOdpConfig());
continue;
}

if (currentBatch.size() == 0) {
// Batch starting, create a new flush time
nextFlushTime = new Date().getTime() + flushInterval;
}

currentBatch.add(nextEvent);
currentBatch.add((ODPEvent) nextEvent);

if (currentBatch.size() >= batchSize) {
flush();
Expand All @@ -176,7 +187,7 @@ public void run() {
logger.debug("Exiting ODP Event Dispatcher Thread.");
}

private void flush() {
private void flush(ODPConfig odpConfig) {
if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Expand All @@ -192,8 +203,23 @@ private void flush() {
currentBatch.clear();
}

private void flush() {
flush(odpConfig);
}

public void signalStop() {
shouldStop = true;
}
}

private static class FlushEvent {
private final ODPConfig odpConfig;
public FlushEvent(ODPConfig odpConfig) {
this.odpConfig = odpConfig.getClone();
}

public ODPConfig getOdpConfig() {
return odpConfig;
}
}
}
61 changes: 61 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import javax.annotation.Nonnull;
import java.util.List;

public class ODPManager {
private volatile ODPConfig odpConfig;
private final ODPSegmentManager segmentManager;
private final ODPEventManager eventManager;

public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, new ODPSegmentManager(odpConfig, apiManager), new ODPEventManager(odpConfig, apiManager));
}

public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPSegmentManager segmentManager, @Nonnull ODPEventManager eventManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customization of segmentCache size/timeout and segmentCache itself as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to support for "disable" entire ODP functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two constructors.

  1. First one only takes the two mandatory arguments of ODPConfig and ApiManager and initializes everything else on default values.
  2. If you want to customize anything at all, use the second constructor which expects you to pass everything from the outside. This is where you will have a chance to initialize your own SegmentManager with your own cacheOptions.

I will recommend to keep these two options for now. We can provide other combinations at the level of factory methods to create the SDK instance later as needed. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of handling disable / enable at optimizely level. When enabled, ODP object will be there and it will always work. When disabled, there will be no ODP instance at all. What do you suggest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two constructors.

  1. First one only takes the two mandatory arguments of ODPConfig and ApiManager and initializes everything else on default values.
  2. If you want to customize anything at all, use the second constructor which expects you to pass everything from the outside. This is where you will have a chance to initialize your own SegmentManager with your own cacheOptions.

I will recommend to keep these two options for now. We can provide other combinations at the level of factory methods to create the SDK instance later as needed. What do you think?

I see adjusting cache size and timeout more likely options for clients rather than full Cache and OdpManager customazations. Providing that path with factory will be also a good solution too.

Copy link
Contributor

@jaeopt jaeopt Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of handling disable / enable at optimizely level. When enabled, ODP object will be there and it will always work. When disabled, there will be no ODP instance at all. What do you suggest?

I was concerned about "if odpManager == null" checking in many places we need odpManager. We can try wrap those checking in OdpManager. Other than that, disabling at the top level is also a good option too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advise to defer this discussion till the top level PR. We can then see if it looks good at top level other wise i will add this to ODPManager. what do you say?

this.odpConfig = odpConfig;
this.segmentManager = segmentManager;
this.eventManager = eventManager;
this.eventManager.start();
}

public ODPSegmentManager getSegmentManager() {
return segmentManager;
}

public ODPEventManager getEventManager() {
return eventManager;
}

public Boolean updateSettings(String apiHost, String apiKey, List<String> allSegments) {
ODPConfig newConfig = new ODPConfig(apiKey, apiHost, allSegments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we flush events in queue before updating apiKey/apiHost? It should not be common but when clients change the ODP account, all events in the queue should be flushed to the old account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (!odpConfig.equals(newConfig)) {
odpConfig = newConfig;
eventManager.updateSettings(odpConfig);
segmentManager.resetCache();
segmentManager.updateSettings(odpConfig);
return true;
}
return false;
}

public void close() {
eventManager.stop();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have top-level fetchSegments and sendEvent apis. Is it a plan to open directly from OdpEventManager and OdpSegmentManager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected to filter out api requests in this level (like "disable" discards all requests). Also sendEvent data type should be validated here (are we doing this in OdpEventManager?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable: I am expecting disable to be handled in optimizely object which will never go to ODPManager at all if disabled.

SendEvent Data validation: I am not sure what validation needs to be done. But if there is one, i would like to add that to eventManager itself. What do you suggest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See TDD which defines accepted types for event data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about top level fetch and event methods, SegmentManager and EventManager provide some overrides. If i add all those overrides, it looks redundant. I thought it made more sense to provide access to segmentManager and eventManager directly through getters to use those methods.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ODPSegmentManager {

private final ODPApiManager apiManager;

private final ODPConfig odpConfig;
private volatile ODPConfig odpConfig;

private final Cache<List<String>> segmentsCache;

Expand Down Expand Up @@ -105,4 +105,12 @@ public List<String> getQualifiedSegments(ODPUserKey userKey, String userValue, L
private String getCacheKey(String userKey, String userValue) {
return userKey + "-$-" + userValue;
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
}

public void resetCache() {
segmentsCache.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,36 @@ public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException {
Thread.sleep(500);
Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());
eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com"));
Thread.sleep(1500);

// Should immediately Flush current batch with old ODP config when settings are changed
Thread.sleep(100);
Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());

// New events should use new config
for (int i = 0; i < 10; i++) {
eventManager.sendEvent(getEvent(i));
}
Thread.sleep(100);
Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any());
}

@Test
public void validateEventData() {
ODPEvent event = new ODPEvent("type", "action", null, null);
Map<String, Object> data = new HashMap<>();

data.put("String", "string Value");
data.put("Integer", 100);
data.put("Float", 33.89);
data.put("Boolean", true);
data.put("null", null);
event.setData(data);
assertTrue(event.isDataValid());

data.put("RandomObject", new Object());
assertFalse(event.isDataValid());
}

private ODPEvent getEvent(int id) {
Map<String, String> identifiers = new HashMap<>();
identifiers.put("identifier1", "value1-" + id);
Expand Down
123 changes: 123 additions & 0 deletions core-api/src/test/java/com/optimizely/ab/odp/ODPManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.util.Arrays;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;

public class ODPManagerTest {
private static final String API_RESPONSE = "{\"data\":{\"customer\":{\"audiences\":{\"edges\":[{\"node\":{\"name\":\"segment1\",\"state\":\"qualified\"}},{\"node\":{\"name\":\"segment2\",\"state\":\"qualified\"}}]}}}}";

@Mock
ODPApiManager mockApiManager;

@Mock
ODPEventManager mockEventManager;

@Mock
ODPSegmentManager mockSegmentManager;

@Before
public void setup() {
mockApiManager = mock(ODPApiManager.class);
mockEventManager = mock(ODPEventManager.class);
mockSegmentManager = mock(ODPSegmentManager.class);
}

@Test
public void shouldStartEventManagerWhenODPManagerIsInitialized() {
ODPConfig config = new ODPConfig("test-key", "test-host");
new ODPManager(config, mockSegmentManager, mockEventManager);
verify(mockEventManager, times(1)).start();
}

@Test
public void shouldStopEventManagerWhenCloseIsCalled() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);

// Stop is not called in the default flow.
verify(mockEventManager, times(0)).stop();

odpManager.close();
// stop should be called when odpManager is closed.
verify(mockEventManager, times(1)).stop();
}

@Test
public void shouldUseNewSettingsInEventManagerWhenODPConfigIsUpdated() throws InterruptedException {
Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(200);
ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2"));
ODPManager odpManager = new ODPManager(config, mockApiManager);

odpManager.getEventManager().identifyUser("vuid", "fsuid");
Thread.sleep(2000);
verify(mockApiManager, times(1))
.sendEvents(eq("test-key"), eq("test-host/v3/events"), any());

odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1"));
odpManager.getEventManager().identifyUser("vuid", "fsuid");
Thread.sleep(1200);
verify(mockApiManager, times(1))
.sendEvents(eq("test-key-updated"), eq("test-host-updated/v3/events"), any());
}

@Test
public void shouldUseNewSettingsInSegmentManagerWhenODPConfigIsUpdated() {
Mockito.when(mockApiManager.fetchQualifiedSegments(anyString(), anyString(), anyString(), anyString(), anyList()))
.thenReturn(API_RESPONSE);
ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2"));
ODPManager odpManager = new ODPManager(config, mockApiManager);

odpManager.getSegmentManager().getQualifiedSegments("test-id");
verify(mockApiManager, times(1))
.fetchQualifiedSegments(eq("test-key"), eq("test-host/v3/graphql"), any(), any(), any());

odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1"));
odpManager.getSegmentManager().getQualifiedSegments("test-id");
verify(mockApiManager, times(1))
.fetchQualifiedSegments(eq("test-key-updated"), eq("test-host-updated/v3/graphql"), any(), any(), any());
}

@Test
public void shouldGetEventManager() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);
assertNotNull(odpManager.getEventManager());

odpManager = new ODPManager(config, mockApiManager);
assertNotNull(odpManager.getEventManager());
}

@Test
public void shouldGetSegmentManager() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);
assertNotNull(odpManager.getSegmentManager());

odpManager = new ODPManager(config, mockApiManager);
assertNotNull(odpManager.getSegmentManager());
}
}