Skip to content

Commit

Permalink
feat: Added ODPManager implementation (#489)
Browse files Browse the repository at this point in the history
## Summary
Added ODPManager Implementation which does the following.
1. Initializes and provides access to ODPEventManager and ODPSegmentManager
2. Provides updated ODPConfig settings to event manager and segment manager.
3. Stops Event Manager thread when closed.

## Test plan
1. Manually tested thoroughly
2. Added unit tests.

## Issues
[FSSDK-8388](https://jira.sso.episerver.net/browse/FSSDK-8388)
  • Loading branch information
zashraf1985 authored Oct 20, 2022
1 parent 913b8e4 commit 7428c4c
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 7 deletions.
8 changes: 8 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ public synchronized List<String> getAllSegments() {
public synchronized void setAllSegments(List<String> allSegments) {
this.allSegments = allSegments;
}

public Boolean equals(ODPConfig toCompare) {
return getApiHost().equals(toCompare.getApiHost()) && getApiKey().equals(toCompare.getApiKey()) && getAllSegments().equals(toCompare.allSegments);
}

public synchronized ODPConfig getClone() {
return new ODPConfig(apiKey, apiHost, allSegments);
}
}
18 changes: 18 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.beans.Transient;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -64,4 +65,21 @@ public Map<String, Object> getData() {
public void setData(Map<String, Object> data) {
this.data = data;
}

@Transient
public Boolean isDataValid() {
for (Object entry: this.data.values()) {
if (
!( entry instanceof String
|| entry instanceof Integer
|| entry instanceof Long
|| entry instanceof Boolean
|| entry instanceof Float
|| entry instanceof Double
|| entry == null)) {
return false;
}
}
return true;
}
}
36 changes: 31 additions & 5 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ODPEventManager {

// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here
// because `LinkedBlockingQueue` itself is thread safe.
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Object> eventQueue = new LinkedBlockingQueue<>();

public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, apiManager, null, null, null);
Expand All @@ -71,7 +71,9 @@ public void start() {
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
if (!this.odpConfig.equals(odpConfig) && eventQueue.offer(new FlushEvent(this.odpConfig))) {
this.odpConfig = odpConfig;
}
}

public void identifyUser(@Nullable String vuid, String userId) {
Expand All @@ -85,6 +87,10 @@ public void identifyUser(@Nullable String vuid, String userId) {
}

public void sendEvent(ODPEvent event) {
if (!event.isDataValid()) {
logger.error("ODP event send failed (ODP data is not valid)");
return;
}
event.setData(augmentCommonData(event.getData()));
processEvent(event);
}
Expand Down Expand Up @@ -137,7 +143,7 @@ private class EventDispatcherThread extends Thread {
public void run() {
while (true) {
try {
ODPEvent nextEvent;
Object nextEvent;

// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
Expand All @@ -158,12 +164,17 @@ public void run() {
continue;
}

if (nextEvent instanceof FlushEvent) {
flush(((FlushEvent) nextEvent).getOdpConfig());
continue;
}

if (currentBatch.size() == 0) {
// Batch starting, create a new flush time
nextFlushTime = new Date().getTime() + flushInterval;
}

currentBatch.add(nextEvent);
currentBatch.add((ODPEvent) nextEvent);

if (currentBatch.size() >= batchSize) {
flush();
Expand All @@ -176,7 +187,7 @@ public void run() {
logger.debug("Exiting ODP Event Dispatcher Thread.");
}

private void flush() {
private void flush(ODPConfig odpConfig) {
if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Expand All @@ -192,8 +203,23 @@ private void flush() {
currentBatch.clear();
}

private void flush() {
flush(odpConfig);
}

public void signalStop() {
shouldStop = true;
}
}

private static class FlushEvent {
private final ODPConfig odpConfig;
public FlushEvent(ODPConfig odpConfig) {
this.odpConfig = odpConfig.getClone();
}

public ODPConfig getOdpConfig() {
return odpConfig;
}
}
}
61 changes: 61 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import javax.annotation.Nonnull;
import java.util.List;

public class ODPManager {
private volatile ODPConfig odpConfig;
private final ODPSegmentManager segmentManager;
private final ODPEventManager eventManager;

public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, new ODPSegmentManager(odpConfig, apiManager), new ODPEventManager(odpConfig, apiManager));
}

public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPSegmentManager segmentManager, @Nonnull ODPEventManager eventManager) {
this.odpConfig = odpConfig;
this.segmentManager = segmentManager;
this.eventManager = eventManager;
this.eventManager.start();
}

public ODPSegmentManager getSegmentManager() {
return segmentManager;
}

public ODPEventManager getEventManager() {
return eventManager;
}

public Boolean updateSettings(String apiHost, String apiKey, List<String> allSegments) {
ODPConfig newConfig = new ODPConfig(apiKey, apiHost, allSegments);
if (!odpConfig.equals(newConfig)) {
odpConfig = newConfig;
eventManager.updateSettings(odpConfig);
segmentManager.resetCache();
segmentManager.updateSettings(odpConfig);
return true;
}
return false;
}

public void close() {
eventManager.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ODPSegmentManager {

private final ODPApiManager apiManager;

private final ODPConfig odpConfig;
private volatile ODPConfig odpConfig;

private final Cache<List<String>> segmentsCache;

Expand Down Expand Up @@ -105,4 +105,12 @@ public List<String> getQualifiedSegments(ODPUserKey userKey, String userValue, L
private String getCacheKey(String userKey, String userValue) {
return userKey + "-$-" + userValue;
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
}

public void resetCache() {
segmentsCache.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,36 @@ public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException {
Thread.sleep(500);
Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());
eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com"));
Thread.sleep(1500);

// Should immediately Flush current batch with old ODP config when settings are changed
Thread.sleep(100);
Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());

// New events should use new config
for (int i = 0; i < 10; i++) {
eventManager.sendEvent(getEvent(i));
}
Thread.sleep(100);
Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any());
}

@Test
public void validateEventData() {
ODPEvent event = new ODPEvent("type", "action", null, null);
Map<String, Object> data = new HashMap<>();

data.put("String", "string Value");
data.put("Integer", 100);
data.put("Float", 33.89);
data.put("Boolean", true);
data.put("null", null);
event.setData(data);
assertTrue(event.isDataValid());

data.put("RandomObject", new Object());
assertFalse(event.isDataValid());
}

private ODPEvent getEvent(int id) {
Map<String, String> identifiers = new HashMap<>();
identifiers.put("identifier1", "value1-" + id);
Expand Down
123 changes: 123 additions & 0 deletions core-api/src/test/java/com/optimizely/ab/odp/ODPManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.util.Arrays;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;

public class ODPManagerTest {
private static final String API_RESPONSE = "{\"data\":{\"customer\":{\"audiences\":{\"edges\":[{\"node\":{\"name\":\"segment1\",\"state\":\"qualified\"}},{\"node\":{\"name\":\"segment2\",\"state\":\"qualified\"}}]}}}}";

@Mock
ODPApiManager mockApiManager;

@Mock
ODPEventManager mockEventManager;

@Mock
ODPSegmentManager mockSegmentManager;

@Before
public void setup() {
mockApiManager = mock(ODPApiManager.class);
mockEventManager = mock(ODPEventManager.class);
mockSegmentManager = mock(ODPSegmentManager.class);
}

@Test
public void shouldStartEventManagerWhenODPManagerIsInitialized() {
ODPConfig config = new ODPConfig("test-key", "test-host");
new ODPManager(config, mockSegmentManager, mockEventManager);
verify(mockEventManager, times(1)).start();
}

@Test
public void shouldStopEventManagerWhenCloseIsCalled() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);

// Stop is not called in the default flow.
verify(mockEventManager, times(0)).stop();

odpManager.close();
// stop should be called when odpManager is closed.
verify(mockEventManager, times(1)).stop();
}

@Test
public void shouldUseNewSettingsInEventManagerWhenODPConfigIsUpdated() throws InterruptedException {
Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(200);
ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2"));
ODPManager odpManager = new ODPManager(config, mockApiManager);

odpManager.getEventManager().identifyUser("vuid", "fsuid");
Thread.sleep(2000);
verify(mockApiManager, times(1))
.sendEvents(eq("test-key"), eq("test-host/v3/events"), any());

odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1"));
odpManager.getEventManager().identifyUser("vuid", "fsuid");
Thread.sleep(1200);
verify(mockApiManager, times(1))
.sendEvents(eq("test-key-updated"), eq("test-host-updated/v3/events"), any());
}

@Test
public void shouldUseNewSettingsInSegmentManagerWhenODPConfigIsUpdated() {
Mockito.when(mockApiManager.fetchQualifiedSegments(anyString(), anyString(), anyString(), anyString(), anyList()))
.thenReturn(API_RESPONSE);
ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2"));
ODPManager odpManager = new ODPManager(config, mockApiManager);

odpManager.getSegmentManager().getQualifiedSegments("test-id");
verify(mockApiManager, times(1))
.fetchQualifiedSegments(eq("test-key"), eq("test-host/v3/graphql"), any(), any(), any());

odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1"));
odpManager.getSegmentManager().getQualifiedSegments("test-id");
verify(mockApiManager, times(1))
.fetchQualifiedSegments(eq("test-key-updated"), eq("test-host-updated/v3/graphql"), any(), any(), any());
}

@Test
public void shouldGetEventManager() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);
assertNotNull(odpManager.getEventManager());

odpManager = new ODPManager(config, mockApiManager);
assertNotNull(odpManager.getEventManager());
}

@Test
public void shouldGetSegmentManager() {
ODPConfig config = new ODPConfig("test-key", "test-host");
ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager);
assertNotNull(odpManager.getSegmentManager());

odpManager = new ODPManager(config, mockApiManager);
assertNotNull(odpManager.getSegmentManager());
}
}

0 comments on commit 7428c4c

Please sign in to comment.