Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kf:seek signals rdkafka-error: Local: Erroneous state #67

Closed
Uthar opened this issue Oct 26, 2022 · 4 comments
Closed

kf:seek signals rdkafka-error: Local: Erroneous state #67

Uthar opened this issue Oct 26, 2022 · 4 comments

Comments

@Uthar
Copy link

Uthar commented Oct 26, 2022

Maybe I don't know how to use it, but this fails for me:

(let ((config ("bootstrap.servers" "localhost:1234" "group.id" "test-lisp-consumer" "enable.auto.commit" "false"))
      (consumer (make-instance 'kf:consumer
                               :conf config
                               :serde #'babel:octets-to-string)))
  (kf:assign consumer (list (cons "my-topic" 0)))
  (kf:seek consumer "my-topic" 0 10 2000)
  consumer)

While doing this as separate steps in repl works, weirdly:

(setf consumer (make-instance 'kf:consumer
                               :conf config
                               :serde #'babel:octets-to-string))
(kf:assign consumer (list (cons "my-topic" 0)))
(kf:seek consumer "my-topic" 0 10 2000)
(kf:poll consumer 2000)
@Uthar
Copy link
Author

Uthar commented Oct 26, 2022

I can put in (sleep 5) around the assign and seek calls, and then it works, but it feels hacky. Is there a way to explicitly sync?

@Uthar
Copy link
Author

Uthar commented Oct 26, 2022

Also, why does (sleep 5) work, but (sleep 4), (sleep 3) does not?

@Uthar
Copy link
Author

Uthar commented Oct 26, 2022

Seems related to this: https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset

Currently this API doesn't allow to set the offset in seek, right?

For me, this works:

--- src/high-level/consumer.lisp
+++ src/high-level/consumer.lisp
@@ -400,7 +400,7 @@
   (with-slots (rd-kafka-consumer) consumer
     (with-toppar-list
         toppar-list
-        (alloc-toppar-list partitions :topic #'car :partition #'cdr)
+        (alloc-toppar-list partitions :topic #'first :partition #'second :offset #'third)
       (let ((err (cl-rdkafka/ll:rd-kafka-assign rd-kafka-consumer toppar-list)))
         (unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)
           (error (make-rdkafka-error err)))))))

Diff finished.  Wed Oct 26 13:31:12 2022

SahilKang added a commit that referenced this issue Dec 31, 2022
@SahilKang
Copy link
Owner

Thanks for pointing this out, I've released v1.2.0 which adds offset support to the kf:assign and kf:assignment methods

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants