Skip to content

Commit

Permalink
clients/promtail: Add ndjson and plaintext formats to loki_push (#4336)
Browse files Browse the repository at this point in the history
* feat(clients/promtail): add ndjson,plaintext formats to loki_push

* feat(clients/promtail): add ndjson,plaintext formats to loki_push

* feat(clients/promtail): documentation

* Address review comments

* fmt

* fix missing port
  • Loading branch information
ldb authored Sep 29, 2021
1 parent 05bb3ce commit 0a66b89
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 14 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ $ ./loki -config.file=./cmd/loki/loki-local-config.yaml
To build Promtail on non-Linux platforms, use the following command:

```bash
$ go build ./cmd/promtail
$ go build ./clients/cmd/promtail
```

On Linux, Promtail requires the systemd headers to be installed for
Expand All @@ -120,21 +120,21 @@ With Journal support on Ubuntu, run with the following commands:

```bash
$ sudo apt install -y libsystemd-dev
$ go build ./cmd/promtail
$ go build ./clients/cmd/promtail
```

With Journal support on CentOS, run with the following commands:

```bash
$ sudo yum install -y systemd-devel
$ go build ./cmd/promtail
$ go build ./clients/cmd/promtail
```

Otherwise, to build Promtail without Journal support, run `go build`
with CGO disabled:

```bash
$ CGO_ENABLED=0 go build ./cmd/promtail
$ CGO_ENABLED=0 go build ./clients/cmd/promtail
```

## License
Expand Down
41 changes: 39 additions & 2 deletions clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package lokipush

import (
"bufio"
"flag"
"io"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -92,7 +94,8 @@ func (t *PushTarget) run() error {
}

t.server = srv
t.server.HTTP.Handle("/loki/api/v1/push", http.HandlerFunc(t.handle))
t.server.HTTP.Handle("/loki/api/v1/push", http.HandlerFunc(t.handleLoki))
t.server.HTTP.Handle("/promtail/api/v1/raw", http.HandlerFunc(t.handlePlaintext))

go func() {
err := srv.Run()
Expand All @@ -104,7 +107,7 @@ func (t *PushTarget) run() error {
return nil
}

func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil)
Expand Down Expand Up @@ -170,6 +173,40 @@ func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// handlePlaintext handles newline delimited input such as plaintext or NDJSON.
func (t *PushTarget) handlePlaintext(w http.ResponseWriter, r *http.Request) {
entries := t.handler.Chan()
defer r.Body.Close()
body := bufio.NewReader(r.Body)
for {
line, err := body.ReadString('\n')
if err != nil && err != io.EOF {
level.Warn(t.logger).Log("msg", "failed to read incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
if err == io.EOF {
break
}
continue
}
entries <- api.Entry{
Labels: t.Labels().Clone(),
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: line,
},
}
if err == io.EOF {
break
}
}

w.WriteHeader(http.StatusNoContent)
}

// Type returns PushTargetType.
func (t *PushTarget) Type() target.TargetType {
return target.PushTargetType
Expand Down
87 changes: 82 additions & 5 deletions clients/pkg/promtail/targets/lokipush/pushtarget_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package lokipush

import (
"bytes"
"flag"
"fmt"
"net"
"net/http"
"os"
"strconv"
"testing"
Expand All @@ -24,7 +27,9 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

func TestPushTarget(t *testing.T) {
const localhost = "127.0.0.1"

func TestLokiPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

Expand All @@ -33,7 +38,7 @@ func TestPushTarget(t *testing.T) {
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
require.NoError(t, err)
l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)
Expand All @@ -44,9 +49,9 @@ func TestPushTarget(t *testing.T) {
// Adjust some of the defaults
defaults := server.Config{}
defaults.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError))
defaults.HTTPListenAddress = "127.0.0.1"
defaults.HTTPListenAddress = localhost
defaults.HTTPListenPort = port
defaults.GRPCListenAddress = "127.0.0.1"
defaults.GRPCListenAddress = localhost
defaults.GRPCListenPort = 0 // Not testing GRPC, a random port will be assigned

config := &scrapeconfig.PushTargetConfig{
Expand All @@ -70,7 +75,7 @@ func TestPushTarget(t *testing.T) {

// Build a client to send logs
serverURL := flagext.URLValue{}
err = serverURL.Set("http://127.0.0.1:" + strconv.Itoa(port) + "/loki/api/v1/push")
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/loki/api/v1/push")
require.NoError(t, err)

ccfg := client.Config{
Expand Down Expand Up @@ -122,3 +127,75 @@ func TestPushTarget(t *testing.T) {
_ = pt.Stop()

}

func TestPlaintextPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

//Create PushTarget
eh := fake.New(func() {})
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
require.NoError(t, err)
l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)
port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
require.NoError(t, err)

// Adjust some of the defaults
defaults := server.Config{}
defaults.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError))
defaults.HTTPListenAddress = localhost
defaults.HTTPListenPort = port
defaults.GRPCListenAddress = localhost
defaults.GRPCListenPort = 0 // Not testing GRPC, a random port will be assigned

config := &scrapeconfig.PushTargetConfig{
Server: defaults,
Labels: model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
},
KeepTimestamp: true,
}

pt, err := NewPushTarget(logger, eh, []*relabel.Config{}, "job2", config)
require.NoError(t, err)

// Send some logs
ts := time.Now()
body := new(bytes.Buffer)
for i := 0; i < 100; i++ {
body.WriteString("line" + strconv.Itoa(i))
_, err := http.Post(fmt.Sprintf("http://%s:%d/promtail/api/v1/raw", localhost, port), "text/json", body)
require.NoError(t, err)
body.Reset()
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
}
// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.
require.GreaterOrEqual(t, ts.Unix(), eh.Received()[99].Timestamp.Unix())

_ = pt.Stop()

}
8 changes: 5 additions & 3 deletions docs/sources/clients/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,10 @@ The `loki_push_api` block configures Promtail to expose a [Loki push API](../../

Each job configured with a `loki_push_api` will expose this API and will require a separate port.

Note the `server` configuration is the same as [server](#server)

Note the `server` configuration is the same as [server](#server).

Promtail also exposes a second endpoint on `/promtail/api/v1/raw` which expects newline-delimited log lines.
This can be used to send NDJSON or plaintext logs.

```yaml
# The push server configuration options
Expand All @@ -804,7 +805,8 @@ labels:
[ <labelname>: <labelvalue> ... ]
# If Promtail should pass on the timestamp from the incoming log or not.
# When false Promtail will assign the current timestamp to the log when it was processed
# When false Promtail will assign the current timestamp to the log when it was processed.
# Does not apply to the plaintext endpoint on `/promtail/api/v1/raw`.
[use_incoming_timestamp: <bool> | default = false]
```
Expand Down

0 comments on commit 0a66b89

Please sign in to comment.