Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q1 #7353

Closed
lmatz opened this issue Jan 12, 2023 · 10 comments
Closed

perf: nexmark q1 #7353

lmatz opened this issue Jan 12, 2023 · 10 comments
Assignees
Milestone

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 12, 2023

query:

CREATE MATERIALIZED VIEW nexmark_q1
AS
SELECT
    auction,
    bidder,
    0.908 * price as price,
    date_time
FROM bid;

plan:

 StreamMaterialize { columns: [auction, bidder, price, date_time, _row_id(hidden)], pk_columns: [_row_id] }
 └─StreamExchange { dist: HashShard(_row_id) }
   └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 1:Int32), (0.908:Decimal * Field(bid, 2:Int32)), Field(bid, 5:Int32), _row_id] }
     └─StreamFilter { predicate: (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

This one is stateless. Probably not a good one,
because there are literally zero computations and I/O to do......

@fuyufjh
Copy link
Member

fuyufjh commented Jan 12, 2023

I deem Q1 will be improved a lot after we refactor the NexMark, which will be done as a by-product of risingwavelabs/rfcs#31

@lmatz
Copy link
Contributor Author

lmatz commented Jan 12, 2023

Oh, the evaluation is done by consuming from Kafka and a external data generator generating a lot of data in advance.

@fuyufjh
Copy link
Member

fuyufjh commented Jan 12, 2023

Another related optimization is to remove StreamExchange { dist: HashShard(_row_id) }. This is because our row_ids are generated randomly but StreamMaterialize requires it must be distributed by HashShard(row_id). We are considering introducing a new special RowID data type and override its Hash function, so that the row_id can be generated exactly to match HashShard(row_id) and avoid the Exchange operator. cc. @st1page @TennyZhuang

@lmatz
Copy link
Contributor Author

lmatz commented Jan 12, 2023

@huangjw806 will use black hole sink in the future evaluation.

Therefore, without mv, we will remove the exchange and see the new number.

@BugenZhao
Copy link
Member

@huangjw806 will use black hole sink in the future evaluation.

Therefore, without mv, we will remove the exchange and see the new number.

The sink is rewritten from the materialize node in the optimizer, so I'm afraid the hash distribution is also followed. 🤔 cc @yuhao-su

@yuhao-su
Copy link
Contributor

Therefore, without mv, we will remove the exchange and see the new number.

Should sink be parallelized in any case?

The sink is rewritten from the materialize node in the optimizer

In planner actually.

@lmatz
Copy link
Contributor Author

lmatz commented Jan 13, 2023

Should sink be parallelized in any case?

If a parallelism 1 sink cannot hit the max throughput while the downstream system is far from being saturated, then increasing parallelism makes sense I suppose

@lmatz
Copy link
Contributor Author

lmatz commented Mar 15, 2023

Query:

CREATE sink nexmark_q1
AS
SELECT
    auction,
    bidder,
    0.908 * price as price,
    date_time
FROM bid with ( connector = 'blackhole', format = 'append_only' );

Plan:

 StreamSink { type: append-only, columns: [auction, bidder, price, date_time] }
 └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4] }
   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, (0.908:Decimal * Field(bid, 2:Int32)) as $expr3, Field(bid, 5:Int32) as $expr4, _row_id] }
     └─StreamFilter { predicate: (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(6 rows)

two consecutive StreamProjects 🤔

@shanicky
Copy link
Contributor

after #8532

MATERIALIZED VIEW

dev=> explain CREATE MATERIALIZED VIEW nexmark_q1
AS
SELECT
    auction,
    bidder,
    0.908 * price as price,
    date_time
FROM bid;
                                                             QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [auction, bidder, price, date_time, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check" }
 └─StreamProject { exprs: [auction, bidder, (0.908:Decimal * price) as $expr1, date_time, _row_id] }
   └─StreamRowIdGen { row_id_index: 7 }
     └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
(4 rows)

@shanicky
Copy link
Contributor

image

Based on a simple benchmark test, if my testing method is correct, removing Exchange has increased the throughput of Nexmark Q1 by about 20%. However, our RowId generation mechanism only uses the first allocation Vnode, so modifications are needed to achieve more accurate results.

@fuyufjh fuyufjh closed this as completed Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants