Skip to content

Commit

Permalink
[FIX] direct get APIs can contain duplicate Nats-* headers, because…
Browse files Browse the repository at this point in the history
… it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message.

Fix #4573
  • Loading branch information
aricart committed Oct 4, 2023
1 parent 195227e commit 7138157
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 0 deletions.
13 changes: 13 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3968,6 +3968,19 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra
return rsi
}

func removeStreamIdentityHeaders(hdr []byte) []byte {
if hdr == nil {
return hdr
}
if idx := bytes.Index(hdr, []byte(JSHeaderPrefix)); idx == -1 {
return hdr
}
for _, h := range []string{JSStream, JSSequence, JSTimeStamp, JSSubject} {
hdr = removeHeaderIfPresent(hdr, h)
}
return hdr
}

// Will remove a header if present.
func removeHeaderIfPresent(hdr []byte, key string) []byte {
start := bytes.Index(hdr, []byte(key))
Expand Down
111 changes: 111 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21931,3 +21931,114 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

name := nuid.Next()

req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("%s.>", name)},
RePublish: &RePublish{
Source: ">",
Destination: fmt.Sprintf("X%s.>", name),
},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

targetStream := fmt.Sprintf("%s-2", name)
req = StreamConfig{
Name: targetStream,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("X%s.>", name)},
AllowDirect: true,
}
reqJson, err = json.Marshal(req)
require_NoError(t, err)
_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second)
require_NoError(t, err)

// Now publish a message to the source stream.
sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data")

getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream)

r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second)
require_NoError(t, err)

require_Equal(t, len(r.Header.Values(JSStream)), 1)
require_Equal(t, len(r.Header.Values(JSSubject)), 1)
require_Equal(t, len(r.Header.Values(JSSequence)), 1)
require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1)
}

func TestRemoveAllJetStreamHeadersIfPresent(t *testing.T) {
// copied private fn from nats.go
headerBytes := func(h nats.Header) ([]byte, error) {
var hdr []byte
if len(h) == 0 {
return hdr, nil
}

var b bytes.Buffer
_, err := b.WriteString(hdrLine)
if err != nil {
return nil, err
}

err = http.Header(h).Write(&b)
if err != nil {
return nil, err
}

_, err = b.WriteString("\r\n")
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

// test nil input
v := removeStreamIdentityHeaders(nil)
if v != nil {
t.Fatalf("expected headers to be nil")
}

h := nats.Header{}
// expect empty to become nil
hb, err := headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect Nats-Stream|Sequence|Subject|Time-Stamp to be removed
h.Add(JSStream, "test")
h.Add(JSSequence, "1")
h.Add(JSSubject, "test.bar")
h.Add(JSTimeStamp, "something")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
if hb != nil {
t.Fatalf("expected headers to be nil")
}

// expect one header to remain
h.Add("Something-Else", "hey")
hb, err = headerBytes(h)
require_NoError(t, err)
hb = removeStreamIdentityHeaders(hb)
require_Equal(t, "NATS/1.0\r\nSomething-Else: hey\r\n\r\n", string(hb))
}
8 changes: 8 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ const (

// Headers for republished messages and direct gets.
const (
JSHeaderPrefix = "Nats-"
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
Expand Down Expand Up @@ -4051,6 +4052,8 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
} else {
hdr = copyBytes(hdr)
// streams that onboarded via republish could contain the original stream headers
hdr = removeStreamIdentityHeaders(hdr)
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
Expand Down Expand Up @@ -4399,6 +4402,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// if we are NOT doing subject remapping, this is a message to on-board it shouldn't store Nats-* headers
if tsubj == _EMPTY_ {
hdr = removeStreamIdentityHeaders(hdr)
}

// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
Expand Down

0 comments on commit 7138157

Please sign in to comment.