Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): unify pipeline for all inputs with format. #7613

Merged
merged 22 commits into from
Sep 20, 2022

Conversation

youngsofun
Copy link
Member

@youngsofun youngsofun commented Sep 14, 2022

I hereby agree to the terms of the CLA available at: https://databend.rs/dev/policies/cla/

Summary

part of #7732

unify pipeline for all input(copy into, streaming load, clickhouse insert with format)

the insight is that sync Deserializer can not do async read, so it is better to feed it with aligned RowBatch (including those in Column format like RowGroup in parquet).
and these RowBatches is independent.

prepare for a distributed copy: split files to splits early

  • big TSV/NDJSON without compression and parquert/orc/arrow can be read in Parallel

for row-based format:

  • Reader(not Processor): each split has its own reader task, read small ReadBatch
  • Aligner(Processor): aligner small ReadBatch to small RowBatch
  • Deserializer(Processor): merge small RowBatch to large DataBlock (avoid Compact)

use mpmc channel between them for task sharing/balance.

parquet files in streaming load fit into this pattern too.

we end up with 2 kinds of pipelines:
we use 1 stage pipeline (Deserializer only ) for Parquet/ORC/ARROW in copy into,
and 2 stage pipeline (Aligner and Deserializer ) for other cases (all formats in streaming load).

other optimize:

  • for text format: store row ends in RowBatch, making the deserialization of each row easier and good for separating errors
    • for \r\n, look for \n only
  • reader, Aligner and Deserializer can be pipelined for the same split

this pr(migrate the existing capabilities):

  • new pipeline framework
  • TSV basic
    • aligner
    • decompression
    • header
    • diagnostic
    • Deserializer
  • parquet
  • ndjson
  • csv (new impl)
  • replace
    • copy (async init InputContext in read_partitions)

Fixes #issue

part of #7732

@vercel
Copy link

vercel bot commented Sep 14, 2022

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Updated
databend ⬜️ Ignored (Inspect) Sep 20, 2022 at 10:28PM (UTC)

@mergify mergify bot added the pr-feature this PR introduces a new feature to the codebase label Sep 14, 2022
@youngsofun youngsofun marked this pull request as draft September 14, 2022 16:05
@BohuTANG BohuTANG mentioned this pull request Sep 15, 2022
43 tasks
@youngsofun youngsofun force-pushed the fmt branch 6 times, most recently from 4c75403 to 998ecf0 Compare September 16, 2022 03:09
@Xuanwo

This comment was marked as resolved.

@Xuanwo

This comment was marked as resolved.

@youngsofun

This comment was marked as resolved.

@youngsofun

This comment was marked as resolved.

@youngsofun
Copy link
Member Author

This PR could be very large. Do we need to split them into multiple?

@Xuanwo

I regret...
let`s delay streamingload/clickhouse to next pr.
all formats are almost done for copy, but still some bugs to fix:

The failure tests:
/runner/_work/databend/databend/tests/suites/1_stateful/04_mini_dataset/04_0000_mini_ontime.sh
/runner/_work/databend/databend/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh

@youngsofun youngsofun mentioned this pull request Sep 19, 2022
58 tasks
@youngsofun
Copy link
Member Author

youngsofun commented Sep 19, 2022

explain for commit 6749386

according to https://clickhouse.com/docs/en/interfaces/formats/#tabseparated-data-formatting

TSV parser should unescape \'; after unescape, 'k sort first in SQL26


the TSV format and hits dataset are fixed.

I will fix csv tommorow.

@youngsofun youngsofun marked this pull request as ready for review September 20, 2022 02:05
@youngsofun
Copy link
Member Author

@BohuTANG
Copy link
Member

@youngsofun
Copy link
Member Author

youngsofun commented Sep 20, 2022

the only remaining error is 04_0001_mini_hits

 ====== SQL6 ======
-3644
+3655

can not reproduce if I copy from s3(minio)


python -m  http.server 8888

COPY INTO hits FROM 'http://localhost:8888/hits_100k.tsv'  ...

do you know why connection refused?

very slow to read https://repo.databend.rs/dataset/stateful/hits_100k.tsv

takes over 40s to read a batch of 1MB on my mac (the file is about 80MB),
maybe due to slow network.

read is done here, is there anything need to improve?
https://github.com/datafuselabs/databend/pull/7613/files#diff-ba9052c805116a0caed03edd0785ffac124fffcde449b3f45f03aa1d130c171dR232

@Xuanwo


solved by change http port to 80

@youngsofun
Copy link
Member Author

youngsofun commented Sep 20, 2022

since tool-chain is updated #7741, seems it may lead to new clippy error.
so I rebased and checked clippy locally and added a simple fix commit.
4effb76

@mergify mergify bot merged commit de04ec6 into databendlabs:main Sep 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants