diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 40092d4b02fb4..2920f4e218bed 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -85,7 +85,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfOperationsIndexed = 0; private long lastFetchTime = -1; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); - private long bufferSize = 0; + private long bufferSizeInBytes = 0; private final LinkedHashMap> fetchExceptions; private volatile ElasticsearchException fatalException; @@ -184,8 +184,8 @@ private boolean hasReadBudget() { params.getFollowShardId(), numConcurrentReads); return false; } - if (bufferSize >= params.getMaxWriteBufferSize().getBytes()) { - LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSize); + if (bufferSizeInBytes >= params.getMaxWriteBufferSize().getBytes()) { + LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes); return false; } if (buffer.size() > params.getMaxWriteBufferCount()) { @@ -214,7 +214,7 @@ private synchronized void coordinateWrites() { } } long opsSize = ops.stream().mapToLong(Translog.Operation::estimateSize).sum(); - bufferSize -= opsSize; + bufferSizeInBytes -= opsSize; numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); @@ -293,7 +293,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar .mapToLong(Translog.Operation::estimateSize) .sum(); buffer.addAll(operations); - bufferSize += operationsSize; + bufferSizeInBytes += operationsSize; final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); assert maxSeqNo == Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); @@ -467,7 +467,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { numConcurrentReads, numConcurrentWrites, buffer.size(), - bufferSize, + bufferSizeInBytes, currentMappingVersion, totalFetchTimeMillis, totalFetchTookTimeMillis,