Skip to content

Commit

Permalink
Make branch weak ownership configurable
Browse files Browse the repository at this point in the history
This shows results, even on local!

When I run lakefs (by default weak ownership is OFF) I get 6.6% errors with
concurrency 50.  Rate is <50/s.  Also the long tail is _extremely_ long.

When I switch weak ownership ON, using the default parameters, I get **0**
errors with concurrency 50.  Rate is about the same, except that the
tail (when load drops) is _short_.

See the difference [here][merge-abuse-speed-chart]: it's faster _and_
returns 0 errors.  The distribution of actual successful merge times is
somewhat slower - possibly because of the time to lock, possibly because of
the fact that errors in the really slow cases cause those slow cases to be
dropped.

Finally, note that because we do not queue, some merges take a *long* time
under sustained load.  We could improve weak ownership to hold an actual
queue of work.  This would make merges _fair_: merges will occur roughly in
order of request arrival.

==== Weak ownership OFF ====

``sh
❯ go run ./cmd/lakectl abuse merge --amount 1000 --parallelism 50 lakefs://abuse/main
Source branch: lakefs://abuse/main
merge - completed: 34, errors: 0, current rate: 33.81 done/second
merge - completed: 80, errors: 0, current rate: 45.98 done/second
merge - completed: 128, errors: 0, current rate: 48.02 done/second
merge - completed: 177, errors: 0, current rate: 49.03 done/second
merge - completed: 222, errors: 0, current rate: 44.97 done/second
merge - completed: 265, errors: 3, current rate: 43.03 done/second
merge - completed: 308, errors: 9, current rate: 42.97 done/second
merge - completed: 357, errors: 15, current rate: 49.01 done/second
merge - completed: 406, errors: 21, current rate: 49.03 done/second
merge - completed: 451, errors: 22, current rate: 44.97 done/second
merge - completed: 499, errors: 29, current rate: 48.01 done/second
merge - completed: 545, errors: 30, current rate: 46.01 done/second
merge - completed: 585, errors: 31, current rate: 39.97 done/second
merge - completed: 632, errors: 33, current rate: 47.04 done/second
merge - completed: 679, errors: 37, current rate: 47.00 done/second
merge - completed: 728, errors: 46, current rate: 48.96 done/second
merge - completed: 768, errors: 49, current rate: 40.04 done/second
merge - completed: 808, errors: 53, current rate: 39.98 done/second
merge - completed: 854, errors: 57, current rate: 45.99 done/second
merge - completed: 891, errors: 58, current rate: 37.00 done/second
merge - completed: 935, errors: 64, current rate: 44.00 done/second
merge - completed: 972, errors: 66, current rate: 36.98 done/second
merge - completed: 990, errors: 66, current rate: 18.00 done/second
merge - completed: 995, errors: 66, current rate: 5.00 done/second
merge - completed: 996, errors: 66, current rate: 1.00 done/second
merge - completed: 998, errors: 66, current rate: 2.00 done/second
merge - completed: 999, errors: 66, current rate: 1.00 done/second
merge - completed: 999, errors: 66, current rate: 0.00 done/second
merge - completed: 999, errors: 66, current rate: 0.00 done/second
completed: 1000, errors: 66, current rate: 5.27 done/second

Histogram (ms):
1       0
2       0
5       0
7       0
10      0
15      0
25      0
50      0
75      601
100     671
250     672
350     672
500     696
750     740
1000    765
5000    896
min     54
max     12022
total   934
```

==== Weak ownership ON ====

```sh
❯ go run ./cmd/lakectl abuse merge --amount 1000 --parallelism 50 lakefs://abuse/main
Source branch: lakefs://abuse/main
merge - completed: 36, errors: 0, current rate: 35.23 done/second
merge - completed: 86, errors: 0, current rate: 49.98 done/second
merge - completed: 136, errors: 0, current rate: 50.03 done/second
merge - completed: 185, errors: 0, current rate: 48.99 done/second
merge - completed: 236, errors: 0, current rate: 51.02 done/second
merge - completed: 286, errors: 0, current rate: 49.99 done/second
merge - completed: 337, errors: 0, current rate: 50.97 done/second
merge - completed: 390, errors: 0, current rate: 53.03 done/second
merge - completed: 438, errors: 0, current rate: 48.01 done/second
merge - completed: 487, errors: 0, current rate: 49.00 done/second
merge - completed: 534, errors: 0, current rate: 46.98 done/second
merge - completed: 581, errors: 0, current rate: 46.99 done/second
merge - completed: 632, errors: 0, current rate: 51.00 done/second
merge - completed: 680, errors: 0, current rate: 48.04 done/second
merge - completed: 725, errors: 0, current rate: 44.98 done/second
merge - completed: 771, errors: 0, current rate: 45.99 done/second
merge - completed: 815, errors: 0, current rate: 44.02 done/second
merge - completed: 861, errors: 0, current rate: 46.01 done/second
merge - completed: 905, errors: 0, current rate: 43.98 done/second
merge - completed: 947, errors: 0, current rate: 42.00 done/second
merge - completed: 977, errors: 0, current rate: 30.01 done/second
merge - completed: 997, errors: 0, current rate: 19.99 done/second
completed: 1000, errors: 0, current rate: 4.92 done/second

Histogram (ms):
1       0
2       0
5       0
7       0
10      0
15      0
25      0
50      0
75      457
100     464
250     468
350     468
500     642
750     647
1000    729
5000    952
min     54
max     13744
total   1000
```
  • Loading branch information
arielshaqed committed Oct 7, 2024
1 parent f2420bf commit b0cfca3
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 34 deletions.
26 changes: 19 additions & 7 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ func (c *ctxCloser) Close() error {
return nil
}

func makeWeakBranchOwnershipParams(cfg config.WeakOwnership) ref.WeakBranchOwnershipParams {
if !cfg.Enabled {
// zero Durations => no branch ownership
return ref.WeakBranchOwnershipParams{}
}
return ref.WeakBranchOwnershipParams{
AcquireInterval: cfg.Acquire,
RefreshInterval: cfg.Refresh,
}
}

func New(ctx context.Context, cfg Config) (*Catalog, error) {
ctx, cancelFn := context.WithCancel(ctx)
adapter, err := factory.BuildBlockAdapter(ctx, nil, cfg.Config)
Expand Down Expand Up @@ -364,13 +375,14 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
addressProvider := ident.NewHexAddressProvider()
refManager := ref.NewRefManager(
ref.ManagerConfig{
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
WeakBranchOwnershipParams: makeWeakBranchOwnershipParams(cfg.Config.Graveler.BranchOwnership),
})
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ type Database struct {
} `mapstructure:"cosmosdb"`
}

// WeakOwnership configures an approximate (weak) ownership.
type WeakOwnership struct {
Enabled bool `mapstructure:"enabled"`
Refresh time.Duration `mapstructure:"refresh"`
Acquire time.Duration `mapstructure:"acquire"`
}

// Config - Output struct of configuration, used to validate. If you read a key using a viper accessor
// rather than accessing a field of this struct, that key will *not* be validated. So don't
// do that.
Expand Down Expand Up @@ -330,6 +337,13 @@ type Config struct {
RateLimit int `mapstructure:"rate_limit"`
} `mapstructure:"background"`
MaxBatchDelay time.Duration `mapstructure:"max_batch_delay"`
// Parameters for tuning performance of concurrent branch
// update operations. These do not affect correctness or
// liveness. Internally this is "*weak* branch ownership"
// because this ownership may safely fail. This distinction
// is unimportant during configuration, so use a shorter
// name.
BranchOwnership WeakOwnership `mapstructure:"branch_ownership"`
} `mapstructure:"graveler"`
Gateways struct {
S3 struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func setDefaults(cfgType string) {
// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff.
viper.SetDefault("graveler.max_batch_delay", 3*time.Millisecond)

viper.SetDefault("graveler.branch_ownership.enabled", false)
// ... but if branch ownership is enabled, set up some useful defaults!
viper.SetDefault("graveler.branch_ownership.refresh", 400*time.Millisecond)
viper.SetDefault("graveler.branch_ownership.acquire", 150*time.Millisecond)

viper.SetDefault("ugc.prepare_interval", time.Minute)
viper.SetDefault("ugc.prepare_max_file_size", 20*1024*1024)

Expand Down
56 changes: 46 additions & 10 deletions pkg/graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,54 @@ func protoFromBranch(branchID graveler.BranchID, b *graveler.Branch) *graveler.B
return branch
}

// WeakBranchOwnershipParams configures weak ownership of branches. Branch
// correctness is safe _regardless_ of the values of these parameters. They
// exist solely to reduce expensive operations when multiple concurrent
// updates race on the same branch. Only one update can win a race, and
// ownership prevents others from interfering by consuming resources on the
// instance.
type WeakBranchOwnershipParams struct {
// AcquireInterval is the interval at which to attempt to acquire
// ownership of a branch. It is a bound on the latency of the time
// for one worker to acquire a branch when multiple operations race
// on that branch. Reducing it increases read load on the branch
// ownership record when concurrent operations occur.
AcquireInterval time.Duration
// RefreshInterval the interval for which to assert ownership of a
// branch. It is a bound on the time to perform an operation on a
// branch IF a previous worker crashed while owning that branch. It
// has no effect when there are no crashes. Reducing it increases
// write load on the branch ownership record when concurrent
// operations occur. This.
//
// If zero or negative, ownership will not be asserted and branch
// operations will race. This is safe but can be slow.
RefreshInterval time.Duration
}

type ManagerConfig struct {
Executor batch.Batcher
KVStore kv.Store
KVStoreLimited kv.Store
AddressProvider ident.AddressProvider
RepositoryCacheConfig CacheConfig
CommitCacheConfig CacheConfig
MaxBatchDelay time.Duration
Executor batch.Batcher
KVStore kv.Store
KVStoreLimited kv.Store
AddressProvider ident.AddressProvider
RepositoryCacheConfig CacheConfig
CommitCacheConfig CacheConfig
MaxBatchDelay time.Duration
WeakBranchOwnershipParams WeakBranchOwnershipParams
}

func NewRefManager(cfg ManagerConfig) *Manager {
var branchOwnership *util.WeakOwner
if cfg.WeakBranchOwnershipParams.RefreshInterval > 0 {
branchOwnership = util.NewWeakOwner(
logging.ContextUnavailable().WithField("component", "RefManager weak branch ownership"),
cfg.KVStore,
"run-refs/weak-branch-owner",
cfg.WeakBranchOwnershipParams.AcquireInterval,
cfg.WeakBranchOwnershipParams.RefreshInterval,
)
}

return &Manager{
kvStore: cfg.KVStore,
kvStoreLimited: cfg.KVStoreLimited,
Expand All @@ -88,8 +125,7 @@ func NewRefManager(cfg ManagerConfig) *Manager {
repoCache: newCache(cfg.RepositoryCacheConfig),
commitCache: newCache(cfg.CommitCacheConfig),
maxBatchDelay: cfg.MaxBatchDelay,
// TODO(ariels): Configure all of these, especially whether or not this is enabled!
branchOwnership: util.NewWeakOwner(logging.ContextUnavailable().WithField("component", "RefManager weak branch ownership"), cfg.KVStore, "run-refs/weak-branch-owner"),
branchOwnership: branchOwnership,
}
}

Expand Down Expand Up @@ -407,7 +443,7 @@ func (m *Manager) BranchUpdate(ctx context.Context, repository *graveler.Reposit
requestID = *requestIDPtr
}
if m.branchOwnership != nil {
own, err := m.branchOwnership.Own(ctx, requestID, string(branchID), 500*time.Millisecond, 100*time.Millisecond)
own, err := m.branchOwnership.Own(ctx, requestID, string(branchID))
if err != nil {
logging.FromContext(ctx).
WithFields(logging.Fields{}).
Expand Down
52 changes: 41 additions & 11 deletions pkg/kv/util/weak_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ var finished = errors.New("finished")
// So it *cannot* guarantee correctness. However it usually works, and if
// it does work, the owning goroutine wins all races by default.
type WeakOwner struct {
// acquireInterval is the polling interval for acquiring ownership.
// Reducing it reduces some additional time to recover if an
// instance crashes while holding ownership. Reducing it too much
// may cause another instance falsely to grab ownership when the
// owner is merely slow. Reducing it also increases read load on
// the KV store when there is contention on the branch.
acquireInterval time.Duration
// refreshInterval is the time for which to assert ownership. It
// should be rather bigger than acquireInterval, probably at least
// 3*. Reducing it reduces the time to recover if an instance
// crashes while holding ownership; because it is greater than
// acquireInterval, it has a greater effect on this recovery time.
// Reducing it too uch may cause another instance falsely to grab
// ownership when the owner is merely slow. Reducing it also
// increases write load on the KV store, always.
refreshInterval time.Duration

// Log is used for logs. Everything is at a fine granularity,
// usually TRACE.
Log logging.Logger
Expand All @@ -47,12 +64,20 @@ type WeakOwner struct {
Prefix string
}

func NewWeakOwner(log logging.Logger, store kv.Store, prefix string) *WeakOwner {
return &WeakOwner{Log: log, Store: store, Prefix: prefix}
func NewWeakOwner(log logging.Logger, store kv.Store, prefix string, acquireInterval, refreshInterval time.Duration) *WeakOwner {
return &WeakOwner{
acquireInterval: acquireInterval,
refreshInterval: refreshInterval,
Log: log,
Store: store,
Prefix: prefix,
}
}

// refreshKey refreshes key for owner at interval until ctx is cancelled.
func (w *WeakOwner) refreshKey(ctx context.Context, owner string, prefixedKey []byte, interval time.Duration) {
func (w *WeakOwner) refreshKey(ctx context.Context, owner string, prefixedKey []byte) {
// Always refresh before ownership expires
interval := w.refreshInterval / 2

Check failure on line 80 in pkg/kv/util/weak_owner.go

View workflow job for this annotation

GitHub Actions / Run Linters and Checkers

Magic number: 2, in <operation> detected (mnd)
log := w.Log.WithContext(ctx).WithFields(logging.Fields{
"interval": interval,
"owner": owner,
Expand Down Expand Up @@ -144,12 +169,12 @@ func checkOwnership(expires, now time.Time, getErr error) keyOwnership {
//
// TODO(ariels): Be fair, at least in the same process. Chaining requests
// and spinning only on the first would do this as well.
func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKey []byte, acquireInterval, refreshInterval time.Duration) error {
func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKey []byte) error {
log := w.Log.WithContext(ctx).WithFields(logging.Fields{
"owner": owner,
"prefixed_key": string(prefixedKey),
"acquire_interval": acquireInterval,
"refresh_interval": refreshInterval,
"acquire_interval": w.acquireInterval,
"refresh_interval": w.refreshInterval,
})
for {
ownership := WeakOwnership{}
Expand All @@ -161,7 +186,7 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
now := time.Now()
free := checkOwnership(ownership.Expires.AsTime(), now, err)
if free != keyOwned {
expiryTime := now.Add(refreshInterval)
expiryTime := now.Add(w.refreshInterval)
ownership = WeakOwnership{
Owner: owner,
Expires: timestamppb.New(expiryTime),
Expand All @@ -188,7 +213,12 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
}
sleep := ownership.Expires.AsTime().Sub(now) - 5*time.Millisecond

Check failure on line 214 in pkg/kv/util/weak_owner.go

View workflow job for this annotation

GitHub Actions / Run Linters and Checkers

Magic number: 5, in <operation> detected (mnd)
if sleep < 0 {
sleep = acquireInterval
// Multiple instances will race the same regardless
// of jitter. Jitter here will _not_ help with a
// perfect KV. However if using a KV that enforces
// rate limits at a very small resolution, jitter
// may help.
sleep = w.acquireInterval
}
log.WithField("sleep", sleep).Trace("Still owned; try again soon")
err = sleepFor(ctx, sleep)
Expand All @@ -201,14 +231,14 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
// Own blocks until it gets weak ownership of key for owner. Ownership
// will be refreshed at resolution interval. It returns a function to stop
// owning key.
func (w *WeakOwner) Own(ctx context.Context, owner, key string, acquireInterval, refreshInterval time.Duration) (func(), error) {
func (w *WeakOwner) Own(ctx context.Context, owner, key string) (func(), error) {
prefixedKey := []byte(fmt.Sprintf("%s/%s", w.Prefix, key))
err := w.startOwningKey(context.Background(), owner, prefixedKey, acquireInterval, refreshInterval)
err := w.startOwningKey(context.Background(), owner, prefixedKey)
if err != nil {
return nil, fmt.Errorf("start owning %s for %s: %w", owner, key, err)
}
refreshCtx, refreshCancel := context.WithCancel(ctx)
go w.refreshKey(refreshCtx, owner, prefixedKey, refreshInterval/2)
go w.refreshKey(refreshCtx, owner, prefixedKey)
stopOwning := func() {
defer refreshCancel()
// Use the original context - in case cancelled twice.
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/util/weak_owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func TestWeakOwnerSingleThreaded(t *testing.T) {
}
log := logging.FromContext(ctx).WithField("test", t.Name())

w := util.NewWeakOwner(log, store, "p")
w := util.NewWeakOwner(log, store, "p", 5*time.Millisecond, 10*time.Millisecond)

releaseAbc, err := w.Own(ctx, "me", "abc", 5*time.Millisecond, 10*time.Millisecond)
releaseAbc, err := w.Own(ctx, "me", "abc")
if err != nil {
t.Fatalf("Failed to own \"abc\": %s", err)
}
defer releaseAbc()

releaseXyz, err := w.Own(ctx, "me", "xyz", 5*time.Millisecond, 10*time.Millisecond)
releaseXyz, err := w.Own(ctx, "me", "xyz")
if err != nil {
t.Fatalf("Failed to own \"xyz\": %s", err)
}
Expand Down Expand Up @@ -100,10 +100,10 @@ func TestWeakOwnerConsecutiveReleased(t *testing.T) {
}
log := logging.FromContext(ctx).WithField("test", t.Name())

w := util.NewWeakOwner(log, store, "p")
w := util.NewWeakOwner(log, store, "p", 5*time.Millisecond, 40*time.Millisecond)
events := Ordering[string]{}

releaseA, err := w.Own(ctx, "me", "xyz", 5*time.Millisecond, 40*time.Millisecond)
releaseA, err := w.Own(ctx, "me", "xyz")
if err != nil {
t.Fatalf("Own main me: %s", err)
}
Expand All @@ -115,7 +115,7 @@ func TestWeakOwnerConsecutiveReleased(t *testing.T) {
go func() {
log.Info("Goroutine start")
defer wg.Done()
releaseB, err := w.Own(ctx, "us", "xyz", 5*time.Millisecond, 40*time.Millisecond)
releaseB, err := w.Own(ctx, "us", "xyz")
if err != nil {
t.Fatalf("Own goroutine us: %s", err)
}
Expand Down

0 comments on commit b0cfca3

Please sign in to comment.