Skip to content

Commit

Permalink
Fix delete updates (#6194)
Browse files Browse the repository at this point in the history
* Fix delete updates

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>

* Add compactor address to the storage config

* review comments

* Review feedback

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>
  • Loading branch information
MasslessParticle and MichelHollands authored May 30, 2022
1 parent 3b3fcf6 commit 9b2786b
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 29 deletions.
8 changes: 4 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]
# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]
# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
Expand Down Expand Up @@ -2613,6 +2609,10 @@ This way, one doesn't have to replicate configuration in multiple places.
# to be used by the distributor's ring, but only if the distributor's ring itself
# doesn't have a `heartbeat_period` set.
[ring: <ring>]

# Address, including port, where the compactor api is served
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]
```
## analytics
Expand Down
7 changes: 6 additions & 1 deletion pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ type Config struct {
// You can check this during Loki execution under ring status pages (ex: `/ring` will output the address of the different ingester
// instances).
InstanceAddr string `yaml:"instance_addr"`

// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`
}

func (c *Config) RegisterFlags(_ *flag.FlagSet) {
func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
throwaway.IntVar(&c.ReplicationFactor, "common.replication-factor", 3, "How many ingesters incoming data should be replicated to.")
c.Storage.RegisterFlagsWithPrefix("common.storage", throwaway)
Expand All @@ -52,6 +55,8 @@ func (c *Config) RegisterFlags(_ *flag.FlagSet) {
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
}

type Storage struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/loki/delete_store_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)

func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {
return &listener{d}
}

type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
deleteRequestsClient deletion.DeleteRequestsClient
}

// Starting is called when the service transitions from NEW to STARTING.
Expand All @@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Terminated is called when the service transitions to the TERMINATED state.
Expand All @@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

// Failed is called when the service transitions to the FAILED state.
Expand All @@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
40 changes: 24 additions & 16 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -575,17 +575,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress := t.Cfg.Frontend.CompactorAddress
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}

if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
return t.Cfg.Common.CompactorAddress, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -742,7 +750,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
Expand All @@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
return nil, err
}

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return deleteStore, nil

return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}

func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Config struct {

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`

TailProxyURL string `yaml:"tail_proxy_url"`
}
Expand All @@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package deletion

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/util/log"
)

const (
orgHeaderKey = "X-Scope-OrgID"
getDeletePath = "/loki/api/v1/delete"
)

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
}

type deleteRequestsClient struct {
url string
httpClient httpClient
mu sync.RWMutex

cache map[string][]DeleteRequest
cacheDuration time.Duration

stopChan chan struct{}
}

type httpClient interface {
Do(*http.Request) (*http.Response, error)
}

type DeleteRequestsStoreOption func(c *deleteRequestsClient)

func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
return func(c *deleteRequestsClient) {
c.cacheDuration = d
}
}

func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
return nil, err
}
u.Path = getDeletePath

client := &deleteRequestsClient{
url: u.String(),
httpClient: c,
cacheDuration: 5 * time.Minute,
cache: make(map[string][]DeleteRequest),
stopChan: make(chan struct{}),
}

for _, o := range opts {
o(client)
}

go client.updateLoop()
return client, nil
}

func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
if cachedRequests, ok := c.getCachedRequests(userID); ok {
return cachedRequests, nil
}

requests, err := c.getRequestsFromServer(ctx, userID)
if err != nil {
return nil, err
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache[userID] = requests

return requests, nil
}

func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

res, ok := c.cache[userID]
return res, ok
}

func (c *deleteRequestsClient) Stop() {
close(c.stopChan)
}

func (c *deleteRequestsClient) updateLoop() {
t := time.NewTicker(c.cacheDuration)
for {
select {
case <-t.C:
c.updateCache()
case <-c.stopChan:
return
}
}
}

func (c *deleteRequestsClient) updateCache() {
userIDs := c.currentUserIDs()

newCache := make(map[string][]DeleteRequest)
for _, userID := range userIDs {
deleteReq, err := c.getRequestsFromServer(context.Background(), userID)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
continue
}
newCache[userID] = deleteReq
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache = newCache
}

func (c *deleteRequestsClient) currentUserIDs() []string {
c.mu.RLock()
defer c.mu.RUnlock()

userIDs := make([]string, 0, len(c.cache))
for userID := range c.cache {
userIDs = append(userIDs, userID)
}

return userIDs
}

func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

req.Header.Set(orgHeaderKey, userID)

resp, err := c.httpClient.Do(req)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}

var deleteRequests []DeleteRequest
if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil {
level.Error(log.Logger).Log("msg", "error marshalling response", "err", err)
return nil, err
}

return deleteRequests, nil
}
Loading

0 comments on commit 9b2786b

Please sign in to comment.