Skip to content

Commit

Permalink
Make ChunksCoordinator in full control of following leader shard
Browse files Browse the repository at this point in the history
* A single ChunksCoordinator is now in charge of following a shard and
keeps on coordinating until the persistent task has been stopped. Whereas
before a ChunksCoordinator's job was to process a finite amount of chunks
and then a new ChunksCoordinator instance would process the next chunks.
* Instead of consuming the chunks queue and waiting for all workers to
complete, another background thread will continuously and chunks to the
queue, so that the workers never run out of chunks to process if the
leader shard has unprocessed write operations.

Relates to elastic#30086
  • Loading branch information
martijnvg committed Jun 5, 2018
1 parent 530089f commit d2d88d2
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -56,7 +55,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand All @@ -69,7 +67,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
static final int PROCESSOR_RETRY_LIMIT = 16;
static final int DEFAULT_CONCURRENT_PROCESSORS = 1;
static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private static final TimeValue CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL = TimeValue.timeValueSeconds(3);

private final Client client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -130,58 +128,20 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
if (task.isRunning() == false) {
// TODO: need better cancellation control
return;
}

final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, task::markAsFailed,
task::isRunning, task::updateProcessedGlobalCheckpoint);
coordinator.start(followGlobalCheckPoint, leaderGlobalCheckPoint);
}, task::markAsFailed);
}

private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
}

@Override
protected void doRun() throws Exception {
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
}
});
}

private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
private static void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
Expand All @@ -201,37 +161,53 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
static class ChunksCoordinator {

private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);

private final Client followerClient;
private final Client leaderClient;
private final Executor ccrExecutor;
private final ThreadPool threadPool;
private final IndexMetadataVersionChecker imdVersionChecker;

private final long batchSize;
private final int concurrentProcessors;
private final int maxConcurrentWorker;
private final long processorMaxTranslogBytes;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;

private final CountDown countDown;
private final Consumer<Exception> failureHandler;
private final Supplier<Boolean> stateSupplier;
private final LongConsumer processedGlobalCheckpointUpdater;

private final AtomicInteger activeWorkers;
private final AtomicLong lastPolledGlobalCheckpoint;
private final AtomicLong lastProcessedGlobalCheckPoint;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();

ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker,
long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
ChunksCoordinator(Client followerClient,
Client leaderClient,
ThreadPool threadPool,
IndexMetadataVersionChecker imdVersionChecker,
long batchSize,
int maxConcurrentWorker,
long processorMaxTranslogBytes,
ShardId leaderShard,
ShardId followerShard,
Consumer<Exception> failureHandler,
Supplier<Boolean> runningSuppler,
LongConsumer processedGlobalCheckpointUpdater) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.threadPool = threadPool;
this.imdVersionChecker = imdVersionChecker;
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.maxConcurrentWorker = maxConcurrentWorker;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.countDown = new CountDown(concurrentProcessors);
this.failureHandler = failureHandler;
this.stateSupplier = runningSuppler;
this.processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater;
this.activeWorkers = new AtomicInteger();
this.lastPolledGlobalCheckpoint = new AtomicLong();
this.lastProcessedGlobalCheckPoint = new AtomicLong();
}

void createChucks(long from, long to) {
Expand All @@ -242,56 +218,105 @@ void createChucks(long from, long to) {
}
}

void start() {
void update() {
schedule(() -> {
if (stateSupplier.get() == false) {
return;
}

fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
long followerGlobalCheckpoint = lastPolledGlobalCheckpoint.get();
if (leaderGlobalCheckPoint != followerGlobalCheckpoint) {
assert followerGlobalCheckpoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followerGlobalCheckpoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
createChucks(lastPolledGlobalCheckpoint.get(), leaderGlobalCheckPoint);
initiateChunkWorkers();
} else {
LOGGER.debug("{} no write operations to fetch", followerShard);
}
update();
}, failureHandler);
});
}

void start(long followerGlobalCheckpoint, long leaderGlobalCheckPoint) {
createChucks(followerGlobalCheckpoint, leaderGlobalCheckPoint);
lastPolledGlobalCheckpoint.set(leaderGlobalCheckPoint);
LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
ccrExecutor.execute(new AbstractRunnable() {
leaderShard, chunks.size(), maxConcurrentWorker);
initiateChunkWorkers();
update();
}

void initiateChunkWorkers() {
int workersToStart = maxConcurrentWorker - activeWorkers.get();
if (workersToStart == 0) {
LOGGER.debug("{} No new chunk workers were started", followerShard);
return;
}

LOGGER.debug("{} Starting [{}] new chunk workers", followerShard, workersToStart);
for (int i = 0; i < workersToStart; i++) {
threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e);
postProcessChuck(e);
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", followerShard), e);
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
processNextChunk();
}
});
activeWorkers.incrementAndGet();
}
}

void processNextChunk() {
if (stateSupplier.get() == false) {
return;
}

long[] chunk = chunks.poll();
if (chunk == null) {
postProcessChuck(null);
int activeWorkers = this.activeWorkers.decrementAndGet();
LOGGER.debug("{} No more chunks to process, active workers [{}]", leaderShard, activeWorkers);
return;
}
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
Consumer<Exception> processorHandler = e -> {
if (e == null) {
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
if (lastPolledGlobalCheckpoint.updateAndGet(x -> x < chunk[1] ? chunk[1] : x) == chunk[1]) {
processedGlobalCheckpointUpdater.accept(chunk[1]);
}
processNextChunk();
} else {
LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
failureHandler.accept(e);
}
};
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker,
leaderShard, followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}

void postProcessChuck(Exception e) {
if (failureHolder.compareAndSet(null, e) == false) {
Exception firstFailure = failureHolder.get();
firstFailure.addSuppressed(e);
}
if (countDown.countDown()) {
handler.accept(failureHolder.get());
}
void schedule(Runnable runnable) {
threadPool.schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
runnable.run();
}
});
}

Queue<long[]> getChunks() {
Expand Down
Loading

0 comments on commit d2d88d2

Please sign in to comment.