Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove the unnecessary exchange before blackhole sink #7377

Closed
Tracked by #7289
lmatz opened this issue Jan 13, 2023 · 7 comments
Closed
Tracked by #7289

remove the unnecessary exchange before blackhole sink #7377

lmatz opened this issue Jan 13, 2023 · 7 comments
Assignees
Milestone

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 13, 2023

nexmark q1 #7353

If using sink instead of materialized view,
the plan is:

 StreamSink
 └─StreamExchange { dist: HashShard(_row_id) }
   └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 1:Int32), (0.908:Decimal * Field(bid, 2:Int32)), Field(bid, 5:Int32), _row_id] }
     └─StreamFilter { predicate: (event_type = 2:Int32) }
       └─StreamRowIdGen { row_id_index: 4 }
         └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

while Flink:

Sink(table=[default_catalog.default_database.discard_sink], fields=[auction, bidder, price, dateTime, extra])
+- Calc(select=[bid.auction AS auction, bid.bidder AS bidder, CAST((0.908 * bid.price) AS DECIMAL(23, 3)) AS price, CAST(dateTime AS TIMESTAMP(3)) AS dateTime, bid.extra AS extra], where=[(event_type = 2)])
   +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
      +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
         +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
@BugenZhao
Copy link
Member

Good catch! Furthermore, we can even remove the row_id generation as we don't need a key for this whole job. I believe the fundamental problem to resolve is to allow non-Hash-shard distributed fragments in both the graph builder and the scheduler on the meta service, instead of applying hacks on Source as it's now. 🤔

@neverchanje
Copy link
Contributor

neverchanje commented Jan 20, 2023

I guess the problem is because the sink's parallelism is different from the source's. As a result, the scheduler has to inject a shuffle to redistribute the data.

@fuyufjh
Copy link
Member

fuyufjh commented Feb 6, 2023

After #7621 this can be solved because the distribution can be HASH(row_id) (at least for Q1)

@lmatz
Copy link
Contributor Author

lmatz commented Mar 15, 2023

It seems done(?) and a new problem appears: #7353 (comment)

@shanicky
Copy link
Contributor

shanicky commented Mar 20, 2023

after #8532

dev=> explain CREATE sink nexmark_q1
AS
SELECT
    auction,
    bidder,
    0.908 * price as price,
    date_time
FROM bid with ( connector = 'blackhole', format = 'append_only' );
                                                             QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
 StreamSink { type: upsert, columns: [auction, bidder, price, date_time], pk: [] }
 └─StreamProject { exprs: [auction, bidder, $expr1, date_time] }
   └─StreamProject { exprs: [auction, bidder, (0.908:Decimal * price) as $expr1, date_time, _row_id] }
     └─StreamRowIdGen { row_id_index: 7 }
       └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date

@BugenZhao
Copy link
Member

It's worth noting that #8099 has accidentally removed the Exchange for append-only sources (and achieved the goal?), since the hidden _row_id primary key column is pruned before enforcing the distribution of Sink. 😄

@lmatz
Copy link
Contributor Author

lmatz commented Mar 20, 2023

Then let's close this, and work #8577 out later

@lmatz lmatz closed this as completed Mar 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants