Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete updates #6194

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]

# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]

# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2208,6 +2204,10 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -store.max-query-length
[max_query_length: <duration> | default = 721h]

# Address, including port, where the compactor api is served
# CLI flag: -store.compactor-address
[compactor_address: <string> | default = ""]

sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
# Maximum number of queries that will be scheduled in parallel by the frontend.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 32]
Expand Down
10 changes: 5 additions & 5 deletions pkg/loki/delete_store_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)

func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {
return &listener{d}
}

type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
deleteRequestsClient deletion.DeleteRequestsClient
}

// Starting is called when the service transitions from NEW to STARTING.
Expand All @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Terminated is called when the service transitions to the TERMINATED state.
Expand All @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Failed is called when the service transitions to the FAILED state.
Expand All @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
40 changes: 24 additions & 16 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -575,17 +575,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.StorageConfig.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
return t.Cfg.StorageConfig.CompactorAddress, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -742,7 +750,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
return nil, err
}

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return deleteStore, nil

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
3 changes: 3 additions & 0 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Config struct {
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"`
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"`

CompactorAddress string `yaml:"compactor_address"`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
EnableAsyncStore bool `yaml:"-"`
Expand All @@ -95,6 +97,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.")
cfg.BoltDBShipperConfig.RegisterFlags(f)
f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.")
f.StringVar(&cfg.CompactorAddress, "store.compactor-address", "", "address of the compactor in the form 'http://host:port'")
cfg.TSDBShipperConfig.RegisterFlagsWithPrefix("tsdb.", f)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package deletion

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/util/log"
)

const (
orgHeaderKey = "X-Scope-OrgID"
getDeletePath = "/loki/api/v1/delete"
)

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
}

type deleteRequestsClient struct {
url string
httpClient httpClient
mu sync.RWMutex

cache map[string][]DeleteRequest
cacheDuration time.Duration

stopChan chan struct{}
}

type httpClient interface {
Do(*http.Request) (*http.Response, error)
}

type DeleteRequestsStoreOption func(c *deleteRequestsClient)

func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
return func(c *deleteRequestsClient) {
c.cacheDuration = d
}
}

func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
return nil, err
}
u.Path = getDeletePath

client := &deleteRequestsClient{
url: u.String(),
httpClient: c,
cacheDuration: 5 * time.Minute,
cache: make(map[string][]DeleteRequest),
stopChan: make(chan struct{}),
}

for _, o := range opts {
o(client)
}

go client.updateLoop()
return client, nil
}

func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
if cachedRequests, ok := c.getCachedRequests(userID); ok {
return cachedRequests, nil
}

requests, err := c.getRequestsFromServer(ctx, userID)
if err != nil {
return nil, err
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache[userID] = requests

return requests, nil
}

func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

res, ok := c.cache[userID]
return res, ok
}

func (c *deleteRequestsClient) Stop() {
close(c.stopChan)
}

func (c *deleteRequestsClient) updateLoop() {
t := time.NewTicker(c.cacheDuration)
for {
select {
case <-t.C:
c.updateCache()
case <-c.stopChan:
return
}
}
}

func (c *deleteRequestsClient) updateCache() {
userIDs := c.currentUserIDs()

newCache := make(map[string][]DeleteRequest)
for _, userID := range userIDs {
deleteReq, err := c.getRequestsFromServer(context.Background(), userID)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
continue
}
newCache[userID] = deleteReq
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache = newCache
}

func (c *deleteRequestsClient) currentUserIDs() []string {
c.mu.RLock()
defer c.mu.RUnlock()

userIDs := make([]string, 0, len(c.cache))
for userID := range c.cache {
userIDs = append(userIDs, userID)
}

return userIDs
}

func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

req.Header.Set(orgHeaderKey, userID)

resp, err := c.httpClient.Do(req)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

var deleteRequests []DeleteRequest
if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil {
level.Error(log.Logger).Log("msg", "error marshalling response", "err", err)
return nil, err
}

return deleteRequests, nil
}
Loading