Skip to content

Commit

Permalink
ingester: Add test for chunk transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Jul 23, 2019
1 parent 2eb666b commit f8155a1
Showing 1 changed file with 210 additions and 0 deletions.
210 changes: 210 additions & 0 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package ingester

import (
"fmt"
"io"
"io/ioutil"
"sort"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"golang.org/x/net/context"
)

func TestTransferOut(t *testing.T) {
f := newTestIngesterFactory(t)

ing := f.getIngester(time.Duration(0))

// Push some data into our original ingester
ctx := user.InjectOrgID(context.Background(), "test")
_, err := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "line 0", Timestamp: time.Unix(0, 0)},
{Line: "line 1", Timestamp: time.Unix(1, 0)},
},
Labels: `{foo="bar",bar="baz1"}`,
},
{
Entries: []logproto.Entry{
{Line: "line 2", Timestamp: time.Unix(2, 0)},
{Line: "line 3", Timestamp: time.Unix(3, 0)},
},
Labels: `{foo="bar",bar="baz2"}`,
},
},
})
require.NoError(t, err)

assert.Len(t, ing.instances, 1)
if assert.Contains(t, ing.instances, "test") {
assert.Len(t, ing.instances["test"].streams, 2)
}

// Create a new ingester and trasfer data to it
ing2 := f.getIngester(time.Second * 60)
ing.Shutdown()

assert.Len(t, ing2.instances, 1)
if assert.Contains(t, ing2.instances, "test") {
assert.Len(t, ing2.instances["test"].streams, 2)

lines := []string{}

// Get all the lines back and make sure the blocks transferred successfully
for _, stream := range ing2.instances["test"].streams {
it, err := stream.Iterator(
time.Unix(0, 0),
time.Unix(10, 0),
logproto.FORWARD,
func([]byte) bool { return true },
)
if !assert.NoError(t, err) {
continue
}

for it.Next() {
entry := it.Entry()
lines = append(lines, entry.Line)
}
}

sort.Strings(lines)

assert.Equal(
t,
[]string{"line 0", "line 1", "line 2", "line 3"},
lines,
)
}
}

type testIngesterFactory struct {
t *testing.T
store ring.KVClient
n int
ingesters map[string]*Ingester
}

func newTestIngesterFactory(t *testing.T) *testIngesterFactory {
return &testIngesterFactory{
t: t,
store: ring.NewInMemoryKVClient(ring.ProtoCodec{Factory: ring.ProtoDescFactory}),
ingesters: make(map[string]*Ingester),
}
}

func (f *testIngesterFactory) getIngester(joinAfter time.Duration) *Ingester {
f.n++

cfg := defaultIngesterTestConfig()
cfg.MaxTransferRetries = 1
cfg.LifecyclerConfig.ClaimOnRollout = true
cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n)
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store
cfg.LifecyclerConfig.JoinAfter = joinAfter
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID

cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) {
ingester, ok := f.ingesters[addr]
if !ok {
return nil, fmt.Errorf("no ingester %s", addr)
}

return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
PusherClient: nil,
QuerierClient: nil,
IngesterClient: &testIngesterClient{t: f.t, i: ingester},
Closer: ioutil.NopCloser(nil),
}, nil
}

_, ing := newTestStore(f.t, cfg)
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing

// NB there's some kind of race condition with the in-memory KV client when
// we don't give the ingester a little bit of time to initialize. a 100ms
// wait time seems effective.
time.Sleep(time.Millisecond * 100)
return ing
}

type testIngesterClient struct {
t *testing.T
i *Ingester
}

func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) {
chunkCh := make(chan *logproto.TimeSeriesChunk)
respCh := make(chan *logproto.TransferChunksResponse)

client := &testTransferChunksClient{ch: chunkCh, resp: respCh}
go func() {
server := &testTransferChunksServer{ch: chunkCh, resp: respCh}
err := c.i.TransferChunks(server)
require.NoError(c.t, err)
}()

return client, nil
}

type testTransferChunksClient struct {
ch chan *logproto.TimeSeriesChunk
resp chan *logproto.TransferChunksResponse

grpc.ClientStream
}

func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error {
c.ch <- chunk
return nil
}

func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) {
close(c.ch)
resp := <-c.resp
close(c.resp)
return resp, nil
}

type testTransferChunksServer struct {
ch chan *logproto.TimeSeriesChunk
resp chan *logproto.TransferChunksResponse

grpc.ServerStream
}

func (s *testTransferChunksServer) Context() context.Context {
return context.Background()
}

func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error {
s.resp <- resp
return nil
}

func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) {
chunk, ok := <-s.ch
if !ok {
return nil, io.EOF
}
return chunk, nil
}

0 comments on commit f8155a1

Please sign in to comment.