Skip to content

Commit

Permalink
MT-Broker: return retriable status code based on the state to leverag…
Browse files Browse the repository at this point in the history
…e retries (knative#8366)

* MT-Broker: return appropriate status code based on the state to leverage retries

The ingress or filter deployments were returning 400 even in the case
where a given resource (like trigger, broker, subscription) wasn't
found, however, this is a common case where the lister cache
hasn't caught up with the latest state.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Fix unit tests

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Dec 3, 2024
1 parent 1048ce4 commit 4ce14b5
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 7 deletions.
20 changes: 20 additions & 0 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/http"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -170,6 +172,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

trigger, err := h.getTrigger(triggerRef)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to find the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -245,6 +252,11 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
}

broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -290,6 +302,11 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
brokerNamespace = trigger.Namespace
}
broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName)
if apierrors.IsNotFound(err) {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.logger.Info("Unable to get the Broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand All @@ -310,6 +327,9 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
Audience: broker.Status.DeadLetterSinkAudience,
}
}
if target == nil {
return
}

reportArgs := &ReportArgs{
ns: trigger.Namespace,
Expand Down
8 changes: 5 additions & 3 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ import (
triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"

_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -109,7 +111,7 @@ func TestReceiver(t *testing.T) {
expectedStatus: http.StatusBadRequest,
},
"Path too long": {
request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/extra", nil),
request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/uuid/extra/extra", nil),
expectedStatus: http.StatusBadRequest,
},
"Path without prefix": {
Expand All @@ -118,7 +120,7 @@ func TestReceiver(t *testing.T) {
},
"Trigger.Get fails": {
// No trigger exists, so the Get will fail.
expectedStatus: http.StatusBadRequest,
expectedStatus: http.StatusNotFound,
},
"Trigger doesn't have SubscriberURI": {
triggers: []*eventingv1.Trigger{
Expand Down
8 changes: 7 additions & 1 deletion pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/ptr"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
Expand Down Expand Up @@ -226,6 +227,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

broker, err := h.getBroker(brokerName, brokerNamespace)
if apierrors.IsNotFound(err) {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -315,7 +321,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
channelAddress, err := h.getChannelAddress(brokerObj)
if err != nil {
h.Logger.Warn("could not get channel address from broker", zap.Error(err))
return http.StatusBadRequest, kncloudevents.NoDuration
return http.StatusInternalServerError, kncloudevents.NoDuration
}

opts := []kncloudevents.SendOption{
Expand Down
8 changes: 5 additions & 3 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"

_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -214,9 +216,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
method: nethttp.MethodPost,
uri: "/ns/name",
body: getValidEvent(),
statusCode: nethttp.StatusBadRequest,
statusCode: nethttp.StatusInternalServerError,
handler: handler(),
reporter: &mockReporter{StatusCode: nethttp.StatusBadRequest, EventDispatchTimeReported: false},
reporter: &mockReporter{StatusCode: nethttp.StatusInternalServerError, EventDispatchTimeReported: false},
defaulter: broker.TTLDefaulter(logger, 100),
brokers: []*eventingv1.Broker{
withUninitializedAnnotations(makeBroker("name", "ns")),
Expand Down

0 comments on commit 4ce14b5

Please sign in to comment.