Skip to content

Commit

Permalink
Revert "Backport 7804 into 2.7.1 (grafana#7896)"
Browse files Browse the repository at this point in the history
This reverts commit e0af1cc.
  • Loading branch information
trevorwhitney committed Dec 9, 2022
1 parent e0af1cc commit 06b92b6
Show file tree
Hide file tree
Showing 29 changed files with 326 additions and 2,341 deletions.
6 changes: 0 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,10 @@ Check the history of the branch FIXME.

#### Loki

##### Enhancements

* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion.

##### Fixes

* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients

##### Changes

* [7877](https://github.com/grafana/loki/pull/7877)A **trevorwhitney**: Due to a known bug with experimental new delete mode feature, the default delete mode has been changed to `filter-only`.

## 2.7.0
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/cloudflare/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
allFields = append(extendedFields, []string{
"BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources",
"FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID",
"RequestHeaders", "ResponseHeaders",
"RequestHeaders", "ResponseHeaders",
}...)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Test_validateJobName(t *testing.T) {
},
},
},
wantErr: false,
wantErr: false,
expectedJob: "job_1_2_3_4_job",
},
}
Expand Down
4 changes: 0 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2911,10 +2911,6 @@ This way, one doesn't have to replicate configuration in multiple places.
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]

# Address and port number where the compactor grpc requests are being served.
# CLI flag: -common.compactor-grpc-address
[compactor_grpc_address: <string> | default = ""]

## analytics

The `analytics` block configures the reporting of Loki analytics to grafana.com.
Expand Down
3 changes: 1 addition & 2 deletions pkg/logqlmodel/logqlmodel.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package logqlmodel

import (
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
Expand Down
3 changes: 1 addition & 2 deletions pkg/logqlmodel/metadata/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
"github.com/stretchr/testify/require"
)

func TestHeaders(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ type Config struct {

// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`

// CompactorAddress is the grpc address of the compactor in the form host:port
CompactorGRPCAddress string `yaml:"compactor_grpc_address"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -60,7 +57,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port")
}

type Storage struct {
Expand Down
53 changes: 25 additions & 28 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
Expand All @@ -70,31 +69,30 @@ type Config struct {
UseBufferedLogger bool `yaml:"use_buffered_logger"`
UseSyncLogger bool `yaml:"use_sync_logger"`

Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"`
CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorClient compactor.ClientConfig `yaml:"delete_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
}

// RegisterFlags registers flag.
Expand All @@ -117,8 +115,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.CompactorHTTPClient.RegisterFlags(f)
c.CompactorGRPCClient.RegisterFlags(f)
c.CompactorClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.StorageConfig.RegisterFlags(f)
Expand Down
67 changes: 22 additions & 45 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -41,7 +43,6 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/ruler"
base_ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/runtime"
Expand All @@ -53,8 +54,6 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/series/index"
Expand Down Expand Up @@ -677,53 +676,41 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
var client generationnumber.CacheGenClient
if t.supportIndexDeleteRequest() {
compactorAddress, isGRPCAddress, err := t.compactorAddress()
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer)
if isGRPCAddress {
client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}

client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
if err != nil {
return nil, err
}
}

t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer)
return services.NewIdleService(nil, func(failureCase error) error {
t.cacheGenerationLoader.Stop()
return nil
}), nil
return services.NewIdleService(nil, nil), nil
}

func (t *Loki) supportIndexDeleteRequest() bool {
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
}

// compactorAddress returns the configured address of the compactor.
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
func (t *Loki) compactorAddress() (string, bool, error) {
func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}

if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}

if t.Cfg.Common.CompactorGRPCAddress != "" {
return t.Cfg.Common.CompactorGRPCAddress, true, nil
}

return t.Cfg.Common.CompactorAddress, false, nil
return t.Cfg.Common.CompactorAddress, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -1020,7 +1007,6 @@ func (t *Loki) initCompactor() (services.Service, error) {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))
grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler)
}

return t.compactor, nil
Expand Down Expand Up @@ -1137,26 +1123,17 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress, isGRPCAddress, err := t.compactorAddress()
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer)
var compactorClient deletion.CompactorClient
if isGRPCAddress {
compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}

client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/queryrange/queryrangebase/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,3 @@ type fakeGenNumberLoader struct {
func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
return l.genNumber
}

func (l *fakeGenNumberLoader) Stop() {}
1 change: 0 additions & 1 deletion pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri

type CacheGenNumberLoader interface {
GetResultsCacheGenNumber(tenantIDs []string) string
Stop()
}

// ResultsCacheConfig is the config for the results cache.
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/queryrange/queryrangebase/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,5 +1054,3 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader {
func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
return ""
}

func (l mockCacheGenNumberLoader) Stop() {}
Loading

0 comments on commit 06b92b6

Please sign in to comment.