Skip to content

Commit

Permalink
connect: expose an API endpoint to compile the discovery chain
Browse files Browse the repository at this point in the history
  • Loading branch information
rboyer committed Aug 1, 2019
1 parent f66c962 commit 247756e
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 111 deletions.
2 changes: 1 addition & 1 deletion agent/cache-types/discovery_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Reques

// Fetch
var reply structs.DiscoveryChainResponse
if err := c.RPC.RPC("ConfigEntry.ReadDiscoveryChain", reqReal, &reply); err != nil {
if err := c.RPC.RPC("DiscoveryChain.Get", reqReal, &reply); err != nil {
return result, err
}

Expand Down
4 changes: 2 additions & 2 deletions agent/cache-types/discovery_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ func TestCompiledDiscoveryChain(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.DiscoveryChainResponse
rpc.On("RPC", "ConfigEntry.ReadDiscoveryChain", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DiscoveryChainRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)

reply := args.Get(2).(*structs.DiscoveryChainResponse)
reply.ConfigEntries = entries
reply.Chain = chain
reply.Entries = entries.Flatten()
reply.QueryMeta.Index = 48
resp = reply
})
Expand Down
62 changes: 0 additions & 62 deletions agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -313,64 +312,3 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return nil
})
}

func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
if done, err := c.srv.forward("ConfigEntry.ReadDiscoveryChain", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "read_discovery_chain"}, time.Now())

// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}

if args.Name == "" {
return fmt.Errorf("Must provide service name")
}

evalDC := args.EvaluateInDatacenter
if evalDC == "" {
evalDC = c.srv.config.Datacenter
}

evalNS := args.EvaluateInNamespace
if evalNS == "" {
// TODO(namespaces) pull from something else?
evalNS = "default"
}

return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
if err != nil {
return err
}

// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name,
CurrentNamespace: evalNS,
CurrentDatacenter: evalDC,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries,
})
if err != nil {
return err
}

reply.Index = index
reply.ConfigEntries = entries
reply.Chain = chain

return nil
})
}
78 changes: 78 additions & 0 deletions agent/consul/discovery_chain_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package consul

import (
"fmt"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)

type DiscoveryChain struct {
srv *Server
}

func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
if done, err := c.srv.forward("DiscoveryChain.Get", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"discoverychain", "get"}, time.Now())

// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceRead(args.Name) {
return acl.ErrPermissionDenied
}

if args.Name == "" {
return fmt.Errorf("Must provide service name")
}

evalDC := args.EvaluateInDatacenter
if evalDC == "" {
evalDC = c.srv.config.Datacenter
}

evalNS := args.EvaluateInNamespace
if evalNS == "" {
// TODO(namespaces) pull from something else?
evalNS = "default"
}

return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
if err != nil {
return err
}

// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name,
CurrentNamespace: evalNS,
CurrentDatacenter: evalDC,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries,
})
if err != nil {
return err
}

reply.Index = index
reply.Entries = entries.Flatten()
reply.Chain = chain

return nil
})
}
1 change: 1 addition & 0 deletions agent/consul/server_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ func init() {
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s} })
registerEndpoint(func(s *Server) interface{} { return &Internal{s} })
Expand Down
70 changes: 70 additions & 0 deletions agent/discovery_chain_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package agent

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/hashicorp/consul/agent/structs"
)

func (s *HTTPServer) ConnectDiscoveryChainGet(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.DiscoveryChainRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

args.Name = strings.TrimPrefix(req.URL.Path, "/v1/connect/discovery-chain/")
if args.Name == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing chain name")
return nil, nil
}

args.EvaluateInDatacenter = req.URL.Query().Get("eval_dc")
// TODO(namespaces): args.EvaluateInNamespace = req.URL.Query().Get("eval_namespace")

overrideMeshGatewayMode := req.URL.Query().Get("override_mesh_gateway_mode")
if overrideMeshGatewayMode != "" {
mode, err := structs.ValidateMeshGatewayMode(overrideMeshGatewayMode)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Invalid override_mesh_gateway_mode parameter")
return nil, nil
}
args.OverrideMeshGateway.Mode = mode
}

args.OverrideProtocol = req.URL.Query().Get("override_protocol")
overrideTimeoutString := req.URL.Query().Get("override_connect_timeout")
if overrideTimeoutString != "" {
d, err := time.ParseDuration(overrideTimeoutString)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Invalid override_connect_timeout parameter")
return nil, nil
}
args.OverrideConnectTimeout = d
}

// Make the RPC request
var out structs.DiscoveryChainResponse
defer setMeta(resp, &out.QueryMeta)

if err := s.agent.RPC("DiscoveryChain.Get", &args, &out); err != nil {
return nil, err
}

apiOut := apiDiscoveryChainResponse{
Chain: out.Chain,
Entries: out.Entries,
}

return apiOut, nil
}

type apiDiscoveryChainResponse struct {
Chain *structs.CompiledDiscoveryChain
Entries []structs.ConfigEntry
}
2 changes: 1 addition & 1 deletion agent/http_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func init() {
registerEndpoint("/v1/connect/intentions/match", []string{"GET"}, (*HTTPServer).IntentionMatch)
registerEndpoint("/v1/connect/intentions/check", []string{"GET"}, (*HTTPServer).IntentionCheck)
registerEndpoint("/v1/connect/intentions/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).IntentionSpecific)
registerEndpoint("/v1/connect/discovery-chain/", []string{"GET"}, (*HTTPServer).ConnectDiscoveryChainGet)
registerEndpoint("/v1/coordinate/datacenters", []string{"GET"}, (*HTTPServer).CoordinateDatacenters)
registerEndpoint("/v1/coordinate/nodes", []string{"GET"}, (*HTTPServer).CoordinateNodes)
registerEndpoint("/v1/coordinate/node/", []string{"GET"}, (*HTTPServer).CoordinateNode)
Expand All @@ -88,7 +89,6 @@ func init() {
registerEndpoint("/v1/health/state/", []string{"GET"}, (*HTTPServer).HealthChecksInState)
registerEndpoint("/v1/health/service/", []string{"GET"}, (*HTTPServer).HealthServiceNodes)
registerEndpoint("/v1/health/connect/", []string{"GET"}, (*HTTPServer).HealthConnectServiceNodes)
registerEndpoint("/v1/internal/discovery-chain/", []string{"GET"}, (*HTTPServer).InternalDiscoveryChain)
registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPServer).UINodes)
registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPServer).UINodeInfo)
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPServer).UIServices)
Expand Down
40 changes: 0 additions & 40 deletions agent/internal_endpoint.go

This file was deleted.

31 changes: 28 additions & 3 deletions agent/structs/config_entry_discoverychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,32 @@ func NewDiscoveryChainConfigEntries() *DiscoveryChainConfigEntries {
}
}

func (e *DiscoveryChainConfigEntries) Flatten() []ConfigEntry {
n := len(e.Routers) + len(e.Splitters) + len(e.Resolvers) + len(e.Services)
if e.GlobalProxy != nil {
n++
}
out := make([]ConfigEntry, 0, n)

for _, v := range e.Routers {
out = append(out, v)
}
for _, v := range e.Splitters {
out = append(out, v)
}
for _, v := range e.Resolvers {
out = append(out, v)
}
for _, v := range e.Services {
out = append(out, v)
}
if e.GlobalProxy != nil {
out = append(out, e.GlobalProxy)
}

return out
}

func (e *DiscoveryChainConfigEntries) GetRouter(name string) *ServiceRouterConfigEntry {
if e.Routers != nil {
return e.Routers[name]
Expand Down Expand Up @@ -1051,10 +1077,9 @@ func (r *DiscoveryChainRequest) CacheInfo() cache.RequestInfo {
return info
}

// TODO(rb): either fix the compiled results, or take the derived data and stash it here in a json/msgpack-friendly way?
type DiscoveryChainResponse struct {
ConfigEntries *DiscoveryChainConfigEntries `json:",omitempty"` // TODO(rb): remove these?
Chain *CompiledDiscoveryChain `json:",omitempty"`
Chain *CompiledDiscoveryChain
Entries []ConfigEntry
QueryMeta
}

Expand Down
4 changes: 2 additions & 2 deletions agent/structs/discovery_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ type CompiledDiscoveryChain struct {
// If set, this value should be used to prefix/suffix any generated load
// balancer data plane objects to avoid sharing customized and
// non-customized versions.
CustomizationHash string
CustomizationHash string `json:",omitempty"`

// Protocol is the overall protocol shared by everything in the chain.
Protocol string
Protocol string `json:",omitempty"`

// StartNode is the first key into the Nodes map that should be followed
// when walking the discovery chain.
Expand Down

0 comments on commit 247756e

Please sign in to comment.