Skip to content

Commit

Permalink
Merge branch 'master' into sse-example
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Jun 12, 2024
2 parents d184aa8 + a91dc03 commit 92689ff
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion _examples/basic/1-your-first-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ server_1 | 2019/08/29 19:41:28 received event {ID:8}
server_1 | 2019/08/29 19:41:29 received event {ID:9}
```

Open another termial and take a look at Kafka topics to see that all messages are there. The initial events should be present on the `events` topic:
Open another terminal and take a look at Kafka topics to see that all messages are there. The initial events should be present on the `events` topic:

```bash
> docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic events
Expand Down
2 changes: 1 addition & 1 deletion components/cqrs/command_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestCommandProcessor_non_pointer_command(t *testing.T) {
assert.IsType(t, cqrs.NonPointerError{}, errors.Cause(err))
}

// TestCommandProcessor_multiple_same_command_handlers checks, that we don't register multiple handlers for the same commend.
// TestCommandProcessor_multiple_same_command_handlers checks, that we don't register multiple handlers for the same command.
func TestCommandProcessor_multiple_same_command_handlers(t *testing.T) {
ts := NewTestServices()

Expand Down
17 changes: 15 additions & 2 deletions components/requestreply/backend_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type PubSubBackendModifyNotificationMessageFn func(msg *message.Message, params

type PubSubBackendOnListenForReplyFinishedFn func(ctx context.Context, params PubSubBackendSubscribeParams)

type ReplyPublishErrorHandler func(replyTopic string, notificationMsg *message.Message, err error) error

type PubSubBackendConfig struct {
Publisher message.Publisher
SubscriberConstructor PubSubBackendSubscriberConstructorFn
Expand All @@ -85,10 +87,15 @@ type PubSubBackendConfig struct {
OnListenForReplyFinished PubSubBackendOnListenForReplyFinishedFn

// AckCommandErrors determines if the command should be acked or nacked when handler returns an error.
// Command will be always nacked, when sending reply fails.
// Command will be nacked by default when sending reply fails, you can control this behaviour with the
// ReplyPublishErrorHandler config option.
// You should use this option instead of cqrs.CommandProcessorConfig.AckCommandHandlingErrors, as it's aware
// if error was returned by handler or sending reply failed.
AckCommandErrors bool

// ReplyPublishErrorHandler if not nil will be invoked when sending the reply fails. If it returns an error
// the command will ba nacked.
ReplyPublishErrorHandler ReplyPublishErrorHandler
}

func (p *PubSubBackendConfig) setDefaults() {
Expand Down Expand Up @@ -245,7 +252,13 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba
return errors.Wrap(err, "cannot generate request/reply notify topic")
}

if err := p.config.Publisher.Publish(replyTopic, notificationMsg); err != nil {
err = p.config.Publisher.Publish(replyTopic, notificationMsg)
if err != nil {
if p.config.ReplyPublishErrorHandler != nil {
err = p.config.ReplyPublishErrorHandler(replyTopic, notificationMsg, err)
}
}
if err != nil {
return errors.Wrap(err, "cannot publish command executed message")
}

Expand Down
2 changes: 1 addition & 1 deletion dev/coverage.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh
########
# Source: https://gist.github.com/lwolf/3764a3b6cd08387e80aa6ca3b9534b8a
# originaly from https://github.com/mlafeldt/chef-runner/blob/v0.7.0/script/coverage
# originally from https://github.com/mlafeldt/chef-runner/blob/v0.7.0/script/coverage
#######
# Generate test coverage statistics for Go packages.
#
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/render-md.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
{{% load-snippet-partial (...) %}}
{{% /render-md %}}

bacause it is rendered as raw text by default
because it is rendered as raw text by default
*/}}
{{.Inner}}
2 changes: 1 addition & 1 deletion message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (r *Router) decorateHandlerSubscriber(h *handler) error {
return nil
}

// addHandlerContext enriches the contex with values that are relevant within this handler's context.
// addHandlerContext enriches the context with values that are relevant within this handler's context.
func (h *handler) addHandlerContext(messages ...*Message) {
for i, msg := range messages {
ctx := msg.Context()
Expand Down
2 changes: 1 addition & 1 deletion message/router/middleware/deduplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
const MessageHasherReadLimitMinimum = 64

// ExpiringKeyRepository is a state container for checking the
// existance of a key in a certain time window.
// existence of a key in a certain time window.
// All operations must be safe for concurrent use.
type ExpiringKeyRepository interface {
// IsDuplicate returns `true` if the key
Expand Down
4 changes: 2 additions & 2 deletions message/subscriber/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

// BulkRead reads provided amount of messages from the provided channel, until a timeout occurrs or the limit is reached.
// BulkRead reads provided amount of messages from the provided channel, until a timeout occurs or the limit is reached.
func BulkRead(messagesCh <-chan *message.Message, limit int, timeout time.Duration) (receivedMessages message.Messages, all bool) {
MessagesLoop:
for len(receivedMessages) < limit {
Expand All @@ -27,7 +27,7 @@ MessagesLoop:
}

// BulkReadWithDeduplication reads provided number of messages from the provided channel, ignoring duplicates,
// until a timeout occurrs or the limit is reached.
// until a timeout occurs or the limit is reached.
func BulkReadWithDeduplication(messagesCh <-chan *message.Message, limit int, timeout time.Duration) (receivedMessages message.Messages, all bool) {
receivedIDs := map[string]struct{}{}

Expand Down
2 changes: 1 addition & 1 deletion slog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *SlogLoggerAdapter) Debug(msg string, fields LogFields) {
func (s *SlogLoggerAdapter) Trace(msg string, fields LogFields) {
s.slog.Log(
// Void context, following the slog example
// as it treats context slighly differently from
// as it treats context slightly differently from
// normal usage, minding contextual
// values, but ignoring contextual deadline.
// See the [slog] package documentation
Expand Down

0 comments on commit 92689ff

Please sign in to comment.