Skip to content

Commit

Permalink
Skip refresh for unused segments in metadata cache (apache#16990) (ap…
Browse files Browse the repository at this point in the history
…ache#17079)

* Skip refresh for unused segments in metadata cache

* Cover the condition where a used segment missing schema is marked for refresh

* Fix test
  • Loading branch information
findingrish committed Sep 18, 2024
1 parent c462e10 commit a63ac25
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,16 @@ public RowSignature buildDataSourceRowSignature(final String dataSource)
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
// mark it for refresh only if it is used
// however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,102 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
Assert.assertEquals(0, metadatas.size());
}

@Test
public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, IOException
{
String dataSource = "xyz";
CountDownLatch latch = new CountDownLatch(1);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
latch.countDown();
}
};

List<DataSegment> segments = ImmutableList.of(
newSegment(dataSource, 1),
newSegment(dataSource, 2),
newSegment(dataSource, 3)
);

final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);

Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, "fp"));

ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put("fp", new SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

schema.addSegment(historicalServerMetadata, segments.get(0));
schema.addSegment(historicalServerMetadata, segments.get(1));
schema.addSegment(historicalServerMetadata, segments.get(2));

serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
serverView.addSegment(segments.get(2), ServerType.HISTORICAL);

schema.onLeaderStart();
schema.awaitInitialization();

Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));

// make segment3 unused
segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));

segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
segmentMap.put(segments.get(0).getId(), segments.get(0));
segmentMap.put(segments.get(1).getId(), segments.get(1));

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
"xyz",
Collections.emptyMap(),
segmentMap
);

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
.thenReturn(druidDataSource);

Set<SegmentId> segmentsToRefresh = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
segmentsToRefresh.remove(segments.get(1).getId());
segmentsToRefresh.remove(segments.get(2).getId());

schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));

Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down

0 comments on commit a63ac25

Please sign in to comment.