Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priority based task locking #1679

Closed
wants to merge 2 commits into from
Closed

Priority based task locking #1679

wants to merge 2 commits into from

Conversation

pjain1
Copy link
Member

@pjain1 pjain1 commented Aug 27, 2015

This PR corresponds to the issue #1513

Design details -

  • The task priority is used for acquiring a lock on an interval for a datasource. Tasks with higher priority can preempt lower-priority tasks for the same datasource and interval if ran concurrently.
    The flow for acquiring and upgrading Locks by a Task during TaskLifeCycle would be like -
    • Check for TaskLocks on same datasource and overlapping interval
      • If the one or more TaskLocks have same or higher priority then stop and retry after some time
      • If no TaskLock is present or all the TaskLocks are of lower priority then check if any lock is an exclusive lock
        • If no then revoke all the lower priority TaskLocks and create a new TaskLock
        • If yes then stop and retry after some time
    • Before publishing the segments, upgrade the TaskLock to exclusiveLock
      • If upgrade successful publish the segment and return success status for the Task
      • If no then return failure status for the Task

Tasks with no priority specified will have the respective default priorities as per the task type

  • Default priorities for task
    • Realtime Index Task - 75
    • Hadoop/Index Task - 50
    • Merge/Append Task - 25
    • Other Tasks - 0
  • Higher the number, higher the priority.

For example, if a Hadoop Index task is running and a Realtime Index task starts that wants to publish a segment for the same (or overlapping) interval for the same datasource, then it will override the task locks of the Hadoop Index task. Consequently, the Hadoop Index task will fail before publishing the segment.

Note - There is no need to set this property, task automatically gets a default priority as per its type. However, if one wants to override the default priority it can be done by setting lockPriority inside context property like this -

  "context" {
    "lockPriority" : "80"
  }

Major Implementation details -

  • Two new fields have been added to TaskLock namely priority and preemptive.
  • Modified and refactored tryLock method in TaskLockbox
  • Added upgradeLock method in TaskLockbox, MetaStorageActionHandler, SQLMetaStorageActionHandler, TaskStorage, MetadataTaskStorage and HeapMemoryTaskStorage
  • New Task Action named LockUpgradeAction has been added
  • Check for preemption of locks has been added in TaskActionToolbox which is performed before publishing the segments in SegmentInsertAction
  • Changed Task implementations to handle priority, preemption and check for lock validity
  • Modified ThreadPoolTaskRunner a bit to be helpful in testing lock override
  • Modified TaskSerdeTest to check for priority
  • Modified Task related docs
  • Added unit tests for TaskLock overriding in TaskLifeCylceTest
  • Added a task name TestIndexTask useful for unit testing

Possible Future Enhancements - Proactively shutdown the tasks instead of waiting for them to fail eventually since their TaskLock has been revoked.

Edit 08/05/2016 -

Priority locking feature is configurable now, by default it is off and can be enabled by setting the runtime property druid.indexer.taskLockboxVersion to v2.

  • TaskLockbox in an interface and there are two implementations - TaskLockboxV1 which is same as the previous TaskLockbox andTaskLockboxV2 does priority based locking. One of them is injected at runtime in CliOverlord and CliPeon depending on druid.indexer.taskLockboxVersion
  • TaskLockbox has new method boolean setTaskLockCriticalState(Task task, Interval interval, TaskLockCriticalState taskLockCriticalState) meant for upgrading locks in case priority based locking is used. TaskLockboxV1 always returns true for this method.
  • TaskLock has two new fields priority and upgraded. In case of TaskLockboxV1 the corresponding values are always 0 and true. For TaskLockboxV2 it is depended on the task.
  • Each task calls setTaskLockCriticalState before publishing segments. However, in case of TaskLockboxV1 the method always returns true and extra overhead is just an HTTP call to overlord. In case of TaskLockboxV2 it does the actual work of setting TaskLock state.
  • Task interface has int getLockPriority() method which I guess is OK

@@ -31,19 +31,24 @@
private final String dataSource;
private final Interval interval;
private final String version;
private final Integer priority;
private boolean exclusiveLock;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be volatile ?
I would prefer making it part of constructor arg ?
also I think serde should preserve the status whether the lock was exclusive or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes making it volatile makes sense...serde is preserving the exclusive lock state, I can make it more explicit by putting @JsonProperty annotation on setExclusiveLock(). Making it part of constructor can help in testing. Is there any other reason for your preference ? Anyways the tasks have to use LockUpgradeAction to set it to true.

@nishantmonu51
Copy link
Member

#1604 adds a taskContext,
How do you feel about making priority part of taskContext ?

@himanshug
Copy link
Contributor

@nishantmonu51 FWIW, I was skeptical about having priority coming from input json as I couldn't think of any case where user should mess around with task priority.
If at all, we make priority be changeable by user then I think it makes sense for it to be part of taskContext introduced in #1604 , however that PR will then block this one.
@gianm what do you think?

@pjain1
Copy link
Member Author

pjain1 commented Sep 2, 2015

@nishantmonu51 I guess it can be done. How do you want to collaborate on it ? Should I wait for your PR to get merged ?

@gianm
Copy link
Contributor

gianm commented Sep 3, 2015

I think it's ok to let people set their own priorities. I don't think it's necessary but I think it's ok if we want to go that way.

About exclusivity- I don't see why we need non-exclusive locks. Why not invalidate all lower priority locks when a higher priority request comes in? There still needs to be some method that upgrades a lock to non-preemptible, but I think it would be okay to have them always be exclusive. That should make life simpler in the lockbox.

Btw, I get confused talking about "higher" and "lower" priority when smaller-numbered priorities are higher…

@pjain1
Copy link
Member Author

pjain1 commented Sep 3, 2015

@gianm LockUpgradeAction is the mechanism using which lock can become non-preemptible or "exclusive". And yes when a higher priority task comes in and all the other conflicting tasks have lower priority non-exclusive(non-preemptive) locks then all the lower priority locks will be revoked, see this. Thus, the lower priority tasks will fail eventually. So, if I understood your comment correctly it is doing exactly the same thing what you have mentioned. Am I missing something ?
Yes...the higher priority have lower number, it is like a ranking system. I can change it if it is not intuitive .

@pjain1
Copy link
Member Author

pjain1 commented Sep 3, 2015

@gianm and one more thing - what is your preference about making priority a part of context #1604 ?

@gianm
Copy link
Contributor

gianm commented Sep 3, 2015

@pjain1 Ok, I see. I was probably just confused by the names then. In my mind all locks in this PR are "exclusive" (in that there will never be two different shared locks for an interval) but some are preemptible and some are not. So IMO "non-preemptible" or "uninterruptible" is a better word than "exclusive".

It's also kind of confusing that "canAcquireLock" actually makes changes to the locks. IMO either that method should not make changes, or it can make changes but then it should have a different name.

For the context stuff, I don't have a strong preference. I think it's ok to let people set their own priorities. I don't think it's necessary but I think it's ok if we want to go that way. It's also something that could be added later, because tasks could have a default priority that is overridden by the json.

@pjain1
Copy link
Member Author

pjain1 commented Sep 8, 2015

@gianm I took care of your comments. I have a question though, the integration test that I wrote submits 3 Index task with different priorities in order such that only last one with highest priority succeeds as the last one will revoke the tasklocks of first two. However, sometimes after the last task succeeds, ExecutorLifeCycle for one of the first two tasks will start and it calls isReady for that task again here. Thus, the task will acquire the lock again as the last task with highest priority has already succeeded. Do you think it is correct behavior and it can succeed or once the lock for a task is revoked it should always fail ?
I was just thinking why there is a call to isReady again in ExecutorLifeCyle, wouldn't for all tasks (even in local mode) the taskqueue will ensure that the task is ready before running it using the task runner. There's a comment saying "Won't hurt in remote mode, and is required for setting up locks in local mode:" I don't understand why it is required in local mode. What am I missing here ? BTW I removed the isReady call in ExecutorLifeCycle and ran it through our CI pipeline (which runs integration test as well) and it passed.

@pjain1
Copy link
Member Author

pjain1 commented Sep 8, 2015

The isReady call in ExecutorLifeCycle was introduced in this commit - 70c1535

@pjain1
Copy link
Member Author

pjain1 commented Sep 8, 2015

I have removed integration test from this PR for now

@pjain1
Copy link
Member Author

pjain1 commented Sep 9, 2015

@gianm I thought and discussed with @himanshug about the integration test. Actually it will be hard to write a fully deterministic integration test for lock overriding unless changes are made to the Druid code, which is not worth it. So I will just skip the integration test for now. Anyways I am still curious to know about call to isReady in ExecutorLifeCycle

@gianm
Copy link
Contributor

gianm commented Sep 9, 2015

@pjain1 The comment about "local mode" means just running a peon by itself, with no overlord and no middle manager. At one point we thought that was something that would be good to make possible, and we used it somewhat often for testing things. I think it is required for that, since nothing else is going to call isReady in that mode (there's no overlord and no task queue).

IMO since there is no real guarantee about what order tasks will attempt to run in, the sequencing you described is actually totally fine as long as "acquire the lock again" means the lower priority task actually got a new lock with a higher version than the previously-run higher-priority task. The lock versions absolutely must be increasing over time as tasks run, even if they don't run in the order in which they were originally submitted.

@pjain1
Copy link
Member Author

pjain1 commented Sep 9, 2015

@gianm Ah...I see that makes sense. Yes the newly acquired lock will have higher version.

@pjain1
Copy link
Member Author

pjain1 commented Sep 9, 2015

Found some problems in handling error scenarios when overlord restarts or leadership changes. Closing the PR, will reopen once done.

@pjain1 pjain1 closed this Sep 9, 2015
@drcrallen
Copy link
Contributor

@pjain1 Task priority was addressed in #984 as well to be used as part of threading priority.

I went back and forth with @xvrl a few times on what priority means, and here's what we originally came up with:

  1. Priority should follow the convention for query priority, where higher numbers mean higher priority, and lower numbers mean lower priority.
  2. Priority of 0 or missing should mean use default.

As such, I think there are two issues here. One is about preemption at all, and the other is about preemption rules.

As per #1513 preemption at all seems to be something that is desired. So the question that is outstanding is how to define rules for preemption.

There are a few other task engines that have "can I be preempted" and "what is my priority" as two independent constructs, and I think that would make sense to consider here. In such a case, a lock can only be pre-empted if it is set to be ABLE to be preempted, and if the preempting task has a higher priority on the preemption scale. If a task is NOT flagged as preempt-able, OR is flagged as preempt-able but the other task is at a lower rank on the priority scale, then no preemption would occur.

If two tasks are vying for resources (be it a lock or an execution slot in the cluster) it would be nice if we had a unified "priority" that was independent of preemption.

@pjain1
Copy link
Member Author

pjain1 commented Sep 9, 2015

@drcrallen Just to recap, right now "can I be preempted" is a function of priority. So a task is preempt-able if higher priority task comes in except when the task is working in a critical section and has upgraded its lock to be non-preempt-able.
Now what your are proposing here is to separate these concerns, so who decides whether the task can be preempted ? One way would be make all tasks preempt-able unless user specifically sets it to non-preempt-able in the task json.
As for the convention for priority number we can model it in the same way as query priority. I originally modeled priority number in same way as nice utility does.

@drcrallen
Copy link
Contributor

@pjain1 to maintain current behavior, the default would be that a task is not preempt-able unless specified that it can be.

@himanshug
Copy link
Contributor

@drcrallen changing the notion of what is high/low priority sounds good.

did you want "can i be prempted" check only for things to be backwards compatible? f that is the case then we can merge this PR in druid-0.9 only

or do you believe tasks should always be non-preemptable by default?

@drcrallen
Copy link
Contributor

@himanshug (thinking out loud here) it seems to me that for a task execution service to take all reasonable steps necessary to ensure a task

  1. Starts execution and
  2. Is not terminated prematurely

is a reasonable thing for an executor service to do. As such it is not immediately obvious to me that the actual preemption should be part of the executing service as opposed to part of some external monitoring and coordination, whereas task priority under general resource contention (number 1 from the list) makes sense to have as part of the task executor.

For example, I can see a scenario where the coordinator evaluates running tasks, kills inferior priority locked tasks, then requests the newer higher priority tasks be run, but operates independently of the actual executing service. In this scenario the coordinator can either 1. Requeue the task or 2. Fail the task and submit a new one later.

I still think "can I be preempted" is separate concept from general task priority because lock preemption is a very specific type of cluster resource contention.

Does that make sense?

@drcrallen
Copy link
Contributor

In another scenario imagine you have a limited set of cluster resources as workers, and some set of tasks that are running and more that need to be run.

Should preemption based on arbitrary cluster resources follow the same rules as lock preemption?

Since we operate on Lambda architecture, it makes sense to me that you can have a hadoop task that is SLA critical, and thus should not be terminated, but you don't want preempting realtime tasks. In such a case I think it would make sense for new real-time tasks to not preempt the SLA critical hadoop tasks, or at least to have a way to specify that the task is not preempt-able under normal preemption conditions.

@himanshug
Copy link
Contributor

@drcrallen for the specific case of SLA critical hadoop task, it is allowed for user to submit it with high enough priority(by configuring priority in submitted json) so that realtime tasks could not preempt it.

That said, just to correct my understanding, is this what you are proposing?

  1. in addition to getPriority() add another method called isPreemptable() in Task.java
  2. and in aquire/upgrade lock methods check both priority as well as isPreemptable() flag to decide whether to give the lock to new task or not.

@himanshug
Copy link
Contributor

we had a bit of chat with @cheddar as well over this and it appears existing way is ok and we believe even priority shouldn't be user configurable (that is realtime task will always preempt batch indexing task).
let us talk more about this in next syncup meeting.

@pjain1
Copy link
Member Author

pjain1 commented Sep 10, 2015

Reopening the PR -

  1. Changed the priority order semantics - Higher the number, higher the priority
  2. Made TaskLock immutable
  3. Handled the error scenarios of overlord failures and restart.
  4. Added unit test for failure scenarios
    As of now preemptability is a function of priority only as it is still under discussion.

@pjain1 pjain1 reopened this Sep 10, 2015
lockReleaseCondition.await();
}

tasksWaitingForLock.remove(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a try/finally? It's possible for lockReleaseCondition.await() or tryLock to throw exceptions while a task is waiting, and in that case tasksWaitingForLock won't get updated. I think the task should still get removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are correct...moved it to finally

@fjy
Copy link
Contributor

fjy commented Jun 15, 2016

@pjain1 just to follow up here, did we ever decide on how to create an enable/disable flag for this feature?

@pjain1
Copy link
Member Author

pjain1 commented Jun 15, 2016

@fjy sorry I have been busy with other things I will think about it, discuss with others and will update the thread. This PR does not need to be a blocker for 0.9.2.

@fjy fjy modified the milestones: 0.9.3, 0.9.2 Jun 16, 2016
@pjain1
Copy link
Member Author

pjain1 commented Aug 5, 2016

Priority locking feature is configurable now, by default it is off and can be enabled by setting the runtime property druid.indexer.taskLockboxVersion to v2.

  • TaskLockbox in an interface and there are two implementations - TaskLockboxV1 which is same as the previous TaskLockbox andTaskLockboxV2 does priority based locking. One of them is injected at runtime in CliOverlord and CliPeon depending on druid.indexer.taskLockboxVersion
  • TaskLockbox has new method boolean setTaskLockCriticalState(Task task, Interval interval, TaskLockCriticalState taskLockCriticalState) meant for upgrading locks in case priority based locking is used. TaskLockboxV1 always returns true for this method.
  • TaskLock has two new fields priority and upgraded. In case of TaskLockboxV1 the corresponding values are always 0 and true. For TaskLockboxV2 it is depended on the task.
  • Each task calls setTaskLockCriticalState before publishing segments. However, in case of TaskLockboxV1 the method always returns true and extra overhead is just an HTTP call to overlord. In case of TaskLockboxV2 it does the actual work of setting TaskLock state.
  • Task interface has int getLockPriority() method which I guess is OK

@pjain1 pjain1 closed this Aug 5, 2016
@pjain1 pjain1 reopened this Aug 5, 2016
 - Task priority is used to acquire a lock on an interval for a datasource.
 - Tasks with higher priority can preempt lower-priority tasks for the same datasource and interval if ran concurrently.
@pjain1
Copy link
Member Author

pjain1 commented Aug 5, 2016

Transient failure -

Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.562 sec <<< FAILURE! - in io.druid.curator.announcement.AnnouncerTest
testSessionKilled(io.druid.curator.announcement.AnnouncerTest)  Time elapsed: 3.227 sec  <<< ERROR!
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /test1
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1212)
    at org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:304)
    at org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:293)
    at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:108)
    at org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:290)
    at org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:281)
    at org.apache.curator.framework.imps.GetDataBuilderImpl$1.forPath(GetDataBuilderImpl.java:105)
    at org.apache.curator.framework.imps.GetDataBuilderImpl$1.forPath(GetDataBuilderImpl.java:65)
    at io.druid.curator.announcement.AnnouncerTest.testSessionKilled(AnnouncerTest.java:170)

restarting the build

@pjain1 pjain1 closed this Aug 5, 2016
@pjain1 pjain1 reopened this Aug 5, 2016
@fjy
Copy link
Contributor

fjy commented Nov 8, 2016

@pjain1 @himanshug what is the status of this PR?

@gianm
Copy link
Contributor

gianm commented Nov 29, 2016

@pjain1 @himanshug do you all still need/want this? Should we direct attention back here for 0.9.3?

@fjy fjy removed this from the 0.9.3 milestone Dec 9, 2016
@jihoonson jihoonson mentioned this pull request Jul 15, 2017
@gianm
Copy link
Contributor

gianm commented Nov 14, 2018

Revived and implemented as part of #4550.

@gianm gianm closed this Nov 14, 2018
seoeun25 pushed a commit to seoeun25/incubator-druid that referenced this pull request Jan 10, 2020
seoeun25 pushed a commit to seoeun25/incubator-druid that referenced this pull request Jan 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants