-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add coordinator API for unused segments #14846
Add coordinator API for unused segments #14846
Conversation
Do we really want to fetch the information of unused segments in the worker task? Wouldn't the task just discard the segment once it realises that the fetched segment is unused? |
The controller already has a picture of the metadata containing only used segments at that point and assigns segments to the workers. On the worker side, we just want to fetch the segment if it has not been deleted completely, even if it is not used anymore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
Left some comments.
/** | ||
* Fetches segment metadata for the given dataSource and segmentId. | ||
*/ | ||
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should rename the fetchUsedSegment call and rename that to fetchSegment taking a boolean "includeUnused".
The callers in most places can set this variable to false.
We can javadoc the behavior changes with this parameter.
WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it a separate API instead of a parameter since this probably shouldn't be used in most other places, so adding an extra parameter for every current call seemed untidy. I am okay with changing it to a parameter since it would cut down on the interface though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* | ||
* @return DataSegment segment corresponding to given id | ||
*/ | ||
DataSegment retrieveSegmentForId(String id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should merge this method with DataSegment retrieveUsedSegmentForId(String id);
by taking the same boolean parameter as discussed above.
@kfaraz - Here is a situation. Let's say you launch an MSQ job to read data from a druid table. At this time T0, controller has a view of the used segments that make up the table. Controller will take the ids of these segments and distribute to worker. As workers operating on the segment at time T1, the segment is no longer available. Because the interval, this segment corresponds to, has been compacted. Worker could have discarded the segment if the segment just falls out of the retention window. But here, workers should instead find those new segments. Instead of taking that route, we are going to let the job run on the previous snapshot of the system. For that to happen, MSQ workers need to find those segments that have been marked unused. |
Thanks for the clarification, @abhishekagarwal87 !
If the segments of the previous snapshot (v1) have already been marked unused, that implies that a new version (v2) has completely overshadowed v1. At this point, if we continue with v1 in an MSQ replace job, wouldn't that mean that we would overwrite v2 and potentially miss out on some data that was present in v2 but not in v1? Also, to avoid this, would a better alternative be to fetch the list of used and non-overshadowed segments in the controller task. IIUC, in the case where new segments of v2 are created (that result in overshadowing of v1) after the controller task has already started should cause the MSQ workers to fail anyway. cc: @cryptoe |
Yes, it would. Only the controller decides which version to use (which it does by fetching the list of used and non-overshadowed segments). This change is only to the getSegmentById API, which the worker uses with the id from the list of segments which have already been fetched. This is currently the case as well, however, this is causing an issue where the worker is not able to access any segments marked as unused after the controller fetches the segment. This PR changes this behaviour.
That is currently the case, however, this is causing an issue where some tasks like compaction/reindex can stop any MSQ tasks from running. Is there a reason to prefer failing the task over providing the results from the snapshot that the MSQ task was started? I think the snapshot approach makes sense. |
Adding on to what @adarshsanjeev said, There are 3 distinct usecases:
|
Thanks for the clarification, @adarshsanjeev , @cryptoe . So I assume this PR is to ensure that cases 1 and 2 work correctly? |
Exactly 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left minor comments.
Changes LGTM!!
{ | ||
final String path = StringUtils.format( | ||
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s", | ||
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks better to read. We can adjust %s to true/false based on the flag passed. Wdyt?
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s", | |
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused=%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it this way for consistency with the other APIs. The current pattern is to add a flag without value and check if it has been passed or not. ("includeOvershadowedStatus", "full", etc). Do we want to start moving away from this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like
druid/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
Line 57 in 82a8529
"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that. Changing to this pattern.
|
||
taskContextOverridesBuilder.put( | ||
MultiStageQueryContext.CTX_IS_REINDEX, | ||
MSQControllerTask.isReindexTask(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets rename this to isReplaceTask ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only for a replace task which reads from the datasource it is replacing, so wouldn't reindex be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaceInputDataSourceTask? How does that sound ?
There is a current issue due to inconsistent metadata between worker and controller in MSQ. A controller can receive one set of segments, which are then marked as unused by, say, a compaction job. The worker would be unable to get the segment information as
MetadataResource
.This PR resolves this by allowing the worker to fetch the segment information even if it is unused.
Release Notes
includeUnused
as an optional parameter to theorg.apache.druid.server.http.MetadataResource#getSegment
coordinator API. The API will also return unused segments if the parameter is set.This PR has: