Skip to content

Conversation

@original-brownbear
Copy link
Contributor

These responses can grow quite large. We dealt with this fact on the reader side by
retaining pooled buffers when reading the agg responses from the wire. On the sending side
though we still allocate unpooled buffers for serialization because we need to serialize
including a size-prefix. We could probably make this more efficient by changing the wire
format to some kind of chunked encoding in a follow-up but that would require additional
complex changes on the reader side. For now, this PR serializes the bytes to pooled buffers
right away and then copies the pooled buffer to the outbound buffer page-by-page,
releasing pages as they get copied. This cuts the peak-memory use of sending these messages
almost in half and removes unpooled buffer allocations. This is particularly useful
when a data node has many shards for an aggs query and might send a number of these responses
in parallel and/or rapid succession.

relates #77466 and #72370

These responses can grow quite large. We dealt with this fact on the reader side by
retaining pooled buffers when reading the agg responses from the wire. On the sending side
though we still allocate unpooled buffers for serialization because we need to serialize
including a size-prefix. We could probably make this more efficient by changing the wire
format to some kind of chunked encoding in a follow-up but that would require additional
complex changes on the reader side. For now, this PR serializes the bytes to pooled buffers
right away and then copies the pooled buffer to the outbound buffer page-by-page,
releasing pages as they get copied. This cuts the peak-memory use of sending these messages
almost in half and removes unpooled buffer allocations. This is particularly useful
when a data node has many shards for an aggs query and might send a number of these responses
in parallel and/or rapid succession.
@elasticmachine elasticmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. labels Mar 17, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@elasticsearchmachine
Copy link
Collaborator

Hi @original-brownbear, I've created a changelog YAML for you.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good idea. As would a chunked write.

I don't know the recycler stuff well enough to approve the PR though.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, is there a big obstacle to adding support for chunking here? Would seem nicer than this stop-gap.

I suggested a shorter implementation too (not tested it tho, beware)

Comment on lines 132 to 154
final int size = tmp.size();
writeVInt(size);
final int bytesInLastPage;
final int remainder = size % pageSize;
final int adjustment;
if (remainder != 0) {
adjustment = 1;
bytesInLastPage = remainder;
} else {
adjustment = 0;
bytesInLastPage = pageSize;
}
final int pageCount = (size / tmp.pageSize) + adjustment;
for (int i = 0; i < pageCount - 1; i++) {
Recycler.V<BytesRef> p = tmp.pages.get(i);
final BytesRef b = p.v();
writeBytes(b.bytes, b.offset, b.length);
tmp.pages.set(i, null).close();
}
Recycler.V<BytesRef> p = tmp.pages.get(pageCount - 1);
final BytesRef b = p.v();
writeBytes(b.bytes, b.offset, bytesInLastPage);
tmp.pages.set(pageCount - 1, null).close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to special-case the last page like that?

Suggested change
final int size = tmp.size();
writeVInt(size);
final int bytesInLastPage;
final int remainder = size % pageSize;
final int adjustment;
if (remainder != 0) {
adjustment = 1;
bytesInLastPage = remainder;
} else {
adjustment = 0;
bytesInLastPage = pageSize;
}
final int pageCount = (size / tmp.pageSize) + adjustment;
for (int i = 0; i < pageCount - 1; i++) {
Recycler.V<BytesRef> p = tmp.pages.get(i);
final BytesRef b = p.v();
writeBytes(b.bytes, b.offset, b.length);
tmp.pages.set(i, null).close();
}
Recycler.V<BytesRef> p = tmp.pages.get(pageCount - 1);
final BytesRef b = p.v();
writeBytes(b.bytes, b.offset, bytesInLastPage);
tmp.pages.set(pageCount - 1, null).close();
int size = tmp.size();
writeVInt(size);
int tmpPage = 0;
while (size > 0) {
final Recycler.V<BytesRef> p = tmp.pages.get(tmpPage);
final BytesRef b = p.v();
final int writeSize = Math.min(size, b.length);
writeBytes(b.bytes, b.offset, writeSize);
tmp.pages.set(tmpPage, null).close();
size -= writeSize;
tmpPage++;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tested this and it works fine :) => updated

@original-brownbear
Copy link
Contributor Author

I wonder, is there a big obstacle to adding support for chunking here?

It makes the read side quite a bit trickier because we have to join the chunks there somehow, whereas right now we just need to read a single slice and we're good. Also, given that we had performance complains about the read side here in the past (can't find the issue right now but it was literally about the impact of a slightly slower readLong I'm a little hesitant about adding chunking here without having a strong benchmark that would show a performance change from it).

=> I don't think it's an insurmountable challenge, but it's not something I'd have time for right now. Just found this when doing some sizing experiments around aggs and this is what I can do as far as a 30 min fix goes :)

Also, maybe a stop-gap is fine here for now because we're hopefully going to have better message chunking support in the not too distant future anyway so we can address it here when adding that. I generally have my doubts about these messages to some degree. They seem larger than necessary (as in tens of MBs or even 100M+ in some cases) and it might be better to look into whether that's absolutely necessary before making the format really complicated?

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Mar 17, 2022

I'm just thinking about something like this:

    public ReleasableBytesReference readChunkedReleasableBytesReference() throws IOException {
        if (getVersion().onOrAfter(Version.V_8_2_0)) {
            final var chunks = new ArrayList<ReleasableBytesReference>();
            try {
                do {
                    final var chunk = readReleasableBytesReference();
                    if (chunk.length() == 0) {
                        final var chunksArray = chunks.toArray(ReleasableBytesReference[]::new);
                        final var result = new ReleasableBytesReference(
                            CompositeBytesReference.of(chunksArray),
                            Releasables.wrap(chunksArray)
                        );
                        chunks.clear();
                        return result;
                    } else {
                        chunks.add(chunk);
                    }
                } while (true);
            } finally {
                Releasables.close(chunks);
            }
        } else {
            return readReleasableBytesReference();
        }
    }

It wouldn't have to be chunking individual pages (although even that might not be so bad)

@original-brownbear
Copy link
Contributor Author

@DaveCTurner the performance issue is that we have larger chunks inbound than outbound:

image

so doing the above will likely be felt on reading. This is something that would be more fun to address if we could just move outbound closer to Netty and write in larger than 16k chunks there as well.
Note again, all that chunking would get us as things are today is saving one memory copy outbound but would cost us on the coordinator which will have to read more of these messages than each writer individually produces => I'd rather optimize for the read side and only go for tricky chunking later if it's a real win in heap usage on the data nodes instinctively.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could write in 1MiB chunks too, they don't need to be single-page jobs - really anything to put an O(1) bound on the additional memory usage on the sender side. But I'm not accounting for other changes you're planning here, so this LGTM as an interim measure.

@original-brownbear
Copy link
Contributor Author

Thanks Nik + David!

@original-brownbear original-brownbear merged commit c492583 into elastic:master Mar 24, 2022
@original-brownbear original-brownbear deleted the reduce-agg-send-memory branch March 24, 2022 09:43
@original-brownbear original-brownbear restored the reduce-agg-send-memory branch April 18, 2023 20:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/Aggregations Aggregations :Distributed Coordination/Network Http and internode communication implementations >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants