Skip to content

Commit

Permalink
IHollowUpdatePlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
mengqwang committed Aug 16, 2024
1 parent 51d3338 commit 424e4af
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HollowClientUpdater {

private volatile HollowDataHolder hollowDataHolderVolatile;

private final HollowUpdatePlanner planner;
private final IHollowUpdatePlanner planner;
private final CompletableFuture<Long> initialLoad;
private boolean forceDoubleSnapshot = false;
private final FailedTransitionTracker failedTransitionTracker;
Expand Down Expand Up @@ -73,7 +73,20 @@ public HollowClientUpdater(HollowConsumer.BlobRetriever transitionCreator,
HollowConsumer.ObjectLongevityDetector objectLongevityDetector,
HollowConsumerMetrics metrics,
HollowMetricsCollector<HollowConsumerMetrics> metricsCollector) {
this.planner = new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig);
this(refreshListeners, apiFactory, doubleSnapshotConfig, hashCodeFinder, memoryMode, objectLongevityConfig, objectLongevityDetector, metrics, metricsCollector, new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig));
}

public HollowClientUpdater(List<HollowConsumer.RefreshListener> refreshListeners,
HollowAPIFactory apiFactory,
HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig,
HollowObjectHashCodeFinder hashCodeFinder,
MemoryMode memoryMode,
HollowConsumer.ObjectLongevityConfig objectLongevityConfig,
HollowConsumer.ObjectLongevityDetector objectLongevityDetector,
HollowConsumerMetrics metrics,
HollowMetricsCollector<HollowConsumerMetrics> metricsCollector,
IHollowUpdatePlanner planner) {
this.planner = planner;
this.failedTransitionTracker = new FailedTransitionTracker();
this.staleReferenceDetector = new StaleHollowReferenceDetector(objectLongevityConfig, objectLongevityDetector);
// Create a copy of the listeners, removing any duplicates
Expand Down Expand Up @@ -117,7 +130,6 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable {
return updateTo(new HollowConsumer.VersionInfo(requestedVersion));
}
public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable {
metrics.setLastRefreshStartNs(System.nanoTime());
long requestedVersion = requestedVersionInfo.getVersion();
if (requestedVersion == getCurrentVersionId()) {
if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) {
Expand Down Expand Up @@ -146,9 +158,9 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion

try {
HollowUpdatePlan updatePlan = shouldCreateSnapshotPlan(requestedVersionInfo)
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());
? planner.planUpdate(HollowConstants.VERSION_NONE, requestedVersion, true)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());

for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
Expand Down Expand Up @@ -206,7 +218,6 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
metricsCollector.collect(metrics);

initialLoad.complete(getCurrentVersionId()); // only set the first time
metrics.setLastRefreshEndNs(System.nanoTime());
return getCurrentVersionId() == requestedVersion;
} catch(Throwable th) {
forceDoubleSnapshotNextUpdate();
Expand All @@ -218,14 +229,13 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion

// intentionally omitting a call to initialLoad.completeExceptionally(th), for producers
// that write often a consumer has a chance to try another snapshot that might succeed
metrics.setLastRefreshEndNs(System.nanoTime());

throw th;
}
}

public synchronized void addRefreshListener(HollowConsumer.RefreshListener refreshListener,
HollowConsumer c) {
HollowConsumer c) {
if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) {
if (!refreshListeners.contains(refreshListener)) {
((HollowConsumer.RefreshRegistrationListener)refreshListener).onBeforeAddition(c);
Expand All @@ -237,7 +247,7 @@ public synchronized void addRefreshListener(HollowConsumer.RefreshListener refre
}

public synchronized void removeRefreshListener(HollowConsumer.RefreshListener refreshListener,
HollowConsumer c) {
HollowConsumer c) {
if (refreshListeners.remove(refreshListener)) {
if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) {
((HollowConsumer.RefreshRegistrationListener)refreshListener).onAfterRemoval(c);
Expand All @@ -248,7 +258,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re
public long getCurrentVersionId() {
HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile;
return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion()
: HollowConstants.VERSION_NONE;
: HollowConstants.VERSION_NONE;
}

public void forceDoubleSnapshotNextUpdate() {
Expand All @@ -260,14 +270,14 @@ public void forceDoubleSnapshotNextUpdate() {
*/
boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) {
if (getCurrentVersionId() == HollowConstants.VERSION_NONE
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
return true;
}

if (doubleSnapshotConfig.doubleSnapshotOnSchemaChange() == true) {
// double snapshot on schema change relies on presence of a header tag in incoming version metadata
if (incomingVersionInfo.getAnnouncementMetadata() == null
|| !incomingVersionInfo.getAnnouncementMetadata().isPresent()) {
|| !incomingVersionInfo.getAnnouncementMetadata().isPresent()) {
LOG.warning("Double snapshots on schema change are enabled and its functioning depends on " +
"visibility into incoming version's schema through metadata but NO metadata was available " +
"for version " + incomingVersionInfo.getVersion() + ". Check that the mechanism that triggered " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* The HollowUpdatePlanner defines the logic responsible for interacting with a {@link HollowBlobRetriever}
* to create a {@link HollowUpdatePlan}.
*/
public class HollowUpdatePlanner {
public class HollowUpdatePlanner implements IHollowUpdatePlanner {

private final HollowConsumer.BlobRetriever transitionCreator;
private final HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.netflix.hollow.api.client;

public interface IHollowUpdatePlanner {
public HollowUpdatePlan planUpdate(long currentVersion, long desiredVersion, boolean allowSnapshot) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.hollow.api.client.FailedTransitionTracker;
import com.netflix.hollow.api.client.HollowAPIFactory;
import com.netflix.hollow.api.client.HollowClientUpdater;
import com.netflix.hollow.api.client.IHollowUpdatePlanner;
import com.netflix.hollow.api.client.StaleHollowReferenceDetector;
import com.netflix.hollow.api.codegen.HollowAPIClassJavaGenerator;
import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever;
Expand Down Expand Up @@ -190,16 +191,30 @@ protected <B extends Builder<B>> HollowConsumer(B builder) {
// duplicated with HollowConsumer(...) constructor above. We cannot chain constructor calls because that
// constructor subscribes to the announcement watcher and we have more setup to do first
this.metrics = new HollowConsumerMetrics();
this.updater = new HollowClientUpdater(builder.blobRetriever,
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector);
if (builder.hollowUpdatePlanner != null) {
this.updater = new HollowClientUpdater(
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector,
builder.hollowUpdatePlanner);
} else {
this.updater = new HollowClientUpdater(builder.blobRetriever,
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector);
}
updater.setFilter(builder.typeFilter);
if(builder.skipTypeShardUpdateWithNoAdditions)
updater.setSkipShardUpdateWithNoAdditions(true);
Expand Down Expand Up @@ -1085,6 +1100,7 @@ public static class Builder<B extends HollowConsumer.Builder<B>> {
protected MemoryMode memoryMode = MemoryMode.ON_HEAP;
protected HollowMetricsCollector<HollowConsumerMetrics> metricsCollector;
protected boolean skipTypeShardUpdateWithNoAdditions = false;
protected IHollowUpdatePlanner hollowUpdatePlanner = null;

public B withBlobRetriever(HollowConsumer.BlobRetriever blobRetriever) {
this.blobRetriever = blobRetriever;
Expand Down Expand Up @@ -1156,6 +1172,11 @@ public B withRefreshListeners(HollowConsumer.RefreshListener... refreshListeners
return (B)this;
}

public B withHollowUpdatePlanner(IHollowUpdatePlanner hollowUpdatePlanner) {
this.hollowUpdatePlanner = hollowUpdatePlanner;
return (B)this;
}

/**
* Provide the code generated API class that extends {@link HollowAPI} with one or more types
* cached for direct field reads.
Expand Down

0 comments on commit 424e4af

Please sign in to comment.