Skip to content

Commit

Permalink
Revert "Simplify code for write path in distributor (#5248)" (#5337)
Browse files Browse the repository at this point in the history
This reverts commit 10b93ce.

We discovered that these changes can lead to log deduplication issues.

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
Danny Kopping authored Feb 7, 2022
1 parent 2db3092 commit 20041b7
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 159 deletions.
115 changes: 53 additions & 62 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,19 @@ func (d *Distributor) stopping(_ error) error {

// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream *logproto.Stream
// successBucket the number of minimum required successful pushes
// each successful push decrements the counter
successBucket atomic.Int32
// failureBucket holds the number of maximum allowed errors
// each erroneous push decrements the counter
failureBucket atomic.Int32
stream logproto.Stream
minSuccess int
maxFailures int
succeeded atomic.Int32
failed atomic.Int32
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
done chan struct{}
err chan error
// pending holds the total amount of streams of a push request
pending atomic.Int32
// failed keeps track of whether the push already failed
failed atomic.Bool
samplesPending atomic.Int32
samplesFailed atomic.Int32
done chan struct{}
err chan error
}

// Push a set of streams.
Expand Down Expand Up @@ -277,7 +273,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
stream.Entries = stream.Entries[:n]

keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{stream: &stream})
streams = append(streams, streamTracker{stream: stream})
}

// Return early if none of the streams contained entries
Expand All @@ -293,44 +289,42 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, userID, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

streamsByIngester := map[string][]*streamTracker{}
descByIngester := map[string]ring.InstanceDesc{}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc

bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
for i, key := range keys {
replicationSet, err := d.ingestersRing.Get(key, ring.Write, bufDescs, bufHosts, bufZones)
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil)
if err != nil {
return nil, err
}

streams[i].successBucket.Store(int32(len(replicationSet.Instances) - replicationSet.MaxErrors))
streams[i].failureBucket.Store(int32(replicationSet.MaxErrors))

streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Instances {
streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i])
descByIngester[ingester.Addr] = ingester
samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i])
ingesterDescs[ingester.Addr] = ingester
}
}

tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.pending.Store(int32(len(streams)))

for addr := range streamsByIngester {
go func(ingester ring.InstanceDesc, streams []*streamTracker) {
tracker.samplesPending.Store(int32(len(streams)))
for ingester, samples := range samplesByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendToIngester(localCtx, ingester, streams, &tracker)
}(descByIngester[addr], streamsByIngester[addr])
d.sendSamples(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], samples)
}

select {
case err := <-tracker.err:
return nil, err
Expand Down Expand Up @@ -361,54 +355,51 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker, pushTracker *pushTracker) {

payload := make([]logproto.Stream, len(streams))
for i, s := range streams {
payload[i] = *s.stream
}

err := d.executePushRequest(ctx, ingester, payload)
if err == nil {
// If the push rpc call to the ingester succeeds, we decrement the
// successBucket counter for each stream sent.
// If we reach the amount of required successful pushes for a stream
// (successBucket is empty), we decrement the counter of pending streams.
// If there are no pending streams, we are done and can notify the tracker.
for i := range streams {
if streams[i].successBucket.Dec() > 0 {
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)

// If we succeed, decrement each sample's pending count by one. If we reach
// the required number of successful puts on this sample, then decrement the
// number of pending samples by one. If we successfully push all samples to
// min success ingesters, wake up the waiting rpc so it can return early.
// Similarly, track the number of errors, and if it exceeds maxFailures
// shortcut the waiting rpc.
//
// The use of atomic increments here guarantees only a single sendSamples
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if pushTracker.pending.Dec() == 0 {
pushTracker.done <- struct{}{}
if pushTracker.samplesFailed.Inc() == 1 {
pushTracker.err <- err
}
}
} else {
// If the push rpc call to the ingester fails, we decrement the
// failureBucket counter for each stream sent.
// If we reach the amount of maximum allowed erroneous pushes for a stream
// (errorBucket is empty), we swap the tracker state to failed.
// If the state isn't already failed, we can return early and notify the
// tracker.
for i := range streams {
if streams[i].failureBucket.Dec() > 0 {
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if !pushTracker.failed.Swap(true) {
pushTracker.err <- err
if pushTracker.samplesPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) executePushRequest(ctx context.Context, ingester ring.InstanceDesc, streams []logproto.Stream) error {
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
c, err := d.pool.GetClientFor(ingester.Addr)
if err != nil {
return err
}

req := &logproto.PushRequest{Streams: streams}
req := &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
}

_, err = c.(logproto.PusherClient).Push(ctx, req)
d.ingesterAppends.WithLabelValues(ingester.Addr).Inc()
if err != nil {
Expand Down
Loading

0 comments on commit 20041b7

Please sign in to comment.