Skip to content

Commit afaa8f6

Browse files
authored
Merge pull request #1869 from hashicorp/NET-6966-peer-listing
[NET-6966] consul-template support for listing peerings
2 parents 8fdab02 + 970b431 commit afaa8f6

9 files changed

+441
-0
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# Unreleased Changes
2+
3+
NEW FEATURES:
4+
* Add support for listing Consul peers [NET-6966](https://hashicorp.atlassian.net/browse/NET-6966)
5+
16
## v0.36.0 (January 3, 2024)
27

38
IMPROVEMENTS:

dependency/consul_peering.go

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package dependency
5+
6+
import (
7+
"context"
8+
"encoding/gob"
9+
"fmt"
10+
"github.com/hashicorp/consul/api"
11+
"github.com/pkg/errors"
12+
"log"
13+
"net/url"
14+
"regexp"
15+
"sort"
16+
"time"
17+
)
18+
19+
var (
20+
// Ensure implements
21+
_ Dependency = (*ListPeeringQuery)(nil)
22+
23+
// ListPeeringQueryRe is the regular expression to use.
24+
ListPeeringQueryRe = regexp.MustCompile(`\A` + queryRe + `\z`)
25+
)
26+
27+
func init() {
28+
gob.Register([]*Peering{})
29+
gob.Register([]*PeeringStreamStatus{})
30+
gob.Register([]*PeeringRemoteInfo{})
31+
}
32+
33+
// ListPeeringQuery fetches all peering for a Consul cluster.
34+
// https://developer.hashicorp.com/consul/api-docs/peering#list-all-peerings
35+
type ListPeeringQuery struct {
36+
stopCh chan struct{}
37+
38+
partition string
39+
}
40+
41+
// Peering represent the response of the Consul peering API.
42+
type Peering struct {
43+
ID string
44+
Name string
45+
Partition string
46+
Meta map[string]string
47+
PeeringState string
48+
PeerID string
49+
PeerServerName string
50+
PeerServerAddresses []string
51+
StreamStatus PeeringStreamStatus
52+
Remote PeeringRemoteInfo
53+
}
54+
55+
type PeeringStreamStatus struct {
56+
ImportedServices []string
57+
ExportedServices []string
58+
LastHeartbeat *time.Time
59+
LastReceive *time.Time
60+
LastSend *time.Time
61+
}
62+
63+
type PeeringRemoteInfo struct {
64+
Partition string
65+
Datacenter string
66+
}
67+
68+
func NewListPeeringQuery(s string) (*ListPeeringQuery, error) {
69+
if s != "" && !ListPeeringQueryRe.MatchString(s) {
70+
return nil, fmt.Errorf("list.peering: invalid format: %q", s)
71+
}
72+
73+
m := regexpMatch(ListPeeringQueryRe, s)
74+
75+
queryParams, err := GetConsulQueryOpts(m, "list.peering")
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
return &ListPeeringQuery{
81+
stopCh: make(chan struct{}, 1),
82+
partition: queryParams.Get(QueryPartition),
83+
}, nil
84+
}
85+
86+
func (l *ListPeeringQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
87+
select {
88+
case <-l.stopCh:
89+
return nil, nil, ErrStopped
90+
default:
91+
}
92+
93+
opts = opts.Merge(&QueryOptions{
94+
ConsulPartition: l.partition,
95+
})
96+
97+
log.Printf("[TRACE] %s: GET %s", l, &url.URL{
98+
Path: "/v1/peerings",
99+
RawQuery: opts.String(),
100+
})
101+
102+
// list peering is a blocking API, so making sure the ctx passed while calling it
103+
// times out after the default wait time.
104+
ctx, cancel := context.WithTimeout(context.Background(), DefaultContextTimeout)
105+
defer cancel()
106+
107+
p, meta, err := clients.Consul().Peerings().List(ctx, opts.ToConsulOpts())
108+
if err != nil {
109+
return nil, nil, errors.Wrap(err, l.String())
110+
}
111+
112+
log.Printf("[TRACE] %s: returned %d results", l, len(p))
113+
114+
peers := make([]*Peering, 0, len(p))
115+
for _, peering := range p {
116+
peers = append(peers, toPeering(peering))
117+
}
118+
119+
// sort so that the result is deterministic
120+
sort.Stable(ByPeer(peers))
121+
122+
rm := &ResponseMetadata{
123+
LastIndex: meta.LastIndex,
124+
LastContact: meta.LastContact,
125+
}
126+
127+
return peers, rm, nil
128+
}
129+
130+
func toPeering(p *api.Peering) *Peering {
131+
return &Peering{
132+
ID: p.ID,
133+
Name: p.Name,
134+
Partition: p.Partition,
135+
Meta: p.Meta,
136+
PeeringState: string(p.State),
137+
PeerID: p.PeerID,
138+
PeerServerName: p.PeerServerName,
139+
PeerServerAddresses: p.PeerServerAddresses,
140+
StreamStatus: PeeringStreamStatus{
141+
ImportedServices: p.StreamStatus.ImportedServices,
142+
ExportedServices: p.StreamStatus.ExportedServices,
143+
LastHeartbeat: p.StreamStatus.LastHeartbeat,
144+
LastReceive: p.StreamStatus.LastReceive,
145+
LastSend: p.StreamStatus.LastSend,
146+
},
147+
Remote: PeeringRemoteInfo{
148+
Partition: p.Remote.Partition,
149+
Datacenter: p.Remote.Datacenter,
150+
},
151+
}
152+
}
153+
154+
func (l *ListPeeringQuery) String() string {
155+
partitionStr := l.partition
156+
157+
if len(partitionStr) > 0 {
158+
partitionStr = fmt.Sprintf("?partition=%s", partitionStr)
159+
} else {
160+
return "list.peerings"
161+
}
162+
163+
return fmt.Sprintf("list.peerings%s", partitionStr)
164+
}
165+
166+
func (l *ListPeeringQuery) Stop() {
167+
close(l.stopCh)
168+
}
169+
170+
func (l *ListPeeringQuery) Type() Type {
171+
return TypeConsul
172+
}
173+
174+
func (l *ListPeeringQuery) CanShare() bool {
175+
return false
176+
}
177+
178+
// ByPeer is a sortable list of peerings in this order:
179+
// 1. State
180+
// 2. Partition
181+
// 3. Name
182+
type ByPeer []*Peering
183+
184+
func (p ByPeer) Len() int { return len(p) }
185+
func (p ByPeer) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
186+
187+
// Less if peer names are cluster-2, cluster-12, cluster-1
188+
// our sorting will be cluster-1, cluster-12, cluster-2
189+
func (p ByPeer) Less(i, j int) bool {
190+
if p[i].PeeringState == p[j].PeeringState {
191+
if p[i].Partition == p[j].Partition {
192+
return p[i].Name < p[j].Name
193+
}
194+
return p[i].Partition < p[j].Partition
195+
}
196+
return p[i].PeeringState < p[j].PeeringState
197+
}

dependency/consul_peering_test.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package dependency
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestListPeeringsQuery(t *testing.T) {
11+
cases := []struct {
12+
name string
13+
i string
14+
exp *ListPeeringQuery
15+
err bool
16+
}{
17+
{
18+
"empty",
19+
"",
20+
&ListPeeringQuery{},
21+
false,
22+
},
23+
{
24+
"invalid query param (unsupported key)",
25+
"?unsupported=foo",
26+
nil,
27+
true,
28+
},
29+
{
30+
"peerings",
31+
"peerings",
32+
nil,
33+
true,
34+
},
35+
{
36+
"partition",
37+
"?partition=foo",
38+
&ListPeeringQuery{
39+
partition: "foo",
40+
},
41+
false,
42+
},
43+
}
44+
45+
for i, tc := range cases {
46+
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
47+
act, err := NewListPeeringQuery(tc.i)
48+
if (err != nil) != tc.err {
49+
t.Fatal(err)
50+
}
51+
52+
if act != nil {
53+
act.stopCh = nil
54+
}
55+
56+
assert.Equal(t, tc.exp, act)
57+
})
58+
}
59+
}
60+
61+
func TestListPeeringsQuery_Fetch(t *testing.T) {
62+
cases := []struct {
63+
name string
64+
i string
65+
exp []string
66+
}{
67+
{
68+
"all",
69+
"",
70+
// the peering generated has random IDs,
71+
// we can't assert on the full response,
72+
// we can assert on the peering names though.
73+
[]string{
74+
"bar",
75+
"foo",
76+
},
77+
},
78+
}
79+
80+
for i, tc := range cases {
81+
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
82+
p, err := NewListPeeringQuery(tc.i)
83+
if err != nil {
84+
t.Fatal(err)
85+
}
86+
87+
res, _, err := p.Fetch(testClients, nil)
88+
if err != nil {
89+
t.Fatal(err)
90+
}
91+
92+
if res == nil {
93+
t.Fatalf("expected non-nil result")
94+
}
95+
96+
peerNames := make([]string, 0)
97+
for _, peering := range res.([]*Peering) {
98+
peerNames = append(peerNames, peering.Name)
99+
}
100+
101+
assert.Equal(t, tc.exp, peerNames)
102+
})
103+
}
104+
}
105+
106+
func TestListPeeringsQuery_String(t *testing.T) {
107+
cases := []struct {
108+
name string
109+
i string
110+
exp string
111+
}{
112+
{
113+
"empty",
114+
"",
115+
"list.peerings",
116+
},
117+
{
118+
"partition",
119+
"?partition=foo",
120+
"list.peerings?partition=foo",
121+
},
122+
}
123+
124+
for i, tc := range cases {
125+
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
126+
d, err := NewListPeeringQuery(tc.i)
127+
if err != nil {
128+
t.Fatal(err)
129+
}
130+
str := d.String()
131+
assert.Equal(t, tc.exp, str)
132+
})
133+
}
134+
}

dependency/dependency.go

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ const (
4242
TypeNomad
4343
)
4444

45+
const (
46+
// DefaultContextTimeout context wait timeout for blocking queries.
47+
DefaultContextTimeout = 60 * time.Second
48+
)
49+
4550
// Dependency is an interface for a dependency that Consul Template is capable
4651
// of watching.
4752
type Dependency interface {

dependency/dependency_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package dependency
55

66
import (
7+
"context"
78
"encoding/json"
89
"fmt"
910
"io"
@@ -124,6 +125,11 @@ func TestMain(m *testing.M) {
124125
Fatalf("%v", err)
125126
}
126127

128+
err := createConsulPeerings(clients)
129+
if err != nil {
130+
Fatalf("%v", err)
131+
}
132+
127133
// Wait for Nomad initialization to finish
128134
if err := <-nomadFuture; err != nil {
129135
testConsul.Stop()
@@ -158,6 +164,22 @@ func TestMain(m *testing.M) {
158164
os.Exit(exit)
159165
}
160166

167+
func createConsulPeerings(clients *ClientSet) error {
168+
generateReq := api.PeeringGenerateTokenRequest{PeerName: "foo"}
169+
_, _, err := clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
170+
if err != nil {
171+
return err
172+
}
173+
174+
generateReq = api.PeeringGenerateTokenRequest{PeerName: "bar"}
175+
_, _, err = clients.consul.client.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
176+
if err != nil {
177+
return err
178+
}
179+
180+
return nil
181+
}
182+
161183
func runTestConsul(tb testutil.TestingTB) {
162184
consul, err := testutil.NewTestServerConfigT(tb,
163185
func(c *testutil.TestServerConfig) {

0 commit comments

Comments
 (0)