Skip to content

Conversation

@vagetablechicken
Copy link
Contributor

@vagetablechicken vagetablechicken commented Mar 19, 2020

Ref #2780 (comment)

ImplementaItion Notes

NodeChannel

  1. _cur_batch -> _pending_batches: when _cur_batch is filled up, move it to _pending_batches.
  2. add_row() just produce batches.
  3. try_send_and_fetch_status() tries to consume one pending batch. If has in flight packet, skip send in this round.

So we can add one sender thread to be in charge of all node channels try_send.

IndexChannel

  1. init(), open() stay the same.
  2. Use for_each_node_channel() to expose the detailed changes of NodeChannel.(It's more easy to read & modify)

Sender thread

See func OlapTableSink::_send_batch_process()

Why use polling?

If we use wait/notify, it will notify when generate a new batch. We can't skip sending this batch, coz it won't notify the same batch again. So wait/notify can't avoid blocking simply.
So I choose polling.
It's wasting to continuously try_send(), but it's difficult to set the suitable polling interval. Thus, I add std::this_thread::yield() to give up the time slice, give priority to other process/threads (if there are other process/threads waiting in the queue).

@imay
Copy link
Contributor

imay commented Mar 19, 2020

Hi @vagetablechicken

Do you have some benchmark for this PR?

@vagetablechicken
Copy link
Contributor Author

vagetablechicken commented Mar 19, 2020

Hi @vagetablechicken

Do you have some benchmark for this PR?

I only have two test samples now, testing environment building is in progress in our team.

I use broker load, and the scan node wasn't do mem limit(my test code doesn't contain a80e9bf). If scan node waits when mem limit exceeded, it must take longer.

5 be:

  • E5-2630v2*2 CPU
  • 8*16G Mem
test sample origin ver(avg) nonblocking ver(avg)
{"ScannedRows":92095001,"TaskNumber":1,"FileNumber":8,"FileSize":2798532731} 10min3s 6min43s
{"ScannedRows":951503983,"TaskNumber":1,"FileNumber":300,"FileSize":68560716133} 4h25min 2h6min

@morningman
Copy link
Contributor

Compilation failed:
LOG(DEBUG) << name() << " queue_push_ns: " << _queue_push_lock_ns;

There is no LOG(DEBUG) macro

@vagetablechicken
Copy link
Contributor Author

Compilation failed:
LOG(DEBUG) << name() << " queue_push_ns: " << _queue_push_lock_ns;

There is no LOG(DEBUG) macro

Oops, my bad. Fixed.

morningman
morningman previously approved these changes Mar 26, 2020
@morningman
Copy link
Contributor

Hi @vagetablechicken , please resolve the conflict.
And Hi @imay , do you have any other comment?

@vagetablechicken
Copy link
Contributor Author

vagetablechicken commented Mar 27, 2020

I wanna change yeild() to sleep(), to avoid CPU busy. I need to prove "even sleep() after each loop, non-blocking sink is faster than blocking one" through testing.
So in the first step, I want the sleep interval to be the config param of BE, and I'll give a default value.

One more thing, the add_row() may be blocked cause mem limit exceeded, I'll add the blocking time to time profile.

Your comments are welcome.

@vagetablechicken
Copy link
Contributor Author

vagetablechicken commented Apr 9, 2020

@imay @morningman
Sorry about my change is a bit late.
I've changed yeild() to sleep(). And the default interval time is 10ms.
Why 10ms, it's calculated through the avg time of add batch RPCs.

If the case is large, it get faster obviously, so I focus on little cases this time.
I tested two little cases, and got the logs about add batch time. e.g.
node add batch time(ms)/wait lock time(ms)/num: {10003:(76703)(0)(4336)} {10002:(87961)(0)(4921)} {10006:(80490)(0)(4570)} {10005:(71098)(0)(3984)} {10004:(87736)(0)(4686)}

So avg time = add batch time / num. And if you think the network time delay is not negligible, you can add it.
In my tests, the avg times are tens of milliseconds. So for the faster sink, I set it to 10ms.

And the test result(interval=10ms) is

test sample origin ver(avg) nonblocking ver(avg)
{"ScannedRows":28582315,"TaskNumber":1,"FileNumber":3,"FileSize":569042681} 4min37s 3min27s
{"ScannedRows":100967318,"TaskNumber":1,"FileNumber":9,"FileSize":3096583144} 12min27s 8min27s

const NodeInfo* node_info() const { return _node_info; }
std::string print_load_info() const { return _load_info; }
std::string name() const {
return "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer strings::Substitute in gutils/strings/Substitute.h.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

including gutils/strings/Substitute.h will occur redefinition error

In file included from /root/incubator-doris/be/src/gutil/strings/stringpiece.h:128:0,
                 from /root/incubator-doris/be/src/gutil/strings/substitute.h:9,
                 from /root/incubator-doris/be/src/exec/tablet_sink.cpp:28:
/root/incubator-doris/be/src/gutil/hash/hash.h:251:26: error: redefinition of 'struct __gnu_cxx::hash<Type*>'
 template<class T> struct hash<T*> {
                          ^~~~~~~~
In file included from /root/incubator-doris/thirdparty/installed/include/butil/containers/flat_map.h:101:0,
                 from /root/incubator-doris/be/src/service/brpc.h:48,
                 from /root/incubator-doris/be/src/util/ref_count_closure.h:24,
                 from /root/incubator-doris/be/src/exec/tablet_sink.h:36,
                 from /root/incubator-doris/be/src/exec/tablet_sink.cpp:18:
/root/incubator-doris/thirdparty/installed/include/butil/containers/hash_tables.h:262:8: note: previous definition of 'struct __gnu_cxx::hash<Type*>'
 struct hash<Type*> {
        ^~~~~~~~~~~
In file included from /root/incubator-doris/be/src/gutil/strings/stringpiece.h:128:0,
                 from /root/incubator-doris/be/src/gutil/strings/substitute.h:9,
                 from /root/incubator-doris/be/src/exec/tablet_sink.cpp:28:
/root/incubator-doris/be/src/gutil/hash/hash.h:309:8: error: redefinition of 'struct __gnu_cxx::hash<std::pair<_T1, _T2> >'
 struct hash<pair<First, Second> > {
        ^~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /root/incubator-doris/thirdparty/installed/include/butil/containers/flat_map.h:101:0,
                 from /root/incubator-doris/be/src/service/brpc.h:48,
                 from /root/incubator-doris/be/src/util/ref_count_closure.h:24,
                 from /root/incubator-doris/be/src/exec/tablet_sink.h:36,
                 from /root/incubator-doris/be/src/exec/tablet_sink.cpp:18:
/root/incubator-doris/thirdparty/installed/include/butil/containers/hash_tables.h:256:8: note: previous definition of 'struct __gnu_cxx::hash<std::pair<_T1, _T2> >'
 struct hash<std::pair<Type1, Type2> > {

I will initialize the name_string in init(), to avoid string building in name()

@vagetablechicken vagetablechicken force-pushed the non-blocking-sink branch 2 times, most recently from d130528 to c9c2741 Compare May 6, 2020 04:03
huangwei5 and others added 6 commits May 6, 2020 15:00
Co-Authored-By: Zhao Chun <buaa.zhaoc@gmail.com>
Co-Authored-By: Zhao Chun <buaa.zhaoc@gmail.com>
@imay imay self-assigned this May 6, 2020
Copy link
Contributor

@imay imay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.
If there is no other comments, I will merge it tomorrow.

@imay imay added approved Indicates a PR has been approved by one committer. area/load Issues or PRs related to all kinds of load kind/feature Categorizes issue or PR as related to a new feature. labels May 6, 2020
@imay imay merged commit 94539e7 into apache:master May 7, 2020
yangzhg added a commit to yangzhg/doris that referenced this pull request May 7, 2020
@EmmyMiao87 EmmyMiao87 mentioned this pull request Aug 17, 2020
acelyc111 pushed a commit to acelyc111/incubator-doris that referenced this pull request Jan 20, 2021
ImplementaItion Notes
NodeChannel
_cur_batch -> _pending_batches: when _cur_batch is filled up, move it to _pending_batches.
add_row() just produce batches.
try_send_and_fetch_status() tries to consume one pending batch. If has in flight packet, skip send in this round.
So we can add one sender thread to be in charge of all node channels try_send.

IndexChannel
init(), open() stay the same.
Use for_each_node_channel() to expose the detailed changes of NodeChannel.(It's more easy to read & modify)
Sender thread
See func OlapTableSink::_send_batch_process()

Why use polling?
If we use wait/notify, it will notify when generate a new batch. We can't skip sending this batch, coz it won't notify the same batch again. So wait/notify can't avoid blocking simply.
So I choose polling.
It's wasting to continuously try_send(), but it's difficult to set the suitable polling interval. Thus, I add std::this_thread::yield() to give up the time slice, give priority to other process/threads (if there are other process/threads waiting in the queue).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by one committer. area/load Issues or PRs related to all kinds of load kind/feature Categorizes issue or PR as related to a new feature.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants