Skip to content

Commit

Permalink
[ADDED] Lookup stream name by subject (#1292)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Jun 8, 2023
1 parent 10a535a commit f554c44
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
3 changes: 3 additions & 0 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ var (
// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}

// ErrInvalidSubject is returned when the provided subject name is invalid.
ErrInvalidSubject JetStreamError = &jsError{message: "invalid subject name"}

// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.').
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}

Expand Down
45 changes: 45 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -68,6 +69,8 @@ type (
UpdateStream(context.Context, StreamConfig) (Stream, error)
// Stream returns a [Stream] hook for a given stream name
Stream(context.Context, string) (Stream, error)
// StreamNameBySubject returns a stream name stream listening on given subject
StreamNameBySubject(context.Context, string) (string, error)
// DeleteStream removes a stream with given name
DeleteStream(context.Context, string) error
// ListStreams returns StreamInfoLister enabling iterating over a channel of stream infos
Expand Down Expand Up @@ -186,8 +189,14 @@ type (
apiPaged
Streams []string `json:"streams"`
}

streamsRequest struct {
Subject string `json:"subject,omitempty"`
}
)

var subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`)

// New returns a enw JetStream instance
//
// Available options:
Expand Down Expand Up @@ -494,6 +503,16 @@ func validateStreamName(stream string) error {
return nil
}

func validateSubject(subject string) error {
if subject == "" {
return fmt.Errorf("%w: %s", ErrInvalidSubject, "subject cannot be empty")
}
if !subjectRegexp.MatchString(subject) {
return fmt.Errorf("%w: %s", ErrInvalidSubject, subject)
}
return nil
}

func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) {
var resp accountInfoResponse

Expand Down Expand Up @@ -591,6 +610,32 @@ func (js *jetStream) StreamNames(ctx context.Context) StreamNameLister {
return l
}

func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) {
if err := validateSubject(subject); err != nil {
return "", err
}
streamsSubject := apiSubj(js.apiPrefix, apiStreams)

r := &streamsRequest{Subject: subject}
req, err := json.Marshal(r)
if err != nil {
return "", err
}
var resp streamNamesResponse
_, err = js.apiRequestJSON(ctx, streamsSubject, &resp, req)
if err != nil {
return "", err
}
if resp.Error != nil {
return "", resp.Error
}
if len(resp.Streams) == 0 {
return "", ErrStreamNotFound
}

return resp.Streams[0], nil
}

// Name returns a channel allowing retrieval of stream names returned by [StreamNames]
func (s *streamLister) Name() <-chan string {
return s.names
Expand Down
36 changes: 36 additions & 0 deletions jetstream/message_test.go → jetstream/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package jetstream

import (
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -80,3 +81,38 @@ func TestMessageMetadata(t *testing.T) {
})
}
}

func TestValidateSubject(t *testing.T) {
tests := []struct {
subject string
withError bool
}{
{"test.A", false},
{"test.*", false},
{"*", false},
{"*.*", false},
{"test.*.A", false},
{"test.>", false},
{">", false},
{">.", true},
{"test.>.A", true},
{"", true},
{"test A", true},
}

for _, test := range tests {
tName := fmt.Sprintf("subj=%s,err=%t", test.subject, test.withError)
t.Run(tName, func(t *testing.T) {
err := validateSubject(test.subject)
if test.withError {
if err == nil {
t.Fatal("Expected error; got nil")
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
})
}
}
78 changes: 78 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,81 @@ func TestJetStream_DeleteConsumer(t *testing.T) {
})
}
}

func TestStreamNameBySubject(t *testing.T) {
tests := []struct {
name string
subject string
withError error
expected string
}{
{
name: "get stream name by subject explicit",
subject: "FOO.123",
expected: "foo",
},
{
name: "get stream name by subject with wildcard",
subject: "BAR.*",
expected: "bar",
},
{
name: "match more than one stream, return the first one",
subject: ">",
expected: "",
},
{
name: "stream not found",
subject: "BAR.XYZ",
withError: jetstream.ErrStreamNotFound,
},
{
name: "invalid subject",
subject: "FOO.>.123",
withError: jetstream.ErrInvalidSubject,
},
}

srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "bar", Subjects: []string{"BAR.ABC"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
name, err := js.StreamNameBySubject(ctx, test.subject)
if test.withError != nil {
if err == nil || !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if test.expected != "" && name != test.expected {
t.Fatalf("Unexpected stream name; want: %s; got: %s", test.expected, name)
}

})
}
}
2 changes: 1 addition & 1 deletion micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ var (
// this regular expression is suggested regexp for semver validation: https://semver.org/
semVerRegexp = regexp.MustCompile(`^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`)
nameRegexp = regexp.MustCompile(`^[A-Za-z0-9\-_]+$`)
subjectRegexp = regexp.MustCompile(`^[^ >]+[>]?$`)
subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`)
)

// Common errors returned by the Service framework.
Expand Down

0 comments on commit f554c44

Please sign in to comment.