Skip to content

Commit

Permalink
Refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Feb 3, 2025
1 parent d102f6a commit 9711878
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis

private final BlockBlobSidecarsTrackerFactory trackerFactory;

private final boolean isSuperNode;
private final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher;
private final KZG kzg;

private final Subscribers<RequiredBlockRootSubscriber> requiredBlockRootSubscribers =
Subscribers.create(true);
private final Subscribers<RequiredBlockRootDroppedSubscriber>
Expand All @@ -131,10 +135,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis

private final BlockImportChannel blockImportChannel;

private final boolean isSuperNode;
private final AtomicBoolean isActiveSuperNode = new AtomicBoolean(false);
private final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher;
private final Supplier<KZG> kzgSupplier;

BlockBlobSidecarsTrackersPoolImpl(
final BlockImportChannel blockImportChannel,
Expand All @@ -151,7 +152,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final UInt64 futureSlotTolerance,
final int maxTrackers,
final boolean isSuperNode,
final Supplier<KZG> kzgSupplier,
final KZG kzg,
final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher) {
super(spec, futureSlotTolerance, historicalSlotTolerance);
this.blockImportChannel = blockImportChannel;
Expand All @@ -167,7 +168,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.poolStatsCounters = poolStatsCounters;
this.trackerFactory = BlockBlobSidecarsTracker::new;
this.isSuperNode = isSuperNode;
this.kzgSupplier = kzgSupplier;
this.kzg = kzg;
this.dataColumnSidecarPublisher = dataColumnSidecarPublisher;

initMetrics(sizeGauge, poolStatsCounters);
Expand All @@ -190,7 +191,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final int maxTrackers,
final BlockBlobSidecarsTrackerFactory trackerFactory,
final boolean isSuperNode,
final Supplier<KZG> kzgSupplier,
final KZG kzg,
final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher) {
super(spec, futureSlotTolerance, historicalSlotTolerance);
this.blockImportChannel = blockImportChannel;
Expand All @@ -206,7 +207,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.poolStatsCounters = poolStatsCounters;
this.trackerFactory = trackerFactory;
this.isSuperNode = isSuperNode;
this.kzgSupplier = kzgSupplier;
this.kzg = kzg;
this.dataColumnSidecarPublisher = dataColumnSidecarPublisher;

initMetrics(sizeGauge, poolStatsCounters);
Expand Down Expand Up @@ -304,7 +305,7 @@ private void publishRecoveredDataColumnSidecars(
blockBlobSidecarsTracker.getBlobSidecars().values().stream()
.map(BlobSidecar::getBlob)
.toList(),
kzgSupplier.get());
kzg);
LOG.info(
"Publishing {} data column sidecars for {}",
dataColumnSidecars.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher,
final boolean isSuperNode,
final Supplier<KZG> kzgSupplier,
final KZG kzg,
final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher) {
return createPoolForBlockBlobSidecarsTrackers(
blockImportChannel,
Expand All @@ -142,7 +142,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE,
DEFAULT_MAX_BLOCKS,
isSuperNode,
kzgSupplier,
kzg,
dataColumnSidecarPublisher);
}

Expand All @@ -159,7 +159,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final UInt64 futureBlockTolerance,
final int maxTrackers,
final boolean isSuperNode,
final Supplier<KZG> kzgSupplier,
final KZG kzg,
final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher) {
return new BlockBlobSidecarsTrackersPoolImpl(
blockImportChannel,
Expand All @@ -176,7 +176,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
futureBlockTolerance,
maxTrackers,
isSuperNode,
kzgSupplier,
kzg,
dataColumnSidecarPublisher);
}

Expand All @@ -195,7 +195,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final int maxItems,
final BlockBlobSidecarsTrackerFactory trackerFactory,
final boolean isSuperNode,
final Supplier<KZG> kzgSupplier,
final KZG kzg,
final Consumer<List<DataColumnSidecar>> dataColumnSidecarPublisher) {
return new BlockBlobSidecarsTrackersPoolImpl(
blockImportChannel,
Expand All @@ -213,7 +213,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
maxItems,
trackerFactory,
isSuperNode,
kzgSupplier,
kzg,
dataColumnSidecarPublisher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class BlockBlobSidecarsTrackersPoolImplFuluTest {
maxItems,
BlockBlobSidecarsTracker::new,
true,
() -> kzg,
kzg,
dataColumnSidecarPublisher);

private UInt64 currentSlot = historicalTolerance.times(2);
Expand Down Expand Up @@ -143,7 +143,7 @@ public void onNewBlockNonSuperNode_shouldIgnoreFuluBlocks() {
maxItems,
BlockBlobSidecarsTracker::new,
false,
() -> KZG.NOOP,
KZG.NOOP,
__ -> {});
final SignedBeaconBlock block =
dataStructureUtil.randomSignedBeaconBlock(currentSlot.longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
maxItems,
this::trackerFactory,
false,
() -> KZG.NOOP,
KZG.NOOP,
__ -> {});

private UInt64 currentSlot = historicalTolerance.times(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ protected void initBlockBlobSidecarsTrackersPool() {
() -> blobSidecarValidator,
blobSidecarGossipChannel::publishBlobSidecar,
isSuperNode,
() -> kzg,
kzg,
dataColumnSidecarPublisher);
eventChannels.subscribe(FinalizedCheckpointChannel.class, pool);
blockBlobSidecarsTrackersPool = pool;
Expand Down

0 comments on commit 9711878

Please sign in to comment.