Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for runtime parameters, e.g. for configuring federation upstream components. #150

Merged
merged 19 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,64 @@ resp, err := rmqc.DeleteShovel("/", "a.shovel")

```

### Operations on Runtime (vhost-scoped) Parameters

```golang
// list all runtime parameters
params, err := rmqc.ListRuntimeParameters()
// => []RuntimeParameter, error

// list all runtime parameters for a component
params, err := rmqc.ListRuntimeParametersFor("federation-upstream")
// => []RuntimeParameter, error

// list runtime parameters in a vhost
params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/")
// => []RuntimeParameter, error

// information about a runtime parameter
p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name")
// => *RuntimeParameter, error

// declare or update a runtime parameter
resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error

// remove a runtime parameter
resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name")
// => *http.Response, error

```

### Operations on Federation Upstreams

```golang
// list all federation upstreams
ups, err := rmqc.ListFederationUpstreams()
// => []FederationUpstream, error

// list federation upstreams in a vhost
ups, err := rmqc.ListFederationUpstreamsIn("/")
// => []FederationUpstream, error

// information about a federated upstream
up, err := rmqc.GetFederationUpstream("/", "name")
// => *FederationUpstream, error

// declare or update a federation upstream
resp, err := rmqc.PutFederationUpstream("/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error

// delete an upstream
resp, err := rmqc.DeleteFederationUpstream("/", "name")
// => *http.Response, error

```

### Operations on cluster name
``` go
// Get cluster name
Expand Down
4 changes: 4 additions & 0 deletions bin/ci/before_build.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ call %RABBITHOLE_RABBITMQCTL% set_permissions -p "rabbit/hole" guest ".*" ".*" "
REM Enable shovel plugin
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel_management

REM Enable shovel plugin
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation_management
4 changes: 4 additions & 0 deletions bin/ci/before_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ $CTL set_cluster_name rabbitmq@localhost
# Enable shovel plugin
$PLUGINS enable rabbitmq_shovel
$PLUGINS enable rabbitmq_shovel_management

# Enable federation plugin
$PLUGINS enable rabbitmq_federation
$PLUGINS enable rabbitmq_federation_management
52 changes: 52 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,58 @@ Managing Topic Permissions
resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange")
// => *http.Response, err
Managing Runtime Parameters
// list all runtime parameters
params, err := rmqc.ListRuntimeParameters()
// => []RuntimeParameter, error
// list all runtime parameters for a component
params, err := rmqc.ListRuntimeParametersFor("federation-upstream")
// => []RuntimeParameter, error
// list runtime parameters in a vhost
params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/")
// => []RuntimeParameter, error
// information about a runtime parameter
p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name")
// => *RuntimeParameter, error
// declare or update a runtime parameter
resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error
// remove a runtime parameter
resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name")
// => *http.Response, error
Managing Federation Upstreams
// list all federation upstreams
ups, err := rmqc.ListFederationUpstreams()
// => []FederationUpstream, error
// list federation upstreams in a vhost
ups, err := rmqc.ListFederationUpstreamsIn("/")
// => []FederationUpstream, error
// information about a federated upstream
up, err := rmqc.GetFederationUpstream("/", "upstream-name")
// => *FederationUpstream, error
// declare or update a federation upstream
resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error
// delete an upstream
resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name")
// => *http.Response, error
Operations on cluster name
// Get cluster name
cn, err := rmqc.GetClusterName()
Expand Down
121 changes: 101 additions & 20 deletions federation.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package rabbithole

import (
"encoding/json"
"net/http"
"net/url"
)

// Federation definition: additional arguments
Expand All @@ -24,49 +22,132 @@ type FederationDefinition struct {

// Represents a configured Federation upstream.
type FederationUpstream struct {
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
Name string `json:"name"`
Vhost string `json:"vhost"`
Component string `json:"component"`
Definition FederationDefinition `json:"value"`
}

const FederationUpstreamComponent string = "federation-upstream"

//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
// GET /api/parameters/federation-upstream
//

// Updates a federation upstream
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
fedUp := FederationUpstream{
Definition: fDef,
}
body, err := json.Marshal(fedUp)
// ListFederationUpstreams returns a list of all federation upstreams.
func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent)
if err != nil {
return nil, err
}

req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), body)
for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
}
return ups, nil
}

//
// GET /api/parameters/federation-upstream/{vhost}
//

// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost.
func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost)
if err != nil {
return nil, err
}

if res, err = executeRequest(c, req); err != nil {
for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
}
return ups, nil
}

//
// GET /api/parameters/federation-upstream/{vhost}/{upstream}
//

// GetFederationUpstream returns information about a federation upstream.
func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) {
p, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name)
if err != nil {
return nil, err
}
return paramToUpstream(p), nil
}

return res, nil
//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
//

// PutFederationUpstream creates or updates a federation upstream configuration.
func (c *Client) PutFederationUpstream(vhost, name string, def FederationDefinition) (res *http.Response, err error) {
return c.PutRuntimeParameter(FederationUpstreamComponent, vhost, name, def)
}

//
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
//

// Deletes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), nil)
if err != nil {
return nil, err
// DeleteFederationUpstream removes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) {
return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name)
}

// paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure.
func paramToUpstream(p *RuntimeParameter) (up *FederationUpstream) {
up = &FederationUpstream{
Name: p.Name,
Vhost: p.Vhost,
Component: p.Component,
}

if res, err = executeRequest(c, req); err != nil {
return nil, err
def := FederationDefinition{}
m := p.Value.(map[string]interface{})

if v, ok := m["uri"].(string); ok {
def.Uri = v
}

if v, ok := m["expires"].(float64); ok {
def.Expires = int(v)
}

if v, ok := m["message-ttl"].(float64); ok {
def.MessageTTL = int32(v)
}

if v, ok := m["max-hops"].(float64); ok {
def.MaxHops = int(v)
}

if v, ok := m["prefetch-count"].(float64); ok {
def.PrefetchCount = int(v)
}

if v, ok := m["reconnect-delay"].(float64); ok {
def.ReconnectDelay = int(v)
}

if v, ok := m["ack-mode"].(string); ok {
def.AckMode = v
}

if v, ok := m["trust-user-id"].(bool); ok {
def.TrustUserId = v
}

if v, ok := m["exchange"].(string); ok {
def.Exchange = v
}

if v, ok := m["queue"].(string); ok {
def.Queue = v
}

return res, nil
up.Definition = def
return up
}
Loading