Skip to content

Commit

Permalink
Adding denies $KV.>/$OBJ.> along leaf connections on differing domain (
Browse files Browse the repository at this point in the history
…#2916)

* Adding denies $KV.>/$OBJ.> along leaf connections on differing domain

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel authored Mar 9, 2022
1 parent 5a97ee6 commit 9a2da9e
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.14.4
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d h1:zJf4l8Kp67RIZhoVeniSLZs69SHNgjLHz0aNsqPPlx8=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
12 changes: 4 additions & 8 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,18 +586,14 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {
// Check if we have a Domain specified.
// If so add in a subject mapping that will allow local connected clients to reach us here as well.
if opts := s.getOpts(); opts.JetStreamDomain != _EMPTY_ {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
found := false
mappings := generateJSMappingTable(opts.JetStreamDomain)
a.mu.RLock()
for _, m := range a.mappings {
if src == m.src {
found = true
break
}
delete(mappings, m.src)
}
a.mu.RUnlock()
if !found {
if err := a.AddMapping(src, jsAllAPI); err != nil {
for src, dest := range mappings {
if err := a.AddMapping(src, dest); err != nil {
s.Errorf("Error adding JetStream domain mapping: %v", err)
}
}
Expand Down
29 changes: 28 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,34 @@ const (
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
)

var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI}
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}

func generateJSMappingTable(domain string) map[string]string {
mappings := map[string]string{}
// This set of mappings is very very very ugly.
// It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
// For optics $KV and $OBJ where made to be independent subject spaces.
// As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
// This is very unfortunate!!!
// Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
// Especially since the actual API for say KV, does use stream create from JS.
// To avoid overlaps KV and OBJ views append the prefix to their API.
// (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
// This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
for srcMappingSuffix, to := range map[string]string{
"INFO": JSApiAccountInfo,
"STREAM.>": "$JS.API.STREAM.>",
"CONSUMER.>": "$JS.API.CONSUMER.>",
"META.>": "$JS.API.META.>",
"SERVER.>": "$JS.API.SERVER.>",
"$KV.>": "$KV.>",
"$OBJ.>": "$OBJ.>",
} {
mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
}
return mappings
}

// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6875,7 +6875,7 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) {
// Client based API - Connected to the core cluster with no JS but account has JS.
s := c.randomServer()
// Make sure the JS interest from the LNs has made it to this server.
checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.>", time.Second)
checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.INFO", time.Second)
nc, _ := jsClientConnect(t, s, nats.UserInfo("nojs", "p"))
defer nc.Close()

Expand Down
27 changes: 18 additions & 9 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,8 +1251,8 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
meta.Campaign()
}
} else {
c.Noticef("JetStream Not Extended, adding deny %q for account %q", jsAllAPI, accName)
c.mergeDenyPermissionsLocked(both, []string{jsAllAPI})
c.Noticef("JetStream Not Extended, adding deny %+v for account %q", denyAllClientJs, accName)
c.mergeDenyPermissionsLocked(both, denyAllClientJs)
}
blockMappingOutgoing = true
} else if acc == sysAcc {
Expand All @@ -1274,19 +1274,21 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
// If the system account is shared, jsAllAPI traffic will go through the system account.
// So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
// If the system account is NOT shared, jsAllAPI traffic has no business
c.Noticef("Adding deny %q for account %q", jsAllAPI, accName)
c.mergeDenyPermissionsLocked(both, []string{jsAllAPI})
c.Noticef("Adding deny %+v for account %q", denyAllClientJs, accName)
c.mergeDenyPermissionsLocked(both, denyAllClientJs)
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
if err := acc.AddMapping(src, jsAllAPI); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
} else {
c.Noticef("Adding JetStream Domain Mapping %q to account %q", src, accName)
for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
if err := acc.AddMapping(src, dest); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
} else {
c.Noticef("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
}
}
if blockMappingOutgoing {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
// make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
// This is a guard against a miss-config with two identical domain names and will only cover some forms
// of this issue, not all of them.
Expand Down Expand Up @@ -1496,6 +1498,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
}
// Likewise for mappings.
for _, m := range acc.mappings {
if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
continue
}
ims = append(ims, m.src)
}

Expand Down Expand Up @@ -1695,6 +1701,9 @@ func (c *client) forceAddToSmap(subj string) {
c.mu.Lock()
defer c.mu.Unlock()

if c.leaf.smap == nil {
return
}
n := c.leaf.smap[subj]
if n != 0 {
return
Expand Down
97 changes: 97 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5183,3 +5183,100 @@ func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
natsPub(t, cln1, "foo", []byte("hello"))
natsNexMsg(t, sln2, time.Second)
}

func TestLeafNodeJetStreamDomainMapCrossTalk(t *testing.T) {
accs := `
accounts :{
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
SYS:{ users:[ {user:s1,password:s1}]},
}
system_account: SYS
`

sd1 := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(sd1)
confA := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
%s
jetstream: { domain: da, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb }
leafnodes: {
listen: 127.0.0.1:-1
no_advertise: true
authorization: {
timeout: 0.5
}
}
`, accs, sd1)))
defer removeFile(t, confA)
sA, _ := RunServerWithConfig(confA)
defer sA.Shutdown()

sd2 := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(sd2)
confL := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
%s
jetstream: { domain: dl, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb }
leafnodes:{
no_advertise: true
remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},
{url:nats://s1:s1@127.0.0.1:%d, account: SYS}]
}
`, accs, sd2, sA.opts.LeafNode.Port, sA.opts.LeafNode.Port)))
defer removeFile(t, confL)
sL, _ := RunServerWithConfig(confL)
defer sL.Shutdown()

ncA := natsConnect(t, sA.ClientURL(), nats.UserInfo("a1", "a1"))
defer ncA.Close()
ncL := natsConnect(t, sL.ClientURL(), nats.UserInfo("a1", "a1"))
defer ncL.Close()

test := func(jsA, jsL nats.JetStreamContext) {
kvA, err := jsA.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"})
require_NoError(t, err)
kvL, err := jsL.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"})
require_NoError(t, err)

_, err = kvA.Put("A", nil)
require_NoError(t, err)
_, err = kvL.Put("L", nil)
require_NoError(t, err)

// check for unwanted cross talk
_, err = kvA.Get("A")
require_NoError(t, err)
_, err = kvA.Get("l")
require_Error(t, err)
require_True(t, err == nats.ErrKeyNotFound)

_, err = kvL.Get("A")
require_Error(t, err)
require_True(t, err == nats.ErrKeyNotFound)
_, err = kvL.Get("L")
require_NoError(t, err)

err = jsA.DeleteKeyValue("bucket")
require_NoError(t, err)
err = jsL.DeleteKeyValue("bucket")
require_NoError(t, err)
}

jsA, err := ncA.JetStream()
require_NoError(t, err)
jsL, err := ncL.JetStream()
require_NoError(t, err)
test(jsA, jsL)

jsAL, err := ncA.JetStream(nats.Domain("dl"))
require_NoError(t, err)
jsLA, err := ncL.JetStream(nats.Domain("da"))
require_NoError(t, err)
test(jsAL, jsLA)

jsAA, err := ncA.JetStream(nats.Domain("da"))
require_NoError(t, err)
jsLL, err := ncL.JetStream(nats.Domain("dl"))
require_NoError(t, err)
test(jsAA, jsLL)
}
10 changes: 6 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,10 +1405,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account {
if jsEnabled {
s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
} else if defDomain != _EMPTY_ {
dest := fmt.Sprintf(jsDomainAPI, defDomain)
s.Noticef("Adding default domain mapping %q -> %q to account %q %p", jsAllAPI, dest, accName, acc)
if err := acc.AddMapping(jsAllAPI, dest); err != nil {
s.Errorf("Error adding JetStream default domain mapping: %v", err)
for src, dest := range generateJSMappingTable(defDomain) {
// flip src and dest around so the domain is inserted
s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
if err := acc.AddMapping(dest, src); err != nil {
s.Errorf("Error adding JetStream default domain mapping: %v", err)
}
}
}
}
Expand Down

0 comments on commit 9a2da9e

Please sign in to comment.