Skip to content

Commit

Permalink
flush boltdb to object store (#1837)
Browse files Browse the repository at this point in the history
* flush boltdb to object store

files are stored in folder per periodic table and are named after ingester
flushed every 15 mins to make index available to other services
files are also flushed before ingester stops to avoid any data loss
new stores can be implemented easily
ingester to also query store when using boltdb

* persisting uploader name across restarts, detecting objectstore type from periodic config, other refactorings

* updated cli flag for active directory

* add tests for boltdb shipper and update vendor

* syncing boltdb files to disk during update

* sleep in tests to let mtime of boltdb file be changed

* add documentation for boltdb shipper and remove unwanted config
  • Loading branch information
sandeepsukhani authored Apr 20, 2020
1 parent 8b924a5 commit fad3b61
Show file tree
Hide file tree
Showing 20 changed files with 1,436 additions and 56 deletions.
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ simplifies the operation and significantly lowers the cost of Loki.
4. [Storage](operations/storage/README.md)
1. [Table Manager](operations/storage/table-manager.md)
2. [Retention](operations/storage/retention.md)
3. [BoltDB Shipper](operations/storage/boltdb-shipper.md)
5. [Multi-tenancy](operations/multi-tenancy.md)
6. [Loki Canary](operations/loki-canary.md)
8. [HTTP API](api.md)
Expand Down
86 changes: 86 additions & 0 deletions docs/operations/storage/boltdb-shipper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Loki with BoltDB Shipper

:warning: BoltDB Shipper is still an experimental feature. It is not recommended to be used in production environments.

BoltDB Shipper lets you run Loki without any dependency on NoSQL stores for storing index.
It locally stores the index in BoltDB files instead and keeps shipping those files to a shared object store i.e the same object store which is being used for storing chunks.
It also keeps syncing BoltDB files from shared object store to a configured local directory for getting index entries created by other services of same Loki cluster.
This helps run Loki with one less dependency and also saves costs in storage since object stores are likely to be much cheaper compared to cost of a hosted NoSQL store or running a self hosted instance of Cassandra.

## Example Configuration

Example configuration with GCS:

```yaml
schema_config:
configs:
- from: 2018-04-15
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: loki_index_
period: 168h

storage_config:
gcs:
bucket_name: GCS_BUCKET_NAME

boltdb_shipper_config:
active_index_directory: /loki/index
cache_location: /loki/boltdb-cache
```
This would run Loki with BoltDB Shipper storing BoltDB files locally at `/loki/index` and chunks at configured `GCS_BUCKET_NAME`.
It would also keep shipping BoltDB files periodically to same configured bucket.
It would also keep downloading BoltDB files from shared bucket uploaded by other ingesters to `/loki/boltdb-cache` folder locally.

## Operational Details

Loki can be configured to run as just a single vertically scaled instance or as a cluster of horizontally scaled single binary(running all Loki services) instances or in micro-services mode running just one of the services in each instance.
When it comes to reads and writes, Ingesters are the ones which writes the index and chunks to stores and Queriers are the ones which reads index and chunks from the store for serving requests.

Before we get into more details, it is important to understand how Loki manages index in stores. Loki shards index as per configured period which defaults to 7 days i.e when it comes to table based stores like Bigtable/Cassandra/DynamoDB there would be separate table per week containing index for that week.
In case of BoltDB files there is no concept of tables so it creates a BoltDB file per week. Files/Tables created per week are identified by a configured `prefix_` + `<period-number-since-epoch>`.
Here `<period-number-since-epoch>` in case of default config would be week number since epoch.
For example, if you have prefix set to `loki_index_` and a write requests comes in on 20th April 2020, it would be stored in table/file named `loki_index_2624` because it has been `2623` weeks since epoch and we are in `2624`th week.
Since sharding of index creates multiple files when using BoltDB, BoltDB Shipper would create a folder per week and add files for that week in that folder and names those files after ingesters which created them.

To show how BoltDB files in shared object store would look like, let us consider 2 ingesters named `ingester-0` and `ingester-1` running in a Loki cluster and
they both having shipped files for week `2623` and `2624` with prefix `loki_index_`, here is how the files would look like:

```
└── index
├── loki_index_2623
│ ├── ingester-0
│ └── ingester-1
└── loki_index_2624
├── ingester-0
└── ingester-1
```
*NOTE: We also add a timestamp to names of the files to randomize the names to avoid overwriting files when running Ingesters with same name and not have a persistent storage. Timestamps not shown here for simplification*

Let us talk about more in depth about how both Ingesters and Queriers work when running them with BoltDB Shipper.

### Ingesters

Ingesters keep writing the index to BoltDB files in `active_index_directory` and BoltDB Shipper keeps looking for new and updated files in that directory every 15 Minutes to upload them to the shared object store.
When running Loki in clustered mode there could be multiple ingesters serving write requests hence each of them generating BoltDB files locally.

*NOTE: To avoid any loss of index when Ingester crashes it is recommended to run Ingesters as statefulset(when using k8s) with a persistent storage for storing index files.*

Another important detail to note is when chunks are flushed they are available for reads in object store instantly while index is not since we only upload them every 15 Minutes with BoltDB shipper.
To avoid missing logs from queries which happen to be indexed in BoltDB files which are not shipped yet, while serving queries for in-memory logs, Ingesters would also do a store query for `now()` - (`max_chunk_age` + `30 Min`) to `<end-time-from-query-request>`.

### Queriers

Queriers lazily loads BoltDB files from shared object store to configured `cache_location`.
When a querier receives a read request, query range from request is resolved to period numbers and all the files for those period numbers are downloaded to `cache_location` if not already.
Once we have downloaded files for a period we keep looking for updates in shared object store and download them every 15 Minutes by default.
Frequency for checking updates can be configured with `resync_interval` config.

To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location.
ttl can be configured using `cache_ttl` config.



1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
google.golang.org/grpc v1.25.1
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/logql"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
Expand Down Expand Up @@ -223,7 +225,7 @@ func (s *testStore) IsLocal() bool {
return false
}

func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
func (s *testStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
return nil, nil
}

Expand Down
61 changes: 59 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -63,6 +67,9 @@ type Config struct {

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)

QueryStore bool `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"`
}

// RegisterFlags registers the flags.
Expand Down Expand Up @@ -113,6 +120,7 @@ type Ingester struct {
// ChunkStore is the interface we need to store chunks.
type ChunkStore interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error)
}

// New makes a new Ingester.
Expand Down Expand Up @@ -241,13 +249,35 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {

// Query the ingests for log streams matching a set of matchers.
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
instanceID, err := user.ExtractOrgID(queryServer.Context())
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)

instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}

instance := i.getOrCreateInstance(instanceID)
return instance.Query(req, queryServer)
itrs, err := instance.Query(ctx, req)
if err != nil {
return err
}

if storeReq := buildStoreRequest(i.cfg, req); storeReq != nil {
storeItr, err := i.store.LazyQuery(ctx, logql.SelectParams{QueryRequest: storeReq})
if err != nil {
return err
}

itrs = append(itrs, storeItr)
}

heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)

defer helpers.LogError("closing iterator", heapItr.Close)

return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit)
}

// Label returns the set of labels for the stream this ingester knows about.
Expand Down Expand Up @@ -356,3 +386,30 @@ func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRe

return &resp, nil
}

// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration.
// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure
// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query.
func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRequest {
if !cfg.QueryStore {
return nil
}
start := req.Start
end := req.End
if cfg.QueryStoreMaxLookBackPeriod != 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(req.Start) {
start = oldestStartTime
}
}

if start.After(end) {
return nil
}

newRequest := *req
newRequest.Start = start
newRequest.End = end

return &newRequest
}
81 changes: 81 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -247,6 +250,10 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
return nil
}

func (s *mockStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
return nil, nil
}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
Expand All @@ -269,3 +276,77 @@ func defaultLimitsTestConfig() validation.Limits {
flagext.DefaultValues(&limits)
return limits
}

func TestIngester_buildStoreRequest(t *testing.T) {
ingesterQueryRequest := logproto.QueryRequest{
Selector: `{foo="bar"}`,
Limit: 100,
}

now := time.Now()

for _, tc := range []struct {
name string
queryStore bool
maxLookBackPeriod time.Duration
ingesterQueryRequest *logproto.QueryRequest
expectedStoreQueryRequest *logproto.QueryRequest
}{
{
name: "do not query store",
queryStore: false,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Minute), now),
expectedStoreQueryRequest: nil,
},
{
name: "query store with max look back covering whole request duration",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now),
expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now),
},
{
name: "query store with max look back covering partial request duration",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-2*time.Hour), now),
expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Hour), now),
},
{
name: "query store with max look back not covering request duration at all",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-4*time.Hour), now.Add(-2*time.Hour)),
expectedStoreQueryRequest: nil,
},
} {
t.Run(tc.name, func(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.QueryStore = tc.queryStore
ingesterConfig.QueryStoreMaxLookBackPeriod = tc.maxLookBackPeriod
storeRequest := buildStoreRequest(ingesterConfig, tc.ingesterQueryRequest)
if tc.expectedStoreQueryRequest == nil {
require.Nil(t, storeRequest)
return
}

// because start time of store could be changed and built based on time when function is called we can't predict expected start time.
// So allowing upto 1s difference between expected and actual start time of store query request.
require.Equal(t, tc.expectedStoreQueryRequest.Selector, storeRequest.Selector)
require.Equal(t, tc.expectedStoreQueryRequest.Limit, storeRequest.Limit)
require.Equal(t, tc.expectedStoreQueryRequest.End, storeRequest.End)

if storeRequest.Start.Sub(tc.expectedStoreQueryRequest.Start) > time.Second {
t.Fatalf("expected upto 1s difference in expected and acutal store request end time but got %d", storeRequest.End.Sub(tc.expectedStoreQueryRequest.End))
}
})
}
}

func recreateRequestWithTime(req logproto.QueryRequest, start, end time.Time) *logproto.QueryRequest {
newReq := req
newReq.Start = start
newReq.End = end

return &newReq
}
17 changes: 5 additions & 12 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,14 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
return s.labels
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)

func (i *instance) Query(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
return nil, err
}
filter, err := expr.Filter()
if err != nil {
return err
return nil, err
}

ingStats := stats.GetIngesterData(ctx)
Expand All @@ -215,13 +211,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
},
)
if err != nil {
return err
return nil, err
}

iter := iter.NewHeapIterator(ctx, iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)

return sendBatches(ctx, iter, queryServer, req.Limit)
return iters, nil
}

func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/logql/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ func GetStoreData(ctx context.Context) *StoreData {

// Snapshot compute query statistics from a context using the total exec time.
func Snapshot(ctx context.Context, execTime time.Duration) Result {
var res Result
// ingester data is decoded from grpc trailers.
res.Ingester = decodeTrailers(ctx)
res := decodeTrailers(ctx)
// collect data from store.
s, ok := ctx.Value(storeKey).(*StoreData)
if ok {
Expand Down
Loading

0 comments on commit fad3b61

Please sign in to comment.