Skip to content

Commit

Permalink
Fix broken test by unsubscribing via ReferenceCountedLivenessReferent…
Browse files Browse the repository at this point in the history
…#destroy
  • Loading branch information
nbauernfeind committed Oct 22, 2024
1 parent c29282c commit 312f12d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
/**
* The update source object for refreshing locations and location sizes.
*/
private Runnable locationChangePoller;
private LocationChangePoller locationChangePoller;

/**
* Construct a new disk-backed table.
Expand Down Expand Up @@ -312,6 +312,7 @@ protected void destroy() {
if (updateSourceRegistrar != null) {
if (locationChangePoller != null) {
updateSourceRegistrar.removeSource(locationChangePoller);
locationChangePoller.locationBuffer.reset();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public synchronized LocationUpdate processPending() {
if (tableLocationProvider.supportsSubscriptions()) {
tableLocationProvider.subscribe(this);
} else {
// NB: Providers that don't support subscriptions don't tick - this single call to run is
// NB: Providers that don't support subscriptions don't tick - this single call to refresh is
// sufficient.
tableLocationProvider.refresh();
tableLocationProvider.getTableLocationKeys().forEach(this::handleTableLocationKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
private final Map<String, ? extends ColumnSource<?>> sharedColumnSources =
Collections.unmodifiableMap(columnSources);


/**
* State for table locations that have been added, but have never been found to exist with non-zero size.
*/
Expand Down Expand Up @@ -421,6 +422,19 @@ public final synchronized boolean isEmpty() {
return sharedColumnSources;
}

@Override
protected void destroy() {
super.destroy();
for (final EmptyTableLocationEntry entry : emptyTableLocations.values()) {
entry.subscriptionBuffer.reset();
}
emptyTableLocations.clear();
for (final IncludedTableLocationEntry entry : includedTableLocations.values()) {
entry.subscriptionBuffer.reset();
}
includedTableLocations.clear();
}

/**
* State keeper for a table location and its subscription buffer if it hasn't been found to have a non-null,
* non-zero size yet.
Expand Down

0 comments on commit 312f12d

Please sign in to comment.