Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify the compression API in Velox #7471

Open
marin-ma opened this issue Nov 8, 2023 · 39 comments
Open

Unify the compression API in Velox #7471

marin-ma opened this issue Nov 8, 2023 · 39 comments
Labels
enhancement New feature or request

Comments

@marin-ma
Copy link
Contributor

marin-ma commented Nov 8, 2023

Description

Context

Within the current Velox implementation, there are three distinct modules utilizing different compression codecs and methods:

Parquet datasource - Uses Arrow codec

  • Supported codecs: lz4, zstd, gzip/zlib, snappy (lzo, brotli, bz2 are supported in Arrow but not ported to Velox)
  • Header file: velox/dwio/parquet/writer/arrow/util/Compression.h

DWRF datasource - Uses a self-defined streaming compressor/decompressor

  • Supported codecs:
    • Compressor: zstd, zlib
    • Decompressor: lz4, lzo, zstd, zlib, snappy
  • Header file: velox/dwio/common/compression/Compression.h

Spill - Uses Folly Codec

  • Supported codecs: lz4, zstd, gzip, zlib, snappy
  • Header file: velox/common/compression/Compression.h

Proposal

It would be better to unify the compression strategy across these modules. Given that the Arrow compression API has already been integrated into Velox along with the parquet writer, it would be more efficient to embrace the Arrow compression API instead of creating a new Codec API from scratch:

  1. Broad codec support: Most commonly used codes have been implemented, as listed above. It also covers all codecs usedd by the other two modules.
  2. Well-defined interface: Arrow compression API offers a Codec interface for batch data compression and a De/Compressor interface for streaming data compression. It also supports configuring the parameters for different compressing algorithms with CodecOptions.
  3. Easily extendable: Apart from using arrow::Status/Result for return codes, the API is general for use and easy to extend. We have already implemented compressing codecs with QAT/IAA accelerators and successfully used in our project Gluten. With the unified Arrow compression API, we can directly contribute these enhancements to Velox.
@marin-ma marin-ma added the enhancement New feature or request label Nov 8, 2023
@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 8, 2023

cc: @FelixYBW

@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 8, 2023

@pedroerp Any plan to unified the codec in Velox? It doesn't make sense we have 3 codec definition in current Velox. In Gluten shuffle we currently use arrow's. In Spill we use Folly.

It's the first step to add de-/compression accelerators in Velox

@mbasmanova @oerling

@mbasmanova
Copy link
Contributor

CC: @majetideepak

@mbasmanova
Copy link
Contributor

CC: @xiaoxmeng

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@yaqi-zhao Based on discussion in #7445 , as Arrow codec currently doesn't support async compression, I would suggest we unify the compression API first, and extend the API to add interfaces for async.

@yaqi-zhao
Copy link
Contributor

@yaqi-zhao Based on discussion in #7445 , as Arrow codec currently doesn't support async compression, I would suggest we unify the compression API first, and extend the API to add interfaces for async.

I have tried to add interface for async decompression (class AsyncDecompressor ) in PR(#6176) . What I worried now is that how to design an interface for different hardware since the storage of the async status may be varied for different hardware. Another concern is that the upper-level caller may need massive change to apply the async interface.

@george-gu-2021
Copy link

Could you initiate a design proposal to facilitate the review? Thanks! @yaqi-zhao , @marin-ma

@yaqi-zhao
Copy link
Contributor

yaqi-zhao commented Nov 9, 2023

Could you initiate a design proposal to facilitate the review? Thanks! @yaqi-zhao , @marin-ma

@george-gu-2021 . I only have an intial idea about how to design Interface for IAA and you can refer to "class AsyncDecompressor in PR(#6176)".

class AsyncDecompressor {
 public:
  explicit AsyncDecompressor(){};
  virtual ~AsyncDecompressor() = default;

  virtual int decompress(
      const char* src,
      uint64_t srcLength,
      char* dest,
      uint64_t destLength) = 0;
  virtual bool waitResult(int job_id) = 0;
  virtual void releaseJob(int job_id) = 0;
};

With the concern I proposed above, I don't konw how to design a universal API for all hardware. Maybe @marin-ma @FelixYBW already have some insights already.

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@yaqi-zhao It took me sometime to review #6176. Here are some initial thoughts:

  1. The QplJobPool should be moved into the common module, which can be addressed after moving the unified compression API into the common module.

  2. A module that supports using Async compression, should not be bound to a specific codec. e.g Parquet PageReader should not contains anything directly related to QPL.

  3. If the codec like IAACodec uses any external resources, it should be responsible for holding the resource and managing its lifetime, then the codec is considered "stateful", which is similar to a streaming compressor. Hence, we can move the states related to QPL from PageReader into the IAACodec, and avoid creating the codec each time we call its API from different function: https://github.com/facebookincubator/velox/pull/6176/files#diff-58ef2bd79ff4173b5a384a3016d6f7bd7d4b237876251cc9d9910e162993eaaaR1147-R1151

  4. for different hardware since the storage of the async status may be varied for different hardware.

    Could you elaborate what kind of async status that should be handled? I would think the status is either successful, or failed. If it fails, the caller should be responsible for handling the status, deciding whether to fallback or throws exception (or returns error code). But anyhow the status should be general, not qplxxx

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@george-gu-2021 @yaqi-zhao The detailed implementation of the Async compression API is somehow beyond the scope of this discussion. I would suggest we unify the compression module in the first step, and then we can extend different compression codecs in the common module, therefore make the IAA codec available for shuffle, spill, etc.

Besides, we can not only have the async IAA codec but also create a synchronized one.

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@majetideepak @xiaoxmeng

Do you have any insights on the proposal of replacing dwrf/folly compression with arrow? Arrow compression API is already mature and can satisfy the requirements of other modules, so I'm planning to work on a PR to replace the codecs under velox/common/compression with Arrow codec implementation, but there are two paths we can choose:

  1. Remove the original implementations under velox/common/compression, and simply replace with
    velox/dwio/parquet/writer/arrow/util/. Since the Codec returns arrow return code, the caller should handle the return code, such as throwing exception for arrow::Status::Invalid.
  2. Still remove the original implementations under velox/common/compression. Use the design of Arrow compression API, but remove the arrow return code and throws exception from the codec on error. The compression API will no longer depend on Arrow.

I would appreciate your suggestions and comments. Thanks!

@mbasmanova
Copy link
Contributor

I would suggest we unify the compression module in the first step

That would be nice. I didn't realize we have 3 implementations.

@mbasmanova
Copy link
Contributor

@marin-ma I'm a bit concerned about using Arrow's implementation directly. At a minimum we'd need to re-write it to match coding style and design patterns of Velox and replace status with Velox-specific. Can we maybe start by replacing dwrf/folly compression with velox/common/compression. Are there any differences in these 2 implementations?

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@mbasmanova

Can we maybe start by replacing dwrf/folly compression with velox/common/compression.

Thank you for your advice!

Are there any differences in these 2 implementations?

Yes. The supported codecs are different, which have been listed in the issue description. And dwrf supports configuring a few codec-specific parameters, while folly doesn't.

@mbasmanova
Copy link
Contributor

@marin-ma

Yes. The supported codecs are different, which have been listed in the issue description. And dwrf supports configuring a few codec-specific parameters, while folly doesn't.

Got it. What are the "few codec-specific parameters"? If we need these, then maybe you can start looking at replacing these 2 implementations with a single unified one.

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 9, 2023

@mbasmanova

Got it. What are the "few codec-specific parameters"?

CompressionOptions defined here

struct CompressionOptions {
/// Format specific compression/decompression options
union Format {
struct {
/// Window bits determines the history buffer size and whether
/// header/trailer is added to the compression block.
int windowBits;
/// Compression level determines the compression ratio. Zlib supports
/// values ranging from 0 (no compression) to 9 (max compression)
int32_t compressionLevel;
} zlib;
struct {
int32_t compressionLevel;
} zstd;
struct {
bool isHadoopFrameFormat;
} lz4_lzo;
} format;
uint32_t compressionThreshold;
};

@pedroerp
Copy link
Contributor

pedroerp commented Nov 9, 2023

@marin-ma thanks for putting together the proposal. A suggested sequence of steps could be:

  1. Define a unified synchronous compression API for Velox.
  2. Ensure it supports all needed codecs. Ideally they (or some of them) should be optional, depending on the library availability (like in Arrow)
  3. Make sure all synchronous call sites in Velox use it.
  4. Define an async compression API. (@yaqi-zhao in Velox we usually use folly::Futures for async programming. This API could be similar to the synchronous one, but returning a Future instead).
  5. Integrate async accelerators to this API - like QBL.
  6. Ensure call sites can properly leverage async compression (where needed).

@yaqi-zhao
Copy link
Contributor

yaqi-zhao commented Nov 13, 2023

@pedroerp How do you think we define a async (de)compressor like this?

class AsyncDecompressor {
 public:
  explicit AsyncDecompressor(){};

  virtual ~AsyncDecompressor() = default;

  virtual Futures<int> decompress(
      const char* src,
      uint64_t srcLength,
      char* dest,
      uint64_t destLength) = 0;
};

@marin-ma
Copy link
Contributor Author

marin-ma commented Nov 13, 2023

@pedroerp How do you think we define a async (de)compressor like this?

class AsyncDecompressor {
 public:
  explicit AsyncDecompressor(){};

  virtual ~AsyncDecompressor() = default;

  virtual Futures<int> decompress(
      const char* src,
      uint64_t srcLength,
      char* dest,
      uint64_t destLength) = 0;
};

I would think returning SemiFuture instead of Future should be better. Then the caller can decide whether use .via() to execute in another Executor, or use .get() to execute inline. The API should be like:

  folly::SemiFuture<uint64_t> decompressAsync(
      const char* src,
      uint64_t srcLength,
      char* dest,
      uint64_t destLength);

And the implementation for IAACompressor:

folly::SemiFuture<uint64_t> GzipIAADecompressor::decompressAsync(
    const char* src,
    uint64_t srcLength,
    char* dest,
    uint64_t destLength) {
  // Acquire HW resources.
  ...
  // Sumit job.
  status = qpl_submit_job(job);
  // If it fails, return SemiFuture ready with exception.
  // If it succeeds, add `waitResult` callback.
  if (status != QPL_STS_OK) {
    qpl_job_pool.releaseJob(deflate_job.first);
    return folly::makeSemiFutureWith([]() -> uint64_t {
      throw std::runtime_error("Cannot submit job, error status" + status);
    });
  }
  return folly::makeSemiFuture().deferValue(
      [this, job](auto&&) -> uint64_t { waitResult(job); })
}

For caller, in the stage of pre-decompressing the page:

...
  auto future = decompressor->decompressAsync(
      (const char*)pageData,
      compressedSize,
      (char*)uncompressedData->asMutable<char>(),
      uncompressedSize);
  if (future.isReady()) {
    auto result = std::move(future).getTry();
    if (result.hasException()) {
      return false;
    }
  }
  // Store the SemiFuture.
  decompressionFuture_ = std::move(future);
...

For caller, getting decompressed result:

// If decompressionFuture_ has value.
auto decompressedSize = std::move(decompressionFuture_).get(); // Execute `waitResult` callback inline.

@marin-ma
Copy link
Contributor Author

I plan to submit a series of PRs for the implementation of compression codecs API. The submission sequence may change based on practical considerations. The proposed order is:

  1. Add Compression Codec API. Add support for LZ4_FRAME, LZ4_RAW, and LZ4_HADOOP codecs. Include unit tests.
  2. Add support for GZIP/ZLIB Codec
  3. Add support for ZSTD codec.
  4. Add support for SNAPPY Codec.
  5. Integrate LZO Codec, reuse the existing implementation, primarily for use in the parquet reader. Currently, only decompression is available for LZO.
  6. Replace the decompression methods in dwio and parquet reader with the new API. Move the API to velox/common/compression and remove velox/dwio/common/compression.
  7. Replace the compression in the arrow parquet writer with the new API. Remove dependencies related to arrow compression.
  8. Add Asynchronous Compression API.

@majetideepak
Copy link
Collaborator

Late to the discussion, but +1.
We could consider keeping Arrow::status as is and let the client handle it. There is some discussion on adding Status to Velox here #6323
It is easier to start the migration starting with leaf components such as compression.

CC: @karteekmurthys

@marin-ma
Copy link
Contributor Author

I plan to submit a series of PRs for the implementation of compression codecs API. The submission sequence may change based on practical considerations. The proposed order is:

  1. Add Compression Codec API. Add support for LZ4_FRAME, LZ4_RAW, and LZ4_HADOOP codecs. Include unit tests.
  2. Add support for GZIP/ZLIB Codec
  3. Add support for ZSTD codec.
  4. Add support for SNAPPY Codec.
  5. Integrate LZO Codec, reuse the existing implementation, primarily for use in the parquet reader. Currently, only decompression is available for LZO.
  6. Replace the decompression methods in dwio and parquet reader with the new API. Move the API to velox/common/compression and remove velox/dwio/common/compression.
  7. Replace the compression in the arrow parquet writer with the new API. Remove dependencies related to arrow compression.
  8. Add Asynchronous Compression API.

I've created some PRs to add the unified API, which are now ready for review. The initial patch #7589 has been created for the first goal.
Since goal 2-5 depends on 1, I've submitted a draft PR #7603 and have implemented the remaining compression codecs. If necessary, I can split this into smaller PRs for easier review. I would greatly appreciate your review and feedback on these PRs when you have a moment. Thank you for your time.
@pedroerp @mbasmanova @majetideepak

@yingsu00
Copy link
Collaborator

yingsu00 commented Dec 1, 2023

@marin-ma @FelixYBW Hi, sorry I didn't see this in time. Just some background: The Arrow codec was only used in Parquet writer (in the datasink, not datasource), which we recently moved into Velox (Velox used to call into Arrow Parquet writer). Parquet reader (in the data source) was an in house one that only uses Velox codec, and with #5914 and #6105, it is now using the common dwio::common::PagedInputStream which in turn uses "velox/dwio/common/compression/Compression.h" . For the writer, the plan was to replace Arrow codecs with Velox ones if Velox already supports them. This was described in #6901. And yes, adding the missing Async codec interface/implementation in Velox would be great!

@FelixYBW
Copy link
Contributor

FelixYBW commented Dec 1, 2023

Thank you, @yingsu00 . We should use the same one for spill and shuffle as well. Can you review #7589?

@pedroerp
Copy link
Contributor

pedroerp commented Dec 22, 2023

We could consider keeping Arrow::status as is and let the client handle it. There is some discussion on adding Status to Velox here #6323

I just merged today a first cut of a Status class for Velox which we could use if we don't want to rely on exceptions here.

#8084

@FelixYBW
Copy link
Contributor

Thank you. @marin-ma will update the PR.

@yaqi-zhao
Copy link
Contributor

@FelixYBW @marin-ma Is there any progress in this issue? I see the PR has no update for a long time.

@marin-ma
Copy link
Contributor Author

@FelixYBW @marin-ma Is there any progress in this issue? I see the PR has no update for a long time.

This PR is pending on the second PR to add the Result return class described in #8084

@yaqi-zhao
Copy link
Contributor

y a first cut of a Status class for Velox which we c

@marin-ma It seems the PR #8084 has been merged one month ago. Is there any progress on the PR #7589?

@FelixYBW
Copy link
Contributor

@marin-ma It seems the PR #8084 has been merged one month ago. Is there any progress on the PR #7589?

there are 3 PRs planed as show in comments of 8084, copied below. 8084 is the first one. we are waiting for the second one.

This first one introduces a Status object that carries the (success or error) status of an operation. Based on arrow::Status.
The second will add a Result object able to represent a value returned from an operation along with a Status code. This is also > based on arrow::Result
The third one will refactor our Parquet Write to use these object and remove more dependencies from Arrow.

@yaqi-zhao
Copy link
Contributor

@FelixYBW @marin-ma
I saw the PR(#7589) has been marked as stale. Will the PR still be reviewed and merged in the future?

@marin-ma
Copy link
Contributor Author

@yaqi-zhao According to #8084 (comment) the Result implementation is in design. We're waiting for its landing first.

@yaqi-zhao
Copy link
Contributor

@yaqi-zhao According to #8084 (comment) the Result implementation is in design. We're waiting for its landing first.

@marin-ma Got it, thanks!

@pedroerp
Copy link
Contributor

@yaqi-zhao is that blocking this work? That fell off of my radar, but I can take a stab at it soon if this is needed here.

@yaqi-zhao
Copy link
Contributor

yaqi-zhao commented Mar 18, 2024

take a s

@pedroerp Yes, #6176 depends on this issue. Here is the pervious comment that ask #6176 waiting for #7471 to be merged first.

@pedroerp
Copy link
Contributor

I see. I think my question in whether this Issue depends on the "Result" implementation design.

@yaqi-zhao
Copy link
Contributor

I see. I think my question in whether this Issue depends on the "Result" implementation design.

I'm not sure. According to @marin-ma's previous comments, this issue is pending on the "Result" implementation design.

@marin-ma
Copy link
Contributor Author

@pedroerp Should I go ahead with #7589 and use the Status return code instead of throwing exceptions? If it's ok for you, we may consider changing the API to use the Result type in the future once the Result implementation is ready.

@pedroerp
Copy link
Contributor

@marin-ma yes, we should use Status and Expected to avoid throwing exception in hot paths. @mbasmanova has recently added a few examples of how that can be used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants