Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Modifies TableManager to use IndexGateway ring #5972

Merged
merged 49 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0f90a99
Add two metrics to the IndexGateway.
DylanGuedes Apr 18, 2022
63321f9
Remove `usersToBeQueryReadyForTotal`.
DylanGuedes Apr 20, 2022
cbbd4b6
Rename metric help text to not mislead people.
DylanGuedes Apr 20, 2022
f9c520a
Log queryReadiness duration.
DylanGuedes Apr 20, 2022
b4737dc
Fix where log message and duration and triggered.
DylanGuedes Apr 25, 2022
dca92fe
Add two metrics to the IndexGateway.
DylanGuedes Apr 18, 2022
02da788
Remove `usersToBeQueryReadyForTotal`.
DylanGuedes Apr 20, 2022
0be5dcd
Rename metric help text to not mislead people.
DylanGuedes Apr 20, 2022
860f855
Log queryReadiness duration.
DylanGuedes Apr 20, 2022
7bac294
Fix where log message and duration and triggered.
DylanGuedes Apr 25, 2022
6632319
Use boundaries to skip users in TableManager.
DylanGuedes Apr 20, 2022
c1a83ac
Add IndexGateway configuration docs.
DylanGuedes Apr 22, 2022
db60fdb
Add tests for pkg/util/ring TokenFor() and IsAssignedKey()
JordanRushing Apr 22, 2022
e9f9b28
Small whitespace fix in pkg/util/ring_test.go
JordanRushing Apr 22, 2022
44e249c
Apply suggestions from code review
DylanGuedes Apr 25, 2022
ddd4380
Join users list in a single string.
DylanGuedes Apr 26, 2022
e90a816
Tweak queryReadiness log messages.
DylanGuedes Apr 27, 2022
0a4d2f3
Merge branch 'index-gateway-metrics' into table-manager-using-ring
DylanGuedes Apr 27, 2022
57e7c2a
Merge branch 'main' of github.com:grafana/loki into table-manager-usi…
DylanGuedes Apr 28, 2022
6156021
Implement new RingManager.
DylanGuedes Apr 29, 2022
11fbb18
Return non-empty ringmanager for all modes.
DylanGuedes Apr 29, 2022
0432b2f
Merge branch 'main' of github.com:grafana/loki into table-manager-usi…
DylanGuedes Apr 29, 2022
2ba9f88
Fix lint regarding error not being check.
DylanGuedes Apr 29, 2022
9d58270
Fix lint due to wrong import order.
DylanGuedes Apr 29, 2022
d6a23aa
Implement support for client and server mode for the ring manager.
DylanGuedes May 4, 2022
392298a
Fix ring manager services registration.
DylanGuedes May 4, 2022
0222c3d
Add option to configure whether or not to log gateway requests.
DylanGuedes May 4, 2022
74300c8
Check if IndexGateway is enabled instead of active.
DylanGuedes May 5, 2022
b64e4d7
Tune RingManager behavior.
DylanGuedes May 5, 2022
ec26bf8
Rewrite `TenantInBoundaries` as a func instead of interface.
DylanGuedes May 5, 2022
ca50ae4
Merge branch 'main' of github.com:grafana/loki into table-manager-usi…
DylanGuedes May 5, 2022
627e5c6
Fix lint.
DylanGuedes May 5, 2022
309386d
Use distributor instead of querier in test.
DylanGuedes May 5, 2022
0e71657
Merge branch 'main' of github.com:grafana/loki into table-manager-usi…
DylanGuedes May 5, 2022
8e0f68a
Add ring mode guard clause.
DylanGuedes May 5, 2022
d199bc4
Return code erased by mistake.
DylanGuedes May 6, 2022
8e8acb1
Rename TenantInBoundaries to IndexGatewayOwnsTenant.
DylanGuedes May 6, 2022
f747d82
Log len of users instead of all of them to avoid log msgs.
DylanGuedes May 6, 2022
1ec30f0
Merge branch 'main' of github.com:grafana/loki into table-manager-usi…
DylanGuedes May 17, 2022
7d90edd
Add docstrings to new public functions.
DylanGuedes May 18, 2022
3500c5a
Apply suggestions from code review
DylanGuedes May 19, 2022
a55bdc9
Modify IsAssignedKey to expect address directly instead of lifecycler.
DylanGuedes May 19, 2022
a6a2b47
Log error message when IsInReplicationSet fails.
DylanGuedes May 19, 2022
da4c7ee
Modify IsAssignedKey to return true by default.
DylanGuedes May 19, 2022
d4207a7
Remove wrong assigning of Service for clientMode.
DylanGuedes May 19, 2022
3d32ed2
Log gateway requests before error checks.
DylanGuedes May 19, 2022
103c977
Remove unnecessary endpoint registration.
DylanGuedes May 19, 2022
69f19d1
Fix lint.
DylanGuedes May 19, 2022
bbbcd5e
Update pkg/util/ring.go
DylanGuedes May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# key value store.
[ingester: <ingester>]

# Configures the index gateway server.
[index_gateway: <index_gateway>]

# Configures where Loki will store data.
[storage_config: <storage_config>]

Expand Down Expand Up @@ -1757,6 +1760,10 @@ boltdb_shipper:
# The CLI flags prefix for this block config is: boltdb.shipper.index-gateway-client
[grpc_client_config: <grpc_client_config>]

# Configures if gateway requests should be logged or not.
# CLI flag: -boltdb.shipper.index-gateway-client.log-gateway-requests
[log_gateway_requests: <bool> | default = false]

# Cache validity for active index entries. Should be no higher than
# the chunk_idle_period in the ingester settings.
# CLI flag: -store.index-cache-validity
Expand Down Expand Up @@ -2397,6 +2404,26 @@ backoff_config:
[max_retries: <int> | default = 10]
```

## index_gateway

The `index_gateway` block configures the Loki index gateway server, responsible for serving index queries
without the need to constantly interact with the object store.

```yaml
# Defines in which mode the index gateway server will operate (default to 'simple').
# It supports two modes:
# 'simple': an index gateway server instance is responsible for handling,
# storing and returning requests for all indices for all tenants.
# 'ring': an index gateway server instance is responsible for a subset of tenants instead
# of all tenants.
[mode: <string> | default = simple]

# Defines the ring to be used by the index gateway servers and clients in case the servers
# are configured to run in 'ring' mode. In case this isn't configured, this block supports
# inheriting configuration from the common ring section.
[ring: <ring>]
```

## table_manager

The `table_manager` block configures the Loki table-manager.
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ type Loki struct {
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
usageReport *usagestats.Reporter
indexGatewayRing *ring.Ring
indexGatewayRingManager *indexgateway.RingManager

clientMetrics storage.ClientMetrics

Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *Loki) setupModuleManager() error {
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},
Compactor: {Server, Overrides, MemberlistKV, UsageReport},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Expand Down
37 changes: 14 additions & 23 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -386,7 +385,9 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// Always set these configs
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing
if t.Cfg.IndexGateway.Mode == indexgateway.RingMode {
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRingManager.Ring
}

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
Expand Down Expand Up @@ -855,7 +856,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -864,8 +865,6 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return nil, err
}

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway)

indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway)
return gateway, nil
}
Expand All @@ -878,29 +877,21 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(t.Cfg.IndexGateway.Ring.ReplicationFactor)
reg := prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)

logger := util_log.Logger
ringStore, err := kv.NewClient(
ringCfg.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", reg), "index-gateway"),
logger,
)
if err != nil {
return nil, gerrors.Wrap(err, "kv new client")
managerMode := indexgateway.ClientMode
if t.Cfg.isModuleEnabled(IndexGateway) {
managerMode = indexgateway.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)

t.indexGatewayRing, err = ring.NewWithStoreClientAndStrategy(
ringCfg, indexgateway.RingIdentifier, indexgateway.RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", reg), logger,
)
if err != nil {
return nil, gerrors.Wrap(err, "new with store client and strategy")
return nil, gerrors.Wrap(err, "new index gateway ring manager")
}

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRing)
return t.indexGatewayRing, nil
t.indexGatewayRingManager = rm

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRingManager)
return t.indexGatewayRingManager, nil
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
Expand Down Expand Up @@ -962,7 +953,7 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cfg *Config) Validate() error {
}

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, registerer prometheus.Registerer) (index.Client, error) {
func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (index.Client, error) {
switch name {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
Expand Down Expand Up @@ -171,7 +171,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi
return nil, err
}

boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, registerer)
boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, ownsTenantFn, registerer)

return boltDBIndexClientWithShipper, err
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
}, nil
}

idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, indexClientReg)
idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -32,6 +31,11 @@ type Limits interface {
DefaultLimits() *validation.Limits
}

// IndexGatewayOwnsTenant is invoked by an IndexGateway instance and answers whether if the given tenant is assigned to this instance or not.
//
// It is only relevant by an IndexGateway in the ring mode and if it returns false for a given tenant, that tenant will be ignored by this IndexGateway during query readiness.
type IndexGatewayOwnsTenant func(tenant string) bool

type Config struct {
CacheDir string
SyncInterval time.Duration
Expand All @@ -52,9 +56,11 @@ type TableManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

ownsTenant IndexGatewayOwnsTenant
}

func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, registerer prometheus.Registerer) (*TableManager, error) {
func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, ownsTenantFn IndexGatewayOwnsTenant, registerer prometheus.Registerer) (*TableManager, error) {
if err := chunk_util.EnsureDirectory(cfg.CacheDir); err != nil {
return nil, err
}
Expand All @@ -64,6 +70,7 @@ func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorage
cfg: cfg,
boltIndexClient: boltIndexClient,
indexStorageClient: indexStorageClient,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(registerer),
ctx: ctx,
Expand Down Expand Up @@ -319,8 +326,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error {
if err := table.EnsureQueryReadiness(ctx, usersToBeQueryReadyFor); err != nil {
return err
}
joinedUsers := strings.Join(usersToBeQueryReadyFor, ",")
level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users", joinedUsers, "duration", time.Since(perTableStart), "table", tableName)
level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users_len", len(usersToBeQueryReadyFor), "duration", time.Since(perTableStart), "table", tableName)
}

return nil
Expand All @@ -345,6 +351,10 @@ func (tm *TableManager) findUsersInTableForQueryReadiness(tableNumber int64, use
continue
}

if tm.ownsTenant != nil && !tm.ownsTenant(userID) {
continue
}

if activeTableNumber-tableNumber <= int64(queryReadyNumDays) {
usersToBeQueryReadyFor = append(usersToBeQueryReadyFor, userID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc)
CacheTTL: time.Hour,
Limits: &mockLimits{},
}
tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil)
tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil, nil)
require.NoError(t, err)

return tableManager, func() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/stores/shipper/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type IndexGatewayClientConfig struct {
// Forcefully disable the use of the index gateway client for the storage.
// This is mainly useful for the index-gateway component which should always use the storage.
Disabled bool `yaml:"-"`

// LogGatewayRequests configures if requests sent to the gateway should be logged or not.
// The log messages are of type debug and contain the address of the gateway and the relevant tenant.
LogGatewayRequests bool `yaml:"log_gateway_requests"`
}

// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
Expand All @@ -75,6 +79,7 @@ type IndexGatewayClientConfig struct {
func (i *IndexGatewayClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
f.StringVar(&i.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server running in simple mode.")
f.BoolVar(&i.LogGatewayRequests, prefix+".log-gateway-requests", false, "Whether requests sent to the gateway should be logged or not.")
}

func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -297,6 +302,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client ind
})
var lastErr error
for _, addr := range addrs {
if s.cfg.LogGatewayRequests {
level.Debug(util_log.Logger).Log("msg", "sending request to gateway", "gateway", addr, "tenant", userID)
}

genericClient, err := s.pool.GetClientFor(addr)
if err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to get client for instance %s", addr), "err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func benchmarkIndexQueries(b *testing.B, queries []index.Query) {
CacheTTL: 15 * time.Minute,
QueryReadyNumDays: 30,
Limits: mockLimits{},
}, bclient, storage.NewIndexStorageClient(fs, "index/"), nil)
}, bclient, storage.NewIndexStorageClient(fs, "index/"), nil, nil)
require.NoError(b, err)

// initialize the index gateway server
Expand Down
Loading