-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bus monitor polish #382
Bus monitor polish #382
Conversation
pkg/buses/gcppubsub/bus.go
Outdated
@@ -105,6 +112,8 @@ func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscriptio | |||
return err | |||
} | |||
|
|||
// DeleteSubscription removes a Subscription from Cloud Pub/Sub for a Knative |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bet this is a bug based on the CreateOrUpdate comment: or idempotently updates a Subscription if it already exists.
which means it will delete adopted topics where we don't mean to. We should be adding something to the metadata that states if we did real work on the remote topic/subscription or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buses should be in total control of their infrastructure, the subscription is prefixed to avoid collisions with a non bus-created resources. While collisions are possible (need to fix #253), it should be rare and is certainly not a feature.
pkg/buses/gcppubsub/bus.go
Outdated
@@ -122,6 +131,8 @@ func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error | |||
return subscription.Delete(ctx) | |||
} | |||
|
|||
// SendEventToTopic sends a message to the Cloud Pub/Sub Topic backing the | |||
// Channel. | |||
func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, message *buses.Message) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this suppose to be a public method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
pkg/buses/gcppubsub/bus.go
Outdated
// ReceiveEvents will receive events from a Cloud Pub/Sub Subscription for a | ||
// Knative Subscription. This method will not block, but will continue to | ||
// receive events until either this method or StopReceiveEvents is called for | ||
// the same Subscription. | ||
func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters buses.ResolvedParameters) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this suppose to be a public method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
/lgtm Minor comments mostly not related to this change but to the code in general. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Letting @n3wscott finish reviewing and lgtm)
sInitial := parameters[InitialOffset] | ||
if sInitial == Oldest { | ||
func resolveInitialOffset(parameters buses.ResolvedParameters) (int64, error) { | ||
sInitial := parameters[initialOffset] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switch or a map would be slightly cleaner here; i.e.
const saramaOffset = map[string]int64{
"newest": saramaNewest,
"oldest": saramaOldest,
}
func resolveInitialOffset(...) (int64, error) {
offset, ok := saramaOffset[parameters[initialOffset]
if !ok {
return 0, fmt.Errorf(...)
}
return offset, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched to a switch
pkg/buses/monitor.go
Outdated
glog.Warningf("Could not update status: %v", errS) | ||
if !equality.Semantic.DeepEqual(subscription.Status, subscriptionCopy.Status) { | ||
// status has changed, update subscription | ||
_, errS := monitor.eventingClient.ChannelsV1alpha1().Subscriptions(subscription.Namespace).Update(subscriptionCopy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels strange to be tracking two error
s here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bit funky. We want always want to update the resource status, if the original err being recorded in the resource status, we want to return that error since it's more meaningful. If the update failed, we still want to return that error to trigger the work queue to process the resource again as the update can fail because of a failed optimistic lock.
Will add a comment in the code
pkg/buses/references.go
Outdated
func (r *BusReference) String() string { | ||
return fmt.Sprintf("%s/%s", r.Namespace, r.Name) | ||
if len(r.Namespace) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a slight preference for r.Namespace != ""
; it makes bugs like #377 slightly easier to catch, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Clarify and solidify the public interfaces for a Bus. Common behavior has moved into composable units that can individually tests or injected into the bus for testing. Bus implementors generally only need to provide custom implementations for up to six methods defined by EventHandlerFuncs. Core components: - [Bus|Channel|Subscription]Reference: decouples bus from the raw k8s resources - Cache: lookup and presist raw resource types for a reference - EventHandlerFuncs: the contract for bus implementations - Reconciler: reads resource changes from the API server, saves the resource in the Cache and emits events to the EventHandlerFuncs - MessageDispatcher: sends messages to a destination over the event delivery protocol - MessageReceiver: receive messages for a channel on the bus over the event delivery protocol Behavior changes: - avoid unnessesary status updates if the status has not changed - avoid supressing errors when updating status - avoid supressing updates for unchanged resources as the local copy may not be fully reconciled
a4eba78
to
3be8c55
Compare
@n3wscott @evankanderson PTAL I addressed your feedback and also pulled in the other refactoring I've been working on. The diff is rather scary, but mostly is moving items around to create components that are similar, more cleanly defined and testable. There's more to do (especially adding tests and polishing rough spots), but I like the general direction and it's a better foundation. From the commit message:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there are new directories, can you confirm there are new tests in each new directory?
pkg/buses/bus.go
Outdated
// EventHandlerFuncs are used to be notified when a subscription is created, | ||
// updated or removed, or a message is received. | ||
func NewBusDispatcher(busRef BusReference, handlerFuncs EventHandlerFuncs, opts *BusOpts) BusDispatcher { | ||
bus := &bus{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would invert this method to have bus := $bus{...}
after the opts validation has happened, and then that condenses the calls like bus.cache = opts.Cache
into just setting inside the struct creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, although it's not quite as clean as you'd sugest because there's a cyclic dependency between the bus and the receiver I still need to untangle.
pkg/buses/bus.go
Outdated
handlerFuncs: handlerFuncs, | ||
} | ||
|
||
if opts.Cache == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opts is allowed to be nil, so you should wrap all the defaulting with a nil check on opts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
func (b *bus) dispatchMessage(subscription *channelsv1alpha1.Subscription, message *Message) error { | ||
subscriber := subscription.Spec.Subscriber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can subscription be nil here? should we check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of a reason to why it should be allowed to be empty. I'll add a check to the validator.
pkg/buses/cache.go
Outdated
// AddChannel adds, or updates, the provided channel to the cache for later | ||
// retrieal by its reference. | ||
func (c *Cache) AddChannel(channel *channelsv1alpha1.Channel) { | ||
channelRef := NewChannelReference(channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nil check channel? return error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a check and test
pkg/buses/cache.go
Outdated
|
||
// RemoveChannel removes the provided channel from the cache. | ||
func (c *Cache) RemoveChannel(channel *channelsv1alpha1.Channel) { | ||
channelRef := NewChannelReference(channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nil check channel? return error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a check and test
return nil | ||
} | ||
|
||
glog.Infof("Delete topic %q\n", topicID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this refactor is the correct time to move to zap
or just plain log
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but in a follow-up PR
pkg/buses/gcppubsub/bus.go
Outdated
return err | ||
} else if exists { | ||
// TODO update subscription configuration | ||
// _, err := subscription.Update(b.ctx, pubsub.SubscriptionConfigToUpdate{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems important?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not important right now because there are no bus params, thus the pub/sub subscription will never need to be mutated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
amending the current comment to make it more clear
pkg/buses/handler_funcs.go
Outdated
err = h.ProvisionFunc(channelRef, parameters) | ||
channelCopy := channel.DeepCopy() | ||
var cond *channelsv1alpha1.ChannelCondition | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to see the if error != nil
block directly after the setting of the involved err variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case you could use the
if err := h.ProvisionFunc(channelRef, parameters); err != nil {
...
} else {
...
}
style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, you use this error much further down as well...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updating
pkg/buses/handler_funcs.go
Outdated
_, errS := reconciler.eventingClient.ChannelsV1alpha1().Channels(channel.Namespace).Update(channelCopy) | ||
if errS != nil { | ||
glog.Warningf("Could not update channel status: %v", errS) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty strange. It feels real weird to have two errors and not report both of them. Maybe smush them together as a string in a new error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will smoosh them into a single error
pkg/buses/handler_funcs.go
Outdated
func (h EventHandlerFuncs) onUnprovision(channel *channelsv1alpha1.Channel, reconciler *Reconciler) error { | ||
if h.UnprovisionFunc != nil { | ||
channelRef := NewChannelReference(channel) | ||
err := h.UnprovisionFunc(channelRef) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general style would have me changing this to:
if err := h.UnprovisionFunc(channelRef); err != nil {
reconciler.RecordChannelEventf(channelRef, corev1.EventTypeWarning, errResourceSync, "Error unprovisioning channel: %s", err)
// skip updating status conditions since the channel was deleted
return err
} else {
reconciler.RecordChannelEventf(channelRef, corev1.EventTypeNormal, successSynced, "Channel unprovisioned successfully")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updating
@n3wscott PTAL |
/lgtm I think this is a really nice refactoring. |
for approval: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
pkg/buses/handler_funcs.go
Outdated
} | ||
|
||
func (h EventHandlerFuncs) onSubscribe(subscription *channelsv1alpha1.Subscription, reconciler *Reconciler) error { | ||
if h.SubscribeFunc != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, convert this to early-return:
if h.SubscribeFunc == nil {
return nil
}
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evankanderson, scothis The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
- return early from handlerfuncs if func is nil
@evankanderson PTAL. I resolved the conflict with master and addressed your comment |
The following is the coverage report on pkg/.
|
/lgtm |
Follow-up from knative#382 (comment)
Follow-up from #382 (comment)
Adding back the incorrectly removed serving/servrless content
* Apply vendor patches on make generate-release Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Align create-release-branch Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
…native#394) * Apply vendor patches on make generate-release * Align create-release-branch --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
* Apply vendor patches on make generate-release Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> * Align create-release-branch Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com> --------- Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
General bus enhancements lifted from #378, since that PR is blocked.
/assign @n3wscott