Skip to content

Commit

Permalink
Adding IsUndelivered to help the readability of results from the prot…
Browse files Browse the repository at this point in the history
…ocol layers (#542)

* Adding Undelivered to help the readability of results from the protocol layers

Signed-off-by: Scott Nichols <snichols@vmware.com>

* IsUndelivered

Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
Scott Nichols authored Jun 26, 2020
1 parent dbeb62d commit 38de413
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 33 deletions.
4 changes: 3 additions & 1 deletion samples/amqp/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ func main() {
log.Fatalf("Failed to set data: %v", err)
}

if result := c.Send(context.Background(), event); !cloudevents.IsACK(result) {
if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
log.Fatalf("Failed to send: %v", result)
} else if cloudevents.IsNACK(result) {
log.Printf("Event not accepted: %v", result)
}
time.Sleep(100 * time.Millisecond)
}
Expand Down
5 changes: 2 additions & 3 deletions samples/gochan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ func main() {
"message": "Hello, World!",
})

res := c.Send(ctx, e)
if !cloudevents.IsACK(res) {
if res := c.Send(ctx, e); cloudevents.IsUndelivered(res) {
log.Printf("[sender] failed to send: %v", res)
} else {
log.Printf("[sender] sent: %d", i)
log.Printf("[sender] sent: %d, accepted: %t", i, cloudevents.IsACK(res))
}
}
// Wait for the timeout.
Expand Down
2 changes: 1 addition & 1 deletion samples/http/requester-with-custom-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func main() {
Message: "Hello world!",
})

if resp, res := c.Request(context.TODO(), event); !cloudevents.IsACK(res) {
if resp, res := c.Request(context.TODO(), event); cloudevents.IsUndelivered(res) {
log.Printf("Failed to request: %v", res)
} else if resp != nil {
fmt.Printf("Response:\n%s\n", resp)
Expand Down
2 changes: 1 addition & 1 deletion samples/http/requester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func main() {
ctx = cloudevents.WithEncodingStructured(ctx)
}

if resp, res := c.Request(ctx, event); !cloudevents.IsACK(res) {
if resp, res := c.Request(ctx, event); cloudevents.IsUndelivered(res) {
log.Printf("Failed to request: %v", res)
} else if resp != nil {
fmt.Printf("Response:\n%s\n", resp)
Expand Down
6 changes: 4 additions & 2 deletions samples/http/sender-retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ func send10(ctx context.Context, c cloudevents.Client) {
"message": "Hello, World!",
})

if result := c.Send(ctx, e); !cloudevents.IsACK(result) {
if result := c.Send(ctx, e); cloudevents.IsUndelivered(result) {
log.Printf("Failed to send: %s", result.Error())
} else {
} else if cloudevents.IsACK(result) {
log.Printf("Sent: %d", i)
} else if cloudevents.IsNACK(result) {
log.Printf("Sent but not accepted: %s", result.Error())
}
time.Sleep(50 * time.Millisecond)
}
Expand Down
2 changes: 1 addition & 1 deletion samples/http/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
})

res := c.Send(ctx, e)
if !cloudevents.IsACK(res) {
if cloudevents.IsUndelivered(res) {
log.Printf("Failed to send: %v", res)
} else {
var httpResult *cehttp.Result
Expand Down
5 changes: 2 additions & 3 deletions samples/kafka/sender-receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ func main() {
"message": "Hello, World!",
})

err := c.Send(context.Background(), e)
if err != nil {
if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d", i)
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}

Expand Down
5 changes: 2 additions & 3 deletions samples/kafka/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ func main() {
"message": "Hello, World!",
})

err := c.Send(context.Background(), e)
if err != nil {
if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d", i)
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}
}
6 changes: 4 additions & 2 deletions samples/nats/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func main() {
Message: fmt.Sprintf("Hello, %s!", contentType),
})

if result := c.Send(context.Background(), e); !cloudevents.IsACK(result) {
log.Fatalf("failed to send: %v", result)
if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
time.Sleep(100 * time.Millisecond)
}
Expand Down
6 changes: 3 additions & 3 deletions samples/pubsub/multisender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func main() {
Message: "HELLO " + topic,
})

err = c.Send(ctx, event)

if err != nil {
if result := c.Send(ctx, event); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
os.Exit(1)
} else {
log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
}
}

Expand Down
6 changes: 3 additions & 3 deletions samples/pubsub/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func main() {
Message: "HELLO",
})

err = c.Send(context.Background(), event)

if err != nil {
if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
os.Exit(1)
} else {
log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
}

os.Exit(0)
Expand Down
7 changes: 3 additions & 4 deletions samples/stan/sender-receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ func main() {
"message": "Hello, World!",
})

result := c.Send(context.Background(), e)
if !cloudevents.IsACK(result) {
log.Printf("failed to send: %v", result)
if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d", i)
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions samples/stan/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ func main() {
"message": "Hello, World!",
})

result := c.Send(context.Background(), e)
if !cloudevents.IsACK(result) {
log.Printf("failed to send: %v", result)
if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d", i)
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}
}
5 changes: 3 additions & 2 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ var (
ResultACK = protocol.ResultACK
ResultNACK = protocol.ResultNACK

IsACK = protocol.IsACK
IsNACK = protocol.IsNACK
IsACK = protocol.IsACK
IsNACK = protocol.IsNACK
IsUndelivered = protocol.IsUndelivered

// HTTP Results

Expand Down
10 changes: 10 additions & 0 deletions v2/protocol/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewResult(messageFmt string, args ...interface{}) Result {
return fmt.Errorf(messageFmt, args...) // TODO: look at adding ACK/Nak support.
}

// IsACK true means the recipient acknowledged the event.
func IsACK(target Result) bool {
// special case, nil target also means ACK.
if target == nil {
Expand All @@ -47,10 +48,19 @@ func IsACK(target Result) bool {
return ResultIs(target, ResultACK)
}

// IsNACK true means the recipient did not acknowledge the event.
func IsNACK(target Result) bool {
return ResultIs(target, ResultNACK)
}

// IsUndelivered true means the target result is not an ACK/NACK, but some other
// error unrelated to delivery not from the intended recipient. Likely target
// is an error that represents some part of the protocol is misconfigured or
// the event that was attempting to be sent was invalid.
func IsUndelivered(target Result) bool {
return !ResultIs(target, ResultACK) && !ResultIs(target, ResultNACK)
}

var (
ResultACK = NewReceipt(true, "")
ResultNACK = NewReceipt(false, "")
Expand Down

0 comments on commit 38de413

Please sign in to comment.