Skip to content

Commit

Permalink
Revendor Cortex (#2755)
Browse files Browse the repository at this point in the history
* replaces prometheus & revendors cortex

* compat

* compat

* sd refactor

* updates docker

* removes prom replace

* adds delimiters back to object addrs in boltdb
  • Loading branch information
owen-d authored Oct 14, 2020
1 parent 690a387 commit 54c0c5f
Show file tree
Hide file tree
Showing 809 changed files with 22,714 additions and 466,607 deletions.
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ require (
github.com/cespare/xxhash/v2 v2.1.1
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.3.1-0.20200923132904-22f2efdc1339
github.com/cortexproject/cortex v1.4.1-0.20201012150016-9e8beee8cacb
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v17.12.0-ce-rc1.0.20200706150819-a40b877fbb9e+incompatible
github.com/docker/docker v17.12.0-ce-rc1.0.20201009160326-9c15e82f19b0+incompatible
github.com/docker/go-metrics v0.0.0-20181218153428-b84716841b82 // indirect
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
github.com/dustin/go-humanize v1.0.0
Expand All @@ -37,15 +37,16 @@ require (
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.9.5
github.com/mitchellh/mapstructure v1.2.2
github.com/moby/term v0.0.0-20200915141129-7f0af18e79f2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pierrec/lz4/v4 v4.0.2-0.20200813132121-22f5d580d5c4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.11.1
github.com/prometheus/prometheus v1.8.2-0.20200819132913-cb830b0a9c78
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v1.8.2-0.20200923143134-7e2db3d092f3
github.com/segmentio/fasthash v1.0.2
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20200627165143-92b8a710ab6c
Expand All @@ -57,7 +58,7 @@ require (
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/net v0.0.0-20200822124328-c89045814202
google.golang.org/grpc v1.30.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
Expand Down
87 changes: 71 additions & 16 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *DefaultClient) doRequest(path, query string, quiet bool, out interface{
TLSConfig: c.TLSConfig,
}

client, err := config.NewClientFromConfig(clientConfig, "logcli", false)
client, err := config.NewClientFromConfig(clientConfig, "logcli", false, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func New(cfg Config, logger log.Logger) (Client, error) {
return nil, err
}

c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", false)
c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", false, false)
if err != nil {
return nil, err
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
Expand Down Expand Up @@ -594,18 +594,15 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string)
Source: "",
}

serviceConfig := sd_config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{
scrapeConfig := scrapeconfig.Config{
JobName: "",
PipelineStages: pipeline,
RelabelConfigs: nil,
Config: discovery.StaticConfig{
&targetGroup,
},
}

scrapeConfig := scrapeconfig.Config{
JobName: "",
PipelineStages: pipeline,
RelabelConfigs: nil,
ServiceDiscoveryConfig: serviceConfig,
}
cfg.ScrapeConfig = append(cfg.ScrapeConfig, scrapeConfig)

// Make sure the SyncPeriod is fast for test purposes, but not faster than the poll interval (250ms)
Expand Down
19 changes: 9 additions & 10 deletions pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@ package scrapeconfig

import (
"fmt"
"reflect"
"time"

"github.com/prometheus/common/model"
"github.com/weaveworks/common/server"

sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/relabel"

"github.com/grafana/loki/pkg/logentry/stages"
)

// Config describes a job to scrape.
type Config struct {
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
discovery.Config
}

// JournalTargetConfig describes systemd journal records to scrape.
Expand Down Expand Up @@ -90,7 +89,7 @@ var DefaultScrapeConfig = Config{
// HasServiceDiscoveryConfig checks to see if the service discovery used for
// file targets is non-zero.
func (c *Config) HasServiceDiscoveryConfig() bool {
return !reflect.DeepEqual(c.ServiceDiscoveryConfig, sd_config.ServiceDiscoveryConfig{})
return c.Config != nil
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down
21 changes: 11 additions & 10 deletions pkg/promtail/targets/file/filetargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -75,7 +74,7 @@ func NewFileTargetManager(
return nil, err
}

config := map[string]sd_config.ServiceDiscoveryConfig{}
configs := map[string]discovery.Configs{}
for _, cfg := range scrapeConfigs {
if !cfg.HasServiceDiscoveryConfig() {
continue
Expand All @@ -91,19 +90,21 @@ func NewFileTargetManager(
// within scrape pool. Also, default target label to localhost if target is not
// defined in promtail config.
// Just to make sure prometheus target group sync works fine.
for i, tg := range cfg.ServiceDiscoveryConfig.StaticConfigs {
tg.Source = fmt.Sprintf("%d", i)
if len(tg.Targets) == 0 {
tg.Targets = []model.LabelSet{
{model.AddressLabel: "localhost"},
if tgs, ok := cfg.Config.(discovery.StaticConfig); ok {
for i, tg := range tgs {
tg.Source = fmt.Sprintf("%d", i)
if len(tg.Targets) == 0 {
tg.Targets = []model.LabelSet{
{model.AddressLabel: "localhost"},
}
}
}
}

// Add an additional api-level node filtering, so we only fetch pod metadata for
// all the pods from the current node. Without this filtering we will have to
// download metadata for all pods running on a cluster, which may be a long operation.
for _, kube := range cfg.ServiceDiscoveryConfig.KubernetesSDConfigs {
if kube, ok := cfg.Config.(*kubernetes.SDConfig); ok {
if kube.Role == kubernetes.RolePod {
selector := fmt.Sprintf("%s=%s", kubernetesPodNodeField, hostname)
kube.Selectors = []kubernetes.SelectorConfig{
Expand All @@ -123,13 +124,13 @@ func NewFileTargetManager(
targetConfig: targetConfig,
}
tm.syncers[cfg.JobName] = s
config[cfg.JobName] = cfg.ServiceDiscoveryConfig
configs[cfg.JobName] = discovery.Configs{cfg.Config}
}

go tm.run()
go helpers.LogError("running target manager", tm.manager.Run)

return tm, tm.manager.ApplyConfig(config)
return tm, tm.manager.ApplyConfig(configs)
}

func (tm *FileTargetManager) run() {
Expand Down
20 changes: 10 additions & 10 deletions pkg/promtail/targets/stdin/stdin_target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/discovery"

"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
Expand All @@ -38,11 +37,9 @@ var (
// defaultStdInCfg is the default config for stdin target if none provided.
defaultStdInCfg = scrapeconfig.Config{
JobName: "stdin",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{
{Labels: model.LabelSet{"job": "stdin"}},
{Labels: model.LabelSet{"hostname": model.LabelValue(hostName)}},
},
Config: discovery.StaticConfig{
{Labels: model.LabelSet{"job": "stdin"}},
{Labels: model.LabelSet{"hostname": model.LabelValue(hostName)}},
},
}
)
Expand Down Expand Up @@ -108,11 +105,14 @@ func newReaderTarget(logger log.Logger, in io.Reader, client api.EntryHandler, c
return nil, err
}
lbs := model.LabelSet{}
for _, static := range cfg.ServiceDiscoveryConfig.StaticConfigs {
if static != nil && static.Labels != nil {
lbs = lbs.Merge(static.Labels)
if tgs, ok := cfg.Config.(discovery.StaticConfig); ok {
for _, static := range tgs {
if static != nil && static.Labels != nil {
lbs = lbs.Merge(static.Labels)
}
}
}

ctx, cancel := context.WithCancel(context.Background())
t := &readerTarget{
in: bufio.NewReaderSize(in, bufferSize),
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/manager/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func engineQueryFunc(engine *logql.Engine, delay time.Duration) rules.QueryFunc
}

// MultiTenantManagerAdapter will wrap a MultiTenantManager which validates loki rules
func MultiTenantManagerAdapter(mgr ruler.MultiTenantManager) *MultiTenantManager {
func MultiTenantManagerAdapter(mgr ruler.MultiTenantManager) ruler.MultiTenantManager {
return &MultiTenantManager{mgr}
}

Expand All @@ -84,13 +84,13 @@ func MemstoreTenantManager(
) ruler.ManagerFactory {
var metrics *Metrics

return func(
return ruler.ManagerFactory(func(
ctx context.Context,
userID string,
notifier *notifier.Manager,
logger log.Logger,
reg prometheus.Registerer,
) *rules.Manager {
) ruler.RulesManager {

// We'll ignore the passed registere and use the default registerer to avoid prefix issues and other weirdness.
// This closure prevents re-registering.
Expand Down Expand Up @@ -120,7 +120,7 @@ func MemstoreTenantManager(
memStore.Start(mgr)

return mgr
}
})
}

type GroupLoader struct{}
Expand Down
20 changes: 20 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
package ruler

import (
"time"

"github.com/cortexproject/cortex/pkg/ruler"
cRules "github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/manager"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

type Config struct {
ruler.Config `yaml:",inline"`
}

// Override the embedded cortex variant which expects a cortex limits struct. Instead copy the relevant bits over.
func (cfg *Config) Validate() error {
if err := cfg.StoreConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}
return nil
}

// Loki does not yet support shuffle sharding or per tenant evaluation delays, so implement what cortex expects.
type passthroughLimits struct{ Config }

func (cfg passthroughLimits) EvaluationDelay(_ string) time.Duration {
return cfg.Config.EvaluationDelay
}
func (passthroughLimits) RulerTenantShardSize(_ string) int { return 0 }

func NewRuler(cfg Config, engine *logql.Engine, reg prometheus.Registerer, logger log.Logger, ruleStore cRules.RuleStore) (*ruler.Ruler, error) {

mgr, err := ruler.NewDefaultMultiTenantManager(
Expand All @@ -34,6 +53,7 @@ func NewRuler(cfg Config, engine *logql.Engine, reg prometheus.Registerer, logge
reg,
logger,
ruleStore,
passthroughLimits{cfg},
)

}
6 changes: 4 additions & 2 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/grafana/loki/pkg/storage/stores/util"
)

const delimiter = "/"

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
Expand Down Expand Up @@ -96,15 +98,15 @@ func (c *Compactor) Run(ctx context.Context) error {
}
}()

_, dirs, err := c.objectClient.List(ctx, "")
_, dirs, err := c.objectClient.List(ctx, "", delimiter)
if err != nil {
status = statusFailure
return err
}

tables := make([]string, len(dirs))
for i, dir := range dirs {
tables[i] = strings.TrimSuffix(string(dir), "/")
tables[i] = strings.TrimSuffix(string(dir), delimiter)
}

for _, tableName := range tables {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O
}

func (t *table) compact() error {
objects, _, err := t.storageClient.List(t.ctx, t.name+"/")
// The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores.
// This is due to the object stores not having a concept of directories.
objects, _, err := t.storageClient.List(t.ctx, t.name+delimiter, delimiter)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 54c0c5f

Please sign in to comment.