Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete updates #6194

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]

# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]

# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2604,6 +2600,10 @@ This way, one doesn't have to replicate configuration in multiple places.
# to be used by the distributor's ring, but only if the distributor's ring itself
# doesn't have a `heartbeat_period` set.
[ring: <ring>]

# Address, including port, where the compactor api is served
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]
```

## analytics
Expand Down
7 changes: 6 additions & 1 deletion pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ type Config struct {
// You can check this during Loki execution under ring status pages (ex: `/ring` will output the address of the different ingester
// instances).
InstanceAddr string `yaml:"instance_addr"`

// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`
}

func (c *Config) RegisterFlags(_ *flag.FlagSet) {
func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
throwaway.IntVar(&c.ReplicationFactor, "common.replication-factor", 3, "How many ingesters incoming data should be replicated to.")
c.Storage.RegisterFlagsWithPrefix("common.storage", throwaway)
Expand All @@ -52,6 +55,8 @@ func (c *Config) RegisterFlags(_ *flag.FlagSet) {
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
}

type Storage struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/loki/delete_store_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)

func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {
return &listener{d}
}

type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
deleteRequestsClient deletion.DeleteRequestsClient
}

// Starting is called when the service transitions from NEW to STARTING.
Expand All @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Terminated is called when the service transitions to the TERMINATED state.
Expand All @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Failed is called when the service transitions to the FAILED state.
Expand All @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
40 changes: 24 additions & 16 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -575,17 +575,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
return t.Cfg.Common.CompactorAddress, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -742,7 +750,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
return nil, err
}

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return deleteStore, nil

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package deletion

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/util/log"
)

const (
orgHeaderKey = "X-Scope-OrgID"
getDeletePath = "/loki/api/v1/delete"
)

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
}

type deleteRequestsClient struct {
url string
httpClient httpClient
mu sync.RWMutex

cache map[string][]DeleteRequest
cacheDuration time.Duration

stopChan chan struct{}
}

type httpClient interface {
Do(*http.Request) (*http.Response, error)
}

type DeleteRequestsStoreOption func(c *deleteRequestsClient)

func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
return func(c *deleteRequestsClient) {
c.cacheDuration = d
}
}

func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
return nil, err
}
u.Path = getDeletePath

client := &deleteRequestsClient{
url: u.String(),
httpClient: c,
cacheDuration: 5 * time.Minute,
cache: make(map[string][]DeleteRequest),
stopChan: make(chan struct{}),
}

for _, o := range opts {
o(client)
}

go client.updateLoop()
return client, nil
}

func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
if cachedRequests, ok := c.getCachedRequests(userID); ok {
return cachedRequests, nil
}

requests, err := c.getRequestsFromServer(ctx, userID)
if err != nil {
return nil, err
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache[userID] = requests

return requests, nil
}

func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

res, ok := c.cache[userID]
return res, ok
}

func (c *deleteRequestsClient) Stop() {
close(c.stopChan)
}

func (c *deleteRequestsClient) updateLoop() {
t := time.NewTicker(c.cacheDuration)
for {
select {
case <-t.C:
c.updateCache()
case <-c.stopChan:
return
}
}
}

func (c *deleteRequestsClient) updateCache() {
userIDs := c.currentUserIDs()

newCache := make(map[string][]DeleteRequest)
for _, userID := range userIDs {
deleteReq, err := c.getRequestsFromServer(context.Background(), userID)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
continue
}
newCache[userID] = deleteReq
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache = newCache
}

func (c *deleteRequestsClient) currentUserIDs() []string {
c.mu.RLock()
defer c.mu.RUnlock()

userIDs := make([]string, 0, len(c.cache))
for userID := range c.cache {
userIDs = append(userIDs, userID)
}

return userIDs
}

func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

req.Header.Set(orgHeaderKey, userID)

resp, err := c.httpClient.Do(req)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

var deleteRequests []DeleteRequest
if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil {
level.Error(log.Logger).Log("msg", "error marshalling response", "err", err)
return nil, err
}

return deleteRequests, nil
}
Loading