diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index 7d4867a817be..f5e4dd7b1cd0 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -296,6 +296,27 @@ appendChunks(FixedParams * args, Chunk::UP chunk) } } +/* + * Wrapper for future unique pointer to chunk that drains the + * pending task if the future is still valid at destruction time. + */ +class FutureChunk { + std::future> _future; +public: + FutureChunk(std::future> future) + : _future(std::move(future)) + { + } + FutureChunk(const FutureChunk&) = delete; + FutureChunk(FutureChunk&&) = default; + ~FutureChunk() { + if (_future.valid()) { + _future.wait(); + } + } + std::unique_ptr get() { return _future.get(); } +}; + } void @@ -308,15 +329,19 @@ FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteDat assert(numChunks <= getNumChunks()); FixedParams fixedParams = {db, dest, lidReadGuard, getFileId().getId(), visitorProgress}; size_t limit = std::thread::hardware_concurrency(); - vespalib::ArrayQueue> queue; + vespalib::ArrayQueue queue; for (size_t chunkId(0); chunkId < numChunks; chunkId++) { std::promise promisedChunk; - std::future futureChunk = promisedChunk.get_future(); + FutureChunk futureChunk(promisedChunk.get_future()); auto task = vespalib::makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable { - const ChunkInfo & cInfo(_chunkInfo[chunkId]); - vespalib::DataBuffer whole(0ul, ALIGNMENT); - FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize())); - promise.set_value(std::make_unique(chunkId, whole.getData(), whole.getDataLen())); + try { + const ChunkInfo & cInfo(_chunkInfo[chunkId]); + vespalib::DataBuffer whole(0ul, ALIGNMENT); + FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize())); + promise.set_value(std::make_unique(chunkId, whole.getData(), whole.getDataLen())); + } catch (...) { + promise.set_exception(std::current_exception()); + } }); executor.execute(CpuUsage::wrap(std::move(task), cpu_category));