Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator API for unused segments #14846

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/api-reference/legacy-metadata-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ Returns a list of all segments for a datasource with the full segment metadata a
Returns full segment metadata for a specific segment as stored in the metadata store, if the segment is used. If the
segment is unused, or is unknown, a 404 response is returned.

`GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}?includeUnused`

Returns full segment metadata for a specific segment as stored in the metadata store. If the is unknown, a 404 response
is returned.

`GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments`

Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like `[interval1, interval2,...]`—for example, `["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final DataSegment dataSegment;
try {
dataSegment = FutureUtils.get(
coordinatorClient.fetchUsedSegment(
coordinatorClient.fetchSegment(
segmentId.getDataSource(),
segmentId.toString()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testConcurrency()
private class TestCoordinatorClientImpl extends NoopCoordinatorClient
{
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId)
{
for (final DataSegment segment : segments) {
if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ public DataSegment retrieveUsedSegmentForId(final String id)
return null;
}

@Override
public DataSegment retrieveSegmentForId(final String id)
{
throw new UnsupportedOperationException();
}

public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public interface CoordinatorClient
*/
ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId);

/**
* Fetches segment metadata for the given dataSource and segmentId.
*/
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should rename the fetchUsedSegment call and rename that to fetchSegment taking a boolean "includeUnused".

The callers in most places can set this variable to false.

We can javadoc the behavior changes with this parameter.

WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it a separate API instead of a parameter since this probably shouldn't be used in most other places, so adding an extra parameter for every current call seemed untidy. I am okay with changing it to a parameter since it would cut down on the interface though.

Copy link
Contributor

@cryptoe cryptoe Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are 3 places in production+ test code where you would have to make the changes. I guess it should be fine?

Screenshot 2023-08-18 at 10 21 20 AM


/**
* Fetches segment metadata for the given dataSource and intervals.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String
);
}

@Override
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId)
{
final String path = StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused",
StringUtils.urlEncode(dataSource),
StringUtils.urlEncode(segmentId)
);

return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), DataSegment.class)
);
}

@Override
public ListenableFuture<List<DataSegment>> fetchUsedSegments(String dataSource, List<Interval> intervals)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,25 @@ SegmentPublishResult commitMetadataOnly(
void deleteSegments(Set<DataSegment> segments);

/**
* Retrieve the segment for a given id from the metadata store. Return null if no such used segment exists
* Retrieve the used segment for a given id from the metadata store. Return null if no such used segment exists
*
* @param id The segment id
*
* @return DataSegment corresponding to given id
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveUsedSegmentForId(String id);

/**
* Retrieve the segment for a given id from the metadata store. Return null if no such segment exists
* <br>
* This also returns unused segments. If only used segments are needed, use {@link #retrieveUsedSegmentForId(String)}
* instead. Unused segments could be deleted by a kill task at any time. This exists mainly to provice a consistent
* view of the metadata, for example, in calls from MSQ controller and worker.
*
* @param id The segment id
*
* @return DataSegment segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should merge this method with DataSegment retrieveUsedSegmentForId(String id); by taking the same boolean parameter as discussed above.


}
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,18 @@ public DataSegment retrieveUsedSegmentForId(final String id)
);
}

@Override
public DataSegment retrieveSegmentForId(final String id)
{
return connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveSegmentForId(id),
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}

private static class PendingSegmentsRecord
{
private final String sequenceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,29 @@ public DataSegment retrieveUsedSegmentForId(String id)
return null;
}

/**
* Retrieve the segment for a given id if it exists in the metadata store and null otherwise
*/
public DataSegment retrieveSegmentForId(String id)
{

final String query = "SELECT payload FROM %s WHERE id = :id";

final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(query, dbTables.getSegmentsTable()))
.bind("id", id);

final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();

if (resultIterator.hasNext()) {
return resultIterator.next();
}

return null;
}

private CloseableIterator<DataSegment> retrieveSegments(
final String dataSource,
final Collection<Interval> intervals,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ public Response getUsedSegmentsInDataSourceForIntervals(
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getUsedSegment(
public Response getSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
@PathParam("segmentId") String segmentId,
@QueryParam("includeUnused") @Nullable String includeUnused
)
{
ImmutableDruidDataSource dataSource = segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSourceName);
Expand All @@ -296,7 +297,12 @@ public Response getUsedSegment(
}
}
// fallback to db
DataSegment segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId);
DataSegment segment;
if (includeUnused != null) {
segment = metadataStorageCoordinator.retrieveSegmentForId(segmentId);
} else {
segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId);
}
if (segment != null) {
return Response.status(Response.Status.OK).entity(segment).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ public void test_fetchUsedSegment() throws Exception
);
}

@Test
public void test_fetchSegment() throws Exception
{
final DataSegment segment =
DataSegment.builder()
.dataSource("xyz")
.interval(Intervals.of("2000/3000"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 1))
.size(1)
.build();

serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segment)
);

Assert.assertEquals(
segment,
coordinatorClient.fetchSegment("xyz", "def").get()
);
}

@Test
public void test_fetchUsedSegments() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<List<DataSegment>> fetchUsedSegments(String dataSource, List<Interval> intervals)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,21 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException
Assert.assertEquals(2, metadataUpdateCounter.get());
}

@Test
public void testRetrieveUsedSegmentForId()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
Assert.assertEquals(defaultSegment, coordinator.retrieveUsedSegmentForId(defaultSegment.getId().toString()));
}

@Test
public void testRetrieveSegmentForId()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
markAllSegmentsUnused(ImmutableSet.of(defaultSegment));
Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString()));
}

@Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void setUp()
Mockito.doReturn(null)
.when(storageCoordinator)
.retrieveUsedSegmentForId(segments[5].getId().toString());
Mockito.doReturn(segments[5])
.when(storageCoordinator)
.retrieveSegmentForId(segments[5].getId().toString());

metadataResource = new MetadataResource(
segmentsMetadataManager,
Expand All @@ -120,23 +123,27 @@ public void testGetAllSegmentsWithOvershadowedStatus()
}

@Test
public void testGetUsedSegment()
public void testGetSegment()
{
// Available in snapshot
Assert.assertEquals(
segments[0],
metadataResource.getUsedSegment(segments[0].getDataSource(), segments[0].getId().toString()).getEntity()
metadataResource.getSegment(segments[0].getDataSource(), segments[0].getId().toString(), null).getEntity()
);

// Unavailable in snapshot, but available in metadata
Assert.assertEquals(
segments[4],
metadataResource.getUsedSegment(segments[4].getDataSource(), segments[4].getId().toString()).getEntity()
metadataResource.getSegment(segments[4].getDataSource(), segments[4].getId().toString(), null).getEntity()
);

// Unavailable in both snapshot and metadata
// Unavailable and unused
Assert.assertNull(
metadataResource.getUsedSegment(segments[5].getDataSource(), segments[5].getId().toString()).getEntity()
metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), null).getEntity()
);
Assert.assertEquals(
segments[5],
metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), "includeUnused").getEntity()
);
}

Expand Down