Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception CloudEventRWException between KafkaSource and Broker #780

Closed
piomin opened this issue Apr 2, 2021 · 2 comments · Fixed by #852
Closed

Exception CloudEventRWException between KafkaSource and Broker #780

piomin opened this issue Apr 2, 2021 · 2 comments · Fixed by #852
Assignees
Labels
kind/bug Categorizes issue or PR as related to a bug.

Comments

@piomin
Copy link

piomin commented Apr 2, 2021

Describe the bug
I created the following Knative objects.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  annotations:
    eventing.knative.dev/broker.class: Kafka
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

But after sending messages to Kafka topic I've got an exception between KafkaSource -> Broker:

{"@timestamp":"2021-04-01T14:59:18.233Z","@version":"1","message":"Failed to send record path=/serverless/default","logger_name":"dev.knative.eventing.kafka.broker.receiver.RequestMapper","thread_name":"vert.x-eventloop-thread-0","level":"WARN","level_value":30000,"stack_trace":"io.cloudevents.rw.CloudEventRWException: Invalid extensions name: kafkaheadercontenttype\n\tat io.cloudevents.rw.CloudEventRWException.newInvalidExtensionName(CloudEventRWException.java:122)\n\tat io.cloudevents.core.impl.BaseCloudEventBuilder.withExtension(BaseCloudEventBuilder.java:103)\n\tat io.cloudevents.core.v1.CloudEventBuilder.withContextAttribute(CloudEventBuilder.java:176)\n\tat io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl.lambda$read$0(BaseGenericBinaryMessageReaderImpl.java:66)\n\tat io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl.lambda$forEachHeader$0(BinaryVertxMessageReaderImpl.java:58)\n\tat io.vertx.core.http.impl.headers.HeadersMultiMap.forEach(HeadersMultiMap.java:285)\n\tat io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl.forEachHeader(BinaryVertxMessageReaderImpl.java:58)\n\tat io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl.read(BaseGenericBinaryMessageReaderImpl.java:58)\n\tat io.cloudevents.core.CloudEventUtils.toEvent(CloudEventUtils.java:91)\n\tat io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:116)\n\tat io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:102)\n\tat io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:35)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)\n\tat io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)\n\tat io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)\n\tat io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:83)\n\tat io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:199)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:79)\n\tat io.vertx.core.http.impl.Http1xServerRequest.onEnd(Http1xServerRequest.java:549)\n\tat io.vertx.core.http.impl.Http1xServerRequest.handleEnd(Http1xServerRequest.java:535)\n\tat io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:73)\n\tat io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:189)\n\tat io.vertx.core.http.impl.Http1xServerConnection.onEnd(Http1xServerConnection.java:199)\n\tat io.vertx.core.http.impl.Http1xServerConnection.onContent(Http1xServerConnection.java:186)\n\tat io.vertx.core.http.impl.Http1xServerConnection.handleOther(Http1xServerConnection.java:156)\n\tat io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:144)\n\tat io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:153)\n\tat io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:148)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)\n\tat io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:101)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Unknown Source)\n","path":"/serverless/default"}

Expected behavior
Events should be delivered to the broker, and then routed by the trigger to the downstream service.

To Reproduce
Create Broker, KafkaSource, and Trigger objects. Then send some messages to the order-events topic.

Knative release version
v0.21

@piomin piomin added the kind/bug Categorizes issue or PR as related to a bug. label Apr 2, 2021
@pierDipi
Copy link
Member

pierDipi commented Apr 2, 2021

A PR: cloudevents/sdk-java#366

/cc @slinkydeveloper

@pierDipi
Copy link
Member

pierDipi commented Apr 2, 2021

Thanks for reporting!

@slinkydeveloper slinkydeveloper self-assigned this Apr 14, 2021
pierDipi pushed a commit to pierDipi/eventing-kafka-broker that referenced this issue Aug 21, 2023
pierDipi pushed a commit to pierDipi/eventing-kafka-broker that referenced this issue Sep 20, 2023
Cali0707 added a commit to Cali0707/eventing-kafka-broker that referenced this issue Sep 26, 2023
* Make SecretSpec field of consumers Auth omitempty (knative-extensions#780)

* Expose init offset and schedule metrics for ConsumerGroup reconciler (knative-extensions#790) (knative-extensions#791)

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Fix channel finalizer logic (knative-extensions#3295) (knative-extensions#795)

Signed-off-by: Calum Murray <cmurray@redhat.com>
Co-authored-by: Calum Murray <cmurray@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* [release-v1.10] SRVKE-958: Cache init offsets results (knative-extensions#817)

* Cache init offsets results

When there is high load and multiple consumer group schedule calls,
we get many `dial tcp 10.130.4.8:9092: i/o timeout` errors when
trying to connect to Kafka.
This leads to increased "time to readiness" for consumer groups.

The downside of caching is that, in the case, partitions
increase while the result is cached we won't initialize
the offsets of the new partitions.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Add autoscaler leader log patch

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Scheduler handle overcommitted pods (knative-extensions#820)

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Set consumer and consumergroups finalizers when creating them (knative-extensions#823)

It is possible that a delete consumer or consumergroup might
be reconciled and never finalized when it is deleted before
the finalizer is set.
This happens because the Knative generated reconciler uses
patch (as opposed to using update) for setting the finalizer
and patch doesn't have any optimistic concurrency controls.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Clean up reserved from resources that have been scheduled (knative-extensions#830)

In a recent testing run, we've noticed we have have a scheduled
`ConsumerGroup` [1] (see placements) being considered having
reserved replicas in a different pod [2].

That makes the scheduler think that there is no space but the
autoscaler says we have enough space to hold every virtual
replica.

[1]
```
$ k describe consumergroups -n ks-multi-ksvc-0 c9ee3490-5b4b-4d11-87af-8cb2219d9fe3
Name:         c9ee3490-5b4b-4d11-87af-8cb2219d9fe3
Namespace:    ks-multi-ksvc-0
...
Status:
  Conditions:
    Last Transition Time:  2023-09-06T19:58:27Z
    Reason:                Autoscaler is disabled
    Status:                True
    Type:                  Autoscaler
    Last Transition Time:  2023-09-06T21:41:13Z
    Status:                True
    Type:                  Consumers
    Last Transition Time:  2023-09-06T19:58:27Z
    Status:                True
    Type:                  ConsumersScheduled
    Last Transition Time:  2023-09-06T21:41:13Z
    Status:                True
    Type:                  Ready
  Observed Generation:     1
  Placements:
    Pod Name:      kafka-source-dispatcher-6
    Vreplicas:     4
    Pod Name:      kafka-source-dispatcher-7
    Vreplicas:     4
  Replicas:        8
  Subscriber Uri:  http://receiver5-2.ks-multi-ksvc-0.svc.cluster.local
Events:            <none>
```

[2]
```
    "ks-multi-ksvc-0/c9ee3490-5b4b-4d11-87af-8cb2219d9fe3": {
      "kafka-source-dispatcher-3": 8
    },
```

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Ignore unknown fields in data plane contract (knative-extensions#3335) (knative-extensions#828)

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Signed-off-by: Calum Murray <cmurray@redhat.com>
Co-authored-by: Martin Gencur <mgencur@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Calum Murray <cmurray@redhat.com>
Co-authored-by: OpenShift Cherrypick Robot <openshift-cherrypick-robot@redhat.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Categorizes issue or PR as related to a bug.
Projects
None yet
3 participants