Skip to content

Commit

Permalink
Merge pull request #373 from splitio/impressions_toggle_storage_refac…
Browse files Browse the repository at this point in the history
…tors

[Impressions toggle] Storage refactors
  • Loading branch information
EmilianoSanchez authored Dec 12, 2024
2 parents e462bba + a771764 commit 5255f06
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 62 deletions.
4 changes: 1 addition & 3 deletions src/listeners/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { IResponse, ISplitApi } from '../services/types';
import { ISettings } from '../types';
import SplitIO from '../../types/splitio';
import { ImpressionsPayload } from '../sync/submitters/types';
import { OPTIMIZED, DEBUG, NONE } from '../utils/constants';
import { objectAssign } from '../utils/lang/objectAssign';
import { CLEANUP_REGISTERING, CLEANUP_DEREGISTERING } from '../logger/constants';
import { ISyncManager } from '../sync/types';
Expand Down Expand Up @@ -78,10 +77,9 @@ export class BrowserSignalListener implements ISignalListener {

// Flush impressions & events data if there is user consent
if (isConsentGranted(this.settings)) {
const sim = this.settings.sync.impressionsMode;
const extraMetadata = {
// sim stands for Sync/Split Impressions Mode
sim: sim === OPTIMIZED ? OPTIMIZED : sim === DEBUG ? DEBUG : NONE
sim: this.settings.sync.impressionsMode
};

this._flushData(events + '/testImpressions/beacon', this.storage.impressions, this.serviceApi.postTestImpressionsBulk, this.fromImpressionsCollector, extraMetadata);
Expand Down
9 changes: 5 additions & 4 deletions src/storages/inLocalStorage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { MySegmentsCacheInLocal } from './MySegmentsCacheInLocal';
import { DEFAULT_CACHE_EXPIRATION_IN_MILLIS } from '../../utils/constants/browser';
import { InMemoryStorageCSFactory } from '../inMemory/InMemoryStorageCS';
import { LOG_PREFIX } from './constants';
import { DEBUG, NONE, STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from '../inMemory/TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from '../inMemory/UniqueKeysCacheInMemoryCS';
import { getMatching } from '../../utils/key';
Expand All @@ -34,7 +34,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
return InMemoryStorageCSFactory(params);
}

const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode } } } = params;
const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const matchingKey = getMatching(settings.core.key);
const keys = new KeyBuilderCS(prefix, matchingKey);
const expirationTimestamp = Date.now() - DEFAULT_CACHE_EXPIRATION_IN_MILLIS;
Expand All @@ -48,10 +48,10 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
segments,
largeSegments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined,
uniqueKeys: new UniqueKeysCacheInMemoryCS(),

destroy() { },

Expand All @@ -66,6 +66,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
impressionCounts: this.impressionCounts,
events: this.events,
telemetry: this.telemetry,
uniqueKeys: this.uniqueKeys,

destroy() { }
};
Expand Down
12 changes: 6 additions & 6 deletions src/storages/inMemory/InMemoryStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory';
import { EventsCacheInMemory } from './EventsCacheInMemory';
import { IStorageFactoryParams, IStorageSync } from '../types';
import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory';
import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants';
import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory';
import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory';

Expand All @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory';
* @param params - parameters required by EventsCacheSync
*/
export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageSync {
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { __splitFiltersValidation } } } = params;

const splits = new SplitsCacheInMemory(__splitFiltersValidation);
const segments = new SegmentsCacheInMemory();
Expand All @@ -23,10 +23,10 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS
splits,
segments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemory() : undefined,
uniqueKeys: new UniqueKeysCacheInMemory(),

destroy() { }
};
Expand All @@ -37,8 +37,8 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
storage.impressionCounts.track = noopTrack;
storage.uniqueKeys.track = noopTrack;
}

return storage;
Expand Down
13 changes: 7 additions & 6 deletions src/storages/inMemory/InMemoryStorageCS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory';
import { EventsCacheInMemory } from './EventsCacheInMemory';
import { IStorageSync, IStorageFactoryParams } from '../types';
import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory';
import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants';
import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS';

Expand All @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS';
* @param params - parameters required by EventsCacheSync
*/
export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorageSync {
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize }, sync: { __splitFiltersValidation } } } = params;

const splits = new SplitsCacheInMemory(__splitFiltersValidation);
const segments = new MySegmentsCacheInMemory();
Expand All @@ -25,10 +25,10 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
segments,
largeSegments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined,
uniqueKeys: new UniqueKeysCacheInMemoryCS(),

destroy() { },

Expand All @@ -42,6 +42,7 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
impressionCounts: this.impressionCounts,
events: this.events,
telemetry: this.telemetry,
uniqueKeys: this.uniqueKeys,

destroy() { }
};
Expand All @@ -54,8 +55,8 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
storage.impressionCounts.track = noopTrack;
storage.uniqueKeys.track = noopTrack;
}

return storage;
Expand Down
20 changes: 10 additions & 10 deletions src/storages/inRedis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { SplitsCacheInRedis } from './SplitsCacheInRedis';
import { SegmentsCacheInRedis } from './SegmentsCacheInRedis';
import { ImpressionsCacheInRedis } from './ImpressionsCacheInRedis';
import { EventsCacheInRedis } from './EventsCacheInRedis';
import { DEBUG, NONE, STORAGE_REDIS } from '../../utils/constants';
import { STORAGE_REDIS } from '../../utils/constants';
import { TelemetryCacheInRedis } from './TelemetryCacheInRedis';
import { UniqueKeysCacheInRedis } from './UniqueKeysCacheInRedis';
import { ImpressionCountsCacheInRedis } from './ImpressionCountsCacheInRedis';
Expand All @@ -30,19 +30,19 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy
const prefix = validatePrefix(options.prefix);

function InRedisStorageFactory(params: IStorageFactoryParams): IStorageAsync {
const { onReadyCb, settings, settings: { log, sync: { impressionsMode } } } = params;
const { onReadyCb, settings, settings: { log } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const redisClient: RedisAdapter = new RD(log, options.options || {});
const telemetry = new TelemetryCacheInRedis(log, keys, redisClient);
const impressionCountsCache = impressionsMode !== DEBUG ? new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient) : undefined;
const uniqueKeysCache = impressionsMode === NONE ? new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient) : undefined;
const impressionCountsCache = new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient);
const uniqueKeysCache = new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient);

// subscription to Redis connect event in order to emit SDK_READY event on consumer mode
redisClient.on('connect', () => {
onReadyCb();
if (impressionCountsCache) impressionCountsCache.start();
if (uniqueKeysCache) uniqueKeysCache.start();
impressionCountsCache.start();
uniqueKeysCache.start();

// Synchronize config
telemetry.recordConfig();
Expand All @@ -60,10 +60,10 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy
// When using REDIS we should:
// 1- Disconnect from the storage
destroy(): Promise<void> {
let promises = [];
if (impressionCountsCache) promises.push(impressionCountsCache.stop());
if (uniqueKeysCache) promises.push(uniqueKeysCache.stop());
return Promise.all(promises).then(() => { redisClient.disconnect(); });
return Promise.all([
impressionCountsCache.stop(),
uniqueKeysCache.stop()
]).then(() => { redisClient.disconnect(); });
// @TODO check that caches works as expected when redisClient is disconnected
}
};
Expand Down
36 changes: 16 additions & 20 deletions src/storages/pluggable/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { EventsCachePluggable } from './EventsCachePluggable';
import { wrapperAdapter, METHODS_TO_PROMISE_WRAP } from './wrapperAdapter';
import { isObject } from '../../utils/lang';
import { getStorageHash, validatePrefix } from '../KeyBuilder';
import { CONSUMER_PARTIAL_MODE, DEBUG, NONE, STORAGE_PLUGGABLE } from '../../utils/constants';
import { CONSUMER_PARTIAL_MODE, STORAGE_PLUGGABLE } from '../../utils/constants';
import { ImpressionsCacheInMemory } from '../inMemory/ImpressionsCacheInMemory';
import { EventsCacheInMemory } from '../inMemory/EventsCacheInMemory';
import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory';
Expand Down Expand Up @@ -63,35 +63,31 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
const prefix = validatePrefix(options.prefix);

function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync {
const { onReadyCb, settings, settings: { log, mode, sync: { impressionsMode }, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const { onReadyCb, settings, settings: { log, mode, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const wrapper = wrapperAdapter(log, options.wrapper);

const isSyncronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isSynchronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE;

const telemetry = shouldRecordTelemetry(params) || isSyncronizer ?
const telemetry = shouldRecordTelemetry(params) || isSynchronizer ?
isPartialConsumer ?
new TelemetryCacheInMemory() :
new TelemetryCachePluggable(log, keys, wrapper) :
undefined;

const impressionCountsCache = impressionsMode !== DEBUG || isSyncronizer ?
isPartialConsumer ?
new ImpressionCountsCacheInMemory() :
new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper) :
undefined;
const impressionCountsCache = isPartialConsumer ?
new ImpressionCountsCacheInMemory() :
new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper);

const uniqueKeysCache = impressionsMode === NONE || isSyncronizer ?
isPartialConsumer ?
settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() :
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper) :
undefined;
const uniqueKeysCache = isPartialConsumer ?
settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() :
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper);

// Connects to wrapper and emits SDK_READY event on main client
const connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
if (isSynchronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
Expand All @@ -106,8 +102,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if ((impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if ((uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
Expand All @@ -129,9 +125,9 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn

// Stop periodic flush and disconnect the underlying storage
destroy() {
return Promise.all(isSyncronizer ? [] : [
impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(),
uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(),
return Promise.all(isSynchronizer ? [] : [
(impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(),
(uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(),
]).then(() => wrapper.disconnect());
},

Expand Down
4 changes: 2 additions & 2 deletions src/storages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,10 @@ export interface IStorageBase<
splits: TSplitsCache,
segments: TSegmentsCache,
impressions: TImpressionsCache,
impressionCounts?: TImpressionsCountCache,
impressionCounts: TImpressionsCountCache,
events: TEventsCache,
telemetry?: TTelemetryCache,
uniqueKeys?: TUniqueKeysCache,
uniqueKeys: TUniqueKeysCache,
destroy(): void | Promise<void>,
shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this
}
Expand Down
6 changes: 2 additions & 4 deletions src/sync/submitters/impressionCountsSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ export function impressionCountsSubmitterFactory(params: ISdkFactoryContextSync)
storage: { impressionCounts }
} = params;

if (impressionCounts) {
// retry impressions counts only once.
return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1);
}
// retry impressions counts only once.
return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1);
}
7 changes: 3 additions & 4 deletions src/sync/submitters/submitterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ export function submitterManagerFactory(params: ISdkFactoryContextSync): ISubmit

const submitters = [
impressionsSubmitterFactory(params),
eventsSubmitterFactory(params)
eventsSubmitterFactory(params),
impressionCountsSubmitterFactory(params),
uniqueKeysSubmitterFactory(params)
];

const impressionCountsSubmitter = impressionCountsSubmitterFactory(params);
if (impressionCountsSubmitter) submitters.push(impressionCountsSubmitter);
const telemetrySubmitter = telemetrySubmitterFactory(params);
if (params.storage.uniqueKeys) submitters.push(uniqueKeysSubmitterFactory(params));

return {
// `onlyTelemetry` true if SDK is created with userConsent not GRANTED
Expand Down
5 changes: 2 additions & 3 deletions src/sync/submitters/uniqueKeysSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) {
const isClientSide = key !== undefined;
const postUniqueKeysBulk = isClientSide ? postUniqueKeysBulkCs : postUniqueKeysBulkSs;

const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys!, UNIQUE_KEYS_RATE, DATA_NAME);
const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys, UNIQUE_KEYS_RATE, DATA_NAME);

// register unique keys submitter to be executed when uniqueKeys cache is full
uniqueKeys!.setOnFullQueueCb(() => {
uniqueKeys.setOnFullQueueCb(() => {
if (syncTask.isRunning()) {
log.info(SUBMITTERS_PUSH_FULL_QUEUE, [DATA_NAME]);
syncTask.execute();
Expand All @@ -33,4 +33,3 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) {

return syncTask;
}

0 comments on commit 5255f06

Please sign in to comment.