Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncIO compression part 2 - added async read and asyncio to compression code #3022

Merged
merged 25 commits into from
Jan 31, 2022

Conversation

yoniko
Copy link
Contributor

@yoniko yoniko commented Jan 22, 2022

PR 2/2 for asyncio compression.
This PR builds on the previous PR and adds:

  • ReadPool - a thread pool based reader for async input.
  • AsyncIO code paths to compression in zstdcli (fileio.c).

Benchmarking from a single desktop, shows this branch and dev. "No cache" means I used dd to remove file from fs caching before each run. multiple.zst is a 111MB file with multiple frames in it.

flag                                        --asyncio  --no-asyncio
input_file      comment
enwik8.zst      decompression                   0.168         0.206
                decompression no cache          0.167         0.213
                decompression no cache dev      0.172         0.214
multiple.zst    decompression                   0.888         1.076
                decompression no cache          0.896         1.100
                decompression no cache dev      0.929         1.112
silesia.tar.zst decompression                   0.273         0.368
                decompression no cache          0.276         0.379
                decompression no cache dev      0.292         0.388


flag                                  --asyncio  --asyncio --fast  --no-asyncio  --no-asyncio --fast
input_file  comment
enwik8      compression                   0.749             0.420         0.783                0.456
            compression no cache          0.751             0.420         0.797                0.470
            compression no cache dev      0.814             0.470         0.802                0.470
silesia.tar compression                   1.218             0.648         1.281                0.713
            compression no cache          1.227             0.646         1.294                0.749
            compression no cache dev      1.316             0.752         1.324                0.752

- Extracted asyncio code to fileio_asyncio.c/.h
- Moved type definitions to fileio_types.h
- Moved common macro definitions needed by both fileio.c and fileio_asyncio.c to fileio_common.h
@yoniko yoniko force-pushed the compression_asyncio_after_refactor branch from 61adb40 to f231a36 Compare January 22, 2022 00:36
- Added asyncio functionality for compression flow
- Added ReadPool for async reads, embedded to both comp and decomp flows
@yoniko yoniko force-pushed the compression_asyncio_after_refactor branch from 8e52ecd to 7366119 Compare January 24, 2022 22:24
@Cyan4973
Copy link
Contributor

Maybe rebase on top of dev after Part 1 get merged,
this way only changes related to Part 2 will be shown in the diff to review.

…ter_refactor

* origin/dev:
  AsyncIO compression part 1 - refactor of existing asyncio code (facebook#3021)
  cleanup double word in comment.
  slightly shortened status and summary lines in very verbose mode
  Change zstdless behavior to align with zless (facebook#2909)
  slightly shortened compression status update line
  More descriptive exclusion error; updated docs and copyright
  Typo (and missing commit)
  Suggestion from code review
  Python style change
  Fixed bugs found in other projects
  Updated README
  Test and tidy
  Feature parity with original shell script; needs further testing
  Work-in-progress; annotated types, added docs, parsed and resolved excluded files
  Using faster Python script to amalgamate
@yoniko
Copy link
Contributor Author

yoniko commented Jan 24, 2022

Maybe rebase on top of dev after Part 1 get merged, this way only changes related to Part 2 will be shown in the diff to review.

Merged, changes now belong only to part 2.

programs/fileio_asyncio.c Show resolved Hide resolved
programs/fileio_asyncio.c Show resolved Hide resolved
Comment on lines 455 to 456
} else
EXM_THROW(37, "Unexpected short read");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please enclose the else in brackets & indent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 551 to 555
if(job->usedBufferSize > srcBufferRemainingSpace) {
memmove(ctx->srcBufferBase, ctx->srcBuffer, ctx->srcBufferLoaded);
ctx->srcBuffer = ctx->srcBufferBase;
}
memcpy(ctx->srcBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something you can do to avoid these memcpy()?

Do they actually happen in the normal case? E.g. if zstd is consuming the entire buffer do we ever hit this case?

If they don't happen for zstd, and only happen for other formats, then it probably isn't a big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two possible ways to avoid memcpy (at the cost of code complexity) are:

  1. Continue holding the IOJob_t and referencing the buffer it owns. This doesn't remove the need for memcpy completely as there are cases where the code would ask for more bytes than are currently available and so we will need to coalesce two buffers. For that we could have another buffer allocated that will be used only when strickly needed.
  2. Implement a ring buffer and have each IOJob_t reference a portion of this ring buffer.

The memcpy/memove bothered me, but I believe these two solutions increase the complexity of the code without actually improving performance. Memcpy should be cheap compared to our other actions (compression / decompression and IO). Am I wrong to assume memcpy is relatively cheap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I wrong to assume memcpy is relatively cheap?

Relatively cheap, yes. Negligible, no.

Decompression runs at > 1GB/s. So adding an extra memcpy can be a 10% slowdown.

Compression can run at a few hundred MB/s, or GBs/s with multithreading. So an extra memcpy can be a 5% slowdown.

Definitely worth benchmarking.

Could you have jobs+1 buffers, and reserve 1 buffer for reading? When you're finished reading, swap that buffer with the next jobs buffer?

Copy link
Contributor Author

@yoniko yoniko Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression runs at > 1GB/s. So adding an extra memcpy can be a 10% slowdown.

The memcpy can happen only for input, not for output which should have a lower rate.
Other than that, up to now each such memcpy/memove was instead an fread which is a memcpy at the best case, but might also involve a syscall (depending on buffering). While the memcpy can have a non negligible cost, it is definitely cheaper than the previous implementation.

Could you have jobs+1 buffers, and reserve 1 buffer for reading? When you're finished reading, swap that buffer with the next jobs buffer?

As mentioned on our second thread this is problematic as there are some cases the user would consume some bytes leaving us with a lower number of bytes available than required in the next read. In such a cases we'd need to take some bytes from the current buffer and joint them with bytes from the next buffer. This can only be possible using memcpy or if the buffers are all a part of one continuous ring buffer (which also has edge cases toward the end of the buffer, unless we use some mmap tricks) .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, I will give it a try, and we can judge it afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such a cases we'd need to take some bytes from the current buffer and joint them with bytes from the next buffer

Not necessarily. You could loosen the contract that you always return the requested # of bytes. All compressors should have the ability to consume the input, even if it is smaller than the expected size.

So in the case where you have leftover bytes, instead of joining with the next job, just return a shorter read. Then when its consumed the tail of the current buffer, grab the next job's buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that increase chances of memcpy to internal decompressor buffers though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatively cheap, yes. Negligible, no.

You are right there. Perf shows 1-3.5% time spent on memcpy.
Also interesting is that ZSTD_(de)compressStream spends even more time on memcpy. I wonder if this is related to the code getting less data than expected.

programs/fileio_asyncio.h Show resolved Hide resolved
@yoniko yoniko force-pushed the compression_asyncio_after_refactor branch from 76f5cd2 to 4cac292 Compare January 26, 2022 02:19
@yoniko
Copy link
Contributor Author

yoniko commented Jan 26, 2022

Added benchmarking information and removed most of the memcpy calls.

@@ -1130,13 +1143,15 @@ FIO_compressLz4Frame(cRess_t* ress,
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;

IOJob_t *writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor (coding style):
if writeJob pointer never changes after initialization, you should const it : IOJob_t* const writeJob = ....
It sends a signal to the reviewer (or the next contributor) that this pointer will never change its value later on.
When applied consistently, its absence becomes a signal that this pointer will change in the future, and should therefore receive extra attention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually does change later on with

AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);

/* AIO_ReadPool_executeReadJob:
* Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : it looks to me that this pointer never changes, so it should be IOJob_t* const

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added consts throughout the new patch, I believe I found all occurrences that it could apply to.

…ter_refactor

* origin/dev:
  fixed minor compression difference in btlazy2
  updated all names to offBase convention
  change the offset|repcode sumtype format to match offBase
@@ -186,7 +186,7 @@ fi

println "\n===> simple tests "

datagen > tmp
datagen -g500K > tmp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason for this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
One of the following tests checks that we fail with "-M2K". This test started failing (i.e decompression succeeded) when I introduced the changes in the branch.
Reason being that I increased the CLI's buffer sizes (and changed buffer usage pattern) resulting in less mallocs during decompression. Increasing the data size forces the lib to allocate space for the window. (**note this is as far as I currently understand the code, might have missed something in this explanation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,
so maybe it is this one -M2K test (or series of tests) that needs to generate and use its own specific source,
rather than impacting a generic sample used by many other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Cyan4973
Copy link
Contributor

This is a good rewrite.
I believe it contributes positively to the readability of the source code.
It does so while providing new features, and improving performance measurably.
Great PR.

Copy link
Contributor

@Cyan4973 Cyan4973 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only some very minor comments on coding style

@yoniko yoniko force-pushed the compression_asyncio_after_refactor branch from bed2a44 to 0f5f6ed Compare January 26, 2022 18:53
if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
{ size_t const cSize = ress->dstBufferSize - strm.avail_out;
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This needs indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -1146,27 +1161,27 @@ FIO_compressLz4Frame(cRess_t* ress,
#if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif
assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize);
assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);

{
size_t readSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please rename this variable, it is super close to the parameter U64* readsize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
Just removed it as it was actually useless here.

Comment on lines -1433 to -1435
if (ferror(srcFile)) {
EXM_THROW(26, "Read error : I/O error");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding of the code, we should be checking ferror() whenever we call fread() in the current implementation. No matter if we use --async-io or --no-async-io. Is that correct?

I just want to verify because this check is absolutely critical.

And by moving it to the fread(), we now get coverage for other formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding of the code, we should be checking ferror() whenever we call fread() in the current implementation. No matter if we use --async-io or --no-async-io. Is that correct?

Yes, the flows are almost same for both cases. The difference is that the async code queues the execution function to the worker thread while the sync version directly calls the execution function (see AIO_ReadPool_executeReadJob and AIO_IOPool_enqueueJob).

assert(feof(finput));

AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
assert(ress->readCtx->reachedEof);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen, right? Assuming the code is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the code wasn't correct, but it didn't happen because 64k is less than our in size (128k). I fixed it.

As it is now, it shouldn't trigger and might be redundant (I just replaced the previous assert because it was easy).
We should only exit the loop if ress->readCtx->srcBufferLoaded==0.
Since we fill the buffer right before this is checked, it should happen only if we reached the EOF or encountered a read error.
Read errors, in turn, will panic before we get to this point.

job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
void* const buffer = malloc(bufferSize);
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.
I think these annoying indentation issues happened when I split my original branch into two and while copy-pasting some code my editor decided that no indentation is the correct indentation. :(

if (ctx->srcBufferLoaded >= n)
return 0;

/* We still have bytes loaded, but enough to satisfy caller. We need to get the next job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "but not enough"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@yoniko yoniko merged commit cc0657f into facebook:dev Jan 31, 2022
@Cyan4973 Cyan4973 mentioned this pull request Feb 9, 2023
Hello71 added a commit to Hello71/zstd that referenced this pull request Feb 11, 2023
Bytef and uInt are zlib types, not available when zlib is disabled

Fixes: 1598e6c ("Async write for decompression")
Fixes: cc0657f ("AsyncIO compression part 2 - added async read and asyncio to compression code (facebook#3022)")
yoniko pushed a commit that referenced this pull request Feb 14, 2023
Bytef and uInt are zlib types, not available when zlib is disabled

Fixes: 1598e6c ("Async write for decompression")
Fixes: cc0657f ("AsyncIO compression part 2 - added async read and asyncio to compression code (#3022)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants