Skip to content

Commit

Permalink
Clarify flags
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Aug 6, 2024
1 parent 8d73a28 commit 9d5c060
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private void drainQueue() {
/**
* A {@link ChunkedRestResponseBodyPart} which will yield all currently-available chunks by consuming entries from {@link #entryQueue}.
* There is only ever at most one active instance of this class at any time, in the sense that one such instance becoming inactive
* <i>happens-before</i> the creation of the next instance.
* <i>happens-before</i> the creation of the next instance. One of these parts may send chunks for more than one entry.
*/
private final class AvailableChunksZipResponseBodyPart implements ChunkedRestResponseBodyPart {

Expand All @@ -288,8 +288,16 @@ private final class AvailableChunksZipResponseBodyPart implements ChunkedRestRes
*/
private ChunkedRestResponseBodyPart bodyPart;

private boolean isPartComplete;
private boolean isLastPart;
/**
* True when we have run out of compressed chunks ready for immediate transmission, so the response is paused, but we expect to send
* more data later.
*/
private boolean isResponsePaused;

/**
* True when we have sent the zip file footer, or the response was cancelled.
*/
private boolean isResponseComplete;

/**
* A listener which is created when there are no more available chunks, so transmission is paused, subscribed to in
Expand All @@ -311,14 +319,17 @@ private final class AvailableChunksZipResponseBodyPart implements ChunkedRestRes
this.bodyPart = bodyPart;
}

/**
* @return whether this part of the compressed response is complete
*/
@Override
public boolean isPartComplete() {
return isPartComplete;
return isResponsePaused || isResponseComplete;
}

@Override
public boolean isLastPart() {
return isLastPart;
return isResponseComplete;
}

@Override
Expand Down Expand Up @@ -369,7 +380,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec

do {
writeNextBytes(sizeHint, recycler, releasables);
} while (isPartComplete == false && chunkStream.size() < sizeHint);
} while (isResponseComplete == false && isResponsePaused == false && chunkStream.size() < sizeHint);

assert (releasables == nextReleasablesCache) == releasables.isEmpty();
assert nextReleasablesCache.isEmpty();
Expand All @@ -389,8 +400,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
}
} else {
// request aborted, nothing more to send (queue is being cleared by queueRefs#closeInternal)
isPartComplete = true;
isLastPart = true;
isResponseComplete = true;
return new ReleasableBytesReference(BytesArray.EMPTY, () -> {});
}
} catch (Exception e) {
Expand Down Expand Up @@ -450,7 +460,7 @@ private void finishCurrentPart(ArrayList<Releasable> releasables) throws IOExcep
// The current entry is complete, but the next entry isn't available yet, so we pause transmission. This means we are no
// longer an active AvailableChunksZipResponseBodyPart, so any concurrent calls to enqueueEntry() at this point will now
// spawn a new AvailableChunksZipResponseBodyPart to take our place.
isPartComplete = true;
isResponsePaused = true;
assert getNextPartListener == null;
assert nextAvailableChunksListener != null;
// Calling our getNextPart() will eventually yield the next body part supplied to enqueueEntry():
Expand All @@ -475,7 +485,7 @@ private void finishCurrentPart(ArrayList<Releasable> releasables) throws IOExcep
// available. It doesn't affect correctness if the next part is already available, it's just a little less efficient to make
// a new AvailableChunksZipResponseBodyPart in that case. That's ok, entries can coalesce all the available parts together
// themselves if efficiency really matters.
isPartComplete = true;
isResponsePaused = true;
assert getNextPartListener == null;
// Calling our getNextPart() will eventually yield the next body part from the current entry:
getNextPartListener = SubscribableListener.newForked(
Expand All @@ -488,8 +498,7 @@ private void finishResponse(ArrayList<Releasable> releasables) throws IOExceptio
assert zipEntry == null;
assert entryQueue.isEmpty() : entryQueue.size();
zipOutputStream.finish();
isPartComplete = true;
isLastPart = true;
isResponseComplete = true;
transferCurrentEntryReleasable(releasables);
assert getNextPartListener == null;
}
Expand Down

0 comments on commit 9d5c060

Please sign in to comment.