Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make both Shovel and Federation upstream URI use URISet #194

Merged
merged 2 commits into from
May 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
## Changes Between 2.8.0 and 2.9.0 (in development)

No changes yet.
This release contains **minor breaking public API changes**.

### Support for Lists of Federation Upstream URIs

Federation definition now uses a dedicated type, `URISet`, to represent
a set of URIs that will be tried sequentially until the link
can successfully connect and authenticate:

``` go
def1 := FederationDefinition{
Uri: URISet{"amqp://hostname/%2f"},
}
```

`URISet` has now replaced `ShovelURISet`:

``` go
sDef := ShovelDefinition{
SourceURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
SourceQueue: "mySourceQueue",
DestinationURI: ShovelURISet([]string{"amqp://host1/%2f"}),
DestinationQueue: "myDestQueue",
AddForwardHeaders: true,
AckMode: "on-confirm",
DeleteAfter: "never",
}
```

GitHub issues: [#193](https://github.com/michaelklishin/rabbit-hole/pull/193), [#194](https://github.com/michaelklishin/rabbit-hole/pull/194)

## Changes Between 2.7.0 and 2.8.0 (Apr 12, 2021)

Expand Down
34 changes: 32 additions & 2 deletions common.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package rabbithole

import "strconv"
import (
"encoding/json"
"strconv"
)

// Properties are extra arguments as a map (on queues, bindings, etc)
type Properties map[string]interface{}

// Port used by RabbitMQ or clients
type Port int

// UnmarshalJSON deserialises
// UnmarshalJSON deserialises a port that can be an integer or string
func (p *Port) UnmarshalJSON(b []byte) error {
stringValue := string(b)
var parsed int64
Expand Down Expand Up @@ -69,3 +72,30 @@ type MessageStats struct {
DropUnroutable int64 `json:"drop_unroutable"`
DropUnroutableDetails RateDetails `json:"drop_unroutable_details"`
}

// URISet represents a set of URIs used by Shovel, Federation, and so on.
// The URIs from this set are tried until one of them succeeds
// (a shovel or federation link successfully connects and authenticates with it)
type URISet []string

// UnmarshalJSON can unmarshal a single URI string or a list of
// URI strings
func (s *URISet) UnmarshalJSON(b []byte) error {
// the value is a single URI, a string
if b[0] == '"' {
var uri string
if err := json.Unmarshal(b, &uri); err != nil {
return err
}
*s = []string{uri}
return nil
}

// the value is a list
var uris []string
if err := json.Unmarshal(b, &uris); err != nil {
return err
}
*s = uris
return nil
}
8 changes: 4 additions & 4 deletions federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// FederationDefinition represents settings
// that will be used by federation links.
type FederationDefinition struct {
Uri []string `json:"uri"`
Uri URISet `json:"uri"`
Expires int `json:"expires,omitempty"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops,omitempty"`
Expand Down Expand Up @@ -38,7 +38,7 @@ const FederationUpstreamComponent string = "federation-upstream"

// ListFederationUpstreams returns a list of all federation upstreams.
func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent)
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent)
if err != nil {
return []FederationUpstream{}, err
}
Expand All @@ -56,7 +56,7 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error)

// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost.
func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" + url.PathEscape(vhost))
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent+"/"+url.PathEscape(vhost))
if err != nil {
return []FederationUpstream{}, err
}
Expand All @@ -74,7 +74,7 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre

// GetFederationUpstream returns information about a federation upstream.
func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) {
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" +url.PathEscape(vhost)+ "/" +url.PathEscape(name))
req, err := newGETRequest(c, "parameters/"+FederationUpstreamComponent+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name))
if err != nil {
return nil, err
}
Expand Down
28 changes: 14 additions & 14 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,13 +2260,13 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when there are upstreams", func() {
It("returns the list of upstreams", func() {
def1 := FederationDefinition{
Uri: []string{"amqp://server-name/%2f"},
Uri: URISet{"amqp://server-name/%2f"},
}
_, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: []string{"amqp://example.com/%2f"},
Uri: URISet{"amqp://example.com/%2f"},
}
_, err = rmqc.PutFederationUpstream("/", "upstream2", def2)
Ω(err).Should(BeNil())
Expand Down Expand Up @@ -2306,14 +2306,14 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"

def1 := FederationDefinition{
Uri: []string{"amqp://server-name/%2f"},
Uri: URISet{"amqp://server-name/%2f"},
}

_, err := rmqc.PutFederationUpstream(vh, "vhost-upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: []string{"amqp://example.com/%2f"},
Uri: URISet{"amqp://example.com/%2f"},
}

_, err = rmqc.PutFederationUpstream(vh, "vhost-upstream2", def2)
Expand Down Expand Up @@ -2647,8 +2647,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2704,8 +2704,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2751,8 +2751,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2800,8 +2800,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"
sn := "temporary"

ssu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
sdu := ShovelURISet([]string{"amqp://127.0.0.1/%2f"})
ssu := URISet([]string{"amqp://127.0.0.1/%2f"})
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
Expand Down Expand Up @@ -2895,9 +2895,9 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(err).Should(BeNil())

sDef := ShovelDefinition{
SourceURI: ShovelURISet([]string{"amqp://127.0.0.1/%2f"}),
SourceURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
SourceQueue: "mySourceQueue",
DestinationURI: ShovelURISet([]string{"amqp://127.0.0.1/%2f"}),
DestinationURI: URISet([]string{"amqp://127.0.0.1/%2f"}),
DestinationQueue: "myDestQueue",
AddForwardHeaders: true,
AckMode: "on-confirm",
Expand Down
31 changes: 2 additions & 29 deletions shovels.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,10 @@ func (d *DeleteAfter) UnmarshalJSON(b []byte) error {
return nil
}

// ShovelURISet represents a set of URIs used by Shovel.
// The URIs from this set are tried until one of them succeeds
// (Shovel successfully connects and authenticates with it)
type ShovelURISet []string

// UnmarshalJSON can unmarshal a single URI string or a list of
// URI strings
func (s *ShovelURISet) UnmarshalJSON(b []byte) error {
// the value is a single URI, a string
if b[0] == '"' {
var uri string
if err := json.Unmarshal(b, &uri); err != nil {
return err
}
*s = []string{uri}
return nil
}

// the value is a list
var uris []string
if err := json.Unmarshal(b, &uris); err != nil {
return err
}
*s = uris
return nil
}

// ShovelDefinition contains the details of the shovel configuration
type ShovelDefinition struct {
DestinationURI ShovelURISet `json:"dest-uri"`
SourceURI ShovelURISet `json:"src-uri"`
DestinationURI URISet `json:"dest-uri"`
SourceURI URISet `json:"src-uri"`

AckMode string `json:"ack-mode,omitempty"`
AddForwardHeaders bool `json:"add-forward-headers,omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ var _ = Describe("Unit tests", func() {
})
})

Context("ShovelURISet marshalling", func() {
Context("URISet marshalling", func() {
It("unmarshalls a single string", func() {
var us ShovelURISet
var us URISet
bs := []byte("\"amqp://127.0.0.1:5672\"")
err := us.UnmarshalJSON(bs)
Ω(err).ShouldNot(HaveOccurred())

Ω(us).Should(Equal(ShovelURISet([]string{"amqp://127.0.0.1:5672"})))
Ω(us).Should(Equal(URISet([]string{"amqp://127.0.0.1:5672"})))
})

It("unmarshalls a list of strings", func() {
var us ShovelURISet
var us URISet
bs := []byte("[\"amqp://127.0.0.1:5672\", \"amqp://localhost:5672\"]")
err := us.UnmarshalJSON(bs)
Ω(err).ShouldNot(HaveOccurred())

Ω(us).Should(Equal(ShovelURISet([]string{"amqp://127.0.0.1:5672", "amqp://localhost:5672"})))
Ω(us).Should(Equal(URISet([]string{"amqp://127.0.0.1:5672", "amqp://localhost:5672"})))
})
})
})