From 4e3303d9e53f0213a8eeb7c34f5124b54c99165d Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 10 Oct 2023 12:16:41 -0500 Subject: [PATCH 1/5] server: when the v2 catalog experiment is enabled reject api and rpc requests that are for the v1 catalog --- agent/ae/ae.go | 27 ++++++++++++--- agent/agent.go | 6 ++-- agent/catalog_endpoint_test.go | 50 ++++++++++++++++++++++++++-- agent/consul/server.go | 42 +++++++++++++++++++---- agent/health_endpoint_test.go | 19 +++++++++++ agent/http.go | 50 +++++++++++++++++++++++++++- agent/rpc/middleware/interceptors.go | 22 +++++++++++- agent/structs/errors.go | 6 ++++ agent/testagent.go | 16 +++++++++ agent/ui_endpoint_test.go | 22 ++++++++++++ 10 files changed, 243 insertions(+), 17 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 65b38e00e4b4..f8b9a331d100 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -81,8 +81,9 @@ type StateSyncer struct { SyncChanges *Trigger // paused stores whether sync runs are temporarily disabled. - pauseLock sync.Mutex - paused int + pauseLock sync.Mutex + paused int + hardDisabled bool // serverUpInterval is the max time after which a full sync is // performed when a server has been added to the cluster. @@ -151,9 +152,20 @@ const ( retryFullSyncState fsmState = "retryFullSync" ) +// HardDisableSync is like PauseSync but is one-way. It causes other +// Pause/Resume/Start operations to be completely ignored. +func (s *StateSyncer) HardDisableSync() { + s.pauseLock.Lock() + s.hardDisabled = true + s.pauseLock.Unlock() +} + // Run is the long running method to perform state synchronization // between local and remote servers. func (s *StateSyncer) Run() { + if s.Disabled() { + return + } if s.ClusterSize == nil { panic("ClusterSize not set") } @@ -329,7 +341,14 @@ func (s *StateSyncer) Pause() { func (s *StateSyncer) Paused() bool { s.pauseLock.Lock() defer s.pauseLock.Unlock() - return s.paused != 0 + return s.paused != 0 || s.hardDisabled +} + +// Disabled returns whether sync runs are permanently disabled. +func (s *StateSyncer) Disabled() bool { + s.pauseLock.Lock() + defer s.pauseLock.Unlock() + return s.hardDisabled } // Resume re-enables sync runs. It returns true if it was the last pause/resume @@ -340,7 +359,7 @@ func (s *StateSyncer) Resume() bool { if s.paused < 0 { panic("unbalanced pause/resume") } - trigger := s.paused == 0 + trigger := s.paused == 0 && !s.hardDisabled s.pauseLock.Unlock() if trigger { s.SyncChanges.Trigger() diff --git a/agent/agent.go b/agent/agent.go index 8354320ad670..192a120ed5b2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -22,8 +22,6 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/lib/stringslice" - "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-connlimit" @@ -73,6 +71,7 @@ import ( "github.com/hashicorp/consul/lib/file" "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/routine" + "github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pboperator" @@ -623,6 +622,9 @@ func (a *Agent) Start(ctx context.Context) error { // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger) + if a.useV2Resources() { + a.sync.HardDisableSync() + } err = validateFIPSConfig(a.config) if err != nil { diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 1b92e29a84d2..38210f94d3bf 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -6,24 +6,68 @@ package agent import ( "context" "fmt" + "io" "net/http" "net/http/httptest" "net/url" + "strings" "testing" "time" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" ) +func TestCatalogEndpointsFailInV2(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, `experiments = ["resource-apis"]`) + + checkRequest := func(method, url string) { + t.Run(method+" "+url, func(t *testing.T) { + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + }) + } + + checkRequest("PUT", "/v1/catalog/register") + checkRequest("GET", "/v1/catalog/connect/") + checkRequest("PUT", "/v1/catalog/deregister") + checkRequest("GET", "/v1/catalog/datacenters") + checkRequest("GET", "/v1/catalog/nodes") + checkRequest("GET", "/v1/catalog/services") + checkRequest("GET", "/v1/catalog/service/") + checkRequest("GET", "/v1/catalog/node/") + checkRequest("GET", "/v1/catalog/node-services/") + checkRequest("GET", "/v1/catalog/gateway-services/") +} + +func assertV1CatalogEndpointDoesNotWorkWithV2(t *testing.T, a *TestAgent, method, url string) { + var body io.Reader + switch method { + case http.MethodPost, http.MethodPut: + body = strings.NewReader("{}\n") + } + + req, err := http.NewRequest(method, url, body) + require.NoError(t, err) + + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusBadRequest, resp.Code) + + got, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Contains(t, string(got), structs.ErrUsingV2CatalogExperiment.Error()) +} + func TestCatalogRegister_PeeringRegistration(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/server.go b/agent/consul/server.go index 697cca43f389..57fae517d3fc 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -19,12 +19,7 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/internal/auth" - "github.com/hashicorp/consul/internal/mesh" - "github.com/hashicorp/consul/internal/resource" - "github.com/armon/go-metrics" - "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -41,6 +36,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/reflection" + "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/blockingquery" @@ -74,9 +70,12 @@ import ( "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/internal/auth" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh" proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" + "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/demo" "github.com/hashicorp/consul/internal/resource/reaper" raftstorage "github.com/hashicorp/consul/internal/storage/raft" @@ -466,6 +465,8 @@ type Server struct { reportingManager *reporting.ReportingManager registry resource.Registry + + useV2Resources bool } func (s *Server) DecrementBlockingQueries() uint64 { @@ -588,8 +589,35 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder") } + isV1CatalogRPC := func(name string) bool { + if strings.HasPrefix(name, "Catalog.") || strings.HasPrefix(name, "Health.") { + return true + } + + switch name { + case "Internal.EventFire", "Internal.KeyringOperation", "Internal.OIDCAuthMethods": + return false + default: + if strings.HasPrefix(name, "Internal.") { + return true + } + } + + return false + } + rpcServerOpts := []func(*rpc.Server){ - rpc.WithPreBodyInterceptor(middleware.GetNetRPCRateLimitingInterceptor(s.incomingRPCLimiter, middleware.NewPanicHandler(s.logger))), + rpc.WithPreBodyInterceptor( + middleware.ChainedRPCPreBodyInterceptor( + func(reqServiceMethod string, sourceAddr net.Addr) error { + if s.useV2Resources && isV1CatalogRPC(reqServiceMethod) { + return structs.ErrUsingV2CatalogExperiment + } + return nil + }, + middleware.GetNetRPCRateLimitingInterceptor(s.incomingRPCLimiter, middleware.NewPanicHandler(s.logger)), + ), + ), } if flat.GetNetRPCInterceptorFunc != nil { @@ -900,6 +928,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error { if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { + s.useV2Resources = true + catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow() diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index aedef6043d8b..e8800eec75f0 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -28,6 +28,25 @@ import ( "github.com/hashicorp/consul/types" ) +func TestHealthEndpointsFailInV2(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, `experiments = ["resource-apis"]`) + + checkRequest := func(method, url string) { + t.Run(method+" "+url, func(t *testing.T) { + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + }) + } + + checkRequest("GET", "/v1/health/node/web") + checkRequest("GET", "/v1/health/checks/web") + checkRequest("GET", "/v1/health/state/web") + checkRequest("GET", "/v1/health/service/web") + checkRequest("GET", "/v1/health/connect/web") + checkRequest("GET", "/v1/health/ingress/web") +} + func TestHealthChecksInState(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/http.go b/agent/http.go index 982e784c76b7..aba8e6f6e2b5 100644 --- a/agent/http.go +++ b/agent/http.go @@ -394,6 +394,23 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc } logURL = aclEndpointRE.ReplaceAllString(logURL, "$1$4") + rejectCatalogV1Endpoint := false + if s.agent.useV2Resources() { + switch { + case strings.HasPrefix(logURL, "/v1/catalog/"), + strings.HasPrefix(logURL, "/v1/health/"): + rejectCatalogV1Endpoint = true + case strings.HasPrefix(logURL, "/v1/internal/acl/authorize"), + strings.HasPrefix(logURL, "/v1/internal/service-virtual-ip"), + strings.HasPrefix(logURL, "/v1/internal/ui/oidc-auth-methods"), + strings.HasPrefix(logURL, "/v1/internal/ui/metrics-proxy/"): + rejectCatalogV1Endpoint = false + + case strings.HasPrefix(logURL, "/v1/internal/"): + rejectCatalogV1Endpoint = true + } + } + if s.denylist.Block(req.URL.Path) { errMsg := "Endpoint is blocked by agent configuration" httpLogger.Error("Request error", @@ -455,6 +472,14 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc return strings.Contains(err.Error(), rate.ErrRetryLater.Error()) } + isUsingV2CatalogExperiment := func(err error) bool { + if err == nil { + return false + } + + return structs.IsErrUsingV2CatalogExperiment(err) + } + isMethodNotAllowed := func(err error) bool { _, ok := err.(MethodNotAllowedError) return ok @@ -490,6 +515,10 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc msg = s.Message() } + if isUsingV2CatalogExperiment(err) && !isHTTPError(err) { + err = newRejectV1RequestWhenV2EnabledError() + } + switch { case isForbidden(err): resp.WriteHeader(http.StatusForbidden) @@ -566,7 +595,12 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc if err == nil { // Invoke the handler - obj, err = handler(resp, req) + if rejectCatalogV1Endpoint { + obj = nil + err = s.rejectV1RequestWhenV2Enabled() + } else { + obj, err = handler(resp, req) + } } } contentType := "application/json" @@ -1084,6 +1118,20 @@ func (s *HTTPHandlers) parseToken(req *http.Request, token *string) { s.parseTokenWithDefault(req, token) } +func (s *HTTPHandlers) rejectV1RequestWhenV2Enabled() error { + if s.agent.useV2Resources() { + return newRejectV1RequestWhenV2EnabledError() + } + return nil +} + +func newRejectV1RequestWhenV2EnabledError() error { + return HTTPError{ + StatusCode: http.StatusBadRequest, + Reason: structs.ErrUsingV2CatalogExperiment.Error(), + } +} + func sourceAddrFromRequest(req *http.Request) string { xff := req.Header.Get("X-Forwarded-For") forwardHosts := strings.Split(xff, ",") diff --git a/agent/rpc/middleware/interceptors.go b/agent/rpc/middleware/interceptors.go index 1e4a4e591fb2..e783254a980d 100644 --- a/agent/rpc/middleware/interceptors.go +++ b/agent/rpc/middleware/interceptors.go @@ -12,9 +12,10 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul-net-rpc/net/rpc" rpcRate "github.com/hashicorp/consul/agent/consul/rate" - "github.com/hashicorp/go-hclog" ) // RPCTypeInternal identifies the "RPC" request as coming from some internal @@ -25,9 +26,11 @@ import ( // Really what we are measuring here is a "cluster operation". The term we have // used for this historically is "RPC", so we continue to use that here. const RPCTypeInternal = "internal" + const RPCTypeNetRPC = "net/rpc" var metricRPCRequest = []string{"rpc", "server", "call"} + var requestLogName = strings.Join(metricRPCRequest, "_") var OneTwelveRPCSummary = []prometheus.SummaryDefinition{ @@ -186,3 +189,20 @@ func GetNetRPCRateLimitingInterceptor(requestLimitsHandler rpcRate.RequestLimits return requestLimitsHandler.Allow(op) } } + +func ChainedRPCPreBodyInterceptor(chain ...rpc.PreBodyInterceptor) rpc.PreBodyInterceptor { + if len(chain) == 0 { + panic("don't call this with zero interceptors") + } + if len(chain) == 1 { + return chain[0] + } + return func(reqServiceMethod string, sourceAddr net.Addr) error { + for _, interceptor := range chain { + if err := interceptor(reqServiceMethod, sourceAddr); err != nil { + return err + } + } + return nil + } +} diff --git a/agent/structs/errors.go b/agent/structs/errors.go index a7eceed2cda2..31a818bd62f2 100644 --- a/agent/structs/errors.go +++ b/agent/structs/errors.go @@ -23,6 +23,7 @@ const ( errRateLimited = "Rate limit reached, try again later" // Note: we depend on this error message in the gRPC ConnectCA.Sign endpoint (see: isRateLimitError). errNotPrimaryDatacenter = "not the primary datacenter" errStateReadOnly = "CA Provider State is read-only" + errUsingV2CatalogExperiment = "V1 catalog is disabled when V2 is enabled" ) var ( @@ -39,6 +40,7 @@ var ( ErrRateLimited = errors.New(errRateLimited) // Note: we depend on this error message in the gRPC ConnectCA.Sign endpoint (see: isRateLimitError). ErrNotPrimaryDatacenter = errors.New(errNotPrimaryDatacenter) ErrStateReadOnly = errors.New(errStateReadOnly) + ErrUsingV2CatalogExperiment = errors.New(errUsingV2CatalogExperiment) ) func IsErrNoDCPath(err error) bool { @@ -60,3 +62,7 @@ func IsErrRPCRateExceeded(err error) bool { func IsErrServiceNotFound(err error) bool { return err != nil && strings.Contains(err.Error(), errServiceNotFound) } + +func IsErrUsingV2CatalogExperiment(err error) bool { + return err != nil && strings.Contains(err.Error(), errUsingV2CatalogExperiment) +} diff --git a/agent/testagent.go b/agent/testagent.go index 84751ba0f767..a025f5077ecd 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -286,6 +286,22 @@ func (a *TestAgent) waitForUp() error { continue // fail, try again } if a.Config.Bootstrap && a.Config.ServerMode { + if a.useV2Resources() { + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var leader string + if err := a.RPC(context.Background(), "Status.Leader", args, &leader); err != nil { + retErr = fmt.Errorf("Status.Leader failed: %v", err) + continue // fail, try again + } + if leader == "" { + retErr = fmt.Errorf("No leader") + continue // fail, try again + } + return nil // success + } + // Ensure we have a leader and a node registration. args := &structs.DCSpecificRequest{ Datacenter: a.Config.Datacenter, diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index 1e0c391d0c95..b9c957464d93 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -32,6 +32,28 @@ import ( "github.com/hashicorp/consul/types" ) +func TestUIEndpointsFailInV2(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, `experiments = ["resource-apis"]`) + + checkRequest := func(method, url string) { + t.Run(method+" "+url, func(t *testing.T) { + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + }) + } + + checkRequest("GET", "/v1/internal/ui/nodes") + checkRequest("GET", "/v1/internal/ui/node/web") + checkRequest("GET", "/v1/internal/ui/services") + checkRequest("GET", "/v1/internal/ui/exported-services") + checkRequest("GET", "/v1/internal/ui/catalog-overview") + checkRequest("GET", "/v1/internal/ui/gateway-services-nodes/web") + checkRequest("GET", "/v1/internal/ui/gateway-intentions/web") + checkRequest("GET", "/v1/internal/ui/service-topology/web") + checkRequest("PUT", "/v1/internal/service-virtual-ip") +} + func TestUIIndex(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") From eb8064c452384e03352b77cf07246aad80467c51 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 10 Oct 2023 15:55:42 -0500 Subject: [PATCH 2/5] add agent --- agent/agent_endpoint_test.go | 28 ++++++++++++++++++++++++++++ agent/catalog_endpoint_test.go | 6 +++--- agent/config_endpoint_test.go | 17 +++++++++++++++++ agent/consul/server.go | 5 ++++- agent/health_endpoint_test.go | 2 +- agent/http.go | 30 ++++++++++++++++++++++++++---- agent/ui_endpoint_test.go | 2 +- 7 files changed, 80 insertions(+), 10 deletions(-) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index c56957558988..1a021d7b8e4c 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -79,6 +79,34 @@ func createACLTokenWithAgentReadPolicy(t *testing.T, srv *HTTPHandlers) string { return svcToken.SecretID } +func TestAgentEndpointsFailInV2(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, `experiments = ["resource-apis"]`) + + checkRequest := func(method, url string) { + t.Run(method+" "+url, func(t *testing.T) { + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, `{}`) + }) + } + + checkRequest("PUT", "/v1/agent/maintenance") + checkRequest("GET", "/v1/agent/services") + checkRequest("GET", "/v1/agent/service/web") + checkRequest("GET", "/v1/agent/checks") + checkRequest("GET", "/v1/agent/health/service/id/web") + checkRequest("GET", "/v1/agent/health/service/name/web") + checkRequest("PUT", "/v1/agent/check/register") + checkRequest("PUT", "/v1/agent/check/deregister/web") + checkRequest("PUT", "/v1/agent/check/pass/web") + checkRequest("PUT", "/v1/agent/check/warn/web") + checkRequest("PUT", "/v1/agent/check/fail/web") + checkRequest("PUT", "/v1/agent/check/update/web") + checkRequest("PUT", "/v1/agent/service/register") + checkRequest("PUT", "/v1/agent/service/deregister/web") + checkRequest("PUT", "/v1/agent/service/maintenance/web") +} + func TestAgent_Services(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 38210f94d3bf..19d520220427 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -32,7 +32,7 @@ func TestCatalogEndpointsFailInV2(t *testing.T) { checkRequest := func(method, url string) { t.Run(method+" "+url, func(t *testing.T) { - assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, "{}") }) } @@ -48,11 +48,11 @@ func TestCatalogEndpointsFailInV2(t *testing.T) { checkRequest("GET", "/v1/catalog/gateway-services/") } -func assertV1CatalogEndpointDoesNotWorkWithV2(t *testing.T, a *TestAgent, method, url string) { +func assertV1CatalogEndpointDoesNotWorkWithV2(t *testing.T, a *TestAgent, method, url string, requestBody string) { var body io.Reader switch method { case http.MethodPost, http.MethodPut: - body = strings.NewReader("{}\n") + body = strings.NewReader(requestBody + "\n") } req, err := http.NewRequest(method, url, body) diff --git a/agent/config_endpoint_test.go b/agent/config_endpoint_test.go index 141e1e8f4d0d..43a8ec206e9f 100644 --- a/agent/config_endpoint_test.go +++ b/agent/config_endpoint_test.go @@ -19,6 +19,23 @@ import ( "github.com/hashicorp/consul/testrpc" ) +func TestConfigEndpointsFailInV2(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, `experiments = ["resource-apis"]`) + + checkRequest := func(method, url string) { + t.Run(method+" "+url, func(t *testing.T) { + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, `{"kind":"service-defaults", "name":"web"}`) + }) + } + + checkRequest("GET", "/v1/config/service-defaults") + checkRequest("GET", "/v1/config/service-defaults/web") + checkRequest("DELETE", "/v1/config/service-defaults/web") + checkRequest("PUT", "/v1/config") +} + func TestConfig_Get(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/server.go b/agent/consul/server.go index 57fae517d3fc..5b4336c5d9d7 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -590,7 +590,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, } isV1CatalogRPC := func(name string) bool { - if strings.HasPrefix(name, "Catalog.") || strings.HasPrefix(name, "Health.") { + switch { + case strings.HasPrefix(name, "Catalog."), + strings.HasPrefix(name, "Health."), + strings.HasPrefix(name, "ConfigEntry."): return true } diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index e8800eec75f0..4c491e9cb276 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -35,7 +35,7 @@ func TestHealthEndpointsFailInV2(t *testing.T) { checkRequest := func(method, url string) { t.Run(method+" "+url, func(t *testing.T) { - assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, "{}") }) } diff --git a/agent/http.go b/agent/http.go index aba8e6f6e2b5..5453bf975b1b 100644 --- a/agent/http.go +++ b/agent/http.go @@ -398,11 +398,33 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc if s.agent.useV2Resources() { switch { case strings.HasPrefix(logURL, "/v1/catalog/"), - strings.HasPrefix(logURL, "/v1/health/"): + strings.HasPrefix(logURL, "/v1/health/"), + strings.HasPrefix(logURL, "/v1/config/"): rejectCatalogV1Endpoint = true - case strings.HasPrefix(logURL, "/v1/internal/acl/authorize"), - strings.HasPrefix(logURL, "/v1/internal/service-virtual-ip"), - strings.HasPrefix(logURL, "/v1/internal/ui/oidc-auth-methods"), + + case strings.HasPrefix(logURL, "/v1/agent/token/"), + logURL == "/v1/agent/self", + logURL == "/v1/agent/host", + logURL == "/v1/agent/version", + logURL == "/v1/agent/reload", + logURL == "/v1/agent/monitor", + logURL == "/v1/agent/metrics", + logURL == "/v1/agent/metrics/stream", + logURL == "/v1/agent/members", + strings.HasPrefix(logURL, "/v1/agent/join/"), + logURL == "/v1/agent/leave", + strings.HasPrefix(logURL, "/v1/agent/force-leave/"), + logURL == "/v1/agent/connect/authorize", + logURL == "/v1/agent/connect/ca/roots", + strings.HasPrefix(logURL, "/v1/agent/connect/ca/leaf/"): + rejectCatalogV1Endpoint = false + + case strings.HasPrefix(logURL, "/v1/agent/"): + rejectCatalogV1Endpoint = true + + case logURL == "/v1/internal/acl/authorize", + logURL == "/v1/internal/service-virtual-ip", + logURL == "/v1/internal/ui/oidc-auth-methods", strings.HasPrefix(logURL, "/v1/internal/ui/metrics-proxy/"): rejectCatalogV1Endpoint = false diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index b9c957464d93..dd1c6d8134b2 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -39,7 +39,7 @@ func TestUIEndpointsFailInV2(t *testing.T) { checkRequest := func(method, url string) { t.Run(method+" "+url, func(t *testing.T) { - assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url) + assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, "{}") }) } From 85148c2fc152db7d9143901aee2b11308e46b55b Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 10 Oct 2023 15:58:47 -0500 Subject: [PATCH 3/5] extract function --- agent/http.go | 76 +++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/agent/http.go b/agent/http.go index 5453bf975b1b..e6ed0713f660 100644 --- a/agent/http.go +++ b/agent/http.go @@ -396,41 +396,7 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc rejectCatalogV1Endpoint := false if s.agent.useV2Resources() { - switch { - case strings.HasPrefix(logURL, "/v1/catalog/"), - strings.HasPrefix(logURL, "/v1/health/"), - strings.HasPrefix(logURL, "/v1/config/"): - rejectCatalogV1Endpoint = true - - case strings.HasPrefix(logURL, "/v1/agent/token/"), - logURL == "/v1/agent/self", - logURL == "/v1/agent/host", - logURL == "/v1/agent/version", - logURL == "/v1/agent/reload", - logURL == "/v1/agent/monitor", - logURL == "/v1/agent/metrics", - logURL == "/v1/agent/metrics/stream", - logURL == "/v1/agent/members", - strings.HasPrefix(logURL, "/v1/agent/join/"), - logURL == "/v1/agent/leave", - strings.HasPrefix(logURL, "/v1/agent/force-leave/"), - logURL == "/v1/agent/connect/authorize", - logURL == "/v1/agent/connect/ca/roots", - strings.HasPrefix(logURL, "/v1/agent/connect/ca/leaf/"): - rejectCatalogV1Endpoint = false - - case strings.HasPrefix(logURL, "/v1/agent/"): - rejectCatalogV1Endpoint = true - - case logURL == "/v1/internal/acl/authorize", - logURL == "/v1/internal/service-virtual-ip", - logURL == "/v1/internal/ui/oidc-auth-methods", - strings.HasPrefix(logURL, "/v1/internal/ui/metrics-proxy/"): - rejectCatalogV1Endpoint = false - - case strings.HasPrefix(logURL, "/v1/internal/"): - rejectCatalogV1Endpoint = true - } + rejectCatalogV1Endpoint = isV1CatalogRequest(logURL) } if s.denylist.Block(req.URL.Path) { @@ -664,6 +630,46 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc } } +func isV1CatalogRequest(logURL string) bool { + switch { + case strings.HasPrefix(logURL, "/v1/catalog/"), + strings.HasPrefix(logURL, "/v1/health/"), + strings.HasPrefix(logURL, "/v1/config/"): + return true + + case strings.HasPrefix(logURL, "/v1/agent/token/"), + logURL == "/v1/agent/self", + logURL == "/v1/agent/host", + logURL == "/v1/agent/version", + logURL == "/v1/agent/reload", + logURL == "/v1/agent/monitor", + logURL == "/v1/agent/metrics", + logURL == "/v1/agent/metrics/stream", + logURL == "/v1/agent/members", + strings.HasPrefix(logURL, "/v1/agent/join/"), + logURL == "/v1/agent/leave", + strings.HasPrefix(logURL, "/v1/agent/force-leave/"), + logURL == "/v1/agent/connect/authorize", + logURL == "/v1/agent/connect/ca/roots", + strings.HasPrefix(logURL, "/v1/agent/connect/ca/leaf/"): + return false + + case strings.HasPrefix(logURL, "/v1/agent/"): + return true + + case logURL == "/v1/internal/acl/authorize", + logURL == "/v1/internal/service-virtual-ip", + logURL == "/v1/internal/ui/oidc-auth-methods", + strings.HasPrefix(logURL, "/v1/internal/ui/metrics-proxy/"): + return false + + case strings.HasPrefix(logURL, "/v1/internal/"): + return true + default: + return false + } +} + // marshalJSON marshals the object into JSON, respecting the user's pretty-ness // configuration. func (s *HTTPHandlers) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) { From 14263bd4794dc59f609ef60595b2b2c9f994a51c Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 10 Oct 2023 16:00:12 -0500 Subject: [PATCH 4/5] extract function --- agent/consul/server.go | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/agent/consul/server.go b/agent/consul/server.go index 5b4336c5d9d7..f0dd0e8c09b1 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -589,31 +589,11 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder") } - isV1CatalogRPC := func(name string) bool { - switch { - case strings.HasPrefix(name, "Catalog."), - strings.HasPrefix(name, "Health."), - strings.HasPrefix(name, "ConfigEntry."): - return true - } - - switch name { - case "Internal.EventFire", "Internal.KeyringOperation", "Internal.OIDCAuthMethods": - return false - default: - if strings.HasPrefix(name, "Internal.") { - return true - } - } - - return false - } - rpcServerOpts := []func(*rpc.Server){ rpc.WithPreBodyInterceptor( middleware.ChainedRPCPreBodyInterceptor( func(reqServiceMethod string, sourceAddr net.Addr) error { - if s.useV2Resources && isV1CatalogRPC(reqServiceMethod) { + if s.useV2Resources && isV1CatalogRequest(reqServiceMethod) { return structs.ErrUsingV2CatalogExperiment } return nil @@ -929,6 +909,25 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, return s, nil } +func isV1CatalogRequest(rpcName string) bool { + switch { + case strings.HasPrefix(rpcName, "Catalog."), + strings.HasPrefix(rpcName, "Health."), + strings.HasPrefix(rpcName, "ConfigEntry."): + return true + } + + switch rpcName { + case "Internal.EventFire", "Internal.KeyringOperation", "Internal.OIDCAuthMethods": + return false + default: + if strings.HasPrefix(rpcName, "Internal.") { + return true + } + return false + } +} + func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error { if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { s.useV2Resources = true From 408e9a153d7a58205f2701c3b9729484c0f454e1 Mon Sep 17 00:00:00 2001 From: John Murret Date: Tue, 10 Oct 2023 16:35:19 -0600 Subject: [PATCH 5/5] centralize useV2Resources logic in deps and use that to configure server (#19132) --- agent/agent.go | 16 +++------------- agent/consul/options.go | 10 ++++++++++ agent/consul/server.go | 5 ++--- agent/http.go | 4 ++-- agent/testagent.go | 2 +- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 192a120ed5b2..06f538bf9695 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -71,7 +71,6 @@ import ( "github.com/hashicorp/consul/lib/file" "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/routine" - "github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pboperator" @@ -622,7 +621,7 @@ func (a *Agent) Start(ctx context.Context) error { // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger) - if a.useV2Resources() { + if a.baseDeps.UseV2Resources() { a.sync.HardDisableSync() } @@ -726,7 +725,7 @@ func (a *Agent) Start(ctx context.Context) error { ) var pt *proxytracker.ProxyTracker - if a.useV2Resources() { + if a.baseDeps.UseV2Resources() { pt = proxyWatcher.(*proxytracker.ProxyTracker) } server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt) @@ -913,20 +912,11 @@ func (a *Agent) Failed() <-chan struct{} { return a.apiServers.failed } -// useV2Resources returns true if "resource-apis" is present in the Experiments -// array of the agent config. -func (a *Agent) useV2Resources() bool { - if stringslice.Contains(a.baseDeps.Experiments, consul.CatalogResourceExperimentName) { - return true - } - return false -} - // getProxyWatcher returns the proper implementation of the ProxyWatcher interface. // It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise, // it will return a ConfigSource. func (a *Agent) getProxyWatcher() xds.ProxyWatcher { - if a.useV2Resources() { + if a.baseDeps.UseV2Resources() { a.logger.Trace("returning proxyTracker for getProxyWatcher") return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ Logger: a.logger.Named("proxy-tracker"), diff --git a/agent/consul/options.go b/agent/consul/options.go index 8c9fe05f4873..6dc754b3aef7 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -4,6 +4,7 @@ package consul import ( + "github.com/hashicorp/consul/lib/stringslice" "google.golang.org/grpc" "github.com/hashicorp/consul-net-rpc/net/rpc" @@ -48,6 +49,15 @@ type Deps struct { EnterpriseDeps } +// useV2Resources returns true if "resource-apis" is present in the Experiments +// array of the agent config. +func (d Deps) UseV2Resources() bool { + if stringslice.Contains(d.Experiments, CatalogResourceExperimentName) { + return true + } + return false +} + type GRPCClientConner interface { ClientConn(datacenter string) (*grpc.ClientConn, error) ClientConnLeader() (*grpc.ClientConn, error) diff --git a/agent/consul/server.go b/agent/consul/server.go index f0dd0e8c09b1..9540cbe1c0fd 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -555,6 +555,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incomingRPCLimiter: incomingRPCLimiter, routineManager: routine.NewManager(logger.Named(logging.ConsulServer)), registry: flat.Registry, + useV2Resources: flat.UseV2Resources(), } incomingRPCLimiter.Register(s) @@ -929,9 +930,7 @@ func isV1CatalogRequest(rpcName string) bool { } func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error { - if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { - s.useV2Resources = true - + if s.useV2Resources { catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow() diff --git a/agent/http.go b/agent/http.go index e6ed0713f660..e95d36c914f8 100644 --- a/agent/http.go +++ b/agent/http.go @@ -395,7 +395,7 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc logURL = aclEndpointRE.ReplaceAllString(logURL, "$1$4") rejectCatalogV1Endpoint := false - if s.agent.useV2Resources() { + if s.agent.baseDeps.UseV2Resources() { rejectCatalogV1Endpoint = isV1CatalogRequest(logURL) } @@ -1147,7 +1147,7 @@ func (s *HTTPHandlers) parseToken(req *http.Request, token *string) { } func (s *HTTPHandlers) rejectV1RequestWhenV2Enabled() error { - if s.agent.useV2Resources() { + if s.agent.baseDeps.UseV2Resources() { return newRejectV1RequestWhenV2EnabledError() } return nil diff --git a/agent/testagent.go b/agent/testagent.go index a025f5077ecd..037bdc76da68 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -286,7 +286,7 @@ func (a *TestAgent) waitForUp() error { continue // fail, try again } if a.Config.Bootstrap && a.Config.ServerMode { - if a.useV2Resources() { + if a.baseDeps.UseV2Resources() { args := structs.DCSpecificRequest{ Datacenter: "dc1", }