Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Kinesis Sink #81

Merged
merged 1 commit into from
Jul 30, 2020
Merged

Conversation

abhishekd0907
Copy link
Contributor

What was the issue?

Kinesis sink was experiencing slowness in writing records as raised in Issue #47

Issue Analysis

Method flushSync was being called at every step of the iterator. flushSync adds a sleep time of at least 500 milliseconds because of its implementation.

public void flushSync() {
        while (getOutstandingRecordsCount() > 0) {
            flush();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) { }
        }
    }

Proposed Solution

We can flush records before task completion and not at every step of the iterator. These code changes also reduce sleep time and make changes related to error handling.

@itsvikramagr
Copy link
Contributor

@ggeorgiadis - can you please help us review this PR?

@@ -238,6 +238,7 @@ private[kinesis] object KinesisSourceProvider extends Logging {
private[kinesis] val SINK_RECORD_MAX_BUFFERED_TIME = "kinesis.executor.recordmaxbufferedtime"
private[kinesis] val SINK_MAX_CONNECTIONS = "kinesis.executor.maxconnections"
private[kinesis] val SINK_AGGREGATION_ENABLED = "kinesis.executor.aggregationenabled"
private[kinesis] val SINK_FLUSH_WAIT_TIME_MILLIS = "kniesis.executor.flushwaittimemillis"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo here

Copy link
Contributor Author

@abhishekd0907 abhishekd0907 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed the main change in this PR,

We need to remove the call to flushSync() in the below code which is causing the slowness.

def sendData(partitionKey: String, data: Array[Byte]): String = {
    var sentSeqNumbers = new String
    val future = producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap(data))
    val kinesisCallBack = new FutureCallback[UserRecordResult]() {

      override def onFailure(t: Throwable): Unit = {
        logError(s"Writing to  $streamName failed due to ${t.getCause}")
        if (failedWrite == null && t!= null) {
          failedWrite = t
          logError(s"Writing to  $streamName failed due to ${t.getCause}")
        }
      }

      override def onSuccess(result: UserRecordResult): Unit = {
        val shardId = result.getShardId
        sentSeqNumbers = result.getSequenceNumber
      }
    }
    Futures.addCallback(future, kinesisCallBack)
    producer.flushSync()
    sentSeqNumbers
  }

Will raise a new PR for this
cc @itsvikramagr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants