Skip to content

Commit

Permalink
Use iterators for in-process Series calls
Browse files Browse the repository at this point in the history
The TSDBStore has two implementations of Series. One uses a goroutine
and the other one buffers series in memory. Both are used for different
use cases and trade-off CPU and memory according to the use.

In order to reconcile these two approaches, we can use an iterator
which relies on coroutines that have a much lower overhead than goroutines.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Oct 5, 2024
1 parent f265c3b commit 3dc7b18
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 296 deletions.
4 changes: 2 additions & 2 deletions Dockerfile.e2e-tests
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Taking a non-alpine image for e2e tests so that cgo can be enabled for the race detector.
FROM golang:1.22 as builder
FROM golang:1.23 as builder

WORKDIR $GOPATH/src/github.com/thanos-io/thanos

Expand All @@ -8,7 +8,7 @@ COPY . $GOPATH/src/github.com/thanos-io/thanos
RUN CGO_ENABLED=1 go build -o $GOBIN/thanos -race ./cmd/thanos
# -----------------------------------------------------------------------------

FROM golang:1.22
FROM golang:1.23
LABEL maintainer="The Thanos Authors"

COPY --from=builder $GOBIN/thanos /bin/thanos
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/thanos-io/thanos

go 1.22.0

toolchain go1.22.5
go 1.23.0

require (
cloud.google.com/go/storage v1.40.0 // indirect
Expand Down
63 changes: 6 additions & 57 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package receive
import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -21,8 +20,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
gmetadata "google.golang.org/grpc/metadata"

"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -127,61 +124,19 @@ func NewMultiTSDB(

type localClient struct {
store *store.TSDBStore
}

type seriesClientMapper struct {
ctx context.Context
series []*storepb.Series

initiated bool

store *store.TSDBStore
req storepb.SeriesRequest
client storepb.StoreClient
}

func (m *seriesClientMapper) Recv() (*storepb.SeriesResponse, error) {
if !m.initiated {
series, err := m.store.SeriesLocal(m.ctx, &m.req)
if err != nil {
return nil, err
}
m.series = series
m.initiated = true
}
if len(m.series) == 0 {
return nil, io.EOF
func newLocalClient(store *store.TSDBStore) *localClient {
return &localClient{
store: store,
client: storepb.ServerAsClient(store),
}
s := m.series[0]
m.series = m.series[1:]
return storepb.NewSeriesResponse(s), nil
}

func (m *seriesClientMapper) Header() (gmetadata.MD, error) {
return nil, nil
}

func (m *seriesClientMapper) Trailer() gmetadata.MD {
return nil
}

func (m *seriesClientMapper) CloseSend() error {
return nil
}

func (m *seriesClientMapper) Context() context.Context {
return m.ctx
}

func (m *seriesClientMapper) RecvMsg(_ interface{}) error {
return nil
}

func (m *seriesClientMapper) SendMsg(_ interface{}) error {
return nil
}

func (l *localClient) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storepb.Store_SeriesClient, error) {
return &seriesClientMapper{ctx: ctx, store: l.store, req: *in}, nil
return l.client.Series(ctx, in, opts...)
}

func (l *localClient) LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
Expand All @@ -192,12 +147,6 @@ func (l *localClient) LabelValues(ctx context.Context, in *storepb.LabelValuesRe
return l.store.LabelValues(ctx, in)
}

func newLocalClient(store *store.TSDBStore) *localClient {
return &localClient{
store: store,
}
}

func (l *localClient) Matches(matchers []*labels.Matcher) bool {
return l.store.Matches(matchers)
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/store/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package store

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -96,13 +97,20 @@ func TestRateLimitedServer(t *testing.T) {
defer cancel()

store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits)
seriesServer := storepb.NewInProcessStream(ctx, 10)
err := store.Series(&storepb.SeriesRequest{}, seriesServer)
if test.err == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, test.err == err.Error(), "want %s, got %s", test.err, err.Error())
client := storepb.ServerAsClient(store)
seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{})
testutil.Ok(t, err)
for {
_, err = seriesClient.Recv()
if err == io.EOF {
break
}
if test.err == "" {
testutil.Ok(t, err)
} else {
testutil.NotOk(t, err)
testutil.Assert(t, test.err == err.Error(), "want %s, got %s", test.err, err.Error())
}
}
})
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/store/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package store

import (
"context"
"io"
"testing"

"github.com/efficientgo/core/testutil"
Expand All @@ -19,9 +20,17 @@ func TestRecoverableServer(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv := storepb.NewInProcessStream(ctx, 1)

testutil.Ok(t, store.Series(&storepb.SeriesRequest{}, srv))
client := storepb.ServerAsClient(store)
seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{})
testutil.Ok(t, err)

for {
_, err := seriesClient.Recv()
if err == io.EOF {
break
}
testutil.Ok(t, err)
}
}

type panicStoreServer struct {
Expand Down
122 changes: 61 additions & 61 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,95 +6,95 @@ package storepb
import (
"context"
"io"
"iter"

"google.golang.org/grpc"
)

func ServerAsClient(srv StoreServer) StoreClient {
return &serverAsClient{srv: srv}
type inProcessServer struct {
Store_SeriesServer
ctx context.Context
yield func(response *SeriesResponse, err error) bool
}

// serverAsClient allows to use servers as clients.
// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private).
type serverAsClient struct {
srv StoreServer
func newInProcessServer(ctx context.Context, yield func(*SeriesResponse, error) bool) *inProcessServer {
return &inProcessServer{
ctx: ctx,
yield: yield,
}
}

func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) {
return s.srv.LabelNames(ctx, in)
func (s *inProcessServer) Send(resp *SeriesResponse) error {
s.yield(resp, nil)
return nil
}

func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, _ ...grpc.CallOption) (*LabelValuesResponse, error) {
return s.srv.LabelValues(ctx, in)
func (s *inProcessServer) Context() context.Context {
return s.ctx
}

func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) {
inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
if err := s.srv.Series(in, inSrv); err != nil {
inSrv.err <- err
}
close(inSrv.err)
close(inSrv.recv)
}()
return &inProcessClientStream{srv: inSrv}, nil
type inProcessClient struct {
Store_SeriesClient
ctx context.Context
next func() (*SeriesResponse, error, bool)
stop func()
sync chan struct{}
}

// TODO(bwplotka): Add streaming attributes, metadata etc. Currently those are disconnected. Follow up on https://github.com/grpc/grpc-go/issues/906.
// TODO(bwplotka): Use this in proxy.go and receiver multi tenant proxy.
type inProcessStream struct {
grpc.ServerStream

ctx context.Context
cancel context.CancelFunc
recv chan *SeriesResponse
err chan error
func newInProcessClient(ctx context.Context, next func() (*SeriesResponse, error, bool), stop func()) *inProcessClient {
return &inProcessClient{
ctx: ctx,
next: next,
stop: stop,
}
}

func NewInProcessStream(ctx context.Context, bufferSize int) *inProcessStream {
return &inProcessStream{
ctx: ctx,
recv: make(chan *SeriesResponse, bufferSize),
err: make(chan error),
func (c *inProcessClient) Recv() (*SeriesResponse, error) {
resp, err, ok := c.next()
if !ok {
return nil, io.EOF
}
return resp, err
}

func (s *inProcessStream) Context() context.Context { return s.ctx }
func (s *inProcessClient) Context() context.Context {
return s.ctx
}

func (s *inProcessStream) Send(r *SeriesResponse) error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- r:
return nil
}
func (s *inProcessClient) CloseSend() error {
s.stop()
return nil
}

type inProcessClientStream struct {
grpc.ClientStream
func ServerAsClient(srv StoreServer) StoreClient {
return &serverAsClient{srv: srv}
}

srv *inProcessStream
// serverAsClient allows to use servers as clients.
// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private).
type serverAsClient struct {
srv StoreServer
ctx context.Context
}

func (s *inProcessClientStream) Context() context.Context { return s.srv.ctx }
func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) {
return s.srv.LabelNames(ctx, in)
}

func (s *inProcessClientStream) CloseSend() error {
s.srv.cancel()
return nil
func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, _ ...grpc.CallOption) (*LabelValuesResponse, error) {
return s.srv.LabelValues(ctx, in)
}

func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err, ok := <-s.srv.err:
if !ok {
return nil, io.EOF
func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) {
var srvIter iter.Seq2[*SeriesResponse, error] = func(yield func(*SeriesResponse, error) bool) {
srv := newInProcessServer(ctx, yield)
err := s.srv.Series(in, srv)
if err != nil {
yield(nil, err)
return
}
return nil, err
}

clientIter, stop := iter.Pull2(srvIter)
return newInProcessClient(ctx, clientIter, stop), nil
}
Loading

0 comments on commit 3dc7b18

Please sign in to comment.