From b4c0350a64e79a07e32fd2911b3ec2a7c57b9abd Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Mon, 9 Dec 2024 16:12:29 -0300 Subject: [PATCH 1/2] Refactor storages and submitter, now that 'impressionCounts' and 'uniqueKeys' caches will be required --- src/listeners/browser.ts | 4 +-- src/storages/inLocalStorage/index.ts | 9 ++--- src/storages/inMemory/InMemoryStorage.ts | 8 ++--- src/storages/inMemory/InMemoryStorageCS.ts | 9 ++--- src/storages/inRedis/index.ts | 20 +++++------ src/storages/pluggable/index.ts | 36 +++++++++---------- src/storages/types.ts | 4 +-- .../submitters/impressionCountsSubmitter.ts | 6 ++-- src/sync/submitters/submitterManager.ts | 7 ++-- src/sync/submitters/uniqueKeysSubmitter.ts | 5 ++- 10 files changed, 50 insertions(+), 58 deletions(-) diff --git a/src/listeners/browser.ts b/src/listeners/browser.ts index 12f13b74..ffd86004 100644 --- a/src/listeners/browser.ts +++ b/src/listeners/browser.ts @@ -8,7 +8,6 @@ import { IResponse, ISplitApi } from '../services/types'; import { ISettings } from '../types'; import SplitIO from '../../types/splitio'; import { ImpressionsPayload } from '../sync/submitters/types'; -import { OPTIMIZED, DEBUG, NONE } from '../utils/constants'; import { objectAssign } from '../utils/lang/objectAssign'; import { CLEANUP_REGISTERING, CLEANUP_DEREGISTERING } from '../logger/constants'; import { ISyncManager } from '../sync/types'; @@ -78,10 +77,9 @@ export class BrowserSignalListener implements ISignalListener { // Flush impressions & events data if there is user consent if (isConsentGranted(this.settings)) { - const sim = this.settings.sync.impressionsMode; const extraMetadata = { // sim stands for Sync/Split Impressions Mode - sim: sim === OPTIMIZED ? OPTIMIZED : sim === DEBUG ? DEBUG : NONE + sim: this.settings.sync.impressionsMode }; this._flushData(events + '/testImpressions/beacon', this.storage.impressions, this.serviceApi.postTestImpressionsBulk, this.fromImpressionsCollector, extraMetadata); diff --git a/src/storages/inLocalStorage/index.ts b/src/storages/inLocalStorage/index.ts index 871be592..c621141d 100644 --- a/src/storages/inLocalStorage/index.ts +++ b/src/storages/inLocalStorage/index.ts @@ -10,7 +10,7 @@ import { MySegmentsCacheInLocal } from './MySegmentsCacheInLocal'; import { DEFAULT_CACHE_EXPIRATION_IN_MILLIS } from '../../utils/constants/browser'; import { InMemoryStorageCSFactory } from '../inMemory/InMemoryStorageCS'; import { LOG_PREFIX } from './constants'; -import { DEBUG, NONE, STORAGE_LOCALSTORAGE } from '../../utils/constants'; +import { STORAGE_LOCALSTORAGE } from '../../utils/constants'; import { shouldRecordTelemetry, TelemetryCacheInMemory } from '../inMemory/TelemetryCacheInMemory'; import { UniqueKeysCacheInMemoryCS } from '../inMemory/UniqueKeysCacheInMemoryCS'; import { getMatching } from '../../utils/key'; @@ -34,7 +34,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn return InMemoryStorageCSFactory(params); } - const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode } } } = params; + const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params; const matchingKey = getMatching(settings.core.key); const keys = new KeyBuilderCS(prefix, matchingKey); const expirationTimestamp = Date.now() - DEFAULT_CACHE_EXPIRATION_IN_MILLIS; @@ -48,10 +48,10 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn segments, largeSegments, impressions: new ImpressionsCacheInMemory(impressionsQueueSize), - impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined, + impressionCounts: new ImpressionCountsCacheInMemory(), events: new EventsCacheInMemory(eventsQueueSize), telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined, - uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined, + uniqueKeys: new UniqueKeysCacheInMemoryCS(), destroy() { }, @@ -66,6 +66,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn impressionCounts: this.impressionCounts, events: this.events, telemetry: this.telemetry, + uniqueKeys: this.uniqueKeys, destroy() { } }; diff --git a/src/storages/inMemory/InMemoryStorage.ts b/src/storages/inMemory/InMemoryStorage.ts index 565d37ba..b8833370 100644 --- a/src/storages/inMemory/InMemoryStorage.ts +++ b/src/storages/inMemory/InMemoryStorage.ts @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory'; import { EventsCacheInMemory } from './EventsCacheInMemory'; import { IStorageFactoryParams, IStorageSync } from '../types'; import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory'; -import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants'; +import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants'; import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory'; import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory'; @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory'; * @param params - parameters required by EventsCacheSync */ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageSync { - const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params; + const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { __splitFiltersValidation } } } = params; const splits = new SplitsCacheInMemory(__splitFiltersValidation); const segments = new SegmentsCacheInMemory(); @@ -23,10 +23,10 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS splits, segments, impressions: new ImpressionsCacheInMemory(impressionsQueueSize), - impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined, + impressionCounts: new ImpressionCountsCacheInMemory(), events: new EventsCacheInMemory(eventsQueueSize), telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined, - uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemory() : undefined, + uniqueKeys: new UniqueKeysCacheInMemory(), destroy() { } }; diff --git a/src/storages/inMemory/InMemoryStorageCS.ts b/src/storages/inMemory/InMemoryStorageCS.ts index 051bd9d6..09d62ac8 100644 --- a/src/storages/inMemory/InMemoryStorageCS.ts +++ b/src/storages/inMemory/InMemoryStorageCS.ts @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory'; import { EventsCacheInMemory } from './EventsCacheInMemory'; import { IStorageSync, IStorageFactoryParams } from '../types'; import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory'; -import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants'; +import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants'; import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory'; import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS'; @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS'; * @param params - parameters required by EventsCacheSync */ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorageSync { - const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params; + const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize }, sync: { __splitFiltersValidation } } } = params; const splits = new SplitsCacheInMemory(__splitFiltersValidation); const segments = new MySegmentsCacheInMemory(); @@ -25,10 +25,10 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag segments, largeSegments, impressions: new ImpressionsCacheInMemory(impressionsQueueSize), - impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined, + impressionCounts: new ImpressionCountsCacheInMemory(), events: new EventsCacheInMemory(eventsQueueSize), telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined, - uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined, + uniqueKeys: new UniqueKeysCacheInMemoryCS(), destroy() { }, @@ -42,6 +42,7 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag impressionCounts: this.impressionCounts, events: this.events, telemetry: this.telemetry, + uniqueKeys: this.uniqueKeys, destroy() { } }; diff --git a/src/storages/inRedis/index.ts b/src/storages/inRedis/index.ts index 80474209..e548142d 100644 --- a/src/storages/inRedis/index.ts +++ b/src/storages/inRedis/index.ts @@ -6,7 +6,7 @@ import { SplitsCacheInRedis } from './SplitsCacheInRedis'; import { SegmentsCacheInRedis } from './SegmentsCacheInRedis'; import { ImpressionsCacheInRedis } from './ImpressionsCacheInRedis'; import { EventsCacheInRedis } from './EventsCacheInRedis'; -import { DEBUG, NONE, STORAGE_REDIS } from '../../utils/constants'; +import { STORAGE_REDIS } from '../../utils/constants'; import { TelemetryCacheInRedis } from './TelemetryCacheInRedis'; import { UniqueKeysCacheInRedis } from './UniqueKeysCacheInRedis'; import { ImpressionCountsCacheInRedis } from './ImpressionCountsCacheInRedis'; @@ -30,19 +30,19 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy const prefix = validatePrefix(options.prefix); function InRedisStorageFactory(params: IStorageFactoryParams): IStorageAsync { - const { onReadyCb, settings, settings: { log, sync: { impressionsMode } } } = params; + const { onReadyCb, settings, settings: { log } } = params; const metadata = metadataBuilder(settings); const keys = new KeyBuilderSS(prefix, metadata); const redisClient: RedisAdapter = new RD(log, options.options || {}); const telemetry = new TelemetryCacheInRedis(log, keys, redisClient); - const impressionCountsCache = impressionsMode !== DEBUG ? new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient) : undefined; - const uniqueKeysCache = impressionsMode === NONE ? new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient) : undefined; + const impressionCountsCache = new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient); + const uniqueKeysCache = new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient); // subscription to Redis connect event in order to emit SDK_READY event on consumer mode redisClient.on('connect', () => { onReadyCb(); - if (impressionCountsCache) impressionCountsCache.start(); - if (uniqueKeysCache) uniqueKeysCache.start(); + impressionCountsCache.start(); + uniqueKeysCache.start(); // Synchronize config telemetry.recordConfig(); @@ -60,10 +60,10 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy // When using REDIS we should: // 1- Disconnect from the storage destroy(): Promise { - let promises = []; - if (impressionCountsCache) promises.push(impressionCountsCache.stop()); - if (uniqueKeysCache) promises.push(uniqueKeysCache.stop()); - return Promise.all(promises).then(() => { redisClient.disconnect(); }); + return Promise.all([ + impressionCountsCache.stop(), + uniqueKeysCache.stop() + ]).then(() => { redisClient.disconnect(); }); // @TODO check that caches works as expected when redisClient is disconnected } }; diff --git a/src/storages/pluggable/index.ts b/src/storages/pluggable/index.ts index 372eeeb4..ee8b1872 100644 --- a/src/storages/pluggable/index.ts +++ b/src/storages/pluggable/index.ts @@ -8,7 +8,7 @@ import { EventsCachePluggable } from './EventsCachePluggable'; import { wrapperAdapter, METHODS_TO_PROMISE_WRAP } from './wrapperAdapter'; import { isObject } from '../../utils/lang'; import { getStorageHash, validatePrefix } from '../KeyBuilder'; -import { CONSUMER_PARTIAL_MODE, DEBUG, NONE, STORAGE_PLUGGABLE } from '../../utils/constants'; +import { CONSUMER_PARTIAL_MODE, STORAGE_PLUGGABLE } from '../../utils/constants'; import { ImpressionsCacheInMemory } from '../inMemory/ImpressionsCacheInMemory'; import { EventsCacheInMemory } from '../inMemory/EventsCacheInMemory'; import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory'; @@ -63,35 +63,31 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn const prefix = validatePrefix(options.prefix); function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync { - const { onReadyCb, settings, settings: { log, mode, sync: { impressionsMode }, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params; + const { onReadyCb, settings, settings: { log, mode, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params; const metadata = metadataBuilder(settings); const keys = new KeyBuilderSS(prefix, metadata); const wrapper = wrapperAdapter(log, options.wrapper); - const isSyncronizer = mode === undefined; // If mode is not defined, the synchronizer is running + const isSynchronizer = mode === undefined; // If mode is not defined, the synchronizer is running const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE; - const telemetry = shouldRecordTelemetry(params) || isSyncronizer ? + const telemetry = shouldRecordTelemetry(params) || isSynchronizer ? isPartialConsumer ? new TelemetryCacheInMemory() : new TelemetryCachePluggable(log, keys, wrapper) : undefined; - const impressionCountsCache = impressionsMode !== DEBUG || isSyncronizer ? - isPartialConsumer ? - new ImpressionCountsCacheInMemory() : - new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper) : - undefined; + const impressionCountsCache = isPartialConsumer ? + new ImpressionCountsCacheInMemory() : + new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper); - const uniqueKeysCache = impressionsMode === NONE || isSyncronizer ? - isPartialConsumer ? - settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() : - new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper) : - undefined; + const uniqueKeysCache = isPartialConsumer ? + settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() : + new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper); // Connects to wrapper and emits SDK_READY event on main client const connectPromise = wrapper.connect().then(() => { - if (isSyncronizer) { + if (isSynchronizer) { // In standalone or producer mode, clear storage if SDK key or feature flag filter has changed return wrapper.get(keys.buildHashKey()).then((hash) => { const currentHash = getStorageHash(settings); @@ -106,8 +102,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn }); } else { // Start periodic flush of async storages if not running synchronizer (producer mode) - if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start(); - if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start(); + if ((impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start(); + if ((uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start(); if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig(); onReadyCb(); @@ -129,9 +125,9 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn // Stop periodic flush and disconnect the underlying storage destroy() { - return Promise.all(isSyncronizer ? [] : [ - impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(), - uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(), + return Promise.all(isSynchronizer ? [] : [ + (impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(), + (uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(), ]).then(() => wrapper.disconnect()); }, diff --git a/src/storages/types.ts b/src/storages/types.ts index 5d7b40b6..638c4606 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -439,10 +439,10 @@ export interface IStorageBase< splits: TSplitsCache, segments: TSegmentsCache, impressions: TImpressionsCache, - impressionCounts?: TImpressionsCountCache, + impressionCounts: TImpressionsCountCache, events: TEventsCache, telemetry?: TTelemetryCache, - uniqueKeys?: TUniqueKeysCache, + uniqueKeys: TUniqueKeysCache, destroy(): void | Promise, shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this } diff --git a/src/sync/submitters/impressionCountsSubmitter.ts b/src/sync/submitters/impressionCountsSubmitter.ts index 48131021..fb02045c 100644 --- a/src/sync/submitters/impressionCountsSubmitter.ts +++ b/src/sync/submitters/impressionCountsSubmitter.ts @@ -39,8 +39,6 @@ export function impressionCountsSubmitterFactory(params: ISdkFactoryContextSync) storage: { impressionCounts } } = params; - if (impressionCounts) { - // retry impressions counts only once. - return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1); - } + // retry impressions counts only once. + return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1); } diff --git a/src/sync/submitters/submitterManager.ts b/src/sync/submitters/submitterManager.ts index 9313048c..bcff5f1b 100644 --- a/src/sync/submitters/submitterManager.ts +++ b/src/sync/submitters/submitterManager.ts @@ -10,13 +10,12 @@ export function submitterManagerFactory(params: ISdkFactoryContextSync): ISubmit const submitters = [ impressionsSubmitterFactory(params), - eventsSubmitterFactory(params) + eventsSubmitterFactory(params), + impressionCountsSubmitterFactory(params), + uniqueKeysSubmitterFactory(params) ]; - const impressionCountsSubmitter = impressionCountsSubmitterFactory(params); - if (impressionCountsSubmitter) submitters.push(impressionCountsSubmitter); const telemetrySubmitter = telemetrySubmitterFactory(params); - if (params.storage.uniqueKeys) submitters.push(uniqueKeysSubmitterFactory(params)); return { // `onlyTelemetry` true if SDK is created with userConsent not GRANTED diff --git a/src/sync/submitters/uniqueKeysSubmitter.ts b/src/sync/submitters/uniqueKeysSubmitter.ts index f51c90b8..f10a1aca 100644 --- a/src/sync/submitters/uniqueKeysSubmitter.ts +++ b/src/sync/submitters/uniqueKeysSubmitter.ts @@ -19,10 +19,10 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) { const isClientSide = key !== undefined; const postUniqueKeysBulk = isClientSide ? postUniqueKeysBulkCs : postUniqueKeysBulkSs; - const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys!, UNIQUE_KEYS_RATE, DATA_NAME); + const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys, UNIQUE_KEYS_RATE, DATA_NAME); // register unique keys submitter to be executed when uniqueKeys cache is full - uniqueKeys!.setOnFullQueueCb(() => { + uniqueKeys.setOnFullQueueCb(() => { if (syncTask.isRunning()) { log.info(SUBMITTERS_PUSH_FULL_QUEUE, [DATA_NAME]); syncTask.execute(); @@ -33,4 +33,3 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) { return syncTask; } - From ac5659a8647c4c26610a9dbcfe4d3916d6e0fccb Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Wed, 11 Dec 2024 15:16:06 -0300 Subject: [PATCH 2/2] Polishing --- src/storages/inMemory/InMemoryStorage.ts | 4 ++-- src/storages/inMemory/InMemoryStorageCS.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storages/inMemory/InMemoryStorage.ts b/src/storages/inMemory/InMemoryStorage.ts index b8833370..7ec099d1 100644 --- a/src/storages/inMemory/InMemoryStorage.ts +++ b/src/storages/inMemory/InMemoryStorage.ts @@ -37,8 +37,8 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS const noopTrack = () => true; storage.impressions.track = noopTrack; storage.events.track = noopTrack; - if (storage.impressionCounts) storage.impressionCounts.track = noopTrack; - if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack; + storage.impressionCounts.track = noopTrack; + storage.uniqueKeys.track = noopTrack; } return storage; diff --git a/src/storages/inMemory/InMemoryStorageCS.ts b/src/storages/inMemory/InMemoryStorageCS.ts index 09d62ac8..bfaec159 100644 --- a/src/storages/inMemory/InMemoryStorageCS.ts +++ b/src/storages/inMemory/InMemoryStorageCS.ts @@ -55,8 +55,8 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag const noopTrack = () => true; storage.impressions.track = noopTrack; storage.events.track = noopTrack; - if (storage.impressionCounts) storage.impressionCounts.track = noopTrack; - if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack; + storage.impressionCounts.track = noopTrack; + storage.uniqueKeys.track = noopTrack; } return storage;