Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Impressions toggle] Storage refactors #373

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/listeners/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { IResponse, ISplitApi } from '../services/types';
import { ISettings } from '../types';
import SplitIO from '../../types/splitio';
import { ImpressionsPayload } from '../sync/submitters/types';
import { OPTIMIZED, DEBUG, NONE } from '../utils/constants';
import { objectAssign } from '../utils/lang/objectAssign';
import { CLEANUP_REGISTERING, CLEANUP_DEREGISTERING } from '../logger/constants';
import { ISyncManager } from '../sync/types';
Expand Down Expand Up @@ -78,10 +77,9 @@ export class BrowserSignalListener implements ISignalListener {

// Flush impressions & events data if there is user consent
if (isConsentGranted(this.settings)) {
const sim = this.settings.sync.impressionsMode;
const extraMetadata = {
// sim stands for Sync/Split Impressions Mode
sim: sim === OPTIMIZED ? OPTIMIZED : sim === DEBUG ? DEBUG : NONE
sim: this.settings.sync.impressionsMode
};

this._flushData(events + '/testImpressions/beacon', this.storage.impressions, this.serviceApi.postTestImpressionsBulk, this.fromImpressionsCollector, extraMetadata);
Expand Down
9 changes: 5 additions & 4 deletions src/storages/inLocalStorage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { MySegmentsCacheInLocal } from './MySegmentsCacheInLocal';
import { DEFAULT_CACHE_EXPIRATION_IN_MILLIS } from '../../utils/constants/browser';
import { InMemoryStorageCSFactory } from '../inMemory/InMemoryStorageCS';
import { LOG_PREFIX } from './constants';
import { DEBUG, NONE, STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from '../inMemory/TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from '../inMemory/UniqueKeysCacheInMemoryCS';
import { getMatching } from '../../utils/key';
Expand All @@ -34,7 +34,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
return InMemoryStorageCSFactory(params);
}

const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode } } } = params;
const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const matchingKey = getMatching(settings.core.key);
const keys = new KeyBuilderCS(prefix, matchingKey);
const expirationTimestamp = Date.now() - DEFAULT_CACHE_EXPIRATION_IN_MILLIS;
Expand All @@ -48,10 +48,10 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
segments,
largeSegments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined,
uniqueKeys: new UniqueKeysCacheInMemoryCS(),

destroy() { },

Expand All @@ -66,6 +66,7 @@ export function InLocalStorage(options: InLocalStorageOptions = {}): IStorageSyn
impressionCounts: this.impressionCounts,
events: this.events,
telemetry: this.telemetry,
uniqueKeys: this.uniqueKeys,

destroy() { }
};
Expand Down
12 changes: 6 additions & 6 deletions src/storages/inMemory/InMemoryStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory';
import { EventsCacheInMemory } from './EventsCacheInMemory';
import { IStorageFactoryParams, IStorageSync } from '../types';
import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory';
import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants';
import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory';
import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory';

Expand All @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemory } from './UniqueKeysCacheInMemory';
* @param params - parameters required by EventsCacheSync
*/
export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageSync {
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { __splitFiltersValidation } } } = params;

const splits = new SplitsCacheInMemory(__splitFiltersValidation);
const segments = new SegmentsCacheInMemory();
Expand All @@ -23,10 +23,10 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS
splits,
segments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemory() : undefined,
uniqueKeys: new UniqueKeysCacheInMemory(),

destroy() { }
};
Expand All @@ -37,8 +37,8 @@ export function InMemoryStorageFactory(params: IStorageFactoryParams): IStorageS
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
storage.impressionCounts.track = noopTrack;
storage.uniqueKeys.track = noopTrack;
}

return storage;
Expand Down
13 changes: 7 additions & 6 deletions src/storages/inMemory/InMemoryStorageCS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ImpressionsCacheInMemory } from './ImpressionsCacheInMemory';
import { EventsCacheInMemory } from './EventsCacheInMemory';
import { IStorageSync, IStorageFactoryParams } from '../types';
import { ImpressionCountsCacheInMemory } from './ImpressionCountsCacheInMemory';
import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_MEMORY } from '../../utils/constants';
import { LOCALHOST_MODE, STORAGE_MEMORY } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from './TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS';

Expand All @@ -14,7 +14,7 @@ import { UniqueKeysCacheInMemoryCS } from './UniqueKeysCacheInMemoryCS';
* @param params - parameters required by EventsCacheSync
*/
export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorageSync {
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const { settings: { scheduler: { impressionsQueueSize, eventsQueueSize }, sync: { __splitFiltersValidation } } } = params;

const splits = new SplitsCacheInMemory(__splitFiltersValidation);
const segments = new MySegmentsCacheInMemory();
Expand All @@ -25,10 +25,10 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
segments,
largeSegments,
impressions: new ImpressionsCacheInMemory(impressionsQueueSize),
impressionCounts: impressionsMode !== DEBUG ? new ImpressionCountsCacheInMemory() : undefined,
impressionCounts: new ImpressionCountsCacheInMemory(),
events: new EventsCacheInMemory(eventsQueueSize),
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined,
uniqueKeys: new UniqueKeysCacheInMemoryCS(),

destroy() { },

Expand All @@ -42,6 +42,7 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
impressionCounts: this.impressionCounts,
events: this.events,
telemetry: this.telemetry,
uniqueKeys: this.uniqueKeys,

destroy() { }
};
Expand All @@ -54,8 +55,8 @@ export function InMemoryStorageCSFactory(params: IStorageFactoryParams): IStorag
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
storage.impressionCounts.track = noopTrack;
storage.uniqueKeys.track = noopTrack;
}

return storage;
Expand Down
20 changes: 10 additions & 10 deletions src/storages/inRedis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { SplitsCacheInRedis } from './SplitsCacheInRedis';
import { SegmentsCacheInRedis } from './SegmentsCacheInRedis';
import { ImpressionsCacheInRedis } from './ImpressionsCacheInRedis';
import { EventsCacheInRedis } from './EventsCacheInRedis';
import { DEBUG, NONE, STORAGE_REDIS } from '../../utils/constants';
import { STORAGE_REDIS } from '../../utils/constants';
import { TelemetryCacheInRedis } from './TelemetryCacheInRedis';
import { UniqueKeysCacheInRedis } from './UniqueKeysCacheInRedis';
import { ImpressionCountsCacheInRedis } from './ImpressionCountsCacheInRedis';
Expand All @@ -30,19 +30,19 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy
const prefix = validatePrefix(options.prefix);

function InRedisStorageFactory(params: IStorageFactoryParams): IStorageAsync {
const { onReadyCb, settings, settings: { log, sync: { impressionsMode } } } = params;
const { onReadyCb, settings, settings: { log } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const redisClient: RedisAdapter = new RD(log, options.options || {});
const telemetry = new TelemetryCacheInRedis(log, keys, redisClient);
const impressionCountsCache = impressionsMode !== DEBUG ? new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient) : undefined;
const uniqueKeysCache = impressionsMode === NONE ? new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient) : undefined;
const impressionCountsCache = new ImpressionCountsCacheInRedis(log, keys.buildImpressionsCountKey(), redisClient);
const uniqueKeysCache = new UniqueKeysCacheInRedis(log, keys.buildUniqueKeysKey(), redisClient);

// subscription to Redis connect event in order to emit SDK_READY event on consumer mode
redisClient.on('connect', () => {
onReadyCb();
if (impressionCountsCache) impressionCountsCache.start();
if (uniqueKeysCache) uniqueKeysCache.start();
impressionCountsCache.start();
uniqueKeysCache.start();

// Synchronize config
telemetry.recordConfig();
Expand All @@ -60,10 +60,10 @@ export function InRedisStorage(options: InRedisStorageOptions = {}): IStorageAsy
// When using REDIS we should:
// 1- Disconnect from the storage
destroy(): Promise<void> {
let promises = [];
if (impressionCountsCache) promises.push(impressionCountsCache.stop());
if (uniqueKeysCache) promises.push(uniqueKeysCache.stop());
return Promise.all(promises).then(() => { redisClient.disconnect(); });
return Promise.all([
impressionCountsCache.stop(),
uniqueKeysCache.stop()
]).then(() => { redisClient.disconnect(); });
// @TODO check that caches works as expected when redisClient is disconnected
}
};
Expand Down
36 changes: 16 additions & 20 deletions src/storages/pluggable/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { EventsCachePluggable } from './EventsCachePluggable';
import { wrapperAdapter, METHODS_TO_PROMISE_WRAP } from './wrapperAdapter';
import { isObject } from '../../utils/lang';
import { getStorageHash, validatePrefix } from '../KeyBuilder';
import { CONSUMER_PARTIAL_MODE, DEBUG, NONE, STORAGE_PLUGGABLE } from '../../utils/constants';
import { CONSUMER_PARTIAL_MODE, STORAGE_PLUGGABLE } from '../../utils/constants';
import { ImpressionsCacheInMemory } from '../inMemory/ImpressionsCacheInMemory';
import { EventsCacheInMemory } from '../inMemory/EventsCacheInMemory';
import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory';
Expand Down Expand Up @@ -63,35 +63,31 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
const prefix = validatePrefix(options.prefix);

function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync {
const { onReadyCb, settings, settings: { log, mode, sync: { impressionsMode }, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const { onReadyCb, settings, settings: { log, mode, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const wrapper = wrapperAdapter(log, options.wrapper);

const isSyncronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isSynchronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE;

const telemetry = shouldRecordTelemetry(params) || isSyncronizer ?
const telemetry = shouldRecordTelemetry(params) || isSynchronizer ?
isPartialConsumer ?
new TelemetryCacheInMemory() :
new TelemetryCachePluggable(log, keys, wrapper) :
undefined;

const impressionCountsCache = impressionsMode !== DEBUG || isSyncronizer ?
isPartialConsumer ?
new ImpressionCountsCacheInMemory() :
new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper) :
undefined;
const impressionCountsCache = isPartialConsumer ?
new ImpressionCountsCacheInMemory() :
new ImpressionCountsCachePluggable(log, keys.buildImpressionsCountKey(), wrapper);

const uniqueKeysCache = impressionsMode === NONE || isSyncronizer ?
isPartialConsumer ?
settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() :
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper) :
undefined;
const uniqueKeysCache = isPartialConsumer ?
settings.core.key === undefined ? new UniqueKeysCacheInMemory() : new UniqueKeysCacheInMemoryCS() :
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper);

// Connects to wrapper and emits SDK_READY event on main client
const connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
if (isSynchronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
Expand All @@ -106,8 +102,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if ((impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if ((uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
Expand All @@ -129,9 +125,9 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn

// Stop periodic flush and disconnect the underlying storage
destroy() {
return Promise.all(isSyncronizer ? [] : [
impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(),
uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(),
return Promise.all(isSynchronizer ? [] : [
(impressionCountsCache as ImpressionCountsCachePluggable).stop && (impressionCountsCache as ImpressionCountsCachePluggable).stop(),
(uniqueKeysCache as UniqueKeysCachePluggable).stop && (uniqueKeysCache as UniqueKeysCachePluggable).stop(),
]).then(() => wrapper.disconnect());
},

Expand Down
4 changes: 2 additions & 2 deletions src/storages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,10 @@ export interface IStorageBase<
splits: TSplitsCache,
segments: TSegmentsCache,
impressions: TImpressionsCache,
impressionCounts?: TImpressionsCountCache,
impressionCounts: TImpressionsCountCache,
events: TEventsCache,
telemetry?: TTelemetryCache,
uniqueKeys?: TUniqueKeysCache,
uniqueKeys: TUniqueKeysCache,
destroy(): void | Promise<void>,
shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this
}
Expand Down
6 changes: 2 additions & 4 deletions src/sync/submitters/impressionCountsSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ export function impressionCountsSubmitterFactory(params: ISdkFactoryContextSync)
storage: { impressionCounts }
} = params;

if (impressionCounts) {
// retry impressions counts only once.
return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1);
}
// retry impressions counts only once.
return submitterFactory(log, postTestImpressionsCount, impressionCounts, IMPRESSIONS_COUNT_RATE, 'impression counts', fromImpressionCountsCollector, 1);
}
7 changes: 3 additions & 4 deletions src/sync/submitters/submitterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ export function submitterManagerFactory(params: ISdkFactoryContextSync): ISubmit

const submitters = [
impressionsSubmitterFactory(params),
eventsSubmitterFactory(params)
eventsSubmitterFactory(params),
impressionCountsSubmitterFactory(params),
uniqueKeysSubmitterFactory(params)
];

const impressionCountsSubmitter = impressionCountsSubmitterFactory(params);
if (impressionCountsSubmitter) submitters.push(impressionCountsSubmitter);
const telemetrySubmitter = telemetrySubmitterFactory(params);
if (params.storage.uniqueKeys) submitters.push(uniqueKeysSubmitterFactory(params));

return {
// `onlyTelemetry` true if SDK is created with userConsent not GRANTED
Expand Down
5 changes: 2 additions & 3 deletions src/sync/submitters/uniqueKeysSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) {
const isClientSide = key !== undefined;
const postUniqueKeysBulk = isClientSide ? postUniqueKeysBulkCs : postUniqueKeysBulkSs;

const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys!, UNIQUE_KEYS_RATE, DATA_NAME);
const syncTask = submitterFactory(log, postUniqueKeysBulk, uniqueKeys, UNIQUE_KEYS_RATE, DATA_NAME);

// register unique keys submitter to be executed when uniqueKeys cache is full
uniqueKeys!.setOnFullQueueCb(() => {
uniqueKeys.setOnFullQueueCb(() => {
if (syncTask.isRunning()) {
log.info(SUBMITTERS_PUSH_FULL_QUEUE, [DATA_NAME]);
syncTask.execute();
Expand All @@ -33,4 +33,3 @@ export function uniqueKeysSubmitterFactory(params: ISdkFactoryContextSync) {

return syncTask;
}

Loading