From 3f3a1fe0e81e79a58b4bda9a6b1ae455fe3f18ba Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Thu, 15 Apr 2021 14:27:03 -0700 Subject: [PATCH] Use CRDs instead of client libraries with RabbitMQ cluster operator (#271) * Have Broker use rabbitmq CRDs * check the status of resources. if rabbitmq operator is missing, remove finalizers * dump rabbit ns logs too * dump rabbit resources * wait for pods before messaging topology * break the Broker tests into two * swap direct / dlq order * try using crds for trigger also * status updates * actually set dependency to true if binding / queue become ready * proper ownerrefs * right bindingkey * grab the latest operator * source is not dlx * more recent rabbitmqcluster, eventing * recombine broker tests again * update unit tests to match new failure behaviour * test name match new exchange name * missed some other places * lasigh * testnames match * revert to previous version * oh jeez... * try v0.5.1 * try v0.6.0 --- .github/workflows/kind-conformance.yaml | 53 +++-- .github/workflows/kind-e2e.yaml | 51 ++-- .../200-rabbitmq-broker-clusterrole.yaml | 1 + pkg/reconciler/broker/broker.go | 217 ++++++++++++++++-- pkg/reconciler/broker/broker_test.go | 6 +- pkg/reconciler/broker/controller.go | 14 ++ pkg/reconciler/broker/controller_test.go | 2 + pkg/reconciler/broker/resources/exchange.go | 18 +- .../broker/resources/exchange_test.go | 6 +- .../broker/resources/ingress_test.go | 2 +- pkg/reconciler/testing/listers.go | 6 + pkg/reconciler/trigger/controller.go | 18 +- pkg/reconciler/trigger/controller_test.go | 3 + pkg/reconciler/trigger/resources/binding.go | 72 +++++- .../trigger/resources/binding_test.go | 8 +- pkg/reconciler/trigger/resources/queue.go | 56 ++++- pkg/reconciler/trigger/trigger.go | 169 ++++++++++++-- pkg/reconciler/trigger/trigger_test.go | 8 +- 18 files changed, 614 insertions(+), 96 deletions(-) diff --git a/.github/workflows/kind-conformance.yaml b/.github/workflows/kind-conformance.yaml index 84f2ad5f91..0a147c78f5 100644 --- a/.github/workflows/kind-conformance.yaml +++ b/.github/workflows/kind-conformance.yaml @@ -17,7 +17,7 @@ on: jobs: ko-resolve: - name: e2e tests + name: conformance tests runs-on: ubuntu-latest strategy: fail-fast: false # Keep running if one leg fails. @@ -28,10 +28,13 @@ jobs: - v1.20.0 eventing-version: - - v0.21.0 + - v0.22.0 rabbitmq-operator-version: - - v1.4.0 + - v1.6.0 + + rabbitmq-messaging-topology-operator-version: + - v0.6.0 # Map between K8s and KinD versions. # This is attempting to make it a bit clearer what's being tested. @@ -54,6 +57,8 @@ jobs: RABBITMQ_SYSTEM_NAMESPACE: rabbitmq-system # Where the Rabbitmq source is installed RABBITMQ_SOURCE_NAMESPACE: knative-sources + # Where the Cert Manager gets installed + CERT_MANAGER_NAMESPACE: cert-manager KIND_CLUSTER_NAME: kind steps: @@ -137,14 +142,21 @@ jobs: set -x kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.2.0/cert-manager.yaml - # Sleep for 120 so things come up - sleep 120 + + - name: Wait for Cert Manager Ready + working-directory: ./src/knative.dev/${{ github.event.repository.name }} + run: | + set -e + source ./vendor/knative.dev/hack/infra-library.sh + wait_until_pods_running ${CERT_MANAGER_NAMESPACE} + # Even though pods are running, seems like every now and then things are not ready in the next step. + sleep 15 - name: Install RabbitMQ Messaging Topology Operator run: | set -x - kubectl apply -f https://github.com/rabbitmq/messaging-topology-operator/releases/latest/download/messaging-topology-operator-with-certmanager.yaml + kubectl apply -f https://github.com/rabbitmq/messaging-topology-operator/releases/download/${{ matrix.rabbitmq-messaging-topology-operator-version }}/messaging-topology-operator-with-certmanager.yaml - name: Install Knative Eventing run: | @@ -197,19 +209,30 @@ jobs: echo "===================== RabbitMQClusters =====================" kubectl get RabbitMQCluster --all-namespaces=true -oyaml + echo "===================== RabbitMQ Exchanges =====================" + kubectl get exchange.rabbitmq.com --all-namespaces=true -oyaml + + echo "===================== RabbitMQ Queues =====================" + kubectl get queue.rabbitmq.com --all-namespaces=true -oyaml + + echo "===================== RabbitMQ Bindings =====================" + kubectl get binding.rabbitmq.com --all-namespaces=true -oyaml + echo "===================== K8s Events ===========================" kubectl get events --all-namespaces=true -oyaml echo "===================== Pod Logs =============================" - namespace=knative-eventing - for pod in $(kubectl get pod -n $namespace | awk '{print $1}'); do - for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do - echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" - kubectl logs -n $namespace "${pod}" -c "${container}" || true - echo "----------------------------------------------------------" - echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" - kubectl logs -p -n $namespace "${pod}" -c "${container}" || true - echo "============================================================" + namespaces=(knative-eventing rabbitmq-system) + for namespace in ${namespaces[@]}; do + for pod in $(kubectl get pod -n $namespace | awk '{print $1}'); do + for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do + echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" + kubectl logs -n $namespace "${pod}" -c "${container}" || true + echo "----------------------------------------------------------" + echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" + kubectl logs -p -n $namespace "${pod}" -c "${container}" || true + echo "============================================================" + done done done diff --git a/.github/workflows/kind-e2e.yaml b/.github/workflows/kind-e2e.yaml index ecbdd02910..a66879215c 100644 --- a/.github/workflows/kind-e2e.yaml +++ b/.github/workflows/kind-e2e.yaml @@ -30,6 +30,9 @@ jobs: rabbitmq-operator-version: - v1.6.0 + rabbitmq-messaging-topology-operator-version: + - v0.6.0 + # Map between K8s and KinD versions. # This is attempting to make it a bit clearer what's being tested. # See: https://github.com/kubernetes-sigs/kind/releases/tag/v0.9.0 @@ -51,6 +54,8 @@ jobs: RABBITMQ_SYSTEM_NAMESPACE: rabbitmq-system # Where the Rabbitmq source is installed RABBITMQ_SOURCE_NAMESPACE: knative-sources + # Where the Cert Manager gets installed + CERT_MANAGER_NAMESPACE: cert-manager KIND_CLUSTER_NAME: kind steps: @@ -131,14 +136,21 @@ jobs: set -x kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.2.0/cert-manager.yaml - # Sleep for 120 so things come up - sleep 120 + + - name: Wait for Cert Manager Ready + working-directory: ./src/knative.dev/${{ github.event.repository.name }} + run: | + set -e + source ./vendor/knative.dev/hack/infra-library.sh + wait_until_pods_running ${CERT_MANAGER_NAMESPACE} + # Even though pods are running, seems like every now and then things are not ready in the next step. + sleep 15 - name: Install RabbitMQ Messaging Topology Operator run: | set -x - kubectl apply -f https://github.com/rabbitmq/messaging-topology-operator/releases/latest/download/messaging-topology-operator-with-certmanager.yaml + kubectl apply -f https://github.com/rabbitmq/messaging-topology-operator/releases/download/${{ matrix.rabbitmq-messaging-topology-operator-version }}/messaging-topology-operator-with-certmanager.yaml - name: Install Knative Eventing run: | @@ -192,7 +204,7 @@ jobs: # Run the TestKoPublish e2e test. go test -v -race -count=1 -timeout=15m -tags=e2e ./test/e2e/... -run 'TestKoPublish' - # Run only Broker specific tests due to large resource requirements of rabbitmqcluster + # Run only Broker specific tests due to large resource requirements of rabbitmqcluster - name: Run Broker e2e Tests working-directory: ./src/knative.dev/${{ github.event.repository.name }} run: | @@ -201,7 +213,7 @@ jobs: # Run the tests tagged as e2e on the KinD cluster that have Broker in the test name go test -v -race -count=1 -timeout=15m -tags=e2e ./test/e2e/... -run 'Test.*Broker.*' - # Run source test + # Run source test - name: Run source e2e Tests working-directory: ./src/knative.dev/${{ github.event.repository.name }} run: | @@ -224,19 +236,30 @@ jobs: echo "===================== RabbitMQClusters =====================" kubectl get RabbitMQCluster --all-namespaces=true -oyaml + echo "===================== RabbitMQ Exchanges =====================" + kubectl get exchange.rabbitmq.com --all-namespaces=true -oyaml + + echo "===================== RabbitMQ Queues =====================" + kubectl get queue.rabbitmq.com --all-namespaces=true -oyaml + + echo "===================== RabbitMQ Bindings =====================" + kubectl get binding.rabbitmq.com --all-namespaces=true -oyaml + echo "===================== K8s Events ===========================" kubectl get events --all-namespaces=true -oyaml echo "===================== Pod Logs =============================" - namespace=knative-eventing - for pod in $(kubectl get pod -n $namespace | awk '{print $1}'); do - for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do - echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" - kubectl logs -n $namespace "${pod}" -c "${container}" || true - echo "----------------------------------------------------------" - echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" - kubectl logs -p -n $namespace "${pod}" -c "${container}" || true - echo "============================================================" + namespaces=(knative-eventing rabbitmq-system) + for namespace in ${namespaces[@]}; do + for pod in $(kubectl get pod -n $namespace | awk '{print $1}'); do + for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do + echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}" + kubectl logs -n $namespace "${pod}" -c "${container}" || true + echo "----------------------------------------------------------" + echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}" + kubectl logs -p -n $namespace "${pod}" -c "${container}" || true + echo "============================================================" + done done done diff --git a/config/broker/200-rabbitmq-broker-clusterrole.yaml b/config/broker/200-rabbitmq-broker-clusterrole.yaml index 81a0e5772e..90c5ee744e 100644 --- a/config/broker/200-rabbitmq-broker-clusterrole.yaml +++ b/config/broker/200-rabbitmq-broker-clusterrole.yaml @@ -75,6 +75,7 @@ rules: - apiGroups: - rabbitmq.com resources: + - bindings - exchanges - queues verbs: diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 4e4a017b02..20edab1c86 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -18,6 +18,7 @@ package broker import ( "context" + "fmt" "net/http" "go.uber.org/zap" @@ -35,6 +36,7 @@ import ( "knative.dev/pkg/logging" "knative.dev/pkg/network" + "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" rabbitclientset "github.com/rabbitmq/messaging-topology-operator/pkg/generated/clientset/versioned" rabbitlisters "github.com/rabbitmq/messaging-topology-operator/pkg/generated/listers/rabbitmq.com/v1alpha2" dialer "knative.dev/eventing-rabbitmq/pkg/amqp" @@ -66,6 +68,8 @@ type Reconciler struct { deploymentLister appsv1listers.DeploymentLister rabbitLister apisduck.InformerFactory exchangeLister rabbitlisters.ExchangeLister + queueLister rabbitlisters.QueueLister + bindingLister rabbitlisters.BindingLister ingressImage string ingressServiceAccountName string @@ -124,9 +128,7 @@ var rabbitBrokerCondSet = apis.NewLivingConditionSet( // messaging-topology-operator or the libraries. func isUsingOperator(b *eventingv1.Broker) bool { if b != nil && b.Spec.Config != nil { - return false - // TODO: vaikas: Enable this to use CRD based reconciler. - // return b.Spec.Config.Kind == "RabbitmqCluster" + return b.Spec.Config.Kind == "RabbitmqCluster" } return false } @@ -144,6 +146,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk } if isUsingOperator(b) { + args.RabbitMQCluster = b.Spec.Config.Name err := r.reconcileUsingCRD(ctx, b, args) if err != nil { return err @@ -160,9 +163,18 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk } func (r *Reconciler) FinalizeKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event { + if isUsingOperator(b) { + // Everything gets cleaned up by garbage collection in this case. + return nil + } args, err := r.getExchangeArgs(ctx, b) if err != nil { - return err + // TODO: Problem here is that depending on the kind of error we get back, say there's no + // RabbitMQ cluster anymore would mean that we couldn't necessarily delete underlying Rabbit + // resources. But leaving the Broker around seems worse because the user would have to manually + // remove the finalizer. So, log it and allow the removal of the resource. + logging.FromContext(ctx).Errorw("Failed to get Exchange args, there might be leaked rabbit resources", zap.Error(err)) + return nil } if err := resources.DeleteExchange(args); err != nil { logging.FromContext(ctx).Errorw("Problem deleting exchange", zap.Error(err)) @@ -320,39 +332,192 @@ func (r *Reconciler) reconcileDLQBinding(ctx context.Context, b *eventingv1.Brok return nil } -func (r *Reconciler) reconcileExchange(ctx context.Context, args *resources.ExchangeArgs) error { +func (r *Reconciler) reconcileExchange(ctx context.Context, args *resources.ExchangeArgs) (*v1alpha2.Exchange, error) { want := resources.NewExchange(ctx, args) current, err := r.exchangeLister.Exchanges(args.Broker.Namespace).Get(resources.ExchangeName(args.Broker, args.DLX)) if apierrs.IsNotFound(err) { - _, err = r.rabbitClientSet.RabbitmqV1alpha2().Exchanges(args.Broker.Namespace).Create(ctx, want, metav1.CreateOptions{}) - if err != nil { - return err - } + logging.FromContext(ctx).Debugw("Creating rabbitmq exchange", zap.String("exchange name", want.Name)) + return r.rabbitClientSet.RabbitmqV1alpha2().Exchanges(args.Broker.Namespace).Create(ctx, want, metav1.CreateOptions{}) } else if err != nil { - return err + return nil, err } else if !equality.Semantic.DeepDerivative(want.Spec, current.Spec) { // Don't modify the informers copy. desired := current.DeepCopy() desired.Spec = want.Spec - _, err = r.rabbitClientSet.RabbitmqV1alpha2().Exchanges(args.Broker.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) - if err != nil { - return err - } + return r.rabbitClientSet.RabbitmqV1alpha2().Exchanges(args.Broker.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) } - return nil + return current, nil +} + +func (r *Reconciler) reconcileQueue(ctx context.Context, b *eventingv1.Broker) (*v1alpha2.Queue, error) { + queueName := triggerresources.CreateBrokerDeadLetterQueueName(b) + // Create a Queue for the DLX + args := &triggerresources.QueueArgs{ + QueueName: queueName, + } + + want := triggerresources.NewQueue(ctx, b, args) + current, err := r.queueLister.Queues(b.Namespace).Get(queueName) + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Debugw("Creating rabbitmq exchange", zap.String("queue name", want.Name)) + return r.rabbitClientSet.RabbitmqV1alpha2().Queues(b.Namespace).Create(ctx, want, metav1.CreateOptions{}) + } else if err != nil { + return nil, err + } else if !equality.Semantic.DeepDerivative(want.Spec, current.Spec) { + // Don't modify the informers copy. + desired := current.DeepCopy() + desired.Spec = want.Spec + return r.rabbitClientSet.RabbitmqV1alpha2().Queues(b.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) + } + return current, nil +} + +func (r *Reconciler) reconcileBinding(ctx context.Context, b *eventingv1.Broker) (*v1alpha2.Binding, error) { + // We can use the same name for queue / binding to keep things simpler + bindingName := triggerresources.CreateBrokerDeadLetterQueueName(b) + + bindingArgs := &triggerresources.BindingArgs{ + Broker: b, + BindingName: bindingName, + BindingKey: b.Name, + RoutingKey: "", + SourceName: resources.ExchangeName(b, true), + QueueName: triggerresources.CreateBrokerDeadLetterQueueName(b), + } + + want, err := triggerresources.NewBinding(ctx, b, bindingArgs) + if err != nil { + return nil, fmt.Errorf("failed to create the binding spec: %w", err) + } + current, err := r.bindingLister.Bindings(b.Namespace).Get(bindingName) + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Infow("Creating rabbitmq binding", zap.String("binding name", want.Name)) + return r.rabbitClientSet.RabbitmqV1alpha2().Bindings(b.Namespace).Create(ctx, want, metav1.CreateOptions{}) + } else if err != nil { + return nil, err + } else if !equality.Semantic.DeepDerivative(want.Spec, current.Spec) { + // Don't modify the informers copy. + desired := current.DeepCopy() + desired.Spec = want.Spec + return r.rabbitClientSet.RabbitmqV1alpha2().Bindings(b.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) + } + return current, nil } func (r *Reconciler) reconcileUsingCRD(ctx context.Context, b *eventingv1.Broker, args *resources.ExchangeArgs) error { - err := r.reconcileExchange(ctx, args) + logging.FromContext(ctx).Info("Reconciling exchange") + exchange, err := r.reconcileExchange(ctx, args) if err != nil { + MarkExchangeFailed(&b.Status, "ExchangeFailure", "Failed to reconcile exchange: %s", err) return err } + if exchange != nil { + if !isReady(exchange.Status.Conditions) { + logging.FromContext(ctx).Warnf("Exchange %q is not ready", exchange.Name) + return nil + } + } args.DLX = true - err = r.reconcileExchange(ctx, args) + + logging.FromContext(ctx).Info("Reconciling DLX exchange") + dlxExchange, err := r.reconcileExchange(ctx, args) if err != nil { + MarkExchangeFailed(&b.Status, "ExchangeFailure", "Failed to reconcile DLX exchange: %s", err) return err } + if dlxExchange != nil { + if !isReady(dlxExchange.Status.Conditions) { + logging.FromContext(ctx).Warnf("DLX exchange %q is not ready", dlxExchange.Name) + MarkExchangeFailed(&b.Status, "ExchangeFailure", "DLX exchange is not ready") + return nil + } + } MarkExchangeReady(&b.Status) + + logging.FromContext(ctx).Info("Reconciling queue") + queue, err := r.reconcileQueue(ctx, b) + if err != nil { + MarkDLXFailed(&b.Status, "QueueFailure", "Failed to reconcile Dead Letter Queue: %s", err) + return err + } + if queue != nil { + if !isReady(queue.Status.Conditions) { + logging.FromContext(ctx).Warnf("Queue %q is not ready", queue.Name) + MarkDLXFailed(&b.Status, "QueueFailure", "Dead Letter Queue is not ready") + return nil + } + } + + MarkDLXReady(&b.Status) + + logging.FromContext(ctx).Info("Reconciling binding") + binding, err := r.reconcileBinding(ctx, b) + if err != nil { + MarkDeadLetterSinkFailed(&b.Status, "DLQ binding", "%v", err) + return err + } + if binding != nil { + if !isReady(binding.Status.Conditions) { + logging.FromContext(ctx).Warnf("Binding %q is not ready", binding.Name) + MarkDeadLetterSinkFailed(&b.Status, "DLQ binding", "DLQ binding is not ready") + return nil + } + } + MarkDeadLetterSinkReady(&b.Status) + + // TODO: These are copy & paste, we should hoist them out. + s := resources.MakeSecret(args) + if err := r.reconcileSecret(ctx, s); err != nil { + logging.FromContext(ctx).Errorw("Problem reconciling Secret", zap.Error(err)) + MarkSecretFailed(&b.Status, "SecretFailure", "Failed to reconcile secret: %s", err) + return err + } + MarkSecretReady(&b.Status) + + if err := r.reconcileIngressDeployment(ctx, b); err != nil { + logging.FromContext(ctx).Errorw("Problem reconciling ingress Deployment", zap.Error(err)) + MarkIngressFailed(&b.Status, "DeploymentFailure", "Failed to reconcile deployment: %s", err) + return err + } + + ingressEndpoints, err := r.reconcileIngressService(ctx, b) + if err != nil { + logging.FromContext(ctx).Errorw("Problem reconciling ingress Service", zap.Error(err)) + MarkIngressFailed(&b.Status, "ServiceFailure", "Failed to reconcile service: %s", err) + return err + } + PropagateIngressAvailability(&b.Status, ingressEndpoints) + + SetAddress(&b.Status, &apis.URL{ + Scheme: "http", + Host: network.GetServiceHostname(ingressEndpoints.GetName(), ingressEndpoints.GetNamespace()), + }) + + // If there's a Dead Letter Sink, then create a dispatcher for it. Note that this is for + // the whole broker, unlike for the Trigger, where we create one dispatcher per Trigger. + var dlsURI *apis.URL + if b.Spec.Delivery != nil && b.Spec.Delivery.DeadLetterSink != nil { + dlsURI, err = r.uriResolver.URIFromDestinationV1(ctx, *b.Spec.Delivery.DeadLetterSink, b) + if err != nil { + logging.FromContext(ctx).Error("Unable to get the DeadLetterSink URI", zap.Error(err)) + MarkDeadLetterSinkFailed(&b.Status, "Unable to get the DeadLetterSink's URI", "%v", err) + return err + } + + // TODO(vaikas): Set the custom annotation for resolved URI?... + // TODO(vaikas): Should this be a first level BrokerStatus field? + } + + // Note that if we didn't actually resolve the URI above, as in it's left as nil it's ok to pass here + // it deals with it properly. + if err := r.reconcileDLXDispatchercherDeployment(ctx, b, dlsURI); err != nil { + logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err)) + MarkDeadLetterSinkFailed(&b.Status, "DeploymentFailure", "%v", err) + return err + } + + // So, at this point the Broker is ready and everything should be solid + // for the triggers to act upon. return nil } @@ -391,8 +556,8 @@ func (r *Reconciler) reconcileUsingLibraries(ctx context.Context, b *eventingv1. MarkDLXReady(&b.Status) if err := r.reconcileDLQBinding(ctx, b); err != nil { - logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err)) - MarkDeadLetterSinkFailed(&b.Status, "DeploymentFailure", "%v", err) + logging.FromContext(ctx).Error("Problem reconciling DLX Binding", zap.Error(err)) + MarkDeadLetterSinkFailed(&b.Status, "DLQ binding", "%v", err) return err } MarkDeadLetterSinkReady(&b.Status) @@ -450,3 +615,17 @@ func (r *Reconciler) reconcileUsingLibraries(ctx context.Context, b *eventingv1. // for the triggers to act upon. return nil } + +func isReady(conditions []v1alpha2.Condition) bool { + numConditions := len(conditions) + // If there are no conditions at all, the resource probably hasn't been reconciled yet => not ready + if numConditions == 0 { + return false + } + for _, c := range conditions { + if c.Status == corev1.ConditionTrue { + numConditions-- + } + } + return numConditions == 0 +} diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index d1534ced4d..f6e31309ac 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -174,10 +174,6 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions, WithBrokerDeletionTimestamp), }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", `secrets "test-secret" not found`), - }, - WantErr: true, }, { Name: "Broker deleted", Key: testKey, @@ -728,6 +724,8 @@ func TestReconcile(t *testing.T) { dispatcherImage: dispatcherImage, rabbitClientSet: fakerabbitclient.Get(ctx), exchangeLister: listers.GetExchangeLister(), + queueLister: listers.GetQueueLister(), + bindingLister: listers.GetBindingLister(), rabbitLister: rabbitduck.Get(ctx), } return broker.NewReconciler(ctx, logger, diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index 1870f7aced..8dbc5b4d0f 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -32,7 +32,9 @@ import ( eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" dialer "knative.dev/eventing-rabbitmq/pkg/amqp" + bindinginformer "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/binding" exchangeinformer "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/exchange" + queueinformer "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/queue" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" "knative.dev/eventing/pkg/duck" @@ -86,6 +88,8 @@ func NewController( endpointsInformer := endpointsinformer.Get(ctx) rabbitInformer := rabbit.Get(ctx) exchangeInformer := exchangeinformer.Get(ctx) + queueInformer := queueinformer.Get(ctx) + bindingInformer := bindinginformer.Get(ctx) r := &Reconciler{ eventingClientSet: eventingclient.Get(ctx), @@ -104,6 +108,8 @@ func NewController( dispatcherImage: env.DispatcherImage, rabbitClientSet: rabbitmqclient.Get(ctx), exchangeLister: exchangeInformer.Lister(), + queueLister: queueInformer.Lister(), + bindingLister: bindingInformer.Lister(), } impl := brokerreconciler.NewImpl(ctx, r, env.BrokerClass, func(impl *controller.Impl) controller.Options { @@ -146,5 +152,13 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + queueInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + bindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) return impl } diff --git a/pkg/reconciler/broker/controller_test.go b/pkg/reconciler/broker/controller_test.go index dc4f52bbde..94b8b51bc9 100644 --- a/pkg/reconciler/broker/controller_test.go +++ b/pkg/reconciler/broker/controller_test.go @@ -26,7 +26,9 @@ import ( // Fake injection informers _ "knative.dev/eventing-rabbitmq/pkg/client/injection/ducks/duck/v1beta1/rabbit/fake" _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/client/fake" + _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/binding/fake" _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/exchange/fake" + _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/queue/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/conditions/fake" diff --git a/pkg/reconciler/broker/resources/exchange.go b/pkg/reconciler/broker/resources/exchange.go index be7ebd0c46..a1669e0a21 100644 --- a/pkg/reconciler/broker/resources/exchange.go +++ b/pkg/reconciler/broker/resources/exchange.go @@ -44,7 +44,7 @@ type ExchangeArgs struct { } func NewExchange(ctx context.Context, args *ExchangeArgs) *rabbitv1alpha2.Exchange { - exchangeName := kmeta.ChildName(ExchangeName(args.Broker, args.DLX), string(args.Broker.GetUID())) + exchangeName := ExchangeName(args.Broker, args.DLX) return &rabbitv1alpha2.Exchange{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Broker.Namespace, @@ -127,12 +127,18 @@ func DeleteExchange(args *ExchangeArgs) error { ) } -// ExchangeName constructs a name given a Broker and whether this is a DLX or not. -// Format is broker.Namespace/broker.Name for normal exchanges and -// broker.Namespace/broker.Name/DLX for DLX exchanges. +// ExchangeName constructs a name given a Broker. +// Format is broker.Namespace.broker.Name for normal exchanges and +// broker.Namespace.broker.Name.DLX for DLX exchanges. func ExchangeName(b *eventingv1.Broker, DLX bool) string { + var exchangeBase string if DLX { - return fmt.Sprintf("%s/knative-%s/DLX", b.Namespace, b.Name) + exchangeBase = fmt.Sprintf("%s.%s.dlx", b.Namespace, b.Name) + } else { + exchangeBase = fmt.Sprintf("%s.%s", b.Namespace, b.Name) + } - return fmt.Sprintf("%s/knative-%s", b.Namespace, b.Name) + foo := kmeta.ChildName(exchangeBase, string(b.GetUID())) + fmt.Printf("TODO: Fix this and use consistently to avoid collisions, worth doing? %s\n", foo) + return exchangeBase } diff --git a/pkg/reconciler/broker/resources/exchange_test.go b/pkg/reconciler/broker/resources/exchange_test.go index cfb6be1b5b..f52baa8b2b 100644 --- a/pkg/reconciler/broker/resources/exchange_test.go +++ b/pkg/reconciler/broker/resources/exchange_test.go @@ -39,7 +39,7 @@ func TestExchangeDeclaration(t *testing.T) { assert.Equal(t, createdExchange["auto_delete"], false) assert.Equal(t, createdExchange["internal"], false) assert.Equal(t, createdExchange["type"], "headers") - assert.Equal(t, createdExchange["name"], fmt.Sprintf("%s/knative-%s", namespace, brokerName)) + assert.Equal(t, createdExchange["name"], fmt.Sprintf("%s.%s", namespace, brokerName)) } func TestIncompatibleExchangeDeclarationFailure(t *testing.T) { @@ -47,7 +47,7 @@ func TestIncompatibleExchangeDeclarationFailure(t *testing.T) { rabbitContainer := testrabbit.AutoStartRabbit(t, ctx) defer testrabbit.TerminateContainer(t, ctx, rabbitContainer) brokerName := "x-change" - exchangeName := fmt.Sprintf("%s/knative-%s", namespace, brokerName) + exchangeName := fmt.Sprintf("%s.%s", namespace, brokerName) testrabbit.CreateExchange(t, ctx, rabbitContainer, exchangeName, "direct") _, err := resources.DeclareExchange(dialer.RealDialer, &resources.ExchangeArgs{ @@ -68,7 +68,7 @@ func TestExchangeDeletion(t *testing.T) { rabbitContainer := testrabbit.AutoStartRabbit(t, ctx) defer testrabbit.TerminateContainer(t, ctx, rabbitContainer) brokerName := "x-change" - exchangeName := fmt.Sprintf("%s/knative-%s", namespace, brokerName) + exchangeName := fmt.Sprintf("%s.%s", namespace, brokerName) testrabbit.CreateExchange(t, ctx, rabbitContainer, exchangeName, "headers") err := resources.DeleteExchange(&resources.ExchangeArgs{ diff --git a/pkg/reconciler/broker/resources/ingress_test.go b/pkg/reconciler/broker/resources/ingress_test.go index 21272eaff0..f3095c185a 100644 --- a/pkg/reconciler/broker/resources/ingress_test.go +++ b/pkg/reconciler/broker/resources/ingress_test.go @@ -94,7 +94,7 @@ func TestMakeIngressDeployment(t *testing.T) { }, }, { Name: "EXCHANGE_NAME", - Value: ns + "/knative-" + brokerName, + Value: ns + "." + brokerName, }}, Ports: []corev1.ContainerPort{{ ContainerPort: 8080, diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index d3142df566..8e8e37a585 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -209,3 +209,9 @@ func (l *Listers) GetRabbitObjects() []runtime.Object { func (l *Listers) GetExchangeLister() rabbitlisters.ExchangeLister { return rabbitlisters.NewExchangeLister(l.indexerFor(&rabbitv1alpha2.Exchange{})) } +func (l *Listers) GetQueueLister() rabbitlisters.QueueLister { + return rabbitlisters.NewQueueLister(l.indexerFor(&rabbitv1alpha2.Queue{})) +} +func (l *Listers) GetBindingLister() rabbitlisters.BindingLister { + return rabbitlisters.NewBindingLister(l.indexerFor(&rabbitv1alpha2.Binding{})) +} diff --git a/pkg/reconciler/trigger/controller.go b/pkg/reconciler/trigger/controller.go index 33d9c409d5..3134f93d0e 100644 --- a/pkg/reconciler/trigger/controller.go +++ b/pkg/reconciler/trigger/controller.go @@ -20,6 +20,7 @@ import ( "context" "log" + rabbitmqclient "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/client" eventingclient "knative.dev/eventing/pkg/client/injection/client" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/injection/clients/dynamicclient" @@ -32,6 +33,7 @@ import ( v1 "knative.dev/eventing/pkg/apis/eventing/v1" dialer "knative.dev/eventing-rabbitmq/pkg/amqp" + bindinginformer "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/binding" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" @@ -41,6 +43,8 @@ import ( "knative.dev/pkg/client/injection/ducks/duck/v1/source" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + queueinformer "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/queue" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -72,6 +76,8 @@ func NewController( brokerInformer := brokerinformer.Get(ctx) deploymentInformer := deploymentinformer.Get(ctx) triggerInformer := triggerinformer.Get(ctx) + queueInformer := queueinformer.Get(ctx) + bindingInformer := bindinginformer.Get(ctx) r := &Reconciler{ eventingClientSet: eventingclient.Get(ctx), @@ -84,6 +90,9 @@ func NewController( dispatcherServiceAccountName: env.DispatcherServiceAccount, brokerClass: env.BrokerClass, dialerFunc: dialer.RealDialer, + queueLister: queueInformer.Lister(), + bindingLister: bindingInformer.Lister(), + rabbitClientSet: rabbitmqclient.Get(ctx), } impl := triggerreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { @@ -100,6 +109,14 @@ func NewController( FilterFunc: controller.FilterControllerGK(v1.Kind("Trigger")), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + queueInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGK(v1.Kind("Trigger")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + bindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGK(v1.Kind("Trigger")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) brokerInformer.Informer().AddEventHandler(controller.HandleAll( func(obj interface{}) { @@ -133,6 +150,5 @@ func NewController( } }, )) - return impl } diff --git a/pkg/reconciler/trigger/controller_test.go b/pkg/reconciler/trigger/controller_test.go index 728f1a0ed7..58deacfae6 100644 --- a/pkg/reconciler/trigger/controller_test.go +++ b/pkg/reconciler/trigger/controller_test.go @@ -25,6 +25,9 @@ import ( // Fake injection informers _ "knative.dev/eventing-rabbitmq/pkg/client/injection/ducks/duck/v1beta1/rabbit/fake" + _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/client/fake" + _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/binding/fake" + _ "knative.dev/eventing-rabbitmq/pkg/client/injection/rabbitmq.com/informers/rabbitmq.com/v1alpha2/queue/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" diff --git a/pkg/reconciler/trigger/resources/binding.go b/pkg/reconciler/trigger/resources/binding.go index ef52b48161..1ed303bad0 100644 --- a/pkg/reconciler/trigger/resources/binding.go +++ b/pkg/reconciler/trigger/resources/binding.go @@ -17,6 +17,8 @@ limitations under the License. package resources import ( + "context" + "encoding/json" "fmt" "io/ioutil" "net" @@ -24,6 +26,11 @@ import ( "net/url" "reflect" + rabbitv1alpha2 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "knative.dev/pkg/kmeta" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) @@ -38,13 +45,73 @@ const ( type BindingArgs struct { Trigger *eventingv1.Trigger Broker *eventingv1.Broker // only for DLQ + BindingName string + BindingKey string RoutingKey string + SourceName string BrokerURL string RabbitmqManagementPort int AdminURL string QueueName string // only for DLQ } +func NewBinding(ctx context.Context, broker *eventingv1.Broker, args *BindingArgs) (*rabbitv1alpha2.Binding, error) { + var or metav1.OwnerReference + if args.Trigger != nil { + or = *kmeta.NewControllerRef(args.Trigger) + } else { + or = *kmeta.NewControllerRef(broker) + } + arguments := map[string]string{ + "x-match": "all", + BindingKey: args.BindingKey, + } + if args.Trigger != nil && args.Trigger.Spec.Filter != nil { + for key, val := range args.Trigger.Spec.Filter.Attributes { + arguments[key] = val + } + } + argumentsJson, err := json.Marshal(arguments) + if err != nil { + return nil, fmt.Errorf("failed to encode binding arguments %+v : %s", argumentsJson, err) + } + + binding := &rabbitv1alpha2.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: broker.Namespace, + Name: args.BindingName, + OwnerReferences: []metav1.OwnerReference{or}, + Labels: BindingLabels(broker), + }, + Spec: rabbitv1alpha2.BindingSpec{ + Vhost: "/", + Source: args.SourceName, + Destination: args.QueueName, + DestinationType: "queue", + RoutingKey: args.RoutingKey, + Arguments: &runtime.RawExtension{ + Raw: argumentsJson, + }, + + // TODO: We had before also internal / nowait set to false. Are these in Arguments, + // or do they get sane defaults that we can just work with? + // TODO: This one has to exist in the same namespace as this exchange. + RabbitmqClusterReference: rabbitv1alpha2.RabbitmqClusterReference{ + Name: broker.Spec.Config.Name, + }, + }, + } + return binding, nil +} + +// BindingLabels generates the labels present on the Queue linking the Broker / Trigger to the +// Binding. +func BindingLabels(b *eventingv1.Broker) map[string]string { + return map[string]string{ + "eventing.knative.dev/broker": b.Name, + } +} + // MakeBinding declares the Binding from the Broker's Exchange to the Trigger's Queue. func MakeBinding(transport http.RoundTripper, args *BindingArgs) error { uri, err := url.Parse(args.BrokerURL) @@ -197,10 +264,9 @@ func managementPort(args *BindingArgs) int { return DefaultManagementPort } -// ExchangeName derives the Exchange name from the Broker name func ExchangeName(namespace, brokerName string, DLX bool) string { if DLX { - return fmt.Sprintf("%s/knative-%s/DLX", namespace, brokerName) + return fmt.Sprintf("%s.%s.dlx", namespace, brokerName) } - return fmt.Sprintf("%s/knative-%s", namespace, brokerName) + return fmt.Sprintf("%s.%s", namespace, brokerName) } diff --git a/pkg/reconciler/trigger/resources/binding_test.go b/pkg/reconciler/trigger/resources/binding_test.go index 3efe928fee..33162ff776 100644 --- a/pkg/reconciler/trigger/resources/binding_test.go +++ b/pkg/reconciler/trigger/resources/binding_test.go @@ -36,7 +36,7 @@ func TestBindingDeclaration(t *testing.T) { qualifiedQueueName := namespace + "-" + queueName testrabbit.CreateDurableQueue(t, ctx, rabbitContainer, qualifiedQueueName) brokerName := "some-broker" - exchangeName := namespace + "/" + "knative-" + brokerName + exchangeName := namespace + "." + brokerName testrabbit.CreateExchange(t, ctx, rabbitContainer, exchangeName, "headers") err := resources.MakeBinding(nil, &resources.BindingArgs{ @@ -78,7 +78,7 @@ func TestBindingDLQDeclaration(t *testing.T) { queueName := "queue-and-a" testrabbit.CreateDurableQueue(t, ctx, rabbitContainer, queueName) brokerName := "some-broker" - exchangeName := namespace + "/" + "knative-" + brokerName + "/DLX" + exchangeName := namespace + "." + brokerName + ".dlx" testrabbit.CreateExchange(t, ctx, rabbitContainer, exchangeName, "headers") err := resources.MakeDLQBinding(nil, &resources.BindingArgs{ @@ -135,8 +135,8 @@ func TestMissingExchangeBindingDeclarationFailure(t *testing.T) { }, }) - assert.ErrorContains(t, err, `Failed to declare Binding: Error 404 (not_found): no exchange 'foobar/knative-some-broke-herr' in vhost '/'`) - assert.ErrorContains(t, err, fmt.Sprintf("no exchange '%s/knative-%s'", namespace, brokerName)) + assert.ErrorContains(t, err, `Failed to declare Binding: Error 404 (not_found): no exchange 'foobar.some-broke-herr' in vhost '/'`) + assert.ErrorContains(t, err, fmt.Sprintf("no exchange '%s.%s'", namespace, brokerName)) } func asMap(t *testing.T, value interface{}) map[string]interface{} { diff --git a/pkg/reconciler/trigger/resources/queue.go b/pkg/reconciler/trigger/resources/queue.go index 59e6735cf9..1e5309faf1 100644 --- a/pkg/reconciler/trigger/resources/queue.go +++ b/pkg/reconciler/trigger/resources/queue.go @@ -17,11 +17,16 @@ limitations under the License. package resources import ( + "context" "fmt" + rabbitv1alpha2 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" "github.com/streadway/amqp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" dialer "knative.dev/eventing-rabbitmq/pkg/amqp" "knative.dev/eventing-rabbitmq/pkg/reconciler/io" + "knative.dev/pkg/kmeta" "github.com/NeowayLabs/wabbit" @@ -30,12 +35,59 @@ import ( // QueueArgs are the arguments to create a Trigger's RabbitMQ Queue. type QueueArgs struct { - QueueName string - RabbitmqURL string + QueueName string + RabbitmqURL string + RabbitmqCluster string + // If the queue is for Trigger, this holds the trigger so we can create a proper Owner Ref + Trigger *eventingv1.Trigger // If non-empty, wire the queue into this DLX. DLX string } +func NewQueue(ctx context.Context, b *eventingv1.Broker, args *QueueArgs) *rabbitv1alpha2.Queue { + var or metav1.OwnerReference + if args.Trigger != nil { + or = *kmeta.NewControllerRef(args.Trigger) + } else { + or = *kmeta.NewControllerRef(b) + } + q := &rabbitv1alpha2.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: b.Namespace, + Name: args.QueueName, + OwnerReferences: []metav1.OwnerReference{or}, + Labels: QueueLabels(b), + }, + Spec: rabbitv1alpha2.QueueSpec{ + // Why is the name in the Spec again? Is this different from the ObjectMeta.Name? If not, + // maybe it should be removed? + Name: args.QueueName, + Durable: true, + AutoDelete: false, + // TODO: We had before also internal / nowait set to false. Are these in Arguments, + // or do they get sane defaults that we can just work with? + // TODO: This one has to exist in the same namespace as this exchange. + RabbitmqClusterReference: rabbitv1alpha2.RabbitmqClusterReference{ + Name: b.Spec.Config.Name, + }, + }, + } + if args.DLX != "" { + q.Spec.Arguments = &runtime.RawExtension{ + Raw: []byte(`{"x-dead-letter-exchange":"` + args.DLX + `"}`), + } + } + return q +} + +// QueueLabels generates the labels present on the Queue linking the Broker / Trigger to the +// Queue. +func QueueLabels(b *eventingv1.Broker) map[string]string { + return map[string]string{ + "eventing.knative.dev/broker": b.Name, + } +} + func CreateBrokerDeadLetterQueueName(b *eventingv1.Broker) string { // TODO(vaikas): https://github.com/knative-sandbox/eventing-rabbitmq/issues/61 // return fmt.Sprintf("%s/%s/DLQ", b.Namespace, b.Name) diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index f2edcbdcb0..d03e12a778 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" + "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" "go.uber.org/zap" v1 "k8s.io/api/apps/v1" @@ -36,6 +37,8 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/logging" + rabbitclientset "github.com/rabbitmq/messaging-topology-operator/pkg/generated/clientset/versioned" + rabbitlisters "github.com/rabbitmq/messaging-topology-operator/pkg/generated/listers/rabbitmq.com/v1alpha2" dialer "knative.dev/eventing-rabbitmq/pkg/amqp" "knative.dev/eventing-rabbitmq/pkg/reconciler/trigger/resources" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -56,11 +59,14 @@ type Reconciler struct { eventingClientSet clientset.Interface dynamicClientSet dynamic.Interface kubeClientSet kubernetes.Interface + rabbitClientSet rabbitclientset.Interface // listers index properties about resources deploymentLister appsv1listers.DeploymentLister brokerLister eventinglisters.BrokerLister triggerLister eventinglisters.TriggerLister + queueLister rabbitlisters.QueueLister + bindingLister rabbitlisters.BindingLister dispatcherImage string dispatcherServiceAccountName string @@ -87,6 +93,15 @@ type Reconciler struct { var _ triggerreconciler.Interface = (*Reconciler)(nil) var _ triggerreconciler.Finalizer = (*Reconciler)(nil) +// isUsingOperator checks the Spec for a Broker and determines if we should be using the +// messaging-topology-operator or the libraries. +func isUsingOperator(b *eventingv1.Broker) bool { + if b != nil && b.Spec.Config != nil { + return b.Spec.Config.Kind == "RabbitmqCluster" + } + return false +} + func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) pkgreconciler.Event { logging.FromContext(ctx).Debug("Reconciling", zap.Any("Trigger", t)) @@ -141,32 +156,67 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p // Note that we always create DLX with this queue because you can't // change them later. + // Note we use the same name for queue & binding for consistency. + queueName := resources.CreateTriggerQueueName(t) queueArgs := &resources.QueueArgs{ - QueueName: resources.CreateTriggerQueueName(t), + QueueName: queueName, RabbitmqURL: rabbitmqURL, DLX: brokerresources.ExchangeName(broker, true), } + if !isUsingOperator(broker) { + queue, err := resources.DeclareQueue(r.dialerFunc, queueArgs) + if err != nil { + logging.FromContext(ctx).Error("Problem declaring Trigger Queue", zap.Error(err)) + t.Status.MarkDependencyFailed("QueueFailure", "%v", err) + return err + } + logging.FromContext(ctx).Info("Created rabbitmq queue", zap.Any("queue", queue)) + + err = resources.MakeBinding(r.transport, &resources.BindingArgs{ + Trigger: t, + RoutingKey: "", + BrokerURL: rabbitmqURL, + AdminURL: r.adminURL, + }) + if err != nil { + logging.FromContext(ctx).Error("Problem declaring Trigger Queue Binding", zap.Error(err)) + t.Status.MarkDependencyFailed("BindingFailure", "%v", err) + return err + } + } else { + queue, err := r.reconcileQueue(ctx, broker, t) + if err != nil { + logging.FromContext(ctx).Error("Problem reconciling Trigger Queue", zap.Error(err)) + t.Status.MarkDependencyFailed("QueueFailure", "%v", err) + return err + } + if queue != nil { + if !isReady(queue.Status.Conditions) { + logging.FromContext(ctx).Warnf("Queue %q is not ready", queue.Name) + t.Status.MarkDependencyFailed("QueueFailure", "Queue %q is not ready", queue.Name) + return nil + } + } - queue, err := resources.DeclareQueue(r.dialerFunc, queueArgs) - logging.FromContext(ctx).Info("Created rabbitmq queue", zap.Any("queue", queue)) - if err != nil { - logging.FromContext(ctx).Error("Problem declaring Trigger Queue", zap.Error(err)) - t.Status.MarkDependencyFailed("QueueFailure", "%v", err) - return err - } + logging.FromContext(ctx).Info("Reconciled rabbitmq queue", zap.Any("queue", queue)) - err = resources.MakeBinding(r.transport, &resources.BindingArgs{ - Trigger: t, - RoutingKey: "", - BrokerURL: rabbitmqURL, - AdminURL: r.adminURL, - }) - if err != nil { - logging.FromContext(ctx).Error("Problem declaring Trigger Queue Binding", zap.Error(err)) - t.Status.MarkDependencyFailed("BindingFailure", "%v", err) - return err - } + binding, err := r.reconcileBinding(ctx, broker, t) + if err != nil { + logging.FromContext(ctx).Error("Problem reconciling Trigger Queue Binding", zap.Error(err)) + t.Status.MarkDependencyFailed("BindingFailure", "%v", err) + return err + } + if binding != nil { + if !isReady(binding.Status.Conditions) { + logging.FromContext(ctx).Warnf("Binding %q is not ready", binding.Name) + t.Status.MarkDependencyFailed("BindingFailure", "Binding %q is not ready", binding.Name) + return nil + } + } + logging.FromContext(ctx).Info("Reconciled rabbitmq binding", zap.Any("binding", binding)) + t.Status.MarkDependencySucceeded() + } if t.Spec.Subscriber.Ref != nil { // To call URIFromDestination(dest apisv1alpha1.Destination, parent interface{}), dest.Ref must have a Namespace // We will use the Namespace of Trigger as the Namespace of dest.Ref @@ -225,13 +275,18 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, t *eventingv1.Trigger) pk return nil } + if isUsingOperator(broker) { + // Everything gets cleaned up by garbage collection in this case. + return nil + } + rabbitmqURL, err := r.rabbitmqURL(ctx, t) // If there's no secret, we can't delete the queue. Deleting an object should not require creation // of a secret, and for example if the namespace is being deleted, there's nothing we can do. - // For now, return an error, the user can always just manually remove a finalizer. + // For now, return nil rather than leave the Trigger around. if err != nil { logging.FromContext(ctx).Errorf("Failed to fetch rabbitmq secret while finalizing, leaking a queue %s/%s", t.Namespace, t.Name) - return err + return nil } err = resources.DeleteQueue(r.dialerFunc, &resources.QueueArgs{ @@ -358,3 +413,75 @@ func (r *Reconciler) rabbitmqURL(ctx context.Context, t *eventingv1.Trigger) (st } return string(val), nil } + +func (r *Reconciler) reconcileQueue(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger) (*v1alpha2.Queue, error) { + queueName := resources.CreateTriggerQueueName(t) + args := &resources.QueueArgs{ + QueueName: queueName, + DLX: brokerresources.ExchangeName(b, true), + Trigger: t, + } + + want := resources.NewQueue(ctx, b, args) + logging.FromContext(ctx).Infow("WANT QUEUE", zap.Any("queue", want)) + current, err := r.queueLister.Queues(b.Namespace).Get(queueName) + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Debugw("Creating rabbitmq exchange", zap.String("queue name", want.Name)) + return r.rabbitClientSet.RabbitmqV1alpha2().Queues(b.Namespace).Create(ctx, want, metav1.CreateOptions{}) + } else if err != nil { + return nil, err + } else if !equality.Semantic.DeepDerivative(want.Spec, current.Spec) { + // Don't modify the informers copy. + desired := current.DeepCopy() + desired.Spec = want.Spec + return r.rabbitClientSet.RabbitmqV1alpha2().Queues(b.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) + } + return current, nil +} + +func (r *Reconciler) reconcileBinding(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger) (*v1alpha2.Binding, error) { + // We can use the same name for queue / binding to keep things simpler + bindingName := resources.CreateTriggerQueueName(t) + + bindingArgs := &resources.BindingArgs{ + Broker: b, + Trigger: t, + BindingName: bindingName, + BindingKey: t.Name, + RoutingKey: "", + SourceName: brokerresources.ExchangeName(b, false), + QueueName: bindingName, + } + + want, err := resources.NewBinding(ctx, b, bindingArgs) + if err != nil { + return nil, fmt.Errorf("failed to create the binding spec: %w", err) + } + current, err := r.bindingLister.Bindings(b.Namespace).Get(bindingName) + if apierrs.IsNotFound(err) { + logging.FromContext(ctx).Infow("Creating rabbitmq binding", zap.String("binding name", want.Name)) + return r.rabbitClientSet.RabbitmqV1alpha2().Bindings(b.Namespace).Create(ctx, want, metav1.CreateOptions{}) + } else if err != nil { + return nil, err + } else if !equality.Semantic.DeepDerivative(want.Spec, current.Spec) { + // Don't modify the informers copy. + desired := current.DeepCopy() + desired.Spec = want.Spec + return r.rabbitClientSet.RabbitmqV1alpha2().Bindings(b.Namespace).Update(ctx, desired, metav1.UpdateOptions{}) + } + return current, nil +} + +func isReady(conditions []v1alpha2.Condition) bool { + numConditions := len(conditions) + // If there are no conditions at all, the resource probably hasn't been reconciled yet => not ready + if numConditions == 0 { + return false + } + for _, c := range conditions { + if c.Status == corev1.ConditionTrue { + numConditions-- + } + } + return numConditions == 0 +} diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 6921a491fe..27bd470cb9 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -187,7 +187,7 @@ func TestReconcile(t *testing.T) { WithTriggerSubscriberURI(subscriberURI)), }, }, { - Name: "Trigger delete fails - with finalizer - no secret", + Name: "Trigger delete succeeds - with finalizer - no secret so no undeletable resources", Key: testKey, Objects: []runtime.Object{ broker.NewBroker(brokerName, testNS, @@ -196,10 +196,12 @@ func TestReconcile(t *testing.T) { broker.WithInitBrokerConditions), triggerWithFinalizerReady(), }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchRemoveFinalizers(testNS, triggerName), + }, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", `secrets "test-broker-broker-rabbit" not found`), + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-trigger" finalizers`), }, - WantErr: true, }, { Name: "Trigger deleted - with finalizer and secret", Key: testKey,