diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java index ad8667eb4..25402b172 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java @@ -71,4 +71,12 @@ public synchronized List getAllSegments() { public synchronized void setAllSegments(List allSegments) { this.allSegments = allSegments; } + + public Boolean equals(ODPConfig toCompare) { + return getApiHost().equals(toCompare.getApiHost()) && getApiKey().equals(toCompare.getApiKey()) && getAllSegments().equals(toCompare.allSegments); + } + + public synchronized ODPConfig getClone() { + return new ODPConfig(apiKey, apiHost, allSegments); + } } diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java index 903bcf663..de7001ca8 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java @@ -17,6 +17,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.beans.Transient; import java.util.Collections; import java.util.Map; @@ -64,4 +65,21 @@ public Map getData() { public void setData(Map data) { this.data = data; } + + @Transient + public Boolean isDataValid() { + for (Object entry: this.data.values()) { + if ( + !( entry instanceof String + || entry instanceof Integer + || entry instanceof Long + || entry instanceof Boolean + || entry instanceof Float + || entry instanceof Double + || entry == null)) { + return false; + } + } + return true; + } } diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index 7cc601f29..ab4ce301e 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -50,7 +50,7 @@ public class ODPEventManager { // The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here // because `LinkedBlockingQueue` itself is thread safe. - private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { this(odpConfig, apiManager, null, null, null); @@ -71,7 +71,9 @@ public void start() { } public void updateSettings(ODPConfig odpConfig) { - this.odpConfig = odpConfig; + if (!this.odpConfig.equals(odpConfig) && eventQueue.offer(new FlushEvent(this.odpConfig))) { + this.odpConfig = odpConfig; + } } public void identifyUser(@Nullable String vuid, String userId) { @@ -85,6 +87,10 @@ public void identifyUser(@Nullable String vuid, String userId) { } public void sendEvent(ODPEvent event) { + if (!event.isDataValid()) { + logger.error("ODP event send failed (ODP data is not valid)"); + return; + } event.setData(augmentCommonData(event.getData())); processEvent(event); } @@ -137,7 +143,7 @@ private class EventDispatcherThread extends Thread { public void run() { while (true) { try { - ODPEvent nextEvent; + Object nextEvent; // If batch has events, set the timeout to remaining time for flush interval, // otherwise wait for the new event indefinitely @@ -158,12 +164,17 @@ public void run() { continue; } + if (nextEvent instanceof FlushEvent) { + flush(((FlushEvent) nextEvent).getOdpConfig()); + continue; + } + if (currentBatch.size() == 0) { // Batch starting, create a new flush time nextFlushTime = new Date().getTime() + flushInterval; } - currentBatch.add(nextEvent); + currentBatch.add((ODPEvent) nextEvent); if (currentBatch.size() >= batchSize) { flush(); @@ -176,7 +187,7 @@ public void run() { logger.debug("Exiting ODP Event Dispatcher Thread."); } - private void flush() { + private void flush(ODPConfig odpConfig) { if (odpConfig.isReady()) { String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; @@ -192,8 +203,23 @@ private void flush() { currentBatch.clear(); } + private void flush() { + flush(odpConfig); + } + public void signalStop() { shouldStop = true; } } + + private static class FlushEvent { + private final ODPConfig odpConfig; + public FlushEvent(ODPConfig odpConfig) { + this.odpConfig = odpConfig.getClone(); + } + + public ODPConfig getOdpConfig() { + return odpConfig; + } + } } diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPManager.java new file mode 100644 index 000000000..cb7e04f99 --- /dev/null +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPManager.java @@ -0,0 +1,61 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import javax.annotation.Nonnull; +import java.util.List; + +public class ODPManager { + private volatile ODPConfig odpConfig; + private final ODPSegmentManager segmentManager; + private final ODPEventManager eventManager; + + public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { + this(odpConfig, new ODPSegmentManager(odpConfig, apiManager), new ODPEventManager(odpConfig, apiManager)); + } + + public ODPManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPSegmentManager segmentManager, @Nonnull ODPEventManager eventManager) { + this.odpConfig = odpConfig; + this.segmentManager = segmentManager; + this.eventManager = eventManager; + this.eventManager.start(); + } + + public ODPSegmentManager getSegmentManager() { + return segmentManager; + } + + public ODPEventManager getEventManager() { + return eventManager; + } + + public Boolean updateSettings(String apiHost, String apiKey, List allSegments) { + ODPConfig newConfig = new ODPConfig(apiKey, apiHost, allSegments); + if (!odpConfig.equals(newConfig)) { + odpConfig = newConfig; + eventManager.updateSettings(odpConfig); + segmentManager.resetCache(); + segmentManager.updateSettings(odpConfig); + return true; + } + return false; + } + + public void close() { + eventManager.stop(); + } +} diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPSegmentManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPSegmentManager.java index ffda9c19c..352c4ec8f 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPSegmentManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPSegmentManager.java @@ -34,7 +34,7 @@ public class ODPSegmentManager { private final ODPApiManager apiManager; - private final ODPConfig odpConfig; + private volatile ODPConfig odpConfig; private final Cache> segmentsCache; @@ -105,4 +105,12 @@ public List getQualifiedSegments(ODPUserKey userKey, String userValue, L private String getCacheKey(String userKey, String userValue) { return userKey + "-$-" + userValue; } + + public void updateSettings(ODPConfig odpConfig) { + this.odpConfig = odpConfig; + } + + public void resetCache() { + segmentsCache.reset(); + } } diff --git a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java index 7be51e415..fd4287e0f 100644 --- a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java +++ b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java @@ -216,10 +216,36 @@ public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException { Thread.sleep(500); Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com")); - Thread.sleep(1500); + + // Should immediately Flush current batch with old ODP config when settings are changed + Thread.sleep(100); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + + // New events should use new config + for (int i = 0; i < 10; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(100); Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any()); } + @Test + public void validateEventData() { + ODPEvent event = new ODPEvent("type", "action", null, null); + Map data = new HashMap<>(); + + data.put("String", "string Value"); + data.put("Integer", 100); + data.put("Float", 33.89); + data.put("Boolean", true); + data.put("null", null); + event.setData(data); + assertTrue(event.isDataValid()); + + data.put("RandomObject", new Object()); + assertFalse(event.isDataValid()); + } + private ODPEvent getEvent(int id) { Map identifiers = new HashMap<>(); identifiers.put("identifier1", "value1-" + id); diff --git a/core-api/src/test/java/com/optimizely/ab/odp/ODPManagerTest.java b/core-api/src/test/java/com/optimizely/ab/odp/ODPManagerTest.java new file mode 100644 index 000000000..924c88836 --- /dev/null +++ b/core-api/src/test/java/com/optimizely/ab/odp/ODPManagerTest.java @@ -0,0 +1,123 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.util.Arrays; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +public class ODPManagerTest { + private static final String API_RESPONSE = "{\"data\":{\"customer\":{\"audiences\":{\"edges\":[{\"node\":{\"name\":\"segment1\",\"state\":\"qualified\"}},{\"node\":{\"name\":\"segment2\",\"state\":\"qualified\"}}]}}}}"; + + @Mock + ODPApiManager mockApiManager; + + @Mock + ODPEventManager mockEventManager; + + @Mock + ODPSegmentManager mockSegmentManager; + + @Before + public void setup() { + mockApiManager = mock(ODPApiManager.class); + mockEventManager = mock(ODPEventManager.class); + mockSegmentManager = mock(ODPSegmentManager.class); + } + + @Test + public void shouldStartEventManagerWhenODPManagerIsInitialized() { + ODPConfig config = new ODPConfig("test-key", "test-host"); + new ODPManager(config, mockSegmentManager, mockEventManager); + verify(mockEventManager, times(1)).start(); + } + + @Test + public void shouldStopEventManagerWhenCloseIsCalled() { + ODPConfig config = new ODPConfig("test-key", "test-host"); + ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager); + + // Stop is not called in the default flow. + verify(mockEventManager, times(0)).stop(); + + odpManager.close(); + // stop should be called when odpManager is closed. + verify(mockEventManager, times(1)).stop(); + } + + @Test + public void shouldUseNewSettingsInEventManagerWhenODPConfigIsUpdated() throws InterruptedException { + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(200); + ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2")); + ODPManager odpManager = new ODPManager(config, mockApiManager); + + odpManager.getEventManager().identifyUser("vuid", "fsuid"); + Thread.sleep(2000); + verify(mockApiManager, times(1)) + .sendEvents(eq("test-key"), eq("test-host/v3/events"), any()); + + odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1")); + odpManager.getEventManager().identifyUser("vuid", "fsuid"); + Thread.sleep(1200); + verify(mockApiManager, times(1)) + .sendEvents(eq("test-key-updated"), eq("test-host-updated/v3/events"), any()); + } + + @Test + public void shouldUseNewSettingsInSegmentManagerWhenODPConfigIsUpdated() { + Mockito.when(mockApiManager.fetchQualifiedSegments(anyString(), anyString(), anyString(), anyString(), anyList())) + .thenReturn(API_RESPONSE); + ODPConfig config = new ODPConfig("test-key", "test-host", Arrays.asList("segment1", "segment2")); + ODPManager odpManager = new ODPManager(config, mockApiManager); + + odpManager.getSegmentManager().getQualifiedSegments("test-id"); + verify(mockApiManager, times(1)) + .fetchQualifiedSegments(eq("test-key"), eq("test-host/v3/graphql"), any(), any(), any()); + + odpManager.updateSettings("test-host-updated", "test-key-updated", Arrays.asList("segment1")); + odpManager.getSegmentManager().getQualifiedSegments("test-id"); + verify(mockApiManager, times(1)) + .fetchQualifiedSegments(eq("test-key-updated"), eq("test-host-updated/v3/graphql"), any(), any(), any()); + } + + @Test + public void shouldGetEventManager() { + ODPConfig config = new ODPConfig("test-key", "test-host"); + ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager); + assertNotNull(odpManager.getEventManager()); + + odpManager = new ODPManager(config, mockApiManager); + assertNotNull(odpManager.getEventManager()); + } + + @Test + public void shouldGetSegmentManager() { + ODPConfig config = new ODPConfig("test-key", "test-host"); + ODPManager odpManager = new ODPManager(config, mockSegmentManager, mockEventManager); + assertNotNull(odpManager.getSegmentManager()); + + odpManager = new ODPManager(config, mockApiManager); + assertNotNull(odpManager.getSegmentManager()); + } +}