Skip to content

Commit

Permalink
docs: fix multiple examples for commit after write to elasticsearch (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
efgpinto committed Jun 16, 2023
1 parent 6494e86 commit 35d9abf
Showing 1 changed file with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

"kafka-example - store documents and pass Responses with passThrough in bulk" in {

//#kafka-example
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// After we've written them to Elastic in bulk, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
Expand Down Expand Up @@ -284,7 +283,6 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
.runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
//#kafka-example
flushAndRefresh(connectionSettings, indexName)

// Make sure all messages was committed to kafka
Expand All @@ -295,9 +293,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

"kafka-example - store documents and pass Responses with passThrough skipping some w/ NOP" in {

//#kafka-example
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// Of those, only some will be written to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
Expand Down Expand Up @@ -343,7 +340,6 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
.runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
//#kafka-example
flushAndRefresh(connectionSettings, indexName)

// Make sure all messages was committed to kafka
Expand All @@ -355,9 +351,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

"kafka-example - skip all NOP documents and pass Responses with passThrough" in {

//#kafka-example
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// After we skip all NOP docs, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
Expand Down Expand Up @@ -402,7 +397,6 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
.runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
//#kafka-example
flushAndRefresh(connectionSettings, indexName)

// Make sure all messages was committed to kafka
Expand Down

0 comments on commit 35d9abf

Please sign in to comment.