Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fixes apache#1715
- TaskLockBox has a set of active tasks
- lock requests throws exception for if they are from a task not in
active task set.
- TaskQueue is responsible for updating the active task set on
tasklockbox

fix apache#1715

fixes apache#1715
- TaskLockBox has a set of active tasks
- lock requests throws exception for if they are from a task not in
active task set.
- TaskQueue is responsible for updating the active task set on
tasklockbox

review comment

remove duplicate line

use ISE instead

organise imports
  • Loading branch information
nishantmonu51 committed Sep 24, 2015
1 parent 35caa75 commit b638400
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -50,9 +47,12 @@
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/**
* Remembers which tasks have locked which intervals. Tasks are permitted to lock an interval if no other task
* Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task
* outside their group has locked an overlapping interval for the same datasource. When a task locks an interval,
* it is assigned a version string that it can use to publish segments.
*/
Expand All @@ -66,6 +66,10 @@ public class TaskLockbox

private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);

// Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
// this set should be accessed under the giant lock.
private final Set<String> activeTasks = Sets.newHashSet();

@Inject
public TaskLockbox(
TaskStorage taskStorage
Expand Down Expand Up @@ -103,8 +107,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
}
};
running.clear();
activeTasks.clear();
// Bookkeeping for a log message at the end
final Set<String> uniqueTaskIds = Sets.newHashSet();
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs;
Expand All @@ -114,7 +118,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
uniqueTaskIds.add(task.getId());
activeTasks.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),
Expand Down Expand Up @@ -147,9 +151,9 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
}
}
log.info(
"Synced %,d locks for %,d tasks from storage (%,d locks ignored).",
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
taskLockCount,
uniqueTaskIds.size(),
activeTasks.size(),
storedLocks.size() - taskLockCount
);
} finally {
Expand All @@ -170,10 +174,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
giant.lock();

try {
Optional<TaskLock> taskLock;

while (!(taskLock = tryLock(task, interval)).isPresent()) {
lockReleaseCondition.await();
}
Expand All @@ -192,6 +194,7 @@ public TaskLock lock(final Task task, final Interval interval) throws Interrupte
* @param interval interval to lock
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
public Optional<TaskLock> tryLock(final Task task, final Interval interval)
{
Expand All @@ -210,12 +213,16 @@ public Optional<TaskLock> tryLock(final Task task, final Interval interval)
* @param preferredVersion use this version string if one has not yet been assigned
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
{
giant.lock();

try {
if(!activeTasks.contains(task.getId())){
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
Expand Down Expand Up @@ -310,13 +317,13 @@ public List<TaskLock> findLocksForTask(final Task task)
try {
return Lists.transform(
findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
{
@Override
public TaskLock apply(TaskLockPosse taskLockPosse)
{
return taskLockPosse.getTaskLock();
}
}
{
@Override
public TaskLock apply(TaskLockPosse taskLockPosse)
{
return taskLockPosse.getTaskLock();
}
}
);
} finally {
giant.unlock();
Expand All @@ -338,7 +345,7 @@ public void unlock(final Task task, final Interval interval)
final String dataSource = task.getDataSource();
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);

// So we can alert if tasks try to release stuff they don't have
// So we can alert if activeTasks try to release stuff they don't have
boolean removed = false;

if(dsRunning != null) {
Expand Down Expand Up @@ -388,17 +395,22 @@ public void unlock(final Task task, final Interval interval)
}

/**
* Release all locks for a task. Does nothing if the task is not currently locked.
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
* @param task task to unlock
*/
public void unlock(final Task task)
public void remove(final Task task)
{
giant.lock();

try {
for(final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(task, taskLockPosse.getTaskLock().getInterval());
try {
log.info("Removing task[%s] from activeTasks", task.getId());
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(task, taskLockPosse.getTaskLock().getInterval());
}
}
finally {
activeTasks.remove(task.getId());
}
}
finally {
Expand Down Expand Up @@ -503,6 +515,17 @@ public TaskLockPosse apply(Interval interval)
}
}

public void add(Task task)
{
giant.lock();
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
} finally {
giant.unlock();
}
}

private static class TaskLockPosse
{
final private TaskLock taskLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
Expand All @@ -44,6 +45,8 @@
import io.druid.metadata.EntryExistsException;
import io.druid.query.DruidMetrics;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -53,6 +56,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;

/**
* Interface between task producers and the task runner.
Expand Down Expand Up @@ -308,7 +312,7 @@ public boolean add(final Task task) throws EntryExistsException
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId()));
tasks.add(task);
addTaskInternal(task);
managementMayBeNecessary.signalAll();
return true;
}
Expand All @@ -317,6 +321,18 @@ public boolean add(final Task task) throws EntryExistsException
}
}

// Should always be called after taking giantLock
private void addTaskInternal(final Task task){
tasks.add(task);
taskLockbox.add(task);
}

// Should always be called after taking giantLock
private void removeTaskInternal(final Task task){
taskLockbox.remove(task);
tasks.remove(task);
}

/**
* Shuts down a task if it has not yet finished.
*
Expand Down Expand Up @@ -378,7 +394,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus)
for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) {
removed++;
tasks.remove(i);
removeTaskInternal(tasks.get(i));
break;
}
}
Expand All @@ -397,7 +413,6 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus)
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus);
taskLockbox.unlock(task);
log.info("Task done: %s", task);
managementMayBeNecessary.signalAll();
}
Expand Down Expand Up @@ -498,15 +513,35 @@ private void syncFromStorage()

try {
if (active) {
final List<Task> newTasks = taskStorage.getActiveTasks();
final Map<String,Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
final int tasksSynced = newTasks.size();
final Map<String,Task> oldTasks = toTaskIDMap(tasks);

// Calculate differences on IDs instead of Task Objects.
Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
for(String taskID : commonIds){
newTasks.remove(taskID);
oldTasks.remove(taskID);
}
Collection<Task> addedTasks = newTasks.values();
Collection<Task> removedTasks = oldTasks.values();

// Clean up removed Tasks
for(Task task : removedTasks){
removeTaskInternal(task);
}

// Add newly Added tasks to the queue
for(Task task : addedTasks){
addTaskInternal(task);
}

log.info(
"Synced %,d tasks from storage (%,d tasks added, %,d tasks removed).",
newTasks.size(),
Sets.difference(Sets.newHashSet(newTasks), Sets.newHashSet(tasks)).size(),
Sets.difference(Sets.newHashSet(tasks), Sets.newHashSet(newTasks)).size()
"Synced %d tasks from storage (%d tasks added, %d tasks removed).",
tasksSynced,
addedTasks.size(),
removedTasks.size()
);
tasks.clear();
tasks.addAll(newTasks);
managementMayBeNecessary.signalAll();
} else {
log.info("Not active. Skipping storage sync.");
Expand All @@ -520,4 +555,13 @@ private void syncFromStorage()
giant.unlock();
}
}

private static Map<String,Task> toTaskIDMap(List<Task> taskList){
Map<String,Task> rv = Maps.newHashMap();
for(Task task : taskList){
rv.put(task.getId(), task);
}
return rv;
}

}
Loading

0 comments on commit b638400

Please sign in to comment.