Skip to content

Commit

Permalink
Merge pull request #194 from michaelklishin/federation-uri-set
Browse files Browse the repository at this point in the history
Make both Shovel and Federation upstream URI use URISet
  • Loading branch information
michaelklishin authored May 29, 2021
2 parents 4a874c4 + 87fa387 commit 7b2c1c2
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 55 deletions.
30 changes: 29 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
## Changes Between 2.8.0 and 2.9.0 (in development)

No changes yet.
This release contains **minor breaking public API changes**.

### Support for Lists of Federation Upstream URIs

Federation definition now uses a dedicated type, `URISet`, to represent
a set of URIs that will be tried sequentially until the link
can successfully connect and authenticate:

``` go
def1 := FederationDefinition{
Uri: URISet{"amqp://hostname/%2f"},
}
```

`URISet` has now replaced `ShovelURISet`:

``` go
sDef := ShovelDefinition{
SourceURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
SourceQueue: "mySourceQueue",
DestinationURI: ShovelURISet([]string{"amqp://host1/%2f"}),
DestinationQueue: "myDestQueue",
AddForwardHeaders: true,
AckMode: "on-confirm",
DeleteAfter: "never",
}
```

GitHub issues: [#193](https://github.com/michaelklishin/rabbit-hole/pull/193), [#194](https://github.com/michaelklishin/rabbit-hole/pull/194)

## Changes Between 2.7.0 and 2.8.0 (Apr 12, 2021)

Expand Down
34 changes: 32 additions & 2 deletions common.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package rabbithole

import "strconv"
import (
"encoding/json"
"strconv"
)

// Properties are extra arguments as a map (on queues, bindings, etc)
type Properties map[string]interface{}

// Port used by RabbitMQ or clients
type Port int

// UnmarshalJSON deserialises
// UnmarshalJSON deserialises a port that can be an integer or string
func (p *Port) UnmarshalJSON(b []byte) error {
stringValue := string(b)
var parsed int64
Expand Down Expand Up @@ -69,3 +72,30 @@ type MessageStats struct {
DropUnroutable int64 `json:"drop_unroutable"`
DropUnroutableDetails RateDetails `json:"drop_unroutable_details"`
}

// URISet represents a set of URIs used by Shovel, Federation, and so on.
// The URIs from this set are tried until one of them succeeds
// (a shovel or federation link successfully connects and authenticates with it)
type URISet []string

// UnmarshalJSON can unmarshal a single URI string or a list of
// URI strings
func (s *URISet) UnmarshalJSON(b []byte) error {
// the value is a single URI, a string
if b[0] == '"' {
var uri string
if err := json.Unmarshal(b, &uri); err != nil {
return err
}
*s = []string{uri}
return nil
}

// the value is a list
var uris []string
if err := json.Unmarshal(b, &uris); err != nil {
return err
}
*s = uris
return nil
}
8 changes: 4 additions & 4 deletions federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// FederationDefinition represents settings
// that will be used by federation links.
type FederationDefinition struct {
Uri []string `json:"uri"`
Uri URISet `json:"uri"`
Expires int `json:"expires,omitempty"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops,omitempty"`
Expand Down Expand Up @@ -38,7 +38,7 @@ const FederationUpstreamComponent string = "federation-upstream"

// ListFederationUpstreams returns a list of all federation upstreams.
func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent)
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent)
if err != nil {
return []FederationUpstream{}, err
}
Expand All @@ -56,7 +56,7 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error)

// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost.
func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" + url.PathEscape(vhost))
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent+"/"+url.PathEscape(vhost))
if err != nil {
return []FederationUpstream{}, err
}
Expand All @@ -74,7 +74,7 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre

// GetFederationUpstream returns information about a federation upstream.
func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" +url.PathEscape(vhost)+ "/" +url.PathEscape(name))
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name))
if err != nil {
return nil, err
}
Expand Down
28 changes: 14 additions & 14 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,13 +2260,13 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when there are upstreams", func() {
It("returns the list of upstreams", func() {
def1 := FederationDefinition{
Uri: []string{"amqp://server-name/%2f"},
Uri: URISet{"amqp://server-name/%2f"},
}
_, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: []string{"amqp://example.com/%2f"},
Uri: URISet{"amqp://example.com/%2f"},
}
_, err = rmqc.PutFederationUpstream("/", "upstream2", def2)
Ω(err).Should(BeNil())
Expand Down Expand Up @@ -2306,14 +2306,14 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"

def1 := FederationDefinition{
Uri: []string{"amqp://server-name/%2f"},
Uri: URISet{"amqp://server-name/%2f"},
}

_, err := rmqc.PutFederationUpstream(vh, "vhost-upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: []string{"amqp://example.com/%2f"},
Uri: URISet{"amqp://example.com/%2f"},
}

_, err = rmqc.PutFederationUpstream(vh, "vhost-upstream2", def2)
Expand Down Expand Up @@ -2647,8 +2647,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2704,8 +2704,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2751,8 +2751,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2800,8 +2800,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2895,9 +2895,9 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(err).Should(BeNil())

sDef := ShovelDefinition{
SourceURI: ShovelURISet([]string{"amqp://127.0.0.1/%2f"}),
SourceURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
SourceQueue: "mySourceQueue",
DestinationURI: ShovelURISet([]string{"amqp://127.0.0.1/%2f"}),
DestinationURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
DestinationQueue: "myDestQueue",
AddForwardHeaders: true,
AckMode: "on-confirm",
Expand Down
31 changes: 2 additions & 29 deletions shovels.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,10 @@ func (d *DeleteAfter) UnmarshalJSON(b []byte) error {
return nil
}

// ShovelURISet represents a set of URIs used by Shovel.
// The URIs from this set are tried until one of them succeeds
// (Shovel successfully connects and authenticates with it)
type ShovelURISet []string

// UnmarshalJSON can unmarshal a single URI string or a list of
// URI strings
func (s *ShovelURISet) UnmarshalJSON(b []byte) error {
// the value is a single URI, a string
if b[0] == '"' {
var uri string
if err := json.Unmarshal(b, &uri); err != nil {
return err
}
*s = []string{uri}
return nil
}

// the value is a list
var uris []string
if err := json.Unmarshal(b, &uris); err != nil {
return err
}
*s = uris
return nil
}

// ShovelDefinition contains the details of the shovel configuration
type ShovelDefinition struct {
DestinationURI ShovelURISet `json:"dest-uri"`
SourceURI ShovelURISet `json:"src-uri"`
DestinationURI URISet `json:"dest-uri"`
SourceURI URISet `json:"src-uri"`

AckMode string `json:"ack-mode,omitempty"`
AddForwardHeaders bool `json:"add-forward-headers,omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ var _ = Describe("Unit tests", func() {
})
})

Context("ShovelURISet marshalling", func() {
Context("URISet marshalling", func() {
It("unmarshalls a single string", func() {
var us ShovelURISet
var us URISet
bs := []byte("\"amqp://127.0.0.1:5672\"")
err := us.UnmarshalJSON(bs)
Ω(err).ShouldNot(HaveOccurred())

Ω(us).Should(Equal(ShovelURISet([]string{"amqp://127.0.0.1:5672"})))
Ω(us).Should(Equal(URISet([]string{"amqp://127.0.0.1:5672"})))
})

It("unmarshalls a list of strings", func() {
var us ShovelURISet
var us URISet
bs := []byte("[\"amqp://127.0.0.1:5672\", \"amqp://localhost:5672\"]")
err := us.UnmarshalJSON(bs)
Ω(err).ShouldNot(HaveOccurred())

Ω(us).Should(Equal(ShovelURISet([]string{"amqp://127.0.0.1:5672", "amqp://localhost:5672"})))
Ω(us).Should(Equal(URISet([]string{"amqp://127.0.0.1:5672", "amqp://localhost:5672"})))
})
})
})

0 comments on commit 7b2c1c2

Please sign in to comment.