Skip to content

Added RBS support for SSE classes #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: rbs-consumer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
flagSetsFilter);
ruleBasedSegmentParser, flagSetsFilter, ruleBasedSegmentCache);
_syncManager.start();

// DestroyOnShutDown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
//No-Op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void stopPeriodicFetching() {
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {
FetchResult fetchResult = _splitFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
if (fetchResult.isSuccess()){
_log.debug("Refresh feature flags completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
Expand All @@ -17,6 +18,7 @@
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
Expand Down Expand Up @@ -79,9 +81,11 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer,
FlagSetsFilter flagSetsFilter) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
telemetryRuntimeProducer, flagSetsFilter);
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache,
RuleBasedSegmentParser ruleBasedSegmentParser) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.RuleBasedSegmentCache;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
Expand Down Expand Up @@ -89,12 +91,15 @@ public static SyncManagerImp build(SplitTasks splitTasks,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitParser splitParser,
FlagSetsFilter flagSetsFilter) {
RuleBasedSegmentParser ruleBasedSegmentParser,
FlagSetsFilter flagSetsFilter,
RuleBasedSegmentCache ruleBasedSegmentCache) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
splitCacheProducer,
segmentCacheProducer,
ruleBasedSegmentCache,
config.streamingRetryDelay(),
config.streamingFetchMaxRetries(),
config.failedAttemptsBeforeLogging(),
Expand All @@ -109,7 +114,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
flagSetsFilter);
flagSetsFilter,
ruleBasedSegmentCache,
ruleBasedSegmentParser);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.RuleBasedSegmentCacheProducer;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
Expand All @@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
private final SplitFetcher _splitFetcher;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final SplitCacheProducer _splitCacheProducer;
private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer;
private final SegmentCacheProducer segmentCacheProducer;
private final ImpressionsManager _impressionManager;
private final EventsTask _eventsTask;
Expand All @@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
SplitFetcher splitFetcher,
SplitCacheProducer splitCacheProducer,
SegmentCacheProducer segmentCacheProducer,
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer,
int onDemandFetchRetryDelayMs,
int onDemandFetchMaxRetries,
int failedAttemptsBeforeLogging,
Expand All @@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
_splitFetcher = checkNotNull(splitFetcher);
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
_splitCacheProducer = checkNotNull(splitCacheProducer);
_ruleBasedSegmentCacheProducer = checkNotNull(ruleBasedSegmentCacheProducer);
this.segmentCacheProducer = checkNotNull(segmentCacheProducer);
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
Expand Down Expand Up @@ -103,7 +107,7 @@ private static class SyncResult {
private final FetchResult _fetchResult;
}

private SyncResult attemptSplitsSync(long targetChangeNumber,
private SyncResult attemptSplitsSync(long targetChangeNumber, long ruleBasedSegmentChangeNumber,
FetchOptions opts,
Function<Void, Long> nextWaitMs,
int maxRetries) {
Expand All @@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
return new SyncResult(false, remainingAttempts, fetchResult);
}
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber())) {
return new SyncResult(true, remainingAttempts, fetchResult);
} else if (remainingAttempts <= 0) {
return new SyncResult(false, remainingAttempts, fetchResult);
Expand All @@ -130,9 +135,11 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
}

@Override
public void refreshSplits(Long targetChangeNumber) {
public void refreshSplits(Long targetChangeNumber, Long ruleBasedSegmentChangeNumber) {

if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
if ((targetChangeNumber != 0 && targetChangeNumber <= _splitCacheProducer.getChangeNumber()) ||
(ruleBasedSegmentChangeNumber != 0 && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer.getChangeNumber()) ||
(ruleBasedSegmentChangeNumber == 0 && targetChangeNumber == 0)) {
return;
}

Expand All @@ -142,7 +149,7 @@ public void refreshSplits(Long targetChangeNumber) {
.flagSetsFilter(_sets)
.build();

SyncResult regularResult = attemptSplitsSync(targetChangeNumber, opts,
SyncResult regularResult = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, opts,
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);

int attempts = _onDemandFetchMaxRetries - regularResult.remainingAttempts();
Expand All @@ -157,7 +164,7 @@ public void refreshSplits(Long targetChangeNumber) {
_log.info(String.format("No changes fetched after %s attempts. Will retry bypassing CDN.", attempts));
FetchOptions withCdnBypass = new FetchOptions.Builder(opts).targetChangeNumber(targetChangeNumber).build();
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, withCdnBypass,
SyncResult withCDNBypassed = attemptSplitsSync(targetChangeNumber, ruleBasedSegmentChangeNumber, withCdnBypass,
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);

int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed._remainingAttempts;
Expand All @@ -175,7 +182,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(),
splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber(), 0L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import io.split.engine.matchers.AttributeMatcher;
import io.split.engine.matchers.RuleBasedSegmentMatcher;
import io.split.engine.matchers.UserDefinedSegmentMatcher;

import java.util.HashSet;
Expand Down Expand Up @@ -243,6 +244,15 @@ public Set<String> getSegmentsNames() {
.collect(Collectors.toSet());
}

public Set<String> getRuleBasedSegmentsNames() {
return parsedConditions().stream()
.flatMap(parsedCondition -> parsedCondition.matcher().attributeMatchers().stream())
.filter(ParsedSplit::isRuleBasedSegmentMatcher)
.map(ParsedSplit::asRuleBasedSegmentMatcherForEach)
.map(RuleBasedSegmentMatcher::getSegmentName)
.collect(Collectors.toSet());
}

private static boolean isSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof UserDefinedSegmentMatcher;
}
Expand All @@ -251,4 +261,11 @@ private static UserDefinedSegmentMatcher asSegmentMatcherForEach(AttributeMatche
return (UserDefinedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}

private static boolean isRuleBasedSegmentMatcher(AttributeMatcher attributeMatcher) {
return ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate() instanceof RuleBasedSegmentMatcher;
}

private static RuleBasedSegmentMatcher asRuleBasedSegmentMatcherForEach(AttributeMatcher attributeMatcher) {
return (RuleBasedSegmentMatcher) ((AttributeMatcher.NegatableMatcher) attributeMatcher.matcher()).delegate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.split.engine.matchers.LessThanOrEqualToSemverMatcher;
import io.split.engine.matchers.InListSemverMatcher;
import io.split.engine.matchers.BetweenSemverMatcher;
import io.split.engine.matchers.RuleBasedSegmentMatcher;
import io.split.engine.matchers.collections.ContainsAllOfSetMatcher;
import io.split.engine.matchers.collections.ContainsAnyOfSetMatcher;
import io.split.engine.matchers.collections.EqualToSetMatcher;
Expand Down Expand Up @@ -183,6 +184,11 @@ public static AttributeMatcher toMatcher(Matcher matcher) {
checkNotNull(matcher.betweenStringMatcherData, "betweenStringMatcherData is required for BETWEEN_SEMVER matcher type");
delegate = new BetweenSemverMatcher(matcher.betweenStringMatcherData.start, matcher.betweenStringMatcherData.end);
break;
case IN_RULE_BASED_SEGMENT:
checkNotNull(matcher.userDefinedSegmentMatcherData);
String ruleBasedSegmentName = matcher.userDefinedSegmentMatcherData.segmentName;
delegate = new RuleBasedSegmentMatcher(ruleBasedSegmentName);
break;
default:
throw new IllegalArgumentException("Unknown matcher type: " + matcher.matcherType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.split.engine.sse.dtos.RawMessageNotification;
import io.split.engine.sse.dtos.SegmentChangeNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
import io.split.engine.sse.exceptions.EventParsingException;

public class NotificationParserImp implements NotificationParser {
Expand Down Expand Up @@ -48,6 +49,8 @@ private IncomingNotification parseNotification(GenericNotificationData genericNo
switch (genericNotificationData.getType()) {
case SPLIT_UPDATE:
return new FeatureFlagChangeNotification(genericNotificationData);
case RB_SEGMENT_UPDATE:
return new RuleBasedSegmentChangeNotification(genericNotificationData);
case SPLIT_KILL:
return new SplitKillNotification(genericNotificationData);
case SEGMENT_UPDATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;

public interface NotificationProcessor {
void process(IncomingNotification notification);
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification);
void processSplitKill(SplitKillNotification splitKillNotification);
void processSegmentUpdate(long changeNumber, String segmentName);
void processStatus(StatusNotification statusNotification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.dtos.RuleBasedSegmentChangeNotification;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;

Expand Down Expand Up @@ -41,6 +42,11 @@ public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNo
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
}

@Override
public void processRuleBasedSegmentUpdate(RuleBasedSegmentChangeNotification ruleBasedSegmentChangeNotification) {
_featureFlagsWorker.addToQueue(ruleBasedSegmentChangeNotification);
}

@Override
public void processSplitKill(SplitKillNotification splitKillNotification) {
_featureFlagsWorker.kill(splitKillNotification);
Expand Down
Loading