Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to bundle all records from one micro-batch into PutRecords #86

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

leslieyanyan
Copy link

@leslieyanyan leslieyanyan commented Aug 7, 2020

The current Kinesis sink only sends one record into Kinesis stream each time, the writing speed is very slow.
With the changes in this PR, we could bundle all records from one micro-batch into PutRecords. We've tested the changes in our production environment, the writing speed and efficiency improved a lot when enablingkinesis.executor.sink.bundle.records

@itsvikramagr
Copy link
Contributor

@leslieyanyan - thanks for the PR.

Kinesis Sink was indeed very slow. @abhishekd0907 has taken a shot in reducing the latency in this PR #81. We are using KPL underneath which takes care of aggregation and sending multiple records in the same API call. Can you try the last master code and see if the new change in making a difference.

@leslieyanyan
Copy link
Author

@itsvikramagr Thank you for the suggestion.

  1. We tried to run jobs with latest master, however the speed was still very slow.
  2. The current implementation initializes FutureCallback for every record, https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala#L66 If my understanding is correct, it is only sending one record each time. The added method bundleExecute in this pr is more like the official example: https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-writing.html, which is sending multiple records in the same API call.


private def bundleExecute(iterator: Iterator[InternalRow]): Unit = {

val groupedIterator: iterator.GroupedIterator[InternalRow] = iterator.grouped(490)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 490 here? Should it be configurable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

README.md Outdated
@@ -149,7 +149,8 @@ Refering $SPARK_HOME to the Spark installation directory.
| kinesis.executor.recordMaxBufferedTime | 1000 (millis) | Specify the maximum buffered time of a record |
| kinesis.executor.maxConnections | 1 | Specify the maximum connections to Kinesis |
| kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis |
| kniesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End |
| kinesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End |
| kinesis.executor.sink.bundle.records | false | Bundle all records from one micro-batch into PutRecords |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have added "kinesis.executor.recordTtl" - can we add details about this config here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

Futures.addCallback(future, kinesisCallBack)

producer.flushSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leslieyanyan @itsvikramagr
The slowness is on account of this function call producer.flushSync(). Please refer my comment here: #81 (review)

The new code in this PR is showing improved performance because method sendBundledData() doesn't have this function call producer.flushSync()

We'll need to separately evaluate how much performance impact we're getting by using GroupedIterator instead of normal iterator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants