Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger related to KafkaBroker creation failed #720

Closed
piomin opened this issue Mar 11, 2021 · 12 comments · Fixed by #747
Closed

Trigger related to KafkaBroker creation failed #720

piomin opened this issue Mar 11, 2021 · 12 comments · Fixed by #747

Comments

@piomin
Copy link

piomin commented Mar 11, 2021

Describe the bug
When I'm trying to create a trigger related to KafkaBroker I receive a following error:
"broker not found in data plane config map knative-eventing/kafka-broker-brokers-triggers"

It creates a new ConfigMap knative-eventing/kafka-broker-brokers-triggers, but it is empty and I can't find any reference what I should to put there.

Expected behavior
Trigger is created without any errors

To Reproduce
Create a trigger related to Kafka broker

Knative release version
0.21

@piomin piomin added the kind/bug Categorizes issue or PR as related to a bug. label Mar 11, 2021
@pierDipi
Copy link
Member

pierDipi commented Mar 11, 2021

Can you provide a reproducer and share the status of the trigger?
Does it become ready after the error?

That configmap is internal only and should not be touched.

@piomin
Copy link
Author

piomin commented Mar 12, 2021

Yes. Here's the definition of Trigger.

apiVersion: v1
items:
- apiVersion: eventing.knative.dev/v1
  kind: Trigger
  metadata:
    annotations:
      eventing.knative.dev/creator: docker-for-desktop
      eventing.knative.dev/lastModifier: docker-for-desktop
      kubectl.kubernetes.io/last-applied-configuration: |
        {"apiVersion":"eventing.knative.dev/v1","kind":"Trigger","metadata":{"annotations":{},"name":"my-service-trigger","namespace":"serverless"},"spec":{"broker":"default","filter":{"attributes":{"kafkaheadercetype":"order"}},"subscriber":{"ref":{"apiVersion":"sources.knative.dev/v1beta1","kind":"KafkaSource","name":"kafka-source"}}}}
    creationTimestamp: "2021-03-11T07:48:27Z"
    finalizers:
    - kafka.triggers.eventing.knative.dev
    generation: 1
    labels:
      eventing.knative.dev/broker: default
    managedFields:
    - apiVersion: eventing.knative.dev/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:finalizers:
            .: {}
            v:"kafka.triggers.eventing.knative.dev": {}
        f:status:
          .: {}
          f:conditions: {}
          f:observedGeneration: {}
      manager: kafka-controller
      operation: Update
      time: "2021-03-11T07:48:27Z"
    - apiVersion: eventing.knative.dev/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:annotations:
            .: {}
            f:kubectl.kubernetes.io/last-applied-configuration: {}
        f:spec:
          .: {}
          f:broker: {}
          f:filter:
            .: {}
            f:attributes:
              .: {}
              f:kafkaheadercetype: {}
          f:subscriber:
            .: {}
            f:ref:
              .: {}
              f:apiVersion: {}
              f:kind: {}
              f:name: {}
      manager: kubectl-client-side-apply
      operation: Update
      time: "2021-03-11T07:48:27Z"
    name: my-service-trigger
    namespace: serverless
    resourceVersion: "1062797"
    selfLink: /apis/eventing.knative.dev/v1/namespaces/serverless/triggers/my-service-trigger
    uid: 2f859a23-da1a-47ee-b2a6-25fbfcb4ea11
  spec:
    broker: default
    filter:
      attributes:
        kafkaheadercetype: order
    subscriber:
      ref:
        apiVersion: sources.knative.dev/v1beta1
        kind: KafkaSource
        name: kafka-source
        namespace: serverless
  status:
    conditions:
    - lastTransitionTime: "2021-03-11T07:48:27Z"
      message: 'config map: knative-eventing/kafka-broker-brokers-triggers'
      reason: Broker not found in data plane map
      status: "False"
      type: BrokerReady
    - lastTransitionTime: "2021-03-11T07:48:27Z"
      status: Unknown
      type: DependencyReady
    - lastTransitionTime: "2021-03-11T07:48:27Z"
      message: 'config map: knative-eventing/kafka-broker-brokers-triggers'
      reason: Broker not found in data plane map
      status: "False"
      type: Ready
    - lastTransitionTime: "2021-03-11T07:48:27Z"
      status: Unknown
      type: SubscriberResolved
    - lastTransitionTime: "2021-03-11T07:48:27Z"
      status: Unknown
      type: SubscriptionReady
    observedGeneration: 1
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

@pierDipi
Copy link
Member

Can you share the broker called default too?

@piomin
Copy link
Author

piomin commented Mar 12, 2021

Yes. Here it is.

apiVersion: v1
items:
- apiVersion: eventing.knative.dev/v1
  kind: Broker
  metadata:
    annotations:
      eventing.knative.dev/broker.class: Kafka
      eventing.knative.dev/creator: docker-for-desktop
      eventing.knative.dev/lastModifier: docker-for-desktop
      kubectl.kubernetes.io/last-applied-configuration: |
        {"apiVersion":"eventing.knative.dev/v1","kind":"Broker","metadata":{"annotations":{"eventing.knative.dev/broker.class":"Kafka"},"name":"default","namespace":"serverless"},"spec":{"config":{"apiVersion":"v1","kind":"ConfigMap","name":"kafka-broker-config","namespace":"knative-eventing"}}}
    creationTimestamp: "2021-03-10T13:30:22Z"
    finalizers:
    - brokers.eventing.knative.dev
    generation: 1
    managedFields:
    - apiVersion: eventing.knative.dev/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:annotations:
            .: {}
            f:eventing.knative.dev/broker.class: {}
            f:kubectl.kubernetes.io/last-applied-configuration: {}
        f:spec:
          .: {}
          f:config:
            .: {}
            f:apiVersion: {}
            f:kind: {}
            f:name: {}
            f:namespace: {}
      manager: kubectl-client-side-apply
      operation: Update
      time: "2021-03-10T13:30:22Z"
    - apiVersion: eventing.knative.dev/v1
      fieldsType: FieldsV1
      fieldsV1:
        f:metadata:
          f:finalizers:
            .: {}
            v:"brokers.eventing.knative.dev": {}
        f:status:
          .: {}
          f:address: {}
          f:conditions: {}
          f:observedGeneration: {}
      manager: kafka-controller
      operation: Update
      time: "2021-03-12T07:58:16Z"
    name: default
    namespace: serverless
    resourceVersion: "1566626"
    selfLink: /apis/eventing.knative.dev/v1/namespaces/serverless/brokers/default
    uid: b5efb85f-5ea3-453f-ba72-3edf58ad135b
  spec:
    config:
      apiVersion: v1
      kind: ConfigMap
      name: kafka-broker-config
      namespace: knative-eventing
  status:
    address: {}
    conditions:
    - lastTransitionTime: "2021-03-10T13:30:23Z"
      status: Unknown
      type: Addressable
    - lastTransitionTime: "2021-03-10T13:30:23Z"
      status: Unknown
      type: ConfigMapUpdated
    - lastTransitionTime: "2021-03-10T13:30:23Z"
      status: "True"
      type: ConfigParsed
    - lastTransitionTime: "2021-03-10T13:30:23Z"
      status: "True"
      type: DataPlaneAvailable
    - lastTransitionTime: "2021-03-12T07:58:16Z"
      message: 'kafka server: Replication-factor is invalid. - Replication factor:
        3 larger than available brokers: 1.'
      reason: 'Failed to create topic: knative-broker-serverless-default'
      status: "False"
      type: Ready
    - lastTransitionTime: "2021-03-12T07:58:16Z"
      message: 'kafka server: Replication-factor is invalid. - Replication factor:
        3 larger than available brokers: 1.'
      reason: 'Failed to create topic: knative-broker-serverless-default'
      status: "False"
      type: TopicReady
    observedGeneration: 1
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

@pierDipi
Copy link
Member

Replication factor 3 larger than available brokers 1.
You need to change the replication factor in the ConfigMap referenced by the broker.spec.config and set it to 1.

@piomin
Copy link
Author

piomin commented Mar 12, 2021

Ok, sorry. My mistake. After fixing it works fine.
But can attach it to the KafkaSource? It seems I can't.

I have a KafkaSource defined as below:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

If I want to filter events I should attach Trigger to the Knative Service like below or to the KafkaSource?

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      kafkaheadercetype: order
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

@pierDipi
Copy link
Member

It should be:

Source (KafkaSource) -> Broker -> Trigger

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: default
  annotations:
    eventing.knative.dev/broker.class: Kafka
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      kafkaheadercetype: order
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Ok, sorry. My mistake

No problem, I have actually collected 2 action items thanks to this issue:

  1. improve error message propagation from broker to trigger
  2. add a note on the replication factor value

@pierDipi pierDipi added kind/question and removed kind/bug Categorizes issue or PR as related to a bug. labels Mar 12, 2021
@piomin
Copy link
Author

piomin commented Mar 15, 2021

Ok. Thanks for help!

@piomin piomin closed this as completed Mar 15, 2021
pierDipi added a commit to pierDipi/docs-1 that referenced this issue Mar 15, 2021
We've got a bug [1] report for a misconfiguration of 
`default.topic.replication.factor`.
This PR adds a note that could alert users on the
value of this configuration.

[1] knative-extensions/eventing-kafka-broker#720
pierDipi added a commit to pierDipi/docs-1 that referenced this issue Mar 15, 2021
We've got a bug [1] report for a misconfiguration of
`default.topic.replication.factor`.
This PR adds a note that could alert users on the
value of this configuration.

[1] knative-extensions/eventing-kafka-broker#720
knative-prow-robot pushed a commit to knative/docs that referenced this issue Mar 15, 2021
* Add a note on replication factor value

We've got a bug [1] report for a misconfiguration of
`default.topic.replication.factor`.
This PR adds a note that could alert users on the
value of this configuration.

[1] knative-extensions/eventing-kafka-broker#720

* Update docs/eventing/broker/kafka-broker.md

Co-authored-by: Ashleigh Brennan <40172997+abrennan89@users.noreply.github.com>

Co-authored-by: Ashleigh Brennan <40172997+abrennan89@users.noreply.github.com>
knative-prow-robot pushed a commit to knative-prow-robot/docs that referenced this issue Mar 15, 2021
We've got a bug [1] report for a misconfiguration of
`default.topic.replication.factor`.
This PR adds a note that could alert users on the
value of this configuration.

[1] knative-extensions/eventing-kafka-broker#720
knative-prow-robot added a commit to knative/docs that referenced this issue Mar 16, 2021
* Add a note on replication factor value

We've got a bug [1] report for a misconfiguration of
`default.topic.replication.factor`.
This PR adds a note that could alert users on the
value of this configuration.

[1] knative-extensions/eventing-kafka-broker#720

* Update docs/eventing/broker/kafka-broker.md

Co-authored-by: Ashleigh Brennan <40172997+abrennan89@users.noreply.github.com>

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Co-authored-by: Ashleigh Brennan <40172997+abrennan89@users.noreply.github.com>
pierDipi added a commit to pierDipi/eventing-kafka-broker that referenced this issue Mar 19, 2021
In knative-extensions#720
I noticed that our error propagation isn't helpful for troubleshooting,
since when a broker is not ready, the error in the trigger ready
condition is overridden by "Broker not found in data plane map".

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
knative-prow-robot pushed a commit that referenced this issue Mar 22, 2021
In #720
I noticed that our error propagation isn't helpful for troubleshooting,
since when a broker is not ready, the error in the trigger ready
condition is overridden by "Broker not found in data plane map".

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
@piomin
Copy link
Author

piomin commented Apr 1, 2021

@pierDipi I had back to this after some time. I set exactly the configuration you send me. But I've got an exception between KafkaSource and Broker. In the broker logs I see the following exception. I do not set that header: kafkaheadercontenttype. It is probably set by the KafkaSource. Do you have any idea what's the problem?

{"@timestamp":"2021-04-01T14:59:18.233Z","@version":"1","message":"Failed to send record path=/serverless/default","logger_name":"dev.knative.eventing.kafka.broker.receiver.RequestMapper","thread_name":"vert.x-eventloop-thread-0","level":"WARN","level_value":30000,"stack_trace":"io.cloudevents.rw.CloudEventRWException: Invalid extensions name: kafkaheadercontenttype\n\tat io.cloudevents.rw.CloudEventRWException.newInvalidExtensionName(CloudEventRWException.java:122)\n\tat io.cloudevents.core.impl.BaseCloudEventBuilder.withExtension(BaseCloudEventBuilder.java:103)\n\tat io.cloudevents.core.v1.CloudEventBuilder.withContextAttribute(CloudEventBuilder.java:176)\n\tat io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl.lambda$read$0(BaseGenericBinaryMessageReaderImpl.java:66)\n\tat io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl.lambda$forEachHeader$0(BinaryVertxMessageReaderImpl.java:58)\n\tat io.vertx.core.http.impl.headers.HeadersMultiMap.forEach(HeadersMultiMap.java:285)\n\tat io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl.forEachHeader(BinaryVertxMessageReaderImpl.java:58)\n\tat io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl.read(BaseGenericBinaryMessageReaderImpl.java:58)\n\tat io.cloudevents.core.CloudEventUtils.toEvent(CloudEventUtils.java:91)\n\tat io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:116)\n\tat io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:102)\n\tat io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:35)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)\n\tat io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)\n\tat io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)\n\tat io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:83)\n\tat io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:199)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:79)\n\tat io.vertx.core.http.impl.Http1xServerRequest.onEnd(Http1xServerRequest.java:549)\n\tat io.vertx.core.http.impl.Http1xServerRequest.handleEnd(Http1xServerRequest.java:535)\n\tat io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:73)\n\tat io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:189)\n\tat io.vertx.core.http.impl.Http1xServerConnection.onEnd(Http1xServerConnection.java:199)\n\tat io.vertx.core.http.impl.Http1xServerConnection.onContent(Http1xServerConnection.java:186)\n\tat io.vertx.core.http.impl.Http1xServerConnection.handleOther(Http1xServerConnection.java:156)\n\tat io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:144)\n\tat io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:153)\n\tat io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:148)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)\n\tat io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:101)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Unknown Source)\n","path":"/serverless/default"}

@piomin piomin reopened this Apr 1, 2021
@pierDipi
Copy link
Member

pierDipi commented Apr 1, 2021

It's a bug in the SDK, I'll send a PR there.

@pierDipi
Copy link
Member

pierDipi commented Apr 1, 2021

Can you open a different issue since it's a different bug?

@piomin
Copy link
Author

piomin commented Apr 2, 2021

Of course. I created #780

@piomin piomin closed this as completed Apr 2, 2021
creydr added a commit to creydr/knative-eventing-kafka-broker that referenced this issue Jul 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants