Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The HTTP API includes the following endpoints:
- [`POST /loki/api/v1/push`](#post-lokiapiv1push)
- [`GET /api/prom/tail`](#get-apipromtail)
- [`GET /api/prom/query`](#get-apipromquery)
- [`POST /api/prom/push`](#post-apiprompush)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
- [`GET /metrics`](#get-metrics)
Expand Down Expand Up @@ -445,8 +446,6 @@ Response (streamed):

## `POST /loki/api/v1/push`

Alias (DEPRECATED): `POST /api/prom/push`

`/loki/api/v1/push` is the endpoint used to send log entries to Loki. The default
behavior is for the POST body to be a snappy-compressed protobuf messsage:

Expand All @@ -460,12 +459,12 @@ JSON post body can be sent in the following format:
{
"streams": [
{
"labels": "<LogQL label key-value pairs>",
"entries": [
{
"ts": "<RFC3339Nano string>",
"line": "<log line>"
}
"stream": {
"label": "value"
},
"values": [
[ "<unix epoch in nanoseconds>", "<log line>" ],
[ "<unix epoch in nanoseconds>", "<log line>" ]
]
}
]
Expand All @@ -482,8 +481,8 @@ In microservices mode, `/loki/api/v1/push` is exposed by the distributor.
### Examples

```bash
$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
$ curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}'
```

## `GET /api/prom/tail`
Expand Down Expand Up @@ -535,8 +534,6 @@ and `Labels` instead of `labels` and `ts` like in the entries for the stream.
As the response is streamed, the object defined by the response format above
will be sent over the WebSocket multiple times.



## `GET /api/prom/query`

> **WARNING**: `/api/prom/query` is DEPRECATED; use `/loki/api/v1/query_range`
Expand Down Expand Up @@ -607,6 +604,50 @@ $ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/lok
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
```

## `POST /api/prom/push`

> **WARNING**: `/api/prom/push` is DEPRECATED; use `/loki/api/v1/push`
> instead.

`/api/prom/push` is the endpoint used to send log entries to Loki. The default
behavior is for the POST body to be a snappy-compressed protobuf messsage:

- [Protobuf definition](/pkg/logproto/logproto.proto)
- [Go client library](/pkg/promtail/client/client.go)

Alternatively, if the `Content-Type` header is set to `application/json`, a
JSON post body can be sent in the following format:

```
{
"streams": [
{
"labels": "<LogQL label key-value pairs>",
"entries": [
{
"ts": "<RFC3339Nano string>",
"line": "<log line>"
}
]
}
]
}
```

> **NOTE**: logs sent to Loki for every stream must be in timestamp-ascending
> order, meaning each log line must be more recent than the one last received.
> If logs do not follow this order, Loki will reject the log with an out of
> order error.

In microservices mode, `/api/prom/push` is exposed by the distributor.

### Examples

```bash
$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/api/prom/push" --data-raw \
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
```

## `GET /ready`

`/ready` returns HTTP 200 when the Loki ingester is ready to accept traffic. If
Expand Down
2 changes: 1 addition & 1 deletion fluentd/fluent-plugin-grafana-loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ services:
## Configuration

### url
The url of the Loki server to send logs to. When sending data the publish path (`/loki/api/v1/push`) will automatically be appended.
The url of the Loki server to send logs to. When sending data the publish path (`/api/prom/push`) will automatically be appended.
By default the url is set to `https://logs-us-west1.grafana.net`, the url of the Grafana Labs preview (hosted Loki)[https://grafana.com/loki] service.

#### Proxy Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.1.0'
spec.version = '1.0.2'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def write(chunk)
body = { 'streams': payload }

# add ingest path to loki url
uri = URI.parse(url + '/loki/api/v1/push')
uri = URI.parse(url + '/api/prom/push')

req = Net::HTTP::Post.new(
uri.request_uri
Expand Down
14 changes: 12 additions & 2 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package distributor

import (
"encoding/json"
"net/http"

"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
Expand All @@ -21,7 +23,15 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

switch r.Header.Get(contentType) {
case applicationJSON:
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
}

if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type QueryResponse struct {
Data QueryResponseData `json:"data"`
}

// PushRequest models a log stream push
type PushRequest struct {
Streams []*Stream `json:"streams"`
}

// ResultType holds the type of the result
type ResultType string

Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
return json.NewDecoder(b).Decode(r)
}
67 changes: 67 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package unmarshal

import (
"io/ioutil"
"log"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /api/prom/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams":[
{
"labels":"{test=\"test\"}",
"entries":[
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
}
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

func mustParse(l string, t string) time.Time {
ret, err := time.Parse(l, t)
if err != nil {
log.Fatalf("Failed to parse %s", t)
}

return ret
}
60 changes: 60 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/loghttp"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
var request loghttp.PushRequest

err := json.NewDecoder(b).Decode(&request)

if err != nil {
return err
}

*r = NewPushRequest(request)

return nil
}

// NewPushRequest constructs a logproto.PushRequest from a PushRequest
func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {
ret := logproto.PushRequest{
Streams: make([]*logproto.Stream, len(r.Streams)),
}

for i, s := range r.Streams {
ret.Streams[i] = NewStream(s)
}

return ret
}

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) *logproto.Stream {
ret := &logproto.Stream{
Entries: make([]logproto.Entry, len(s.Entries)),
Labels: s.Labels.String(),
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
}

return ret
}

// NewEntry constructs a logproto.Entry from a Entry
func NewEntry(e loghttp.Entry) logproto.Entry {
return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
56 changes: 56 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package unmarshal

import (
"io/ioutil"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /loki/api/v1/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}