-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Added write buffer size limit #34797
[CCR] Added write buffer size limit #34797
Conversation
This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit. Renamed existing `max_write_buffer_size` to ``max_write_buffer_count` to indicate that limit is count based. Closes elastic#34705
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I left some minor comments.
I noticed that the current implementation (not this PR) might fill the write buffer more than the limit. When we send a read-request, we don't limit the request count and size to the vacant slots of the write buffer. If the buffer has one byte (or one count) left, we still issue a full read-request. We can fix this in a follow-up if we feel we should.
@@ -327,7 +327,7 @@ private void followLeaderIndex(String autoFollowPattenName, | |||
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); | |||
followRequest.setMaxBatchSize(pattern.getMaxBatchSize()); | |||
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); | |||
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); | |||
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We forget passing "maxWriteBufferSize" parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: 7af8ba2
@@ -85,6 +85,7 @@ | |||
private long numberOfOperationsIndexed = 0; | |||
private long lastFetchTime = -1; | |||
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); | |||
private long bufferSize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be bufferSizeInBytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed: 9925984
@@ -208,6 +213,8 @@ private synchronized void coordinateWrites() { | |||
break; | |||
} | |||
} | |||
long opsSize = ops.stream().mapToLong(Translog.Operation::estimateSize).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into the loop to avoid the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed: 980f1a5
@dnhatn Thanks for reviewing!
Yes, that is what it is currently doing. I don't see a real problem in the way the limit is currently enforced. |
+1. The main rationale here was to keep things simple and treat the limit as a soft limit that is used to cause back pressure. No more. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -455,6 +466,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { | |||
numConcurrentReads, | |||
numConcurrentWrites, | |||
buffer.size(), | |||
bufferSizeInBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
Makes sense. Thanks @martijnvg and @bleskes for explaining. |
Note that this PR also changes the default for The reason behind this that a finding a default for this parameter is difficult and because this pr add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit. Renamed existing `max_write_buffer_size` to ``max_write_buffer_count` to indicate that limit is count based. Closes #34705
This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit. Renamed existing `max_write_buffer_size` to ``max_write_buffer_count` to indicate that limit is count based. Closes #34705
This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit.
Renamed existing
max_write_buffer_size
tomax_write_buffer_count
to indicate that limit is count based.Closes #34705