Skip to content

Commit

Permalink
Fixing issue with a lease existing on start (#277)
Browse files Browse the repository at this point in the history
Storage failures don't return a Response, and require us to do an errors.As() and check the returned error instead. This checks for the two codes that can come back if the blob already exists (409) or if the blob exists _and_ it has an active storage lease (412).

Also, fixed a race condition in the LeaseReceiver that was causing Storage/TestMultiple() to fail.

Fixes #276
  • Loading branch information
richardpark-msft authored Nov 8, 2022
1 parent adc9788 commit d9a8850
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
44 changes: 32 additions & 12 deletions eph/leasedReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"time"

"github.com/devigned/tab"
Expand All @@ -38,24 +39,29 @@ type (
leasedReceiver struct {
handle *eventhub.ListenerHandle
processor *EventProcessorHost
lease LeaseMarker
lease *atomic.Value // LeaseMarker
done func()
}
)

func newLeasedReceiver(processor *EventProcessorHost, lease LeaseMarker) *leasedReceiver {
leaseValue := atomic.Value{}
leaseValue.Store(lease)

return &leasedReceiver{
processor: processor,
lease: lease,
lease: &leaseValue,
}
}

func (lr *leasedReceiver) Run(ctx context.Context) error {
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.Run")
defer span.End()

partitionID := lr.lease.GetPartitionID()
epoch := lr.lease.GetEpoch()
lease := lr.getLease()

partitionID := lease.GetPartitionID()
epoch := lease.GetEpoch()
lr.dlog(ctx, "running...")

renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background())
Expand Down Expand Up @@ -99,7 +105,9 @@ func (lr *leasedReceiver) listenForClose() {
defer cancel()
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.listenForClose")
defer span.End()
err := lr.processor.scheduler.stopReceiver(ctx, lr.lease)

lease := lr.getLease()
err := lr.processor.scheduler.stopReceiver(ctx, lease)
if err != nil {
tab.For(ctx).Error(err)
}
Expand All @@ -120,7 +128,8 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
err := lr.tryRenew(ctx)
if err != nil {
tab.For(ctx).Error(err)
_ = lr.processor.scheduler.stopReceiver(ctx, lr.lease)
lease := lr.getLease()
_ = lr.processor.scheduler.stopReceiver(ctx, lease)
}
}
}
Expand All @@ -130,7 +139,8 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.tryRenew")
defer span.End()

lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID())
oldLease := lr.getLease()
lease, ok, err := lr.processor.leaser.RenewLease(ctx, oldLease.GetPartitionID())
if err != nil {
tab.For(ctx).Error(err)
return err
Expand All @@ -141,23 +151,33 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
return err
}
lr.dlog(ctx, "lease renewed")
lr.lease = lease

lr.lease.Store(lease)
return nil
}

func (lr *leasedReceiver) dlog(ctx context.Context, msg string) {
name := lr.processor.name
partitionID := lr.lease.GetPartitionID()
epoch := lr.lease.GetEpoch()
lease := lr.getLease()

partitionID := lease.GetPartitionID()
epoch := lease.GetEpoch()
tab.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch))
}

func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName)

lease := lr.getLease()

span.AddAttributes(
tab.StringAttribute("eph.id", lr.processor.name),
tab.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lr.lease.GetEpoch()),
tab.StringAttribute(partitionIDTag, lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lease.GetEpoch()),
)
return span, ctx
}

func (lr *leasedReceiver) getLease() LeaseMarker {
return lr.lease.Load().(LeaseMarker)
}
3 changes: 2 additions & 1 deletion eph/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func (s *scheduler) Stop(ctx context.Context) error {
if err := lr.Close(ctx); err != nil {
lastErr = err
}
_, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID())

_, _ = s.processor.leaser.ReleaseLease(ctx, lr.getLease().GetPartitionID())
}

return lastErr
Expand Down
6 changes: 6 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ func (sl *LeaserCheckpointer) createOrGetLease(ctx context.Context, partitionID
})

if err != nil {
if storageErr := azblobvendor.StorageError(nil); errors.As(err, &storageErr) &&
(storageErr.Response().StatusCode == http.StatusConflict || // blob exists
storageErr.Response().StatusCode == http.StatusPreconditionFailed) { // blob exists AND an Azure storage lease is active
return sl.getLease(ctx, partitionID)
}

return nil, err
}

Expand Down
4 changes: 4 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (ts *testSuite) TestLeaserLeaseEnsure() {
lease, err := leaser.EnsureLease(ctx, partitionID)
ts.NoError(err)
ts.Equal(partitionID, lease.GetPartitionID())

lease, err = leaser.EnsureLease(ctx, partitionID)
ts.NoError(err)
ts.Equal(partitionID, lease.GetPartitionID())
}
}

Expand Down

0 comments on commit d9a8850

Please sign in to comment.