diff --git a/programs/README.md b/programs/README.md index 5570f90c3b4..b88cf78d71a 100644 --- a/programs/README.md +++ b/programs/README.md @@ -164,6 +164,7 @@ Advanced arguments : --filelist FILE : read list of files to operate upon from FILE --output-dir-flat DIR : processed files are stored into DIR --output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure +--[no-]asyncio : use asynchronous IO (default: enabled) --[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate). -- : All arguments after "--" are treated as files diff --git a/programs/fileio.c b/programs/fileio.c index 2066096d233..524c60292c0 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -255,7 +255,7 @@ FIO_prefs_t* FIO_createPreferences(void) ret->literalCompressionMode = ZSTD_ps_auto; ret->excludeCompressedFiles = 0; ret->allowBlockDevices = 0; - ret->asyncIO = 0; + ret->asyncIO = AIO_supported(); return ret; } @@ -814,16 +814,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t* * Compression ************************************************************************/ typedef struct { - FILE* srcFile; - FILE* dstFile; - void* srcBuffer; - size_t srcBufferSize; - void* dstBuffer; - size_t dstBufferSize; void* dictBuffer; size_t dictBufferSize; const char* dictFileName; ZSTD_CStream* cctx; + WritePoolCtx_t *writeCtx; + ReadPoolCtx_t *readCtx; } cRess_t; /** ZSTD_cycleLog() : @@ -872,9 +868,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, if (ress.cctx == NULL) EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx", strerror(errno)); - ress.srcBufferSize = ZSTD_CStreamInSize(); - ress.srcBuffer = malloc(ress.srcBufferSize); - ress.dstBufferSize = ZSTD_CStreamOutSize(); /* need to update memLimit before calling createDictBuffer * because of memLimit check inside it */ @@ -882,10 +875,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize; FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel); } - ress.dstBuffer = malloc(ress.dstBufferSize); ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */ - if (!ress.srcBuffer || !ress.dstBuffer) - EXM_THROW(31, "allocation error : not enough memory"); + + ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize()); + ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize()); /* Advanced parameters, including dictionary */ if (dictFileName && (ress.dictBuffer==NULL)) @@ -948,9 +941,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, static void FIO_freeCResources(const cRess_t* const ress) { - free(ress->srcBuffer); - free(ress->dstBuffer); free(ress->dictBuffer); + AIO_WritePool_free(ress->writeCtx); + AIO_ReadPool_free(ress->readCtx); ZSTD_freeCStream(ress->cctx); /* never fails */ } @@ -963,6 +956,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no { unsigned long long inFileSize = 0, outFileSize = 0; z_stream strm; + IOJob_t *writeJob = NULL; if (compressionLevel > Z_BEST_COMPRESSION) compressionLevel = Z_BEST_COMPRESSION; @@ -978,51 +972,58 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret); } } + writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; while (1) { int ret; if (strm.avail_in == 0) { - size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); - if (inSize == 0) break; - inFileSize += inSize; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; - strm.avail_in = (uInt)inSize; + AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); + if (ress->readCtx->srcBufferLoaded == 0) break; + inFileSize += ress->readCtx->srcBufferLoaded; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; + } + + { + size_t const availBefore = strm.avail_in; + ret = deflate(&strm, Z_NO_FLUSH); + AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); } - ret = deflate(&strm, Z_NO_FLUSH); + if (ret != Z_OK) EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); - { size_t const cSize = ress->dstBufferSize - strm.avail_out; + { size_t const cSize = writeJob->bufferSize - strm.avail_out; if (cSize) { - if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) - EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno)); + writeJob->usedBufferSize = cSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += cSize; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; - } } + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; + } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", - (unsigned)(inFileSize>>20), - (double)outFileSize/inFileSize*100) + (unsigned)(inFileSize>>20), + (double)outFileSize/inFileSize*100) } else { DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ", - (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20), - (double)outFileSize/inFileSize*100); - } } + (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20), + (double)outFileSize/inFileSize*100); + } } while (1) { int const ret = deflate(&strm, Z_FINISH); - { size_t const cSize = ress->dstBufferSize - strm.avail_out; + { size_t const cSize = writeJob->bufferSize - strm.avail_out; if (cSize) { - if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) - EXM_THROW(75, "Write error : %s ", strerror(errno)); + writeJob->usedBufferSize = cSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += cSize; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; - } } + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; + } } if (ret == Z_STREAM_END) break; if (ret != Z_BUF_ERROR) EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret); @@ -1033,6 +1034,8 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); } } *readsize = inFileSize; + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return outFileSize; } #endif @@ -1048,6 +1051,7 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_stream strm = LZMA_STREAM_INIT; lzma_action action = LZMA_RUN; lzma_ret ret; + IOJob_t *writeJob = NULL; if (compressionLevel < 0) compressionLevel = 0; if (compressionLevel > 9) compressionLevel = 9; @@ -1065,31 +1069,37 @@ FIO_compressLzmaFrame(cRess_t* ress, EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); } + writeJob =AIO_WritePool_acquireJob(ress->writeCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; while (1) { if (strm.avail_in == 0) { - size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); - if (inSize == 0) action = LZMA_FINISH; + size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); + if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; inFileSize += inSize; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = inSize; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; + } + + { + size_t const availBefore = strm.avail_in; + ret = lzma_code(&strm, action); + AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); } - ret = lzma_code(&strm, action); if (ret != LZMA_OK && ret != LZMA_STREAM_END) EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); - { size_t const compBytes = ress->dstBufferSize - strm.avail_out; + { size_t const compBytes = writeJob->bufferSize - strm.avail_out; if (compBytes) { - if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes) - EXM_THROW(85, "Write error : %s", strerror(errno)); + writeJob->usedBufferSize = compBytes; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += compBytes; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = writeJob->bufferSize; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", @@ -1105,6 +1115,9 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_end(&strm); *readsize = inFileSize; + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); + return outFileSize; } #endif @@ -1130,15 +1143,18 @@ FIO_compressLz4Frame(cRess_t* ress, LZ4F_preferences_t prefs; LZ4F_compressionContext_t ctx; + IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx); + LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(errorCode)) EXM_THROW(31, "zstd: failed to create lz4 compression context"); memset(&prefs, 0, sizeof(prefs)); - assert(blockSize <= ress->srcBufferSize); + assert(blockSize <= ress->readCtx->base.jobBufferSize); - prefs.autoFlush = 1; + /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */ + prefs.autoFlush = 0; prefs.compressionLevel = compressionLevel; prefs.frameInfo.blockMode = LZ4F_blockLinked; prefs.frameInfo.blockSizeID = LZ4F_max64KB; @@ -1146,27 +1162,25 @@ FIO_compressLz4Frame(cRess_t* ress, #if LZ4_VERSION_NUMBER >= 10600 prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; #endif - assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize); + assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize); { - size_t readSize; - size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs); + size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs); if (LZ4F_isError(headerSize)) EXM_THROW(33, "File header generation failed : %s", LZ4F_getErrorName(headerSize)); - if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize) - EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno)); + writeJob->usedBufferSize = headerSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += headerSize; /* Read first block */ - readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); - inFileSize += readSize; + inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); /* Main Loop */ - while (readSize>0) { - size_t const outSize = LZ4F_compressUpdate(ctx, - ress->dstBuffer, ress->dstBufferSize, - ress->srcBuffer, readSize, NULL); + while (ress->readCtx->srcBufferLoaded) { + size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded); + size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize, + ress->readCtx->srcBuffer, inSize, NULL); if (LZ4F_isError(outSize)) EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", srcFileName, LZ4F_getErrorName(outSize)); @@ -1182,33 +1196,29 @@ FIO_compressLz4Frame(cRess_t* ress, } /* Write Block */ - { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile); - if (sizeCheck != outSize) - EXM_THROW(36, "Write error : %s", strerror(errno)); - } + writeJob->usedBufferSize = outSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); /* Read next block */ - readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); - inFileSize += readSize; + AIO_ReadPool_consumeBytes(ress->readCtx, inSize); + inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); } - if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName); /* End of Stream mark */ - headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL); + headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL); if (LZ4F_isError(headerSize)) EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", srcFileName, LZ4F_getErrorName(headerSize)); - { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile); - if (sizeCheck != headerSize) - EXM_THROW(39, "Write error : %s (cannot write end of stream)", - strerror(errno)); - } + writeJob->usedBufferSize = headerSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += headerSize; } *readsize = inFileSize; LZ4F_freeCompressionContext(ctx); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return outFileSize; } @@ -1223,8 +1233,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, int compressionLevel, U64* readsize) { cRess_t const ress = *ressPtr; - FILE* const srcFile = ress.srcFile; - FILE* const dstFile = ress.dstFile; + IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx); + U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; @@ -1269,12 +1279,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, do { size_t stillToFlush; /* Fill input Buffer */ - size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); - ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; + size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize()); + ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 }; DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); *readsize += inSize; - if ((inSize == 0) || (*readsize == fileSize)) + if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; stillToFlush = 1; @@ -1282,9 +1292,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, || (directive == ZSTD_e_end && stillToFlush != 0) ) { size_t const oldIPos = inBuff.pos; - ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; + ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive)); + AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos); /* count stats */ inputPresented++; @@ -1293,12 +1304,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, /* Write compressed stream */ DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", - (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); + (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); if (outBuff.pos) { - size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck != outBuff.pos) - EXM_THROW(25, "Write error : %s (cannot write compressed block)", - strerror(errno)); + writeJob->usedBufferSize = outBuff.pos; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); compressedfilesize += outBuff.pos; } @@ -1430,14 +1439,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, } /* while ((inBuff.pos != inBuff.size) */ } while (directive != ZSTD_e_end); - if (ferror(srcFile)) { - EXM_THROW(26, "Read error : I/O error"); - } if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) { EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B", (unsigned long long)*readsize, (unsigned long long)fileSize); } + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx); + return compressedfilesize; } @@ -1538,7 +1547,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, /*! FIO_compressFilename_dstFile() : - * open dstFileName, or pass-through if ress.dstFile != NULL, + * open dstFileName, or pass-through if ress.file != NULL, * then start compression with FIO_compressFilename_internal(). * Manages source removal (--rm) and file permissions transfer. * note : ress.srcFile must be != NULL, @@ -1557,8 +1566,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, int result; stat_t statbuf; int transferMTime = 0; - assert(ress.srcFile != NULL); - if (ress.dstFile == NULL) { + FILE *dstFile; + assert(AIO_ReadPool_getFile(ress.readCtx) != NULL); + if (AIO_WritePool_getFile(ress.writeCtx) == NULL) { int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp (srcFileName, stdinmark) && strcmp (dstFileName, stdoutmark) @@ -1570,8 +1580,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, closeDstFile = 1; DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName); - ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); - if (ress.dstFile==NULL) return 1; /* could not open dstFileName */ + dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); + if (dstFile==NULL) return 1; /* could not open dstFileName */ + AIO_WritePool_setFile(ress.writeCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, * and the user presses Ctrl-C when asked if they wish to overwrite. @@ -1582,13 +1593,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); if (closeDstFile) { - FILE* const dstFile = ress.dstFile; - ress.dstFile = NULL; - clearHandler(); DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName); - if (fclose(dstFile)) { /* error closing dstFile */ + if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */ DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result=1; } @@ -1634,6 +1642,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, int compressionLevel) { int result; + FILE* srcFile; DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName); /* ensure src is not a directory */ @@ -1657,13 +1666,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, return 0; } - ress.srcFile = FIO_openSrcFile(prefs, srcFileName); - if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */ + srcFile = FIO_openSrcFile(prefs, srcFileName); + if (srcFile == NULL) return 1; /* srcFile could not be opened */ + AIO_ReadPool_setFile(ress.readCtx, srcFile); result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); + AIO_ReadPool_closeFile(ress.readCtx); - fclose(ress.srcFile); - ress.srcFile = NULL; if ( prefs->removeSrcFile /* --rm */ && result == 0 /* success */ && strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */ @@ -1810,23 +1819,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* init */ assert(outFileName != NULL || suffix != NULL); if (outFileName != NULL) { /* output into a single destination (stdout typically) */ + FILE *dstFile; if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) { FIO_freeCResources(&ress); return 1; } - ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); - if (ress.dstFile == NULL) { /* could not open outFileName */ + dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); + if (dstFile == NULL) { /* could not open outFileName */ error = 1; } else { + AIO_WritePool_setFile(ress.writeCtx, dstFile); for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) { status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if (fclose(ress.dstFile)) + if (AIO_WritePool_closeFile(ress.writeCtx)) EXM_THROW(29, "Write error (%s) : cannot properly close %s", strerror(errno), outFileName); - ress.dstFile = NULL; } } else { if (outMirroredRootDirName) @@ -1882,13 +1892,10 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* ************************************************************************** * Decompression ***************************************************************************/ - typedef struct { - void* srcBuffer; - size_t srcBufferSize; - size_t srcBufferLoaded; ZSTD_DStream* dctx; WritePoolCtx_t *writeCtx; + ReadPoolCtx_t *readCtx; } dRess_t; static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) @@ -1906,11 +1913,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) ); CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag)); - ress.srcBufferSize = ZSTD_DStreamInSize(); - ress.srcBuffer = malloc(ress.srcBufferSize); - if (!ress.srcBuffer) - EXM_THROW(61, "Allocation error : not enough memory"); - /* dictionary */ { void* dictBuffer; size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs); @@ -1919,6 +1921,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi } ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize()); + ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize()); return ress; } @@ -1926,47 +1929,31 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); - free(ress.srcBuffer); AIO_WritePool_free(ress.writeCtx); -} - -/* FIO_consumeDSrcBuffer: - * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */ -static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) { - assert(ress->srcBufferLoaded >= len); - ress->srcBufferLoaded -= len; - memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded); + AIO_ReadPool_free(ress.readCtx); } /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ -static int FIO_passThrough(const FIO_prefs_t* const prefs, - FILE* foutput, FILE* finput, - void* buffer, size_t bufferSize, - size_t alreadyLoaded) +static int FIO_passThrough(dRess_t *ress) { - size_t const blockSize = MIN(64 KB, bufferSize); - size_t readFromInput; - unsigned storedSkips = 0; - - /* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */ - { size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput); - if (sizeCheck != alreadyLoaded) { - DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno)); - return 1; - } } - - do { - readFromInput = fread(buffer, 1, blockSize, finput); - storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips); - } while (readFromInput == blockSize); - if (ferror(finput)) { - DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno)); - return 1; + size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize()); + IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx); + AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); + + while(ress->readCtx->srcBufferLoaded) { + size_t writeSize; + writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded); + assert(writeSize <= writeJob->bufferSize); + memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize); + writeJob->usedBufferSize = writeSize; + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); + AIO_ReadPool_consumeBytes(ress->readCtx, writeSize); + AIO_ReadPool_fillBuffer(ress->readCtx, blockSize); } - assert(feof(finput)); - - AIO_fwriteSparseEnd(prefs, foutput, storedSkips); + assert(ress->readCtx->reachedEof); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return 0; } @@ -1984,7 +1971,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs, return; /* Try to decode the frame header */ - err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); + err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded); if (err == 0) { unsigned long long const windowSize = header.windowSize; unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0); @@ -2007,7 +1994,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs, */ #define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2)) static unsigned long long -FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, +FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, const FIO_prefs_t* const prefs, const char* srcFileName, U64 alreadyDecoded) /* for multi-frames streams */ @@ -2023,16 +2010,11 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only); /* Header loading : ensures ZSTD_getFrameHeader() will succeed */ - { size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX; - if (ress->srcBufferLoaded < toDecode) { - size_t const toRead = toDecode - ress->srcBufferLoaded; - void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded; - ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput); - } } + AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX); /* Main decompression Loop */ while (1) { - ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 }; + ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 }; ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff); const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2; @@ -2054,7 +2036,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, if (srcFileNameSize > 18) { const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15; DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ", - fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix); + fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix); } else { DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ", fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix); @@ -2064,23 +2046,21 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, srcFileName, hrs.precision, hrs.value, hrs.suffix); } - FIO_consumeDSrcBuffer(ress, inBuff.pos); + AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos); if (readSizeHint == 0) break; /* end of frame */ /* Fill input buffer */ - { size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */ - if (ress->srcBufferLoaded < toDecode) { - size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */ - void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded; - size_t const readSize = fread(startPosition, 1, toRead, finput); + { size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */ + if (ress->readCtx->srcBufferLoaded < toDecode) { + size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode); if (readSize==0) { DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n", - srcFileName); + srcFileName); + AIO_WritePool_releaseIoJob(writeJob); return FIO_ERROR_FRAME_DECODING; } - ress->srcBufferLoaded += readSize; - } } } + } } } AIO_WritePool_releaseIoJob(writeJob); AIO_WritePool_sparseWriteEnd(ress->writeCtx); @@ -2091,7 +2071,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, #ifdef ZSTD_GZDECOMPRESS static unsigned long long -FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) +FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName) { unsigned long long outFileSize = 0; z_stream strm; @@ -2111,16 +2091,16 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; - strm.avail_in = (uInt)ress->srcBufferLoaded; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; for ( ; ; ) { int ret; if (strm.avail_in == 0) { - ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); - if (ress->srcBufferLoaded == 0) flush = Z_FINISH; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; - strm.avail_in = (uInt)ress->srcBufferLoaded; + AIO_ReadPool_consumeAndRefill(ress->readCtx); + if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; } ret = inflate(&strm, flush); if (ret == Z_BUF_ERROR) { @@ -2143,7 +2123,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) if (ret == Z_STREAM_END) break; } - FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); + AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in); if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */ && (decodingError==0) ) { @@ -2158,7 +2138,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) #ifdef ZSTD_LZMADECOMPRESS static unsigned long long -FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, +FIO_decompressLzmaFrame(dRess_t* ress, const char* srcFileName, int plain_lzma) { unsigned long long outFileSize = 0; @@ -2186,16 +2166,16 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = ress->srcBufferLoaded; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; for ( ; ; ) { lzma_ret ret; if (strm.avail_in == 0) { - ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); - if (ress->srcBufferLoaded == 0) action = LZMA_FINISH; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = ress->srcBufferLoaded; + AIO_ReadPool_consumeAndRefill(ress->readCtx); + if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; } ret = lzma_code(&strm, action); @@ -2219,7 +2199,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, if (ret == LZMA_STREAM_END) break; } - FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); + AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in); lzma_end(&strm); AIO_WritePool_releaseIoJob(writeJob); AIO_WritePool_sparseWriteEnd(ress->writeCtx); @@ -2229,8 +2209,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, #ifdef ZSTD_LZ4DECOMPRESS static unsigned long long -FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, - const char* srcFileName) +FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName) { unsigned long long filesize = 0; LZ4F_errorCode_t nextToLoad = 4; @@ -2248,34 +2227,27 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, /* Main Loop */ for (;nextToLoad;) { - size_t readSize; size_t pos = 0; size_t decodedBytes = writeJob->bufferSize; int fullBufferDecoded = 0; /* Read input */ - nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded); - readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile); - if(!readSize && ferror(srcFile)) { - DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); - decodingError=1; - break; - } - if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */ - ress->srcBufferLoaded += readSize; + AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad); + if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */ - while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ + while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ /* Decode Input (at least partially) */ - size_t remaining = ress->srcBufferLoaded - pos; + size_t remaining = ress->readCtx->srcBufferLoaded - pos; decodedBytes = writeJob->bufferSize; - nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); + nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos, + &remaining, NULL); if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); decodingError = 1; nextToLoad = 0; break; } pos += remaining; - assert(pos <= ress->srcBufferLoaded); + assert(pos <= ress->readCtx->srcBufferLoaded); fullBufferDecoded = decodedBytes == writeJob->bufferSize; /* Write Block */ @@ -2290,7 +2262,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (!nextToLoad) break; } - FIO_consumeDSrcBuffer(ress, pos); + AIO_ReadPool_consumeBytes(ress->readCtx, pos); } if (nextToLoad!=0) { DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName); @@ -2314,23 +2286,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, * 1 : error */ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, - dRess_t ress, FILE* srcFile, - const FIO_prefs_t* const prefs, - const char* dstFileName, const char* srcFileName) + dRess_t ress, const FIO_prefs_t* const prefs, + const char* dstFileName, const char* srcFileName) { unsigned readSomething = 0; unsigned long long filesize = 0; - assert(srcFile != NULL); /* for each frame */ for ( ; ; ) { /* check magic number -> version */ size_t const toRead = 4; - const BYTE* const buf = (const BYTE*)ress.srcBuffer; - if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */ - ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded, - (size_t)1, toRead - ress.srcBufferLoaded, srcFile); - if (ress.srcBufferLoaded==0) { + const BYTE* buf; + AIO_ReadPool_fillBuffer(ress.readCtx, toRead); + buf = (const BYTE*)ress.readCtx->srcBuffer; + if (ress.readCtx->srcBufferLoaded==0) { if (readSomething==0) { /* srcFile is empty (which is invalid) */ DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName); return 1; @@ -2338,17 +2307,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, break; /* no more input */ } readSomething = 1; /* there is at least 1 byte in srcFile */ - if (ress.srcBufferLoaded < toRead) { + if (ress.readCtx->srcBufferLoaded < toRead) { DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName); return 1; } - if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) { - unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize); + if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) { + unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */ #ifdef ZSTD_GZDECOMPRESS - unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName); + unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2358,7 +2327,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */ || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */ #ifdef ZSTD_LZMADECOMPRESS - unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD); + unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2367,7 +2336,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) { #ifdef ZSTD_LZ4DECOMPRESS - unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName); + unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2375,10 +2344,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, return 1; #endif } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ - return FIO_passThrough(prefs, - AIO_WritePool_getFile(ress.writeCtx), srcFile, - ress.srcBuffer, ress.srcBufferSize, - ress.srcBufferLoaded); + return FIO_passThrough(&ress); } else { DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName); return 1; @@ -2398,15 +2364,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } /** FIO_decompressDstFile() : - open `dstFileName`, - or path-through if ress.dstFile is already != 0, + open `dstFileName`, or pass-through if writeCtx's file is already != 0, then start decompression process (FIO_decompressFrames()). @return : 0 : OK 1 : operation aborted */ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - dRess_t ress, FILE* srcFile, + dRess_t ress, const char* dstFileName, const char* srcFileName) { int result; @@ -2438,7 +2403,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, addHandler(dstFileName); } - result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName); + result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName); if (releaseDstFile) { clearHandler(); @@ -2479,9 +2444,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs srcFile = FIO_openSrcFile(prefs, srcFileName); if (srcFile==NULL) return 1; - ress.srcBufferLoaded = 0; + AIO_ReadPool_setFile(ress.readCtx, srcFile); + + result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName); - result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName); + AIO_ReadPool_setFile(ress.readCtx, NULL); /* Close file */ if (fclose(srcFile)) { diff --git a/programs/fileio_asyncio.c b/programs/fileio_asyncio.c index 868720a1da2..332292bbc1b 100644 --- a/programs/fileio_asyncio.c +++ b/programs/fileio_asyncio.c @@ -1,5 +1,5 @@ /* - * Copyright (c) Yann Collet, Facebook, Inc. + * Copyright (c) Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -29,7 +29,8 @@ /** AIO_fwriteSparse() : * @return : storedSkips, * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ -unsigned AIO_fwriteSparse(FILE* file, +static unsigned +AIO_fwriteSparse(FILE* file, const void* buffer, size_t bufferSize, const FIO_prefs_t* const prefs, unsigned storedSkips) @@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file, if (!prefs->sparseFileSupport) { /* normal write */ size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); if (sizeCheck != bufferSize) - EXM_THROW(70, "Write error : cannot write decoded block : %s", + EXM_THROW(70, "Write error : cannot write block : %s", strerror(errno)); return 0; } @@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file, storedSkips = 0; /* write the rest */ if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) - EXM_THROW(93, "Write error : cannot write decoded block : %s", + EXM_THROW(93, "Write error : cannot write block : %s", strerror(errno)); } ptrT += seg0SizeT; @@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file, return storedSkips; } -void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) +static void +AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) { if (prefs->testMode) assert(storedSkips == 0); if (storedSkips>0) { @@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st * AsyncIO functionality ************************************************************************/ +/* AIO_supported: + * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ +int AIO_supported(void) { +#ifdef ZSTD_MULTITHREAD + return 1; +#else + return 0; +#endif +} + /* *********************************** * General IoPool implementation *************************************/ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { - void *buffer; - IOJob_t *job; - job = (IOJob_t*) malloc(sizeof(IOJob_t)); - buffer = malloc(bufferSize); + IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t)); + void* const buffer = malloc(bufferSize); if(!job || !buffer) - EXM_THROW(101, "Allocation error : not enough memory"); + EXM_THROW(101, "Allocation error : not enough memory"); job->buffer = buffer; job->bufferSize = bufferSize; job->usedBufferSize = 0; @@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { /* AIO_IOPool_createThreadPool: * Creates a thread pool and a mutex for threaded IO pool. * Displays warning if asyncio is requested but MT isn't available. */ -static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) { +static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { ctx->threadPool = NULL; if(prefs->asyncIO) { if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) - EXM_THROW(102,"Failed creating write availableJobs mutex"); + EXM_THROW(102,"Failed creating write availableJobs mutex"); /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ assert(MAX_IO_JOBS >= 2); ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); if (!ctx->threadPool) - EXM_THROW(104, "Failed creating writer thread pool"); + EXM_THROW(104, "Failed creating writer thread pool"); } } /* AIO_IOPool_init: * Allocates and sets and a new write pool including its included availableJobs. */ -static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) { +static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { int i; AIO_IOPool_createThreadPool(ctx, prefs); ctx->prefs = prefs; ctx->poolFunction = poolFunction; - ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1; + ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2; ctx->availableJobsCount = ctx->totalIoJobs; for(i=0; i < ctx->availableJobsCount; i++) { ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); } + ctx->jobBufferSize = bufferSize; ctx->file = NULL; } /* AIO_IOPool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ -static void AIO_IOPool_releaseIoJob(IOJob_t *job) { - IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx; - if(ctx->threadPool) { +static void AIO_IOPool_releaseIoJob(IOJob_t* job) { + IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; + if(ctx->threadPool) ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); - assert(ctx->availableJobsCount < MAX_IO_JOBS); - ctx->availableJobs[ctx->availableJobsCount++] = job; + assert(ctx->availableJobsCount < ctx->totalIoJobs); + ctx->availableJobs[ctx->availableJobsCount++] = job; + if(ctx->threadPool) ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); - } else { - assert(ctx->availableJobsCount == 0); - ctx->availableJobsCount++; - } } /* AIO_IOPool_join: @@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { /* AIO_IOPool_acquireJob: * Returns an available io job to be used for a future io. */ -static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) { +static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { IOJob_t *job; assert(ctx->file != NULL || ctx->prefs->testMode); - if(ctx->threadPool) { + if(ctx->threadPool) ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); - assert(ctx->availableJobsCount > 0); - job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; + assert(ctx->availableJobsCount > 0); + job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; + if(ctx->threadPool) ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); - } else { - assert(ctx->availableJobsCount == 1); - ctx->availableJobsCount--; - job = (IOJob_t*)ctx->availableJobs[0]; - } job->usedBufferSize = 0; job->file = ctx->file; job->offset = 0; @@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) { * Sets the destination file for future files in the pool. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. * Also requires ending of sparse write if a previous file was used in sparse mode. */ -static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) { +static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { assert(ctx!=NULL); AIO_IOPool_join(ctx); assert(ctx->availableJobsCount == ctx->totalIoJobs); ctx->file = file; } -static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) { +static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { return ctx->file; } /* AIO_IOPool_enqueueJob: * Enqueues an io job for execution. * The queued job shouldn't be used directly after queueing it. */ -static void AIO_IOPool_enqueueJob(IOJob_t *job) { - IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx; +static void AIO_IOPool_enqueueJob(IOJob_t* job) { + IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; if(ctx->threadPool) POOL_add(ctx->threadPool, ctx->poolFunction, job); else @@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) { /* AIO_WritePool_acquireJob: * Returns an available write job to be used for a future write. */ -IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) { +IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) { return AIO_IOPool_acquireJob(&ctx->base); } @@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { /* AIO_WritePool_sparseWriteEnd: * Ends sparse writes to the current file. * Blocks on completion of all current write jobs before executing. */ -void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) { +void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { assert(ctx != NULL); if(ctx->base.threadPool) POOL_joinJobs(ctx->base.threadPool); @@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) { * Sets the destination file for future writes in the pool. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. * Also requires ending of sparse write if a previous file was used in sparse mode. */ -void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) { +void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) { AIO_IOPool_setFile(&ctx->base, file); assert(ctx->storedSkips == 0); } /* AIO_WritePool_getFile: * Returns the file the writePool is currently set to write to. */ -FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) { +FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) { return AIO_IOPool_getFile(&ctx->base); } /* AIO_WritePool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ -void AIO_WritePool_releaseIoJob(IOJob_t *job) { +void AIO_WritePool_releaseIoJob(IOJob_t* job) { AIO_IOPool_releaseIoJob(job); } /* AIO_WritePool_closeFile: * Ends sparse write and closes the writePool's current file and sets the file to NULL. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ -int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) { - FILE *dstFile = ctx->base.file; +int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) { + FILE* const dstFile = ctx->base.file; assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); AIO_WritePool_sparseWriteEnd(ctx); AIO_IOPool_setFile(&ctx->base, NULL); @@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) { /* AIO_WritePool_executeWriteJob: * Executes a write job synchronously. Can be used as a function for a thread pool. */ static void AIO_WritePool_executeWriteJob(void* opaque){ - IOJob_t* job = (IOJob_t*) opaque; - WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx; + IOJob_t* const job = (IOJob_t*) opaque; + WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx; ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); AIO_IOPool_releaseIoJob(job); } /* AIO_WritePool_create: * Allocates and sets and a new write pool including its included jobs. */ -WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) { - WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); +WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) { + WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); ctx->storedSkips = 0; @@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) { assert(ctx->storedSkips==0); free(ctx); } + + +/* *********************************** + * ReadPool implementation + *************************************/ +static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { + int i; + for(i=0; icompletedJobsCount; i++) { + IOJob_t* job = (IOJob_t*) ctx->completedJobs[i]; + AIO_IOPool_releaseIoJob(job); + } + ctx->completedJobsCount = 0; +} + +static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { + ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; + if(ctx->base.threadPool) + ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + assert(ctx->completedJobsCount < MAX_IO_JOBS); + ctx->completedJobs[ctx->completedJobsCount++] = job; + if(ctx->base.threadPool) { + ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); + ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); + } +} + +/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: + * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, + * if job wasn't found returns NULL. + * IMPORTANT: assumes ioJobsMutex is locked. */ +static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) { + IOJob_t *job = NULL; + int i; + /* This implementation goes through all completed jobs and looks for the one matching the next offset. + * While not strictly needed for a single threaded reader implementation (as in such a case we could expect + * reads to be completed in order) this implementation was chosen as it better fits other asyncio + * interfaces (such as io_uring) that do not provide promises regarding order of completion. */ + for (i=0; icompletedJobsCount; i++) { + job = (IOJob_t *) ctx->completedJobs[i]; + if (job->offset == ctx->waitingOnOffset) { + ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; + return job; + } + } + return NULL; +} + +/* AIO_ReadPool_numReadsInFlight: + * Returns the number of IO read jobs currrently in flight. */ +static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { + const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1); + return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld); +} + +/* AIO_ReadPool_getNextCompletedJob: + * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. + * Would block. */ +static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { + IOJob_t *job = NULL; + if (ctx->base.threadPool) + ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + + job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); + + /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ + while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) { + assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ + ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); + job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); + } + + if(job) { + assert(job->offset == ctx->waitingOnOffset); + ctx->waitingOnOffset += job->usedBufferSize; + } + + if (ctx->base.threadPool) + ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); + return job; +} + + +/* AIO_ReadPool_executeReadJob: + * Executes a read job synchronously. Can be used as a function for a thread pool. */ +static void AIO_ReadPool_executeReadJob(void* opaque){ + IOJob_t* const job = (IOJob_t*) opaque; + ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; + if(ctx->reachedEof) { + job->usedBufferSize = 0; + AIO_ReadPool_addJobToCompleted(job); + return; + } + job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); + if(job->usedBufferSize < job->bufferSize) { + if(ferror(job->file)) { + EXM_THROW(37, "Read error"); + } else if(feof(job->file)) { + ctx->reachedEof = 1; + } else { + EXM_THROW(37, "Unexpected short read"); + } + } + AIO_ReadPool_addJobToCompleted(job); +} + +static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) { + IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base); + job->offset = ctx->nextReadOffset; + ctx->nextReadOffset += job->bufferSize; + AIO_IOPool_enqueueJob(job); +} + +static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) { + int i; + for (i = 0; i < ctx->base.availableJobsCount; i++) { + AIO_ReadPool_enqueueRead(ctx); + } +} + +/* AIO_ReadPool_setFile: + * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. + * Waits for all current enqueued tasks to complete if a previous file was set. */ +void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) { + assert(ctx!=NULL); + AIO_IOPool_join(&ctx->base); + AIO_ReadPool_releaseAllCompletedJobs(ctx); + if (ctx->currentJobHeld) { + AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); + ctx->currentJobHeld = NULL; + } + AIO_IOPool_setFile(&ctx->base, file); + ctx->nextReadOffset = 0; + ctx->waitingOnOffset = 0; + ctx->srcBuffer = ctx->coalesceBuffer; + ctx->srcBufferLoaded = 0; + ctx->reachedEof = 0; + if(file != NULL) + AIO_ReadPool_startReading(ctx); +} + +/* AIO_ReadPool_create: + * Allocates and sets and a new readPool including its included jobs. + * bufferSize should be set to the maximal buffer we want to read at a time, will also be used + * as our basic read size. */ +ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) { + ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t)); + if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); + AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize); + + ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2); + ctx->srcBuffer = ctx->coalesceBuffer; + ctx->srcBufferLoaded = 0; + ctx->completedJobsCount = 0; + ctx->currentJobHeld = NULL; + + if(ctx->base.threadPool) + if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) + EXM_THROW(103,"Failed creating write jobCompletedCond mutex"); + + return ctx; +} + +/* AIO_ReadPool_free: + * Frees and releases a readPool and its resources. Closes source file. */ +void AIO_ReadPool_free(ReadPoolCtx_t* ctx) { + if(AIO_ReadPool_getFile(ctx)) + AIO_ReadPool_closeFile(ctx); + if(ctx->base.threadPool) + ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); + AIO_IOPool_destroy(&ctx->base); + free(ctx->coalesceBuffer); + free(ctx); +} + +/* AIO_ReadPool_consumeBytes: + * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ +void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) { + assert(n <= ctx->srcBufferLoaded); + ctx->srcBufferLoaded -= n; + ctx->srcBuffer += n; +} + +/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext: + * Release the current held job and get the next one, returns NULL if no next job available. */ +static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) { + if (ctx->currentJobHeld) { + AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); + ctx->currentJobHeld = NULL; + AIO_ReadPool_enqueueRead(ctx); + } + ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx); + return (IOJob_t*) ctx->currentJobHeld; +} + +/* AIO_ReadPool_fillBuffer: + * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller). + * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file. + * Return value is the number of bytes added to the buffer. + * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */ +size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) { + IOJob_t *job; + int useCoalesce = 0; + if(n > ctx->base.jobBufferSize) + n = ctx->base.jobBufferSize; + + /* We are good, don't read anything */ + if (ctx->srcBufferLoaded >= n) + return 0; + + /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job + * and coalesce the remaining bytes with the next job's buffer */ + if (ctx->srcBufferLoaded > 0) { + useCoalesce = 1; + memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded); + ctx->srcBuffer = ctx->coalesceBuffer; + } + + /* Read the next chunk */ + job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx); + if(!job) + return 0; + if(useCoalesce) { + assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize); + memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); + ctx->srcBufferLoaded += job->usedBufferSize; + } + else { + ctx->srcBuffer = (U8 *) job->buffer; + ctx->srcBufferLoaded = job->usedBufferSize; + } + return job->usedBufferSize; +} + +/* AIO_ReadPool_consumeAndRefill: + * Consumes the current buffer and refills it with bufferSize bytes. */ +size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) { + AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); + return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize); +} + +/* AIO_ReadPool_getFile: + * Returns the current file set for the read pool. */ +FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) { + return AIO_IOPool_getFile(&ctx->base); +} + +/* AIO_ReadPool_closeFile: + * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ +int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { + FILE* const file = AIO_ReadPool_getFile(ctx); + AIO_ReadPool_setFile(ctx, NULL); + return fclose(file); +} diff --git a/programs/fileio_asyncio.h b/programs/fileio_asyncio.h index 3e91164c558..bf07f85947b 100644 --- a/programs/fileio_asyncio.h +++ b/programs/fileio_asyncio.h @@ -1,5 +1,5 @@ /* - * Copyright (c) Yann Collet, Facebook, Inc. + * Copyright (c) Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -28,7 +28,7 @@ typedef struct { /* These struct fields should be set only on creation and not changed afterwards */ POOL_ctx* threadPool; int totalIoJobs; - FIO_prefs_t* prefs; + const FIO_prefs_t* prefs; POOL_function poolFunction; /* Controls the file we currently write to, make changes only by using provided utility functions */ @@ -39,8 +39,36 @@ typedef struct { ZSTD_pthread_mutex_t ioJobsMutex; void* availableJobs[MAX_IO_JOBS]; int availableJobsCount; + size_t jobBufferSize; } IOPoolCtx_t; +typedef struct { + IOPoolCtx_t base; + + /* State regarding the currently read file */ + int reachedEof; + U64 nextReadOffset; + U64 waitingOnOffset; + + /* We may hold an IOJob object as needed if we actively expose its buffer. */ + void *currentJobHeld; + + /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in + * the first of them. Shouldn't be accessed from outside ot utility functions. */ + U8 *coalesceBuffer; + + /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might + * change when consuming / refilling buffer. */ + U8 *srcBuffer; + size_t srcBufferLoaded; + + /* We need to know what tasks completed so we can use their buffers when their time comes. + * Should only be accessed after locking base.ioJobsMutex . */ + void* completedJobs[MAX_IO_JOBS]; + int completedJobsCount; + ZSTD_pthread_cond_t jobCompletedCond; +} ReadPoolCtx_t; + typedef struct { IOPoolCtx_t base; unsigned storedSkips; @@ -59,15 +87,10 @@ typedef struct { U64 offset; } IOJob_t; -/** AIO_fwriteSparse() : -* @return : storedSkips, -* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ -unsigned AIO_fwriteSparse(FILE* file, - const void* buffer, size_t bufferSize, - const FIO_prefs_t* const prefs, - unsigned storedSkips); +/* AIO_supported: + * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ +int AIO_supported(void); -void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips); /* AIO_WritePool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ @@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); /* AIO_WritePool_getFile: * Returns the file the writePool is currently set to write to. */ -FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx); +FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); /* AIO_WritePool_closeFile: * Ends sparse write and closes the writePool's current file and sets the file to NULL. @@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); /* AIO_WritePool_create: * Allocates and sets and a new write pool including its included jobs. * bufferSize should be set to the maximal buffer we want to write to at a time. */ -WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize); +WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); /* AIO_WritePool_free: * Frees and releases a writePool and its resources. Closes destination file. */ void AIO_WritePool_free(WritePoolCtx_t* ctx); +/* AIO_ReadPool_create: + * Allocates and sets and a new readPool including its included jobs. + * bufferSize should be set to the maximal buffer we want to read at a time, will also be used + * as our basic read size. */ +ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); + +/* AIO_ReadPool_free: + * Frees and releases a readPool and its resources. Closes source file. */ +void AIO_ReadPool_free(ReadPoolCtx_t* ctx); + +/* AIO_ReadPool_consumeBytes: + * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ +void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); + +/* AIO_ReadPool_fillBuffer: + * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize). + * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. + * Return value is the number of bytes added to the buffer. + * Note that srcBuffer might have up to 2 times bufferSize bytes. */ +size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); + +/* AIO_ReadPool_consumeAndRefill: + * Consumes the current buffer and refills it with bufferSize bytes. */ +size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); + +/* AIO_ReadPool_setFile: + * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. + * Waits for all current enqueued tasks to complete if a previous file was set. */ +void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); + +/* AIO_ReadPool_getFile: + * Returns the current file set for the read pool. */ +FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); + +/* AIO_ReadPool_closeFile: + * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ +int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); + #if defined (__cplusplus) } #endif diff --git a/programs/fileio_common.h b/programs/fileio_common.h index d33c19d7bd1..282c2f13b4d 100644 --- a/programs/fileio_common.h +++ b/programs/fileio_common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) Yann Collet, Facebook, Inc. + * Copyright (c) Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the diff --git a/programs/fileio_types.h b/programs/fileio_types.h index 1909ab1ab5a..cf566aa2063 100644 --- a/programs/fileio_types.h +++ b/programs/fileio_types.h @@ -1,5 +1,5 @@ /* - * Copyright (c) Yann Collet, Facebook, Inc. + * Copyright (c) Facebook, Inc. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the @@ -70,4 +70,4 @@ typedef struct FIO_prefs_s { int allowBlockDevices; } FIO_prefs_t; -#endif /* FILEIO_TYPES_HEADER */ \ No newline at end of file +#endif /* FILEIO_TYPES_HEADER */ diff --git a/programs/zstdcli.c b/programs/zstdcli.c index fd563e1c24d..4355e047281 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -46,6 +46,7 @@ # include "zstdcli_trace.h" #endif #include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */ +#include "fileio_asyncio.h" /*-************************************ @@ -179,7 +180,8 @@ static void usage_advanced(const char* programName) #ifdef UTIL_HAS_MIRRORFILELIST DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n"); #endif - + if (AIO_supported()) + DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n"); #ifndef ZSTD_NOCOMPRESS DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)"); @@ -242,9 +244,6 @@ static void usage_advanced(const char* programName) DISPLAYOUT( " -l : print information about zstd compressed files \n"); DISPLAYOUT( "--test : test compressed file integrity \n"); DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); -#ifdef ZSTD_MULTITHREAD - DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n"); -#endif # if ZSTD_SPARSE_DEFAULT DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n"); # else @@ -1445,6 +1444,7 @@ int main(int argCount, const char* argv[]) FIO_setTargetCBlockSize(prefs, targetCBlockSize); FIO_setSrcSizeHint(prefs, srcSizeHint); FIO_setLiteralCompressionMode(prefs, literalCompressionMode); + FIO_setSparseWrite(prefs, 0); if (adaptMin > cLevel) cLevel = adaptMin; if (adaptMax < cLevel) cLevel = adaptMax; diff --git a/tests/playTests.sh b/tests/playTests.sh index f19291a6063..64164d5bb6a 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -259,10 +259,13 @@ zstd -dc - < tmp.zst > $INTOVOID zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used zstd -d - < tmp.zst > $INTOVOID println "test : impose memory limitation (must fail)" -zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed" -zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command -zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command -zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command +datagen -g500K > tmplimit +zstd -f tmplimit +zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed" +zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command +zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command +zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command +rm -f tmplimit tmplimit.zst println "test : overwrite protection" zstd -q tmp && die "overwrite check failed!" println "test : force overwrite" @@ -1590,11 +1593,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then exit 1 fi -println "\n===> zstd asyncio decompression tests " +println "\n===> zstd asyncio tests " addFrame() { datagen -g2M -s$2 >> tmp_uncompressed - datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst + datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst } addTwoFrames() {