Skip to content

Commit

Permalink
Update Sequence to use correct conditions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Nichols committed May 7, 2021
1 parent f1f4615 commit 820b051
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 226 deletions.
5 changes: 5 additions & 0 deletions cmd/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
"log"

"knative.dev/hack/schema/commands"
Expand All @@ -42,6 +43,10 @@ func main() {
registry.Register(&sourcesv1.ApiServerSource{})
registry.Register(&sourcesv1.SinkBinding{})

// Flows
registry.Register(&flowsv1.Sequence{})
registry.Register(&flowsv1.Parallel{})

if err := commands.New("knative.dev/eventing").Execute(); err != nil {
log.Fatal("Error during command execution: ", err)
}
Expand Down
298 changes: 168 additions & 130 deletions config/core/resources/sequence.yaml

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,6 @@ gomodules.xyz/jsonpatch/v2 v2.1.0 h1:Phva6wqu+xR//Njw6iorylFFgn/z547tw5Ne3HZPQ+k
gomodules.xyz/jsonpatch/v2 v2.1.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3mwe7XcUU=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
Expand Down
34 changes: 18 additions & 16 deletions pkg/apis/flows/v1/sequence_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch

}
for i, c := range channels {
// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(c.Status.Address)
}

ss.ChannelStatuses[i] = SequenceChannelStatus{
Channel: corev1.ObjectReference{
APIVersion: c.APIVersion,
Expand All @@ -130,20 +135,16 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
Namespace: c.Namespace,
},
}
// TODO: Once the addressable has a real status to dig through, use that here instead of
// addressable, because it might be addressable but not ready.
address := c.Status.AddressStatus.Address
if address != nil {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue}

if ready := c.Status.GetCondition(apis.ConditionReady); ready != nil {
ss.ChannelStatuses[i].ReadyCondition = *ready
if !ready.IsTrue() {
allReady = false
}
} else {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"}
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionUnknown, Reason: "NoReady", Message: "Channel does not have Ready condition"}
allReady = false
}

// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(address)
}
}
if allReady {
sCondSet.Manage(ss).MarkTrue(SequenceConditionChannelsReady)
Expand All @@ -153,22 +154,23 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
}

func (ss *SequenceStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) setAddress(address *duckv1.Addressable) {
if address == nil || address.URL == nil {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
ss.Address = duckv1.Addressable{}
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
} else {
ss.AddressStatus.Address = &duckv1.Addressable{URL: address.URL}
ss.Address = duckv1.Addressable{URL: address.URL}
sCondSet.Manage(ss).MarkTrue(SequenceConditionAddressable)
}
}
107 changes: 76 additions & 31 deletions pkg/apis/flows/v1/sequence_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ func getChannelable(ready bool) *eventingduckv1.Channelable {
}

if ready {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
}})
c.Status.Address = &duckv1.Addressable{URL: URL}
} else {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
}})
}

return &c
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
}{{
name: "empty",
subs: []*messagingv1.Subscription{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "empty status",
subs: []*messagingv1.Subscription{{
Expand All @@ -227,19 +236,19 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
Status: messagingv1.SubscriptionStatus{},
},
},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription not ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: corev1.ConditionTrue,
}, {
name: "one subscription ready, one not",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two subscriptions ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
Expand Down Expand Up @@ -267,19 +276,19 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
}{{
name: "empty",
channels: []*eventingduckv1.Channelable{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable not ready",
channels: []*eventingduckv1.Channelable{getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
want: corev1.ConditionTrue,
}, {
name: "one channelable ready, one not",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two channelables ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
Expand All @@ -293,7 +302,7 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
got := ps.GetCondition(SequenceConditionChannelsReady).Status
want := test.want
if want != got {
t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got)
t.Errorf("unexpected conditions: want=%q, got=%q", want, got)
}
})
}
Expand Down Expand Up @@ -358,44 +367,80 @@ func TestSequenceReady(t *testing.T) {

func TestSequencePropagateSetAddress(t *testing.T) {
URL := apis.HTTP("example.com")
URL2 := apis.HTTP("another.example.com")
tests := []struct {
name string
address *duckv1.Addressable
want *duckv1.Addressable
wantStatus corev1.ConditionStatus
name string
status SequenceStatus
address *duckv1.Addressable
want duckv1.Addressable
wantStatus corev1.ConditionStatus
wantAddress string
}{{
name: "nil",
status: SequenceStatus{},
address: nil,
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "empty",
status: SequenceStatus{},
address: &duckv1.Addressable{},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "URL",
address: &duckv1.Addressable{URL: URL},
want: &duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
name: "URL",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "New URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL2,
},
},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "Clear URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL,
},
},
address: nil,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "nil",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: nil},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := SequenceStatus{}
ps.setAddress(test.address)
got := ps.Address
if diff := cmp.Diff(test.want, got, ignoreAllButTypeAndStatus); diff != "" {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

tt.status.setAddress(tt.address)
got := tt.status.Address
if diff := cmp.Diff(tt.want, got, ignoreAllButTypeAndStatus); diff != "" {
t.Error("unexpected address (-want, +got) =", diff)
}
gotStatus := ps.GetCondition(SequenceConditionAddressable).Status
if test.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", test.wantStatus, gotStatus)
gotStatus := tt.status.GetCondition(SequenceConditionAddressable).Status
if tt.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", tt.wantStatus, gotStatus)
}
gotAddress := ""
if got.URL != nil {
gotAddress = got.URL.String()
}
if diff := cmp.Diff(tt.wantAddress, gotAddress); diff != "" {
t.Error("unexpected address.url (-want, +got) =", diff)
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/flows/v1/sequence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ type SequenceStatus struct {
// Matches the Spec.Steps array in the order.
ChannelStatuses []SequenceChannelStatus `json:"channelStatuses"`

// AddressStatus is the starting point to this Sequence. Sending to this
// Address is the starting point to this Sequence. Sending to this
// will target the first subscriber.
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
duckv1.AddressStatus `json:",inline"`
// +optional
Address duckv1.Addressable `json:"address,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/flows/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 820b051

Please sign in to comment.