Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: barrier_inflight_latency is too high and keeps growing #6571

Closed
Tracked by #6640
fuyufjh opened this issue Nov 24, 2022 · 16 comments · Fixed by #6943
Closed
Tracked by #6640

bug: barrier_inflight_latency is too high and keeps growing #6571

fuyufjh opened this issue Nov 24, 2022 · 16 comments · Fixed by #6943
Assignees
Labels
priority/critical type/bug Something isn't working

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Nov 24, 2022

I’m running NexMark with a self-made query, which consists of one Join and one Agg.
What could be the reason that barrier latency keeps growing? It has become > 8min

image
image

The query is

create materialized view mv_q6s as
  SELECT
    MAX(B.price) AS final,
    A.id,
    A.seller
  FROM
    auction AS A,
    bid AS B
  WHERE
    A.id = B.auction
    and B.date_time between A.date_time and A.expires
  GROUP BY
    A.id,
    A.seller

Is it stuck? -- No

/risedev ctl meta pause

After pause, the barrier number starts to drop. So, it’s not stuck. The pause succeed after 324.06s 😂 and the in-flight barriers were cleared

[cargo-make] INFO - Build Done in 324.06 seconds.

image

So, what could be the reason?

@fuyufjh fuyufjh transferred this issue from risingwavelabs/rfcs Nov 24, 2022
@github-actions github-actions bot added this to the release-0.1.15 milestone Nov 24, 2022
@BugenZhao
Copy link
Member

BugenZhao commented Nov 24, 2022

After pause, the barrier number starts to drop.

Pause won't cut in line of all 40 pending barriers, but will pause the concurrent checkpoint and prevent future barriers being scheduled. So the barrier number starts to drop, then it's indeed not stuck. Maybe it's just caused by ingesting source data too slow.

Why avg is larger than max? 👀

@fuyufjh
Copy link
Member Author

fuyufjh commented Nov 25, 2022

Why avg is larger than max? 👀

avg is accurate.

In short, avg is calculated by sum/count, which is always accurate; while the percentiles (p50, p90, p99, pmax) are calculated from a histogram, where the x-axis (i.e. buckets) are ranges of latency in exponential distribution. The problem is, if the number of buckets is not sufficient and a data point is too large or too small, it will fall into the edge buckets e.g. [-inf, 0.01ms] or [3.69min, +inf] (numbers are randomly made), which will lead to giant errors of results.

Reference: https://prometheus.io/docs/practices/histograms/

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 28, 2022

I can consistenly reproduce this issue with risedev d full. I observered abnormal streaming actor metrics:

  • Actor Execution Time for the last fragment (join -> agg -> mv) is consistently at ~1s. This means tha the actor is stuck or almost stuck given that this metric measures the rate of tokio total poll time for the actor (i.e. the max value of this metric should be 1s).

I originally suspected this is caused by high storage read latency but the cache hit rate is high and the read tail latency shown in the metrics is not significant. I will continue investigating into the issue.

image
image
image

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 29, 2022

Actor traces:

--- Actor Traces ---
>> Actor 1
[captured 638.914706ms ago]
Actor 1: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.370004022s]
  Epoch 3438920976564224 [!!! 11.070000095s]
    MaterializeExecutor 10000007F (actor 1, executor 127) [!!! 11.070000095s]
      ProjectExecutor 10000007E (actor 1, executor 126) [!!! 11.070000095s]
        HashAggExecutor 10000007C (actor 1, executor 124) [!!! 11.070000095s]
          ProjectExecutor 10000007A (actor 1, executor 122) [!!! 8.480000077s]
            FilterExecutor 100000078 (actor 1, executor 120) [!!! 8.480000077s]
  Subtask [!!! 270.370004022s]
    HashJoinExecutor 100000076 (actor 1, executor 118) [!!! 3.760000039s]
      store_get [0ns]

>> Actor 2
[captured 292.524903ms ago]
Actor 2: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.710004024s]
  Epoch 3438921521037312 [!!! 1.190000012s]
    MaterializeExecutor 20000007F (actor 2, executor 127) [!!! 1.190000012s]
      ProjectExecutor 20000007E (actor 2, executor 126) [!!! 1.190000012s]
        HashAggExecutor 20000007C (actor 2, executor 124) [!!! 1.190000012s]
          ProjectExecutor 20000007A (actor 2, executor 122) [0ns]
            FilterExecutor 200000078 (actor 2, executor 120) [0ns]
  Subtask [!!! 270.710004024s]

>> Actor 3
[captured 535.509605ms ago]
Actor 3: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.470004023s]
  Epoch 3438920976564224 [!!! 11.750000099s]
    MaterializeExecutor 30000007F (actor 3, executor 127) [260.000003ms]
      ProjectExecutor 30000007E (actor 3, executor 126) [260.000003ms]
        HashAggExecutor 30000007C (actor 3, executor 124) [260.000003ms]
          store_get [0ns]
  Subtask [!!! 270.470004023s]

>> Actor 4
[captured 621.001006ms ago]
Actor 4: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.390004022s]
  Epoch 3438920976564224 [!!! 11.270000096s]
    MaterializeExecutor 40000007F (actor 4, executor 127) [490.000005ms]
      ProjectExecutor 40000007E (actor 4, executor 126) [490.000005ms]
        HashAggExecutor 40000007C (actor 4, executor 124) [490.000005ms]
          store_get [0ns]
  Subtask [!!! 270.390004022s]

>> Actor 5
[captured 125.374701ms ago]
Actor 5: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.880004026s]
  Epoch 3438937023578112 [!!! 11.570000099s]
    ProjectExecutor 500000070 (actor 5, executor 112) [!!! 9.000000082s]
      SourceExecutor 50000005C (actor 5, executor 92) [!!! 9.000000082s]
        source_recv_barrier [!!! 11.570000099s]

>> Actor 6
[captured 966.80971ms ago]
Actor 6: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.040004017s]
  Epoch 3438937023578112 [!!! 10.730000091s]
    ProjectExecutor 600000070 (actor 6, executor 112) [!!! 8.680000077s]
      SourceExecutor 60000005C (actor 6, executor 92) [!!! 8.680000077s]
        source_recv_barrier [!!! 10.730000091s]

>> Actor 7
[captured 321.697103ms ago]
Actor 7: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.690004024s]
  Epoch 3438937023578112 [!!! 11.380000097s]
    ProjectExecutor 700000070 (actor 7, executor 112) [!!! 8.66000008s]
      SourceExecutor 70000005C (actor 7, executor 92) [!!! 8.66000008s]
        source_recv_barrier [!!! 11.380000097s]

>> Actor 8
[captured 772.806408ms ago]
Actor 8: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.23000402s]
  Epoch 3438937023578112 [!!! 10.920000093s]
    ProjectExecutor 800000070 (actor 8, executor 112) [!!! 8.530000077s]
      SourceExecutor 80000005C (actor 8, executor 92) [!!! 8.530000077s]
        source_recv_barrier [!!! 10.920000093s]

>> Actor 9
[captured 719.894607ms ago]
Actor 9: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.280004021s]
  Epoch 3438937023578112 [590.000006ms]
    dispatch_chunk [550.000006ms]
      LocalOutput (actor 2) [550.000006ms]
[Detached 5]
  source_recv_barrier [590.000006ms]

>> Actor 10
[captured 111.684301ms ago]
Actor 10: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.890004026s]
  Epoch 3438937023578112 [!!! 1.250000013s]
    dispatch_chunk [!!! 1.180000012s]
      LocalOutput (actor 4) [!!! 1.180000012s]
[Detached 5]
  source_recv_barrier [!!! 1.250000013s]

>> Actor 11
[captured 671.793507ms ago]
Actor 11: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.330004021s]
  Epoch 3438937023578112 [!!! 10.580000091s]
    dispatch_chunk [!!! 9.640000084s]
      LocalOutput (actor 4) [750.000007ms]
[Detached 5]
  source_recv_barrier [!!! 10.580000091s]

>> Actor 12
[captured 121.743801ms ago]
Actor 12: `SELECT MAX(B.price) AS final, A.id, A.seller FROM auction AS A, bid AS B WHERE (A.id = B.auction) AND B.date_time BETWEEN A.date_time AND A.expires GROUP BY A.id, A.seller` [270.880004026s]
  Epoch 3438937023578112 [!!! 11.520000098s]
    ProjectExecutor C00000073 (actor 12, executor 115) [440.000005ms]
      SourceExecutor C00000058 (actor 12, executor 88) [440.000005ms]
        source_recv_barrier [!!! 11.520000098s]

--- RPC Traces ---
>> RPC 127.0.0.1:5688 (101)
[captured 39.3003ms ago]
/stream_service.StreamService/BarrierComplete:101 [214.000003729s]
  collect_barrier (epoch 3438923400871936) [!!! 214.000003729s]

>> RPC 127.0.0.1:5688 (103)
[captured 1.6252ms ago]
/stream_service.StreamService/BarrierComplete:103 [207.010003342s]
  collect_barrier (epoch 3438923762761728) [!!! 207.010003342s]

>> RPC 127.0.0.1:5688 (105)
[captured 538.283605ms ago]
/stream_service.StreamService/BarrierComplete:105 [200.000002976s]
  collect_barrier (epoch 3438924224135168) [!!! 200.000002976s]

>> RPC 127.0.0.1:5688 (107)
[captured 15.6766ms ago]
/stream_service.StreamService/BarrierComplete:107 [189.000001932s]
  collect_barrier (epoch 3438924647497728) [!!! 189.000001932s]

>> RPC 127.0.0.1:5688 (109)
[captured 597.400906ms ago]
/stream_service.StreamService/BarrierComplete:109 [186.000001697s]
  collect_barrier (epoch 3438925402537984) [!!! 186.000001697s]

>> RPC 127.0.0.1:5688 (111)
[captured 661.197406ms ago]
/stream_service.StreamService/BarrierComplete:111 [179.000001083s]
  collect_barrier (epoch 3438925561331712) [!!! 179.000001083s]

>> RPC 127.0.0.1:5688 (113)
[captured 643.721006ms ago]
/stream_service.StreamService/BarrierComplete:113 [171.000000277s]
  collect_barrier (epoch 3438926015758336) [!!! 171.000000277s]

>> RPC 127.0.0.1:5688 (115)
[captured 769.848208ms ago]
/stream_service.StreamService/BarrierComplete:115 [163.999999582s]
  collect_barrier (epoch 3438926541094912) [!!! 163.999999582s]

>> RPC 127.0.0.1:5688 (117)
[captured 704.509207ms ago]
/stream_service.StreamService/BarrierComplete:117 [155.999999444s]
  collect_barrier (epoch 3438926991392768) [!!! 155.999999444s]

>> RPC 127.0.0.1:5688 (119)
[captured 530.020705ms ago]
/stream_service.StreamService/BarrierComplete:119 [149.999999346s]
  collect_barrier (epoch 3438927520202752) [!!! 149.999999346s]

>> RPC 127.0.0.1:5688 (121)
[captured 456.127504ms ago]
/stream_service.StreamService/BarrierComplete:121 [143.999999274s]
  collect_barrier (epoch 3438927924953088) [!!! 143.999999274s]

>> RPC 127.0.0.1:5688 (123)
[captured 950.910509ms ago]
/stream_service.StreamService/BarrierComplete:123 [135.999999189s]
  collect_barrier (epoch 3438928323084288) [!!! 135.999999189s]

>> RPC 127.0.0.1:5688 (125)
[captured 985.98871ms ago]
/stream_service.StreamService/BarrierComplete:125 [129.999999125s]
  collect_barrier (epoch 3438928814866432) [!!! 129.999999125s]

>> RPC 127.0.0.1:5688 (127)
[captured 969.40781ms ago]
/stream_service.StreamService/BarrierComplete:127 [123.999999062s]
  collect_barrier (epoch 3438929205723136) [!!! 123.999999062s]

>> RPC 127.0.0.1:5688 (129)
[captured 427.070004ms ago]
/stream_service.StreamService/BarrierComplete:129 [112.999998946s]
  collect_barrier (epoch 3438929600053248) [!!! 112.999998946s]

>> RPC 127.0.0.1:5688 (131)
[captured 247.966202ms ago]
/stream_service.StreamService/BarrierComplete:131 [111.999998935s]
  collect_barrier (epoch 3438930356535296) [!!! 111.999998935s]

>> RPC 127.0.0.1:5688 (133)
[captured 793.225108ms ago]
/stream_service.StreamService/BarrierComplete:133 [103.999998838s]
  collect_barrier (epoch 3438930433802240) [!!! 103.999998838s]

>> RPC 127.0.0.1:5688 (136)
[captured 896.900209ms ago]
/stream_service.StreamService/BarrierComplete:136 [96.999998749s]
  collect_barrier (epoch 3438930922373120) [!!! 96.999998749s]

>> RPC 127.0.0.1:5688 (138)
[captured 139.489801ms ago]
/stream_service.StreamService/BarrierComplete:138 [88.999998677s]
  collect_barrier (epoch 3438931374309376) [!!! 88.999998677s]

>> RPC 127.0.0.1:5688 (140)
[captured 631.400406ms ago]
/stream_service.StreamService/BarrierComplete:140 [83.009998637s]
  collect_barrier (epoch 3438931948208128) [!!! 83.009998637s]

>> RPC 127.0.0.1:5688 (142)
[captured 951.031909ms ago]
/stream_service.StreamService/BarrierComplete:142 [76.009998606s]
  collect_barrier (epoch 3438932309311488) [!!! 75.999998606s]

>> RPC 127.0.0.1:5688 (144)
[captured 724.331607ms ago]
/stream_service.StreamService/BarrierComplete:144 [69.999998618s]
  collect_barrier (epoch 3438932747026432) [!!! 69.999998618s]

>> RPC 127.0.0.1:5688 (146)
[captured 617.180106ms ago]
/stream_service.StreamService/BarrierComplete:146 [62.999998581s]
  collect_barrier (epoch 3438933155184640) [!!! 62.999998581s]

>> RPC 127.0.0.1:5688 (148)
[captured 437.205304ms ago]
/stream_service.StreamService/BarrierComplete:148 [54.99999914s]
  collect_barrier (epoch 3438933620752384) [!!! 54.99999914s]

>> RPC 127.0.0.1:5688 (150)
[captured 801.369808ms ago]
/stream_service.StreamService/BarrierComplete:150 [47.999999877s]
  collect_barrier (epoch 3438934156967936) [!!! 47.999999877s]

>> RPC 127.0.0.1:5688 (152)
[captured 195.291802ms ago]
/stream_service.StreamService/BarrierComplete:152 [38.000000219s]
  collect_barrier (epoch 3438934591930368) [!!! 38.000000219s]

>> RPC 127.0.0.1:5688 (154)
[captured 589.118506ms ago]
/stream_service.StreamService/BarrierComplete:154 [37.000000211s]
  collect_barrier (epoch 3438935286874112) [!!! 37.000000211s]

>> RPC 127.0.0.1:5688 (156)
[captured 409.803304ms ago]
/stream_service.StreamService/BarrierComplete:156 [35.000000203s]
  collect_barrier (epoch 3438935326720000) [!!! 35.000000203s]

>> RPC 127.0.0.1:5688 (158)
[captured 689.820907ms ago]
/stream_service.StreamService/BarrierComplete:158 [32.000000187s]
  collect_barrier (epoch 3438935469522944) [!!! 32.000000187s]

>> RPC 127.0.0.1:5688 (160)
[captured 507.576805ms ago]
/stream_service.StreamService/BarrierComplete:160 [30.000000178s]
  collect_barrier (epoch 3438935647715328) [!!! 30.000000178s]

>> RPC 127.0.0.1:5688 (162)
[captured 315.319903ms ago]
/stream_service.StreamService/BarrierComplete:162 [28.000000154s]
  collect_barrier (epoch 3438935790649344) [!!! 28.000000154s]

>> RPC 127.0.0.1:5688 (164)
[captured 681.562507ms ago]
/stream_service.StreamService/BarrierComplete:164 [26.000000132s]
  collect_barrier (epoch 3438935934435328) [!!! 26.000000132s]

>> RPC 127.0.0.1:5688 (166)
[captured 759.699107ms ago]
/stream_service.StreamService/BarrierComplete:166 [23.000000096s]
  collect_barrier (epoch 3438936041586688) [!!! 23.000000096s]

>> RPC 127.0.0.1:5688 (168)
[captured 444.740304ms ago]
/stream_service.StreamService/BarrierComplete:168 [21.000000089s]
  collect_barrier (epoch 3438936232951808) [!!! 21.000000089s]

>> RPC 127.0.0.1:5688 (170)
[captured 696.196507ms ago]
/stream_service.StreamService/BarrierComplete:170 [11.000000094s]
  collect_barrier (epoch 3438936384798720) [!!! 11.000000094s]

>> RPC 127.0.0.1:5688 (171)
[captured 444.7µs ago]
<initial>

>> RPC 127.0.0.1:5688 (91)
[captured 230.813202ms ago]
/stream_service.StreamService/BarrierComplete:91 [248.020004116s]
  collect_barrier (epoch 3438920976564224) [!!! 248.020004116s]

>> RPC 127.0.0.1:5688 (93)
[captured 43.6223ms ago]
/stream_service.StreamService/BarrierComplete:93 [239.990004091s]
  collect_barrier (epoch 3438921521037312) [!!! 239.990004091s]

>> RPC 127.0.0.1:5688 (95)
[captured 184.907402ms ago]
/stream_service.StreamService/BarrierComplete:95 [233.000004065s]
  collect_barrier (epoch 3438922057842688) [!!! 233.000004065s]

>> RPC 127.0.0.1:5688 (97)
[captured 563.726205ms ago]
/stream_service.StreamService/BarrierComplete:97 [226.000004041s]
  collect_barrier (epoch 3438922507812864) [!!! 226.000004041s]

>> RPC 127.0.0.1:5688 (99)
[captured 562.578005ms ago]
/stream_service.StreamService/BarrierComplete:99 [219.000003921s]
  collect_barrier (epoch 3438922942054400) [!!! 219.000003921s]

[cargo-make] INFO - Build Done in 1.62 seconds.

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 29, 2022

While the barrier in-flight latency is high, the source throughput is not extremely low in this issue. I suspect this is somehow related to the bufferred messages when backpressure happens. With some experiements, I can conclude that this is indeed the cause and the behaviour changes come from the permit-based backpressure introduced by #6170.

I deployed one CN using risedev dev full so we use local channel for exchange. Here are the results:

  1. One commit before feat(streaming): permit-based back-pressure in exchange #6170 (915ac8)
    • avg barrier in-flight latency = ~20s
  2. feat(streaming): permit-based back-pressure in exchange #6170 (2ac0c8d) with MAX_CHUNK_PERMITS = 28682 (default):
    • avg barrier in-flight latency = ~2.7min
    • overall source throughput is similar to 1) but it is more balanced across the two sources.
  3. feat(streaming): permit-based back-pressure in exchange #6170 (2ac0c8d) with MAX_CHUNK_PERMITS = 2048:
    • avg barrier in-flight latency = ~30s
    • soure throughput similar to 2)
      image
      image
      image

In summary, the default setting in #6170 makes the number of bufferred messages larger than before by default. This leads to more messages being processed between barrier when backpressure happens so the barrier in-flight latency increases. This won't affect source throughput because the buffer size won't affect the actor throughput.

IMO, we can decrease the default buffer size (i.e. lower MAX_CHUNK_PERMITS) as a quick fix and in the future we can use some heuristics to tune it dynamically based on barrier in-flight latency and actor throughput.

@fuyufjh
Copy link
Member Author

fuyufjh commented Nov 30, 2022

Great work!!! In short, the messages were piled up in the exchange channels.

I agree that we may need to decrease the MAX_CHUNK_PERMITS. 2048 is too small, perhaps we can start from half of the current MAX_CHUNK_PERMITS?

Some other ideas:

  1. If we implement estimate_size() for DataChunk, it looks like a better kind of permit than the number of rows.
  2. Is it possible to report the current permit number to Prometheus and show in Grafana? The current used permit divided by the total permit measures the degree of backpressure, which sounds like a pretty useful metric. The reporting can be done on every barrier.
  3. A random idea: If the number of in-flight barriers reaches the upper bound i.e. 40, we may not only stop injecting barriers but also block the source executor to generate any new events. What do you guys think?

@BugenZhao
Copy link
Member

BugenZhao commented Nov 30, 2022

Some ideas:

  • Why is the throughput of the actor 1-4 only ~2000 rows per second? This seems too low, and what's the bottleneck? I'll investigate it later.
  • The current permits of 32k is not that large. If we have an RTT of 20ms, the performance will be effected by false back-pressures if the throughput is higher than 1600k rows per second, which is comparable to the actual performance. We may try turning it half, but I'm afraid this can not resolve the problem entirely.
  • Make the permits size-based may also not work as expected :(. A chunk with a wide scheme will be large in size, but it doesn't mean that the throughput will be lower. It really depends on the workload.
  • A large buffer in exchange should only make the first barriers become large. After the buffer is filled, the next epochs should be small. However, we have a hard limit 40 of concurrent barriers. If we reach this limit, then there'll be another large epoch. This situation repeats over and over again. Will this lead to problem?
  • We do have the mechanism of pausing the source if a source executor has not received a new barrier for some time, introduced by @yezizp2012. But this is not that aggressive and the target is to resolve the meta failure.
  • The dimension tables in Nexmark generator should have fewer records. But as we poll the sources in a fair manner, the actual result is that, there are 15-50x dimension records generated, compared to the bid in the same time. This makes the workload not realistic. Will this lead to problems?

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 30, 2022

Why is the throughput of the actor 1-4 only ~2000 rows per second? This seems too low, and what's the bottleneck? I'll investigate it later.

The CPU usage is constantly at 800%. In my setting (3 fragments on 1CN with worker_node_parallelism=4), I think the bottleneck is the CPU.

The current permits of 32k is not that large. If we have an RTT of 20ms, the performance will be effected by false back-pressures if the throughput is higher than 1600k rows per second, which is comparable to the actual performance. We may try turning it half, but I'm afraid this can not resolve the problem entirely.

True. But in the experiement I did, only local channel is used so the RTT is significantly smaller. In such case, buffer 32K can be a problem. Maybe we can use buffer_size = RTT x dowstream_throughput to determine the buffer size, similar to the basic TCP.

A large buffer in exchange should only make the first barriers become large. After the buffer is filled, the next epochs should be small. However, we have a hard limit 40 of concurrent barriers. If we reach this limit, then there'll be another large epoch. This situation repeats over and over again. Will this lead to problem?

Agree. After we hit the in-flight barrier limit, the barrier injection interval will be largened to e2e barrier latency (in this issue it is >2min) instead of the configured barrier send interval (250ms). I think @fuyufjh's proposal on throttling the source when we hit the barrier limit makes sense. To extend from that, we can bound the total size of messages ingested between two barriers to throttle source.

@fuyufjh
Copy link
Member Author

fuyufjh commented Nov 30, 2022

The dimension tables in Nexmark generator should have fewer records. But as we poll the sources in a fair manner, the actual result is that, there are 15-50x dimension records generated, compared to the bid in the same time. This makes the workload not realistic. Will this lead to problems?

It's indeed a potential problem that could lead to benchmark results that are far from the real cases. 😇

@lmatz lmatz added type/bug Something isn't working priority/critical labels Nov 30, 2022
@BugenZhao
Copy link
Member

BugenZhao commented Nov 30, 2022

@wangrunji0408 points out that the data generator will sleep and delay to meet the ratio of 46:3:1, only if we have set the min.event.gap.in.ns to non-zero. As sleeping for a little time is not accurate in tokio, I set the value to 1000 (then it will sleep at least 1000ns*1024=1ms for each chunk) and tested it... and it seemed to work well?

The throughput of the actor 1-4 in my case is ~70000 rows per sec, which is much larger than the result from @hzxa21. 🤔 This is interesting. Do we test under debug profile before?

image

image

image

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 30, 2022

I use nexmark.min.event.gap.in.ns = '100' in my test. Let me also try 1000. But I think it still makes sense for us to be robust enough to avoid prolong barrier in-flight latency and do not assume the source distribtion is optimal.

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 30, 2022

Setting nexmark.min.event.gap.in.ns = '1000' doesn't help in my envrionment (bb051f7a621237f929aca8637fe1fe5af711180e debug build + risedev d full).
image

Queries:

dev=> create source auction (id INTEGER, "item_name" VARCHAR, description VARCHAR, "initial_bid" INTEGER, reserve INTEGER, "date_time" TIMESTAMP, expires TIMESTAMP, seller INTEGER, category INTEGER)
with (
  connector = 'nexmark',
  nexmark.table.type = 'Auction',
  nexmark.split.num = '4',
  nexmark.min.event.gap.in.ns = '1000'
) row format json;
create source bid (auction INTEGER, bidder INTEGER, price INTEGER, "date_time" TIMESTAMP)
with (
  connector = 'nexmark',
  nexmark.table.type = 'Bid',
  nexmark.split.num = '4',
  nexmark.min.event.gap.in.ns = '1000'
) row format json;
CREATE_SOURCE
CREATE_SOURCE
dev=> create materialized view mv_q6s as
  SELECT
    MAX(B.price) AS final,
    A.id,
    A.seller
  FROM
    auction AS A,
    bid AS B
  WHERE
    A.id = B.auction
    and B.date_time between A.date_time and A.expires
  GROUP BY
    A.id,
    A.seller;
CREATE_MATERIALIZED_VIEW

@BugenZhao
Copy link
Member

I guess the main cause is the "debug build". 🤔 Maybe we should set a smaller initial permits in debug build.

@hzxa21
Copy link
Collaborator

hzxa21 commented Nov 30, 2022

I guess the main cause is the "debug build". 🤔 Maybe we should set a smaller initial permits in debug build.

Indeed. I observed way higher actor throughput (70k rows/s) with release build and barrier in-flight latency becomes normal.

@fuyufjh
Copy link
Member Author

fuyufjh commented Dec 1, 2022

I guess the main cause is the "debug build". 🤔 Maybe we should set a smaller initial permits in debug build.

The original test done by me was run against release build.

@KeXiangWang
Copy link
Contributor

I'm designing a fine-grained backpressure mechanism to control the join actors' event consumption from two sides. Will first write a design document for this.

@mergify mergify bot closed this as completed in #6943 Dec 17, 2022
mergify bot pushed a commit that referenced this issue Dec 17, 2022
See #6571 for background.

After some discussion with @BugenZhao @yezizp2012 @tabVersion, we realized that it's futile to attempt to "backpressure" the streaming jobs by limiting `in_flight_barrier_nums`. On the contrary, it makes things more tricky, such as the interval between barriers becoming much larger, which in turn results in uncontrolled memory consumption.

This PR removes the limit of `in_flight_barrier_nums` in a quick & dirty way, and reduces the permits of exchange channels to mitigate the high-latency issue.

Of course, we must remove the related code. Just do this fix for the imminent version release.


Approved-By: BugenZhao
Approved-By: yezizp2012

Co-Authored-By: Eric Fu <eric@singularity-data.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/critical type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants