-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow users to pass task payload via deep storage instead of environment variable #14887
Allow users to pass task payload via deep storage instead of environment variable #14887
Conversation
processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
Outdated
Show resolved
Hide resolved
...overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
Outdated
Show resolved
Hide resolved
...overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
Outdated
Show resolved
Hide resolved
@georgew5656 |
@cryptoe I think the way it is implemented now behind the feature flag is better. For most usecases it is much better to just pass the task.json directly. It is much faster, also for our customers we use deep-storage which is slow and less featured than s3, thus this would be a feature we would only turn on if necessary. Additionally, our deep storage provider wont allow us to do batch deletes so the cleaner approach to remove the task files is not great. I know its a special case for us, but it is always better to pass something directly if possible than use indirection...disregarding our usecase. LGTM overall, but one feature request I would like to request is that at the end of the task in AbstractTask to delete the task.json file, not leave it up to the cleaner. Quite a few folks that have their own k8s which they launch in their datacenters. While this works for cloud providers, it might not work for everyone else. This can be done in another PR or I can one up after this PR is merged if needed. |
i didn't really want to break anyone who didn't want to use deep storage for task payloads for whatever reason. i think maybe later on if this gets used in production a bit more and the performance is okay maybe we could consider flipping it on as a default |
Thanks @churromorales and @georgew5656 for the responses. I was more worried about adding another config which needs to be set for the end users. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM generally, left some new comments along with my previous one #14887 (comment)
...rlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
Outdated
Show resolved
Hide resolved
I have concerns about the config change as well. In a single PR adding one config doesn't seem much, but after a year worth of work, you suddenly realize that the feature has become very complex to tune and use. Is there a way to check if the deep storage supports storing payload and if not, then using the environment variable? Because then we can change the default easily. |
Why not just check the size of the task.json, if its larger than then MAX_SIZE do it in deep storage, if it is smaller then just use the env? I agree 100% with Druid in general having too many configuration options, makes it hard to remember everything you need include. When creating this feature, the whole goal I had in mind was to have this work with as few configuration options as possible. |
@abhishekagarwal87 @churromorales I just put up a new version of the PR that removes the configs and uses deep storage to pass the task payload if the task size is too large. This is more of a "opinionated" choice but in this case I think it makes sense, lmk what you guys think |
...overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
Outdated
Show resolved
Hide resolved
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "task.json"); | ||
try { | ||
FileUtils.writeStringToFile(file.toFile(), mapper.writeValueAsString(task), Charset.defaultCharset()); | ||
taskLogs.pushTaskPayload(task.getId(), file.toFile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that a log cleanup job removes the task payload from the deep storage while task is still in progress? How are these payloads cleaned up from deep storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the task reads the file to disk on startup so i wouldn't be worried about the log cleanup job to clean it up that soon. we are relying on the log cleanup job to cleanup deep storage
...overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
Show resolved
Hide resolved
|
||
public K8sTaskAdapter( | ||
KubernetesClientApi client, | ||
KubernetesTaskRunnerConfig taskRunnerConfig, | ||
TaskConfig taskConfig, | ||
StartupLoggingConfig startupLoggingConfig, | ||
DruidNode node, | ||
ObjectMapper mapper | ||
ObjectMapper mapper, | ||
TaskLogs taskLogs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name of this class has been confusing since it does so much more than dealing with task logs. could be fixed in some other PR someday.
...rlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
Outdated
Show resolved
Hide resolved
...rlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
Outdated
Show resolved
Hide resolved
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/tasklogs/TaskPayloadManager.java
Outdated
Show resolved
Hide resolved
thank you for addressing comments @georgew5656. Looks good to me except for the exception handling in some places. |
processing/src/main/java/org/apache/druid/error/InternalError.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/error/InternalError.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/error/InternalError.java
Outdated
Show resolved
Hide resolved
{ | ||
com.google.common.base.Optional<InputStream> taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId()); | ||
if (!taskBody.isPresent()) { | ||
throw InternalError.exception("Could not load task payload for job [%s]", from.getMetadata().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an action you can associate with this error message? Like should they verify that overlord is successfully uploading task jsons to deep storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the message
{ | ||
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); | ||
if (annotations == null) { | ||
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be replaced with DruidException.defensive()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Can you look into the test failures?
...rlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
Outdated
Show resolved
Hide resolved
…a/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
i think the only failing tests are for coverage now so we should be good |
…ent variable (apache#14887) This change is meant to fix a issue where passing too large of a task payload to the mm-less task runner will cause the peon to fail to startup because the payload is passed (compressed) as a environment variable (TASK_JSON). In linux systems the limit for a environment variable is commonly 128KB, for windows systems less than this. Setting a env variable longer than this results in a bunch of "Argument list too long" errors.
This change is meant to fix a issue where passing too large of a task payload to the mm-less task runner will cause the peon to fail to startup because the payload is passed (compressed) as a environment variable (TASK_JSON). In linux systems the limit for a environment variable is commonly 128KB, for windows systems less than this. Setting a env variable longer than this results in a bunch of "Argument list too long" errors.
Description
(1) Problem
The goal of this patch is to prevent larger tasks from failing with mm-less ingestion due to the TASK_JSON being too large as described above.
(2) Solution
Part 1. Optionally stop setting TASK_JSON
To address the immediate problem (setting environment variables that are too large), I added a additional config for the KubernetesTaskRunner (druid.indexer.runner.taskPayloadAsEnvVariable) that defaults to true but can be optionally set to false. Setting this config to false will cause the K8s adapters to not set the task payload as the TASK_JSON env variable. This prevents the Jobs from failing to come up.
Part 2. We still need to pass the task.json payload to the peons somehow. I explored three options for this, and ended up going with the below solution.
Push the task payload into task logs deep storage and have the peon read the payload.
I ended up going with this option because it was the most simple to implement and the most future-proof (no worry about task payloads getting larger than 1MB). The task logs killer will automatically delete the task.json in deep storage alongside he task logs whenever it is run.
Changes Made
(3) Alternative solutions to passing the task.json payload
Using k8s configmaps to store the task payload and then mounting them onto the created peon pods.
I decided not to go with this option because configmaps still have a 1MB size limit and I was concerned with the KubernetesTaskRunner having to manage a bunch of configmaps in addition to jobs. Having this many configmaps also pollutes K8s metadata, making it hard to see anything else going on when you're looking at configmaps.
Updating CliPeon to use the getTaskPayload endpoint on the overlord to pull the task.json payload on startup. This didn't work because we currently have a guice injector in the peon that requires the task.json be available at injection time. In order to pull the task.json from the overlord, we need to use the ServiceLocator class which is only available once the peon lifecycle has already started (after injection). Changing this would have required many changes to the code so I didn't want to do it. Additionally, I would have had to deprecate the toTask interface on the overlords since there would be no way for the overlord to turn a K8s Job into a task definition.
Release note
Key changed/added classes in this PR
CliPeon
KubernetesPeonLifecycle
PodTemplateTaskAdapter
K8sTaskAdapter
S3TaskLogs
TaskLogs
I can add some more documentation to this PR later but I wanted to get some feedback on this approach before doing so.
This PR has: