Skip to content

Commit

Permalink
Add tenant resolver (#3486)
Browse files Browse the repository at this point in the history
* Add tenant resolver package

This implements the multi tenant resolver as described by the [proposal]
for multi tenant query-federation.

By default it behaves like before, but it's implementation can be
swapped out.

[proposal]: cortexproject/cortex#3364

Signed-off-by: Christian Simon <simon@swine.de>

* Replace usages of `ExtractOrgID`

Use TenantID or UserID depending on which of the methods are meant to be
used.

Signed-off-by: Christian Simon <simon@swine.de>

* Replace usages of `ExtractOrgIDFromHTTPRequest`

This is replaced by ExtractTenantIDFromHTTPRequest, which makes sure
that exactly one tenant ID is set.

Signed-off-by: Christian Simon <simon@swine.de>

* Add methods to `tenant` package to use resolver directly

Signed-off-by: Christian Simon <simon@swine.de>

* Remove UserID method from Resolver interface

We need a better definition for what we are trying to achieve with
UserID before we can add it to the interface

Signed-off-by: Christian Simon <simon@swine.de>

* Update comment on the TenantID/TenantIDs

Signed-off-by: Christian Simon <simon@swine.de>

* Improve performance of NormalizeTenantIDs

- reduce allocations by reusing the input slice during de-duplication

Signed-off-by: Christian Simon <simon@swine.de>
Former-commit-id: b7a062f
  • Loading branch information
simonswine authored Nov 26, 2020
1 parent 72473a4 commit 95e78ac
Show file tree
Hide file tree
Showing 32 changed files with 451 additions and 71 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ lint:
# Ensure no blacklisted package is imported.
GOFLAGS="-tags=requires_docker" faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\
golang.org/x/net/context=context,\
sync/atomic=go.uber.org/atomic" ./pkg/... ./cmd/... ./tools/... ./integration/...
sync/atomic=go.uber.org/atomic,\
github.com/weaveworks/common/user.{ExtractOrgID}=github.com/cortexproject/cortex/pkg/tenant.{TenantID},\
github.com/weaveworks/common/user.{ExtractOrgIDFromHTTPRequest}=github.com/cortexproject/cortex/pkg/tenant.{ExtractTenantIDFromHTTPRequest}" ./pkg/... ./cmd/... ./tools/... ./integration/...

# Ensure clean pkg structure.
faillint -paths "\
Expand Down
8 changes: 4 additions & 4 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"path/filepath"

"github.com/cortexproject/cortex/pkg/alertmanager/alerts"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/template"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
)

Expand All @@ -36,7 +36,7 @@ type UserConfig struct {
func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util.WithContext(r.Context(), am.logger)

userID, err := user.ExtractOrgID(r.Context())
userID, err := tenant.TenantID(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
Expand Down Expand Up @@ -73,7 +73,7 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.

func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util.WithContext(r.Context(), am.logger)
userID, err := user.ExtractOrgID(r.Context())
userID, err := tenant.TenantID(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
Expand Down Expand Up @@ -114,7 +114,7 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.

func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util.WithContext(r.Context(), am.logger)
userID, err := user.ExtractOrgID(r.Context())
userID, err := tenant.TenantID(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
amconfig "github.com/prometheus/alertmanager/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/alertmanager/alerts"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -462,7 +462,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco

// ServeHTTP serves the Alertmanager's web UI and API.
func (am *MultitenantAlertmanager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
userID, err := user.ExtractOrgID(req.Context())
userID, err := tenant.TenantID(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"net/http"

"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
)

// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation
func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.TombstonesLoader) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID, err := user.ExtractOrgID(r.Context())
userID, err := tenant.TenantID(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/purger/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
)

Expand Down Expand Up @@ -54,7 +54,7 @@ func NewDeleteRequestHandler(deleteStore *DeleteStore, deleteRequestCancelPeriod
// AddDeleteRequestHandler handles addition of new delete request
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -119,7 +119,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -141,7 +141,7 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -72,7 +72,7 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
// We cache the entire row, so filter client side.
callback = chunk_util.QueryFilter(callback)

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/configs/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/gorilla/mux"
amconfig "github.com/prometheus/alertmanager/config"
amtemplate "github.com/prometheus/alertmanager/template"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/configs/userconfig"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
)

Expand Down Expand Up @@ -108,7 +108,7 @@ func (a *API) RegisterRoutes(r *mux.Router) {

// getConfig returns the request configuration.
func (a *API) getConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand Down Expand Up @@ -146,7 +146,7 @@ func (a *API) getConfig(w http.ResponseWriter, r *http.Request) {
}

func (a *API) setConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand Down Expand Up @@ -296,7 +296,7 @@ func (a *API) getConfigs(w http.ResponseWriter, r *http.Request) {
}

func (a *API) deactivateConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand All @@ -318,7 +318,7 @@ func (a *API) deactivateConfig(w http.ResponseWriter, r *http.Request) {
}

func (a *API) restoreConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/limiter"
Expand Down Expand Up @@ -384,7 +385,7 @@ func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, user

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
d.HATracker = r
}

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
assert.NoError(t, err)
err = d.HATracker.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica)
assert.NoError(t, err)
Expand Down Expand Up @@ -1314,7 +1315,7 @@ func (i *mockIngester) Push(ctx context.Context, req *client.WriteRequest, opts
i.metadata = map[uint32]map[client.MetricMetadata]struct{}{}
}

orgid, err := user.ExtractOrgID(ctx)
orgid, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ingester/client"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
Expand Down Expand Up @@ -76,7 +76,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
// GetIngestersForQuery returns a replication set including all ingesters that should be queried
// to fetch series matching input label matchers.
func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return ring.ReplicationSet{}, err
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
// GetIngestersForMetadata returns a replication set including all ingesters that should be queried
// to fetch metadata (eg. label names/values or series).
func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return ring.ReplicationSet{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
)

Expand Down Expand Up @@ -250,7 +250,7 @@ func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) {
}

func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/grpcutil"
Expand Down Expand Up @@ -151,7 +151,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
return nil, fmt.Errorf("frontend not running: %v", s)
}

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -233,7 +233,7 @@ enqueueAgain:
}

func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"

cortex_chunk "github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
Expand Down Expand Up @@ -440,7 +440,7 @@ func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, fmt.Errorf("no user id")
}
Expand Down Expand Up @@ -683,7 +683,7 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
return i.v2Query(ctx, req)
}

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -930,7 +930,7 @@ func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetad
}
i.userStatesMtx.RUnlock()

userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, fmt.Errorf("no user id")
}
Expand Down
Loading

0 comments on commit 95e78ac

Please sign in to comment.