Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Jul 28, 2020
1 parent 0980504 commit 4fa9f36
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 170 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ require (
k8s.io/apimachinery v0.18.6
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/utils v0.0.0-20200124190032-861946025e34
knative.dev/eventing v0.16.1-0.20200726092658-9c16bdbec2f3
knative.dev/pkg v0.0.0-20200727081359-9a051b3decfd
knative.dev/serving v0.16.1-0.20200727085159-0fb04711c130
knative.dev/eventing v0.16.1-0.20200727160458-c65c9de63208
knative.dev/pkg v0.0.0-20200727154558-deb6b33d2a6c
knative.dev/serving v0.16.1-0.20200727165458-628bea7657d6
knative.dev/test-infra v0.0.0-20200725213358-8557dab80d7a
)

Expand Down
28 changes: 9 additions & 19 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+
cloud.google.com/go/pubsub v1.2.0 h1:Lpy6hKgdcl7a3WGSfJIFmxmcdjSpP6OmBEfcOv1Y680=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.5.0/go.mod h1:ZEwJccE3z93Z2HWvstpri00jOg7oO4UZDtKhwDwqF0w=
cloud.google.com/go/pubsub v1.6.0/go.mod h1:I6DkrTv7tKIvDQTZt+6rAFo1446FEoVDJeLXTu4pCcE=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
Expand Down Expand Up @@ -1456,7 +1455,6 @@ golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200722175500-76b94024e4b6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200724161237-0e2f3a69832c h1:UIcGWL6/wpCfyGuJnRFJRurA+yj8RrW7Q6x2YMCXt6c=
golang.org/x/sys v0.0.0-20200724161237-0e2f3a69832c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1561,10 +1559,8 @@ golang.org/x/tools v0.0.0-20200527183253-8e7acdbce89d/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200601175630-2caf76543d99/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200626171337-aa94e735be7f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200706234117-b22de6825cf7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200709181711-e327e1019dfe/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200721223218-6123e77877b2/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200723000907-a7c6fd066f6d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200724172932-b5fc9d354d99 h1:OHn441rq5CeM5r1xJ0OmY7lfdTvnedi6k+vQiI7G9b8=
Expand Down Expand Up @@ -1656,10 +1652,8 @@ google.golang.org/genproto v0.0.0-20200527145253-8367513e4ece/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200626011028-ee7919e894b5/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200707001353-8e8330bf89df/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200709005830-7a2ca40e9dc3/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200711021454-869866162049/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200721032028-5044d0edf986/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200722002428-88e341933a54/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200724131911-43cab4749ae7 h1:AWgNCmk2V5HZp9AiCDRBExX/b9I0Ey9F8STHDZlhCC4=
google.golang.org/genproto v0.0.0-20200724131911-43cab4749ae7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -1821,30 +1815,26 @@ k8s.io/legacy-cloud-providers v0.17.4/go.mod h1:FikRNoD64ECjkxO36gkDgJeiQWwyZTuB
k8s.io/metrics v0.17.2/go.mod h1:3TkNHET4ROd+NfzNxkjoVfQ0Ob4iZnaHmSEA4vYpwLw=
k8s.io/test-infra v0.0.0-20200514184223-ba32c8aae783/go.mod h1:bW6thaPZfL2hW7ecjx2WYwlP9KQLM47/xIJyttkVk5s=
k8s.io/test-infra v0.0.0-20200617221206-ea73eaeab7ff/go.mod h1:L3+cRvwftUq8IW1TrHji5m3msnc4uck/7LsE/GR/aZk=
k8s.io/test-infra v0.0.0-20200721115715-1af01ef6b4c8/go.mod h1:4cRZlOy5Ka3Ym/orCmNWL2dsE39pN0xHFT0WFrZe2HQ=
k8s.io/test-infra v0.0.0-20200722010006-526277bee528/go.mod h1:4cRZlOy5Ka3Ym/orCmNWL2dsE39pN0xHFT0WFrZe2HQ=
k8s.io/test-infra v0.0.0-20200723132140-b02d194a2d64/go.mod h1:4cRZlOy5Ka3Ym/orCmNWL2dsE39pN0xHFT0WFrZe2HQ=
k8s.io/test-infra v0.0.0-20200724210216-b2cdf8399d01/go.mod h1:4cRZlOy5Ka3Ym/orCmNWL2dsE39pN0xHFT0WFrZe2HQ=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20200124190032-861946025e34 h1:HjlUD6M0K3P8nRXmr2B9o4F9dUy9TCj/aEpReeyi6+k=
k8s.io/utils v0.0.0-20200124190032-861946025e34/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
knative.dev/caching v0.0.0-20200724160858-bf86c4f25164/go.mod h1:ynA34AdJ/8sxpUxUUcel/cVUesoIG80S3qyclqth7xQ=
knative.dev/eventing v0.16.1-0.20200726092658-9c16bdbec2f3 h1:xaoOQaJXayHRxkTKNP5smtfpCCylXjpBzOX+Oc/B5to=
knative.dev/eventing v0.16.1-0.20200726092658-9c16bdbec2f3/go.mod h1:D7x216MQznRHiu3V7Y5I29/xCRkHxl9i21T6LrZPQt8=
knative.dev/caching v0.0.0-20200724214558-d562fe0c500c/go.mod h1:+YKPYiz3a0Ge+5sq/8EcF5eITBs751g3B2ENE2vNa8Y=
knative.dev/eventing v0.16.1-0.20200727160458-c65c9de63208 h1:WgbSDfIZ8oLiE07ZpqYJ00yMPZwb615kovG6PvD7Am8=
knative.dev/eventing v0.16.1-0.20200727160458-c65c9de63208/go.mod h1:m99rpuvhfPakRIwmEWd8p0uApdEzpgsn/H9KhGPJ4O4=
knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
knative.dev/networking v0.0.0-20200724160858-a48770266a29 h1:qBxok3mlcLsFPKWL/qLdIdBiRrIuNHtuTaHC4yeOloQ=
knative.dev/networking v0.0.0-20200724160858-a48770266a29/go.mod h1:NYdbzk8QhBB3O6BtJaw76Wjelyw8hLZVqGxd1702gF8=
knative.dev/networking v0.0.0-20200724220358-c886e8fc54c6 h1:gpANrGM5schC7q6KaKUQaAhxfVrjTxHFpLh4Ql4UuWw=
knative.dev/networking v0.0.0-20200724220358-c886e8fc54c6/go.mod h1:0jwxfJ0NdsCBhVFfnJ+V/MzE50kH/mo7Rd5vj2L5vDY=
knative.dev/pkg v0.0.0-20200207155214-fef852970f43/go.mod h1:pgODObA1dTyhNoFxPZTTjNWfx6F0aKsKzn+vaT9XO/Q=
knative.dev/pkg v0.0.0-20200723060257-ae9c3f7fa8d3/go.mod h1:pOJ+tu5k/SgHPGmecWqFguxJcIZozyDWlTlyldGcYGM=
knative.dev/pkg v0.0.0-20200724055557-c36f46cc8c80/go.mod h1:GtoTEupsOzadgRKT4GgPWukbhAcINwDDGcKibTyd1Fk=
knative.dev/pkg v0.0.0-20200724211057-f21f66204a5c/go.mod h1:jVP9M8uft3EYhWF8K00zKiNCvyfGy3tm0L2w9edCRqw=
knative.dev/pkg v0.0.0-20200727081359-9a051b3decfd h1:PcffTKYEFwjgDTterGM7I0B1MFXIOxntrk0hZGHsk4I=
knative.dev/pkg v0.0.0-20200727081359-9a051b3decfd/go.mod h1:Zv+oUY4uQLV7ZiBHmFuD7WH4IPU0HcEVnrZbHfc/n/A=
knative.dev/serving v0.16.1-0.20200727085159-0fb04711c130 h1:ASUOm8+ug9eYDsDKlSXWmykYGwxeBa8ggiruPwgZ3zQ=
knative.dev/serving v0.16.1-0.20200727085159-0fb04711c130/go.mod h1:OMXgx4aGPQyj+WuGKfwPIakMXEt52coVjLNQQeameqc=
knative.dev/test-infra v0.0.0-20200721175154-c98db9bd4d5d/go.mod h1:kzRhTm5L08eDQFRl8NKSAN93lz6IZWQMs+2TjTCN+VA=
knative.dev/test-infra v0.0.0-20200722142057-3ca910b5a25e/go.mod h1:oHmDsPmq+zcc3b+Z94Kgmrz1JnmZEz36jmKuvL2Lw7o=
knative.dev/pkg v0.0.0-20200727154558-deb6b33d2a6c h1:PEvxg+ycPTQX+1vog44BC7eNgliXnFkXoqIk07w68OY=
knative.dev/pkg v0.0.0-20200727154558-deb6b33d2a6c/go.mod h1:Zv+oUY4uQLV7ZiBHmFuD7WH4IPU0HcEVnrZbHfc/n/A=
knative.dev/serving v0.16.1-0.20200727165458-628bea7657d6 h1:4f658k4Gl7WcBku4Z9j/dUYXM91p9PMv0e8veOZrBrA=
knative.dev/serving v0.16.1-0.20200727165458-628bea7657d6/go.mod h1:e94C+9CjYEzbS2/WnWyYz+MfU7MRh1TvKjHBAYNOdVw=
knative.dev/test-infra v0.0.0-20200723182457-517b66ba19c1/go.mod h1:Y3sNbLSTA11kRcLvTzRsZCxO+GQuw7KV7+PL+iD+9CA=
knative.dev/test-infra v0.0.0-20200724213858-d5ec9cdc6b33/go.mod h1:Y3sNbLSTA11kRcLvTzRsZCxO+GQuw7KV7+PL+iD+9CA=
knative.dev/test-infra v0.0.0-20200725213358-8557dab80d7a h1:Coxwj+5PN0WCrRh2Dxf33Nu2QteF4hcy55qw+AJKMSw=
Expand Down
23 changes: 6 additions & 17 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
nethttp "net/http"
"net/url"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -130,7 +129,6 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
if r := recover(); r != nil {
c.logger.Warn("Panic happened while handling a message",
zap.String("topic", consumerMessage.Topic),
zap.String("sub", string(c.sub.UID)),
zap.Any("panic value", r),
)
}
Expand All @@ -139,27 +137,18 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
if message.ReadEncoding() == binding.EncodingUnknown {
return false, errors.New("received a message with unknown encoding")
}
var destination *url.URL
if !c.sub.SubscriberURI.IsEmpty() {
destination = c.sub.SubscriberURI.URL()
}
var reply *url.URL
if !c.sub.ReplyURI.IsEmpty() {
reply = c.sub.ReplyURI.URL()
}
var deadLetter *url.URL
if c.sub.Delivery != nil && c.sub.Delivery.DeadLetterSink != nil && !c.sub.Delivery.DeadLetterSink.URI.IsEmpty() {
deadLetter = c.sub.Delivery.DeadLetterSink.URI.URL()
}

c.logger.Debug("Going to dispatch the message",
zap.String("topic", consumerMessage.Topic),
zap.String("sub", string(c.sub.UID)),
zap.Any("destination", c.sub.Subscriber),
zap.Any("reply", c.sub.Reply),
zap.Any("deadLetter", c.sub.DeadLetter),
)

ctx, span := startTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

err := c.dispatcher.DispatchMessage(ctx, message, nil, destination, reply, deadLetter)
err := c.dispatcher.DispatchMessage(ctx, message, nil, c.sub.Subscriber, c.sub.Reply, c.sub.DeadLetter)

// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
Expand Down Expand Up @@ -299,7 +288,7 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error {
// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub subscription) error {
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.SubscriberSpec))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", sub.Namespace, channelRef.Name, sub.Name)
Expand Down
24 changes: 4 additions & 20 deletions kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ func init() {
_ = kafkaScheme.AddToScheme(scheme.Scheme)
}

const (
// ReconcilerName is the name of the reconciler.
ReconcilerName = "KafkaChannels"

// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "kafka-ch-dispatcher"

// Name of the corev1.Events emitted from the reconciliation process.
channelReconciled = "ChannelReconciled"
channelReconcileFailed = "ChannelReconcileFailed"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
)

// Reconciler reconciles Kafka Channels.
type Reconciler struct {
kafkaDispatcher *dispatcher.KafkaDispatcher
Expand Down Expand Up @@ -233,21 +219,16 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel)
newSubs := make([]fanout.Subscription, len(c.Spec.Subscribable.Subscribers))
for i, source := range c.Spec.Subscribable.Subscribers {

_ = source.ConvertTo(context.TODO(), &newSubs[i])

// Extract retry configuration
retryConfig := kncloudevents.NoRetries()
if source.Delivery != nil {
delivery := &eventingduckv1.DeliverySpec{}
_ = source.Delivery.ConvertTo(context.TODO(), delivery)

_retryConfig, err := kncloudevents.RetryConfigFromDeliverySpec(*delivery)
if err == nil {
retryConfig = _retryConfig
newSubs[i].RetryConfig = &_retryConfig
}
}
newSubs[i].RetryConfig = retryConfig

}
channelConfig.FanoutConfig = fanout.Config{
AsyncHandler: true,
Expand All @@ -257,6 +238,9 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel)
return &channelConfig
}

type Config struct {
}

// newConfigFromKafkaChannels creates a new Config from the list of kafka channels.
func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1alpha1.KafkaChannel) *multichannelfanout.Config {
cc := make([]multichannelfanout.ChannelConfig, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ package fanout

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"
Expand All @@ -36,7 +34,6 @@ import (
"go.uber.org/zap"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
)
Expand All @@ -46,16 +43,10 @@ const (
)

type Subscription struct {
eventingduck.SubscriberSpec
RetryConfig kncloudevents.RetryConfig
}

func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.SubscriberSpec)
}

func (s *Subscription) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &s.SubscriberSpec)
Subscriber *url.URL
Reply *url.URL
DeadLetter *url.URL
RetryConfig *kncloudevents.RetryConfig
}

// Config for a fanout.MessageHandler.
Expand Down Expand Up @@ -96,23 +87,35 @@ func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDisp
}
handler.receiver = receiver

for i := range config.Subscriptions {
retriesConfig, err := retriesOf(config.Subscriptions[i].SubscriberSpec)
if err != nil {
return nil, fmt.Errorf("failed to create retries config from SubscriberSpec: %w", err)
}
config.Subscriptions[i].RetryConfig = retriesConfig
}

return handler, nil
}

func retriesOf(spec eventingduck.SubscriberSpec) (kncloudevents.RetryConfig, error) {
delivery := &eventingduckv1.DeliverySpec{}
func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) {
var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}

var deadLetter *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetter = sub.Delivery.DeadLetterSink.URI.URL()
}

_ = spec.ConvertTo(context.Background(), delivery)
var retryConfig *kncloudevents.RetryConfig
if sub.Delivery != nil {
if rc, err := kncloudevents.RetryConfigFromDeliverySpec(*sub.Delivery); err != nil {
return nil, err
} else {
retryConfig = &rc
}
}

return kncloudevents.RetryConfigFromDeliverySpec(*delivery)
return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil
}

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
Expand Down Expand Up @@ -198,21 +201,13 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error {

var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}

var deadLetterURL *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetterURL = sub.Delivery.DeadLetterSink.URI.URL()
}

return f.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetterURL, &sub.RetryConfig)
return f.dispatcher.DispatchMessageWithRetries(
ctx,
message,
additionalHeaders,
sub.Subscriber,
sub.Reply,
sub.DeadLetter,
sub.RetryConfig,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
// Config for a multichannelfanout.Handler.
type Config struct {
// The configuration of each channel in this handler.
ChannelConfigs []ChannelConfig `json:"channelConfigs"`
ChannelConfigs []ChannelConfig
}

// ChannelConfig is the configuration for a single Channel.
type ChannelConfig struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
HostName string `json:"hostname"`
FanoutConfig fanout.Config `json:"fanoutConfig"`
Namespace string
Name string
HostName string
FanoutConfig fanout.Config
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func channelCRDHasRequiredLabels(client *testlib.Client, channel metav1.TypeMeta
// label of messaging.knative.dev/subscribable: "true"
// label of duck.knative.dev/addressable: "true"

validateRequiredLabels(client, channel, channelLabels)
ValidateRequiredLabels(client, channel, channelLabels)
}

func channelCRDHasProperCategory(st *testing.T, client *testlib.Client, channel metav1.TypeMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
testlib "knative.dev/eventing/test/lib"
)

func validateRequiredLabels(client *testlib.Client, object metav1.TypeMeta, labels map[string]string) {
func ValidateRequiredLabels(client *testlib.Client, object metav1.TypeMeta, labels map[string]string) {
for k, v := range labels {
if !objectHasRequiredLabel(client, object, k, v) {
client.T.Fatalf("can't find label '%s=%s' in CRD %q", k, v, object)
Expand Down
Loading

0 comments on commit 4fa9f36

Please sign in to comment.