Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy parameters adaptation (part 1 - ZSTD_c_stableInBuffer) #2974

Merged
merged 13 commits into from
Jan 27, 2022
Merged

Conversation

Cyan4973
Copy link
Contributor

@Cyan4973 Cyan4973 commented Jan 5, 2022

This is merely a way to "prove" that lazy parameters adaptation can work,
though this implementation only makes it happen when ZSTD_c_stableInBuffer is enabled.
ZSTD_c_stableInBuffer pushes the responsibility of buffering to the user,
making lazy parameters adaptation simpler to achieve.

As a (favorable) side effect, it extends the nb of use cases supported by ZSTD_c_stableInBuffer.
Specifically, this parameter is now compatible with "appending" scenarios,
where more data is continuously added after compression has started,
instead of being restricted to presenting the entire input immediately at start.

It also makes ZSTD_c_stableInBuffer compatible with the regular streaming interface
aka initCStream, compressStream, flushStream and endStream
(was previously limited to ZSTD_compressStream2( , , , zstd_e_end) only).

@Cyan4973 Cyan4973 changed the title Lazy parameters adaptation (part 1) Lazy parameters adaptation (part 1 - ZSTD_c_stableInBuffer) Jan 5, 2022
@@ -6156,17 +6163,23 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci
/*====== Finalize ======*/

/*! ZSTD_flushStream() :
* Note : not compatible with ZSTD_c_stableInBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, that's a leftover, to be fixed

Comment on lines 5647 to 5692
size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */
if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */
&& (endOp == ZSTD_e_continue) /* no flush requested, more input to come */
&& (inputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */
/* just wait, allows lazy adaptation of compression parameters */
cctx->expectedInBuffer = *input;
return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */
}
Copy link
Contributor

@terrelln terrelln Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is correct. I get that the use case of streaming + stable input is somewhat contrived, and that it isn't super useful in practice. But...

Zstd guarantees to make forward progress. Either consume some input or produce some output. This breaks that contract, since input is not modified.

A loop constructed like this should not infinite loop:

while (!(input empty || output empty || finished))
  ZSTD_compressStream(cctx, &output, &input);

Instead, we should save the initial input->pos, and update input->pos = input->size.

Additionally, we shouldn't directly set cctx->expectedInBuffer, ZSTD_setBufferExpectations(cctx, output, input) should be used instead. The call on line 5656 can probably be moved up to handle both lazy & non-lazy adaptation, but only for the first call of lazy adaptation.

Lastly, since we are now modifying the input, we should make sure that we are calling FORWARD_IF_ERROR(ZSTD_checkBufferStability(cctx, output, input, endOp), "invalid buffers"); in the lazy-adaptation mode.

Copy link
Contributor Author

@Cyan4973 Cyan4973 Jan 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zstd guarantees to make forward progress.

Forward progress was indeed one of my concerns when writing that code.

Some nuances here : there are limits to forward progress "guarantees" that zstd provides, even with current code.
As a perhaps obvious example, if a user continuously presents an empty input, then there is nothing to do. Same thing when a user continuously presents an output buffer with no more available space to flush into, then the streaming interface cannot progress. In both cases, repetitively calling ZSTD_compressStream() will just loop forever.

But it's okay, I guess we can classify above scenarios as user errors, that users should be able to debug, while the scenario you advocate for is different. What you mean is that, if it is possible to make some forward progresses, aka there is room in the output buffer and there is some input to ingest, then zstd should make some forward progress.
In this instance, instead of returning immediately without doing anything, zstd should fake progress by pretending having consumed the input, even though it hasn't, so that the suggested loop condition, which relies on forward progress, can exit.

I think that's fair, so that's what latest commit in this PR provides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't directly set cctx->expectedInBuffer, ZSTD_setBufferExpectations(cctx, output, input) should be used instead.

ZSTD_setBufferExpectations() relies on cctx->appliedParams which is not initialized yet since at this stage ZSTD_CCtx_init_compressStream2() has not been invoked yet.

@felixhandte
Copy link
Contributor

I think we probably want to hold this until after the release, yeah?

@Cyan4973
Copy link
Contributor Author

Cyan4973 commented Jan 5, 2022

I think we probably want to hold this until after the release, yeah?

It's not critical for the release.

@Cyan4973
Copy link
Contributor Author

ping, this should be ready for review and merge now

@Cyan4973
Copy link
Contributor Author

ping 2

if (cctx->savedInPosPlusOne == 0) cctx->savedInPosPlusOne = input->pos + 1;
cctx->expectedInBuffer = *input;
/* pretend input was consumed, to give a sense forward progress */
input[0].pos = input[0].size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not input->pos = input->size? It seems strange to access it as an array.

lib/compress/zstd_compress.c Show resolved Hide resolved
@Cyan4973
Copy link
Contributor Author

Cyan4973 commented Jan 25, 2022

OK, so having investigated the issue in more details, it's more complex than initially anticipated. And the necessity to "pretend" consuming all input data (even though it's not) doesn't help.

The scenario suggested by @terrelln does indeed fail, as predicted. This scenario was part of the test suite, but only the error codes were checked. A full roundtrip comparison exposes the missing bytes.

This one use case can be fixed. But then, other cases appear, and it becomes more complex.

What about :

  • ingest ~200 bytes
  • flush that
  • ingest 130 KB, continue mode

After the flush() operation, a frame must be started, and the initial 200 bytes of input must be effectively consumed (the interface was only "pretending" to have consumed it previously).
OK, but then what about the second ingest operation ?
Well, now it depends if we want the behavior to be equivalent to buffered mode, which I presume we want.
In this case, we should not ingest the full 130 KB, as it would be equivalent to flush mode, resulting in a different output. Only full blocks, aka 128 KB, can be ingested and compressed. The next 2 KB must not be ingested.
But on the other hand, the interface needs to "pretend" that the full input was consumed, as a way to allow the user to test this condition and not be stuck into an infinite loop.
So now, this situation needs to be continuously tracked. This goes beyond just the first block.

Now, this can be solved too. It's just that the topic goes beyond "late adaptation of compression parameters", it becomes "how to allow continue mode in combination with ZSTD_stableInBuffer", which is quite an extension of scope. This is acceptable if the complexity is low, but not if it impacts too much code in the core library.

So I'll have to spend a bit of time to figure out how to do that without impacting too much the library. And if not, then maybe just drop it. This was more a "late parameter adaptation demo" anyway.

@Cyan4973 Cyan4973 force-pushed the fix2966_part3 branch 3 times, most recently from 25794fa to df78010 Compare January 25, 2022 17:36
@Cyan4973
Copy link
Contributor Author

Good news, the new version shipped this morning seems to fix the issues reported above.
As importantly, it does so without increasing complexity.
The new formulation looks actually simpler to read and explain, and is more general.

Copy link
Contributor

@terrelln terrelln left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good to me, just need to change some asserts to RETURN_ERROR_IF(), and a few more tests.

The approach seems right, and simplifies the usage of ZSTD_c_stableInBuffer.

DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%i, srcSize = %zu", (int)flushMode, input->size - input->pos);
assert(zcs != NULL);
if (zcs->appliedParams.inBufferMode == ZSTD_bm_stable) {
assert(input->pos >= zcs->stableIn_notConsumed);
Copy link
Contributor

@terrelln terrelln Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a RETURN_ERROR_IF() right? Since we're validating user input, not library assumptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function ZSTD_compressStream_generic() is invoked after ZSTD_checkBufferStability(),
which should have already validated that ZSTD_c_stableInBuffer condition is correctly respected,
aka input->pos is stable and must not have been directly manipulated by the user.

So if we reach that assert() condition, it looks to me that this would be a programmatic error, not (just) user input.

@@ -5645,8 +5669,27 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,

/* transparent initialization stage */
if (cctx->streamStage == zcss_init) {
FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, input->size), "CompressStream2 initialization failed");
Copy link
Contributor

@terrelln terrelln Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you point out in the comment inputSize = input->size - input->pos. So this is actually a bug, and we'll get the pledgedSrcSize wrong.

Can you add a test case for the scenario where input->pos starts off as non-zero? It should fail before the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't get pledgedSrcSize wrong because it's based on totalInputSize, which correctly counts from initial pos, wether it is 0 or non-0, by summing the inputSize across invocations.

Point taken for the input->pos != 0 start condition test.

Copy link
Contributor

@terrelln terrelln Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the previous version of the code. It is buggy before your patch, and correct after.

You fixed the bug, so I would love to see the test case, so it doesn't accidentally get re-introduced.

if (cctx->stableIn_notConsumed) {
assert(cctx->appliedParams.inBufferMode == ZSTD_bm_stable);
/* some early data was skipped - make it available for consumption */
assert(input->pos >= cctx->stableIn_notConsumed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be a RETURN_ERROR_IF().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea, ZSTD_checkBufferStability() is invoked just before this paragraph, and should already take care of user input issues. So if we nonetheless hit this assert() at this position, seems like it's a programmatic error.

Comment on lines 5679 to 5682
assert(input->src == cctx->expectedInBuffer.src);
assert(input->pos == cctx->expectedInBuffer.size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be RETURN_ERROR_IF() since we're validating user input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in this case, these checks are front line, they are effectively validating user input. They should return errors.

ZSTD_inBuffer const nullInput = { NULL, 0, 0 };
int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable);
ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput;
input.size = input.pos; /* do not ingest more input during flush */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an assert(input.size == input.pos) instead?

I believe that we should always have consumed the entire input, is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with ZSTD_c_stableInBuffer, it's possible to meet a situation where some input has not been consumed.

This happens if output buffer size is not large enough, in which case, the streaming state machine will be stopped during a flush stage, with some input not yet consumed.
After that ZSTD_flushStream() does not present inBuffer anymore, so it's not possible to update inBuffer->pos. ZSTD_flushStream() can only flush what's left in the output buffer, not ingest a bit more.
This situation is a bit different from ZSTD_compressStream2(cctx, outbuf, inbuf, ZSTD_e_flush), which allows ingesting a bit more input as part of the flush operation.

Comment on lines 6222 to 6226
ZSTD_inBuffer const nullInput = { NULL, 0, 0 };
int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable);
ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you move this into a small helper function that is shared between ZSTD_flushStream() and ZSTD_endStream()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 888 to 895
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) );
inBuf.size = 200;
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) );
CHECK_Z( ZSTD_flushStream(cctx, &outBuf) );
inBuf.size = inputSize;
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) );
CHECK(ZSTD_endStream(cctx, &outBuf) != 0, "compression should be successful and fully flushed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you also test ZSTD_compressStream2(), since it isn't going through the exact same code-path as ZSTD_{compress,flush,end}Stream() is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

on streaming compression implementation.
effectively makes ZSTD_c_stableInput compatible ZSTD_compressStream()
and zstd_e_continue operation mode.
including flushStream().

Now the only condition is for `input.size` to continuously grow.
specifically, there is no obligation to start streaming compression with pos=0.
stableSrc mode is now compatible with this setup.
as suggested by @terrelln.

Also : commented zstreamtest more
to ensure ZSTD_stableInBuffer is tested/
@Cyan4973
Copy link
Contributor Author

I believe final commit answers all remaining issues.

Copy link
Contributor

@terrelln terrelln left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one minor nit about consistency of the new error code

@@ -38,6 +38,7 @@ const char* ERR_getErrorString(ERR_enum code)
case PREFIX(tableLog_tooLarge): return "tableLog requires too much memory : unsupported";
case PREFIX(maxSymbolValue_tooLarge): return "Unsupported max Symbol Value : too large";
case PREFIX(maxSymbolValue_tooSmall): return "Specified maxSymbolValue is too small";
case PREFIX(stabilityCondition_notRespected): return "pledged buffer stability condition is not respected";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZSTD_checkBufferStability() is currently using srcBuffer_wrong and dstBuffer_wrong. If you're going to add this code, can you please switch them over too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea

@Cyan4973 Cyan4973 merged commit bad7f82 into dev Jan 27, 2022
@Cyan4973 Cyan4973 deleted the fix2966_part3 branch January 13, 2023 04:28
@Cyan4973 Cyan4973 mentioned this pull request Feb 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants