Skip to content

Commit

Permalink
Merge branch 'main' into 964-pass-message-to-malformed-event
Browse files Browse the repository at this point in the history
  • Loading branch information
raza-matillion authored Oct 21, 2023
2 parents 0f7087a + 566d1be commit bda8308
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 26 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ jobs:
ports:
- 4222:4222

jetstream:
image: bitnami/nats:latest
env:
NATS_EXTRA_ARGS: "--jetstream --port 4223"
ports:
- 4223:4223

amqp:
image: scholzj/qpid-dispatch
env:
Expand Down
98 changes: 96 additions & 2 deletions protocol/nats_jetstream/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ package nats_jetstream
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/nats-io/nats.go"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
)

const (
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md
prefix = "ce-"
contentTypeHeader = "content-type"
)

var specs = spec.WithPrefix(prefix)

// Message implements binding.Message by wrapping an *nats.Msg.
// This message *can* be read several times safely
type Message struct {
Expand All @@ -24,8 +35,15 @@ type Message struct {

// NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.
func NewMessage(msg *nats.Msg) *Message {
return &Message{Msg: msg, encoding: binding.EncodingStructured}
encoding := binding.EncodingStructured
if msg.Header != nil {
if msg.Header.Get(specs.PrefixedSpecVersionName()) != "" {
encoding = binding.EncodingBinary
}
}
return &Message{Msg: msg, encoding: encoding}
}

var _ binding.Message = (*Message)(nil)
Expand All @@ -37,15 +55,91 @@ func (m *Message) ReadEncoding() binding.Encoding {

// ReadStructured transfers a structured-mode event to a StructuredWriter.
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.encoding != binding.EncodingStructured {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data))
}

// ReadBinary transfers a binary-mode event to an BinaryWriter.
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
return binding.ErrNotBinary
if m.encoding != binding.EncodingBinary {
return binding.ErrNotBinary
}

version := m.GetVersion()
if version == nil {
return binding.ErrNotBinary
}

var err error
for k, v := range m.Msg.Header {
headerValue := v[0]
if strings.HasPrefix(k, prefix) {
attr := version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, headerValue)
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
}
} else if k == contentTypeHeader {
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
}
if err != nil {
return err
}
}

if m.Msg.Data != nil {
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data))
}

return err
}

// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (m *Message) Finish(err error) error {
return nil
}

// GetAttribute implements binding.MessageMetadataReader
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) {
key := withPrefix(attributeKind.String())
if m.Msg.Header != nil {
headerValue := m.Msg.Header.Get(key)
if headerValue != "" {
version := m.GetVersion()
return version.Attribute(key), headerValue
}
}
return nil, nil
}

// GetExtension implements binding.MessageMetadataReader
func (m *Message) GetExtension(name string) interface{} {
key := withPrefix(name)
if m.Msg.Header != nil {
headerValue := m.Msg.Header.Get(key)
if headerValue != "" {
return headerValue
}
}
return nil
}

// GetVersion looks for specVersion header and returns a Version object
func (m *Message) GetVersion() spec.Version {
if m.Msg.Header == nil {
return nil
}
versionValue := m.Msg.Header.Get(specs.PrefixedSpecVersionName())
if versionValue == "" {
return nil
}
return specs.Version(versionValue)
}

// withPrefix prepends the prefix to the attribute name
func withPrefix(attributeName string) string {
return fmt.Sprintf("%s%s", prefix, attributeName)
}
54 changes: 42 additions & 12 deletions protocol/nats_jetstream/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package nats_jetstream
import (
"context"
"encoding/json"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"
"testing"

"github.com/cloudevents/sdk-go/v2/binding/spec"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
"github.com/nats-io/nats.go"
)

var (
outBinaryMessage = bindingtest.MockBinaryMessage{}
outBinaryMessage = bindingtest.MockBinaryMessage{
Metadata: map[spec.Attribute]interface{}{},
Extensions: map[string]interface{}{},
}
outStructMessage = bindingtest.MockStructuredMessage{}

testEvent = test.FullEvent()
Expand All @@ -31,18 +36,44 @@ var (
Subject: "hello",
Data: binaryData,
}
binaryConsumerMessage = &nats.Msg{
Subject: "hello",
Data: testEvent.Data(),
Header: nats.Header{
"ce-type": {testEvent.Type()},
"ce-source": {testEvent.Source()},
"ce-id": {testEvent.ID()},
"ce-time": {test.Timestamp.String()},
"ce-specversion": {"1.0"},
"ce-dataschema": {test.Schema.String()},
"ce-datacontenttype": {"text/json"},
"ce-subject": {"receiverTopic"},
"ce-exta": {"someext"},
},
}
)

func TestNewMessage(t *testing.T) {
tests := []struct {
name string
consumerMessage *nats.Msg
expectedEncoding binding.Encoding
name string
consumerMessage *nats.Msg
expectedEncoding binding.Encoding
expectedStructuredError error
expectedBinaryError error
}{
{
name: "Structured encoding",
consumerMessage: structuredConsumerMessage,
expectedEncoding: binding.EncodingStructured,
name: "Structured encoding",
consumerMessage: structuredConsumerMessage,
expectedEncoding: binding.EncodingStructured,
expectedStructuredError: nil,
expectedBinaryError: binding.ErrNotBinary,
},
{
name: "Binary encoding",
consumerMessage: binaryConsumerMessage,
expectedEncoding: binding.EncodingBinary,
expectedStructuredError: binding.ErrNotStructured,
expectedBinaryError: nil,
},
}
for _, tt := range tests {
Expand All @@ -52,17 +83,16 @@ func TestNewMessage(t *testing.T) {
t.Errorf("Error in NewMessage!")
}
err := got.ReadBinary(context.TODO(), &outBinaryMessage)
if err == nil {
t.Errorf("Response in ReadBinary should err")
if err != tt.expectedBinaryError {
t.Errorf("ReadBinary err:%s", err.Error())
}
err = got.ReadStructured(context.TODO(), &outStructMessage)
if err != nil {
if err != tt.expectedStructuredError {
t.Errorf("ReadStructured err:%s", err.Error())
}
if got.ReadEncoding() != tt.expectedEncoding {
t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding())
}

})
}
}
14 changes: 14 additions & 0 deletions protocol/nats_jetstream/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ func NatsOptions(opts ...nats.Option) []nats.Option {
// ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
type ProtocolOption func(*Protocol) error

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption {
return func(p *Protocol) error {
p.consumerOptions = opts
return nil
}
}

func WithSenderOptions(opts ...SenderOption) ProtocolOption {
return func(p *Protocol) error {
p.senderOptions = opts
return nil
}
}

type SenderOption func(*Sender) error

type ConsumerOption func(*Consumer) error
Expand Down
13 changes: 7 additions & 6 deletions protocol/nats_jetstream/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package nats_jetstream

import (
"context"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"

Expand All @@ -18,11 +19,11 @@ import (
type Protocol struct {
Conn *nats.Conn

Consumer *Consumer
//consumerOptions []ConsumerOption
Consumer *Consumer
consumerOptions []ConsumerOption

Sender *Sender
//senderOptions []SenderOption
Sender *Sender
senderOptions []SenderOption

connOwned bool // whether this protocol created the nats connection
}
Expand Down Expand Up @@ -55,11 +56,11 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st
return nil, err
}

if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts); err != nil {
if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts, p.consumerOptions...); err != nil {
return nil, err
}

if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts); err != nil {
if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts, p.senderOptions...); err != nil {
return nil, err
}

Expand Down
12 changes: 10 additions & 2 deletions protocol/nats_jetstream/v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,18 @@ func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...b
}()

writer := new(bytes.Buffer)
if err = WriteMsg(ctx, in, writer, transformers...); err != nil {
header, err := WriteMsg(ctx, in, writer, transformers...)
if err != nil {
return err
}
_, err = s.Jsm.Publish(s.Subject, writer.Bytes())

natsMsg := &nats.Msg{
Subject: s.Subject,
Data: writer.Bytes(),
Header: header,
}

_, err = s.Jsm.PublishMsg(natsMsg)

return err
}
Expand Down
Loading

0 comments on commit bda8308

Please sign in to comment.