Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail pull cloudflare logs #4813

Merged
merged 20 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.
* [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention
* [4813](https://github.com/grafana/loki/pull/4813) **cyriltovena**: Promtail: Adds the ability to pull logs from Cloudflare.

# 2.4.1 (2021/11/07)

Expand Down
17 changes: 17 additions & 0 deletions clients/cmd/promtail/promtail-cloudflare.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: cloudflare
cloudflare:
api_token: REDACTED
zone_id: REDACTED
labels:
job: cloudflare
19 changes: 14 additions & 5 deletions clients/pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import (
yaml "gopkg.in/yaml.v2"
)

const positionFileMode = 0600
const (
positionFileMode = 0600
cursorKeyPrefix = "cursor-"
journalKeyPrefix = "journal-"
)

// Config describes where to get position information from.
type Config struct {
Expand Down Expand Up @@ -176,14 +180,20 @@ func (p *positions) save() {
}
}

// CursorKey returns a key that can be saved as a cursor that is never deleted.
func CursorKey(key string) string {
return fmt.Sprintf("%s%s", cursorKeyPrefix, key)
}

func (p *positions) cleanup() {
p.mtx.Lock()
defer p.mtx.Unlock()
toRemove := []string{}
for k := range p.positions {
// If the position file is prefixed with journal, it's a
// JournalTarget cursor and not a file on disk.
if strings.HasPrefix(k, "journal-") {
// If the position file is prefixed with cursor, it's a
// cursor and not a file on disk.
// We still have to support journal files, so we keep the previous check to avoid breaking change.
if strings.HasPrefix(k, cursorKeyPrefix) || strings.HasPrefix(k, journalKeyPrefix) {
continue
}

Expand All @@ -204,7 +214,6 @@ func (p *positions) cleanup() {
}

func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

cleanfn := filepath.Clean(cfg.PositionsFile)
buf, err := ioutil.ReadFile(cleanfn)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"`
CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
}
Expand Down Expand Up @@ -309,6 +310,27 @@ type GelfTargetConfig struct {
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

type CloudflareConfig struct {
// APIToken is the API key for the Cloudflare account.
APIToken string `yaml:"api_token"`
// ZoneID is the ID of the zone to use.
ZoneID string `yaml:"zone_id"`
// Labels optionally holds labels to associate with each record read from cloudflare logs.
Labels model.LabelSet `yaml:"labels"`
// The amount of workers to use for parsing cloudflare logs. Default to 3.
Workers int `yaml:"workers"`
// The timerange to fetch for each pull request that will be spread across workers. Default 1m.
PullRange model.Duration `yaml:"pull_range"`
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
// Fields to fetch from cloudflare logs.
// Default to default fields.
// Available fields type:
// - default
// - minimal
// - extended
// - all
FieldsType string `yaml:"fields_type"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
Expand Down
37 changes: 37 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cloudflare

import (
"context"
"time"

"github.com/cloudflare/cloudflare-go"
)

// Client is a wrapper around the Cloudflare API that allow for testing and being zone/fields aware.
type Client interface {
LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error)
}

type wrappedClient struct {
client *cloudflare.API
zoneID string
fields []string
}

func (w *wrappedClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) {
return w.client.LogpullReceived(ctx, w.zoneID, start, end, cloudflare.LogpullReceivedOption{
Fields: w.fields,
})
}

var getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
c, err := cloudflare.NewWithAPIToken(apiKey)
if err != nil {
return nil, err
}
return &wrappedClient{
client: c,
zoneID: zoneID,
fields: fields,
}, nil
}
50 changes: 50 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cloudflare

import (
"fmt"
)

type FieldsType string

const (
FieldsTypeDefault FieldsType = "default"
FieldsTypeMinimal FieldsType = "minimal"
FieldsTypeExtended FieldsType = "extended"
FieldsTypeAll FieldsType = "all"
)

var (
defaultFields = []string{
"ClientIP", "ClientRequestHost", "ClientRequestMethod", "ClientRequestURI", "EdgeEndTimestamp", "EdgeResponseBytes",
"EdgeRequestHost", "EdgeResponseStatus", "EdgeStartTimestamp", "RayID",
}
minimalFields = append(defaultFields, []string{
"ZoneID", "ClientSSLProtocol", "ClientRequestProtocol", "ClientRequestPath", "ClientRequestUserAgent", "ClientRequestReferer",
"EdgeColoCode", "ClientCountry", "CacheCacheStatus", "CacheResponseStatus", "EdgeResponseContentType", "SecurityLevel",
"WAFAction", "WAFProfile", "WAFRuleID", "WAFRuleMessage", "EdgeRateLimitID", "EdgeRateLimitAction",
}...)
extendedFields = append(minimalFields, []string{
"ClientSSLCipher", "ClientASN", "ClientIPClass", "CacheResponseBytes", "EdgePathingOp", "EdgePathingSrc", "EdgePathingStatus", "ParentRayID",
"WorkerCPUTime", "WorkerStatus", "WorkerSubrequest", "WorkerSubrequestCount", "OriginIP", "OriginResponseStatus", "OriginSSLProtocol",
"OriginResponseHTTPExpires", "OriginResponseHTTPLastModified",
}...)
allFields = append(extendedFields, []string{
"ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources",
"FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID",
}...)
)

func Fields(t FieldsType) ([]string, error) {
switch t {
case FieldsTypeDefault:
return defaultFields, nil
case FieldsTypeMinimal:
return minimalFields, nil
case FieldsTypeExtended:
return extendedFields, nil
case FieldsTypeAll:
return allFields, nil
default:
return nil, fmt.Errorf("unknown fields type: %s", t)
}
}
38 changes: 38 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cloudflare

import "github.com/prometheus/client_golang/prometheus"

// Metrics holds a set of cloudflare metrics.
type Metrics struct {
reg prometheus.Registerer

Entries prometheus.Counter
LastEnd prometheus.Gauge
}

// NewMetrics creates a new set of cloudflare metrics. If reg is non-nil, the
// metrics will be registered.
func NewMetrics(reg prometheus.Registerer) *Metrics {
var m Metrics
m.reg = reg

m.Entries = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "cloudflare_target_entries_total",
Help: "Total number of successful entries sent via the cloudflare target",
})
m.LastEnd = prometheus.NewGauge(prometheus.GaugeOpts{
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
Namespace: "promtail",
Name: "cloudflare_target_last_requested_end_timestamp",
Help: "The last cloudflare request end timestamp fetched. This allows to calculate how far the target is behind.",
})

if reg != nil {
reg.MustRegister(
m.Entries,
m.LastEnd,
)
}

return &m
}
Loading