diff --git a/pkg/batch/executor.go b/pkg/batch/executor.go new file mode 100644 index 00000000000..fa9767a26b8 --- /dev/null +++ b/pkg/batch/executor.go @@ -0,0 +1,112 @@ +package batch + +import ( + "context" + "time" + + "github.com/treeverse/lakefs/pkg/logging" +) + +// RequestBufferSize is the amount of requests users can dispatch that haven't been processed yet before +// dispatching new ones would start blocking. +const RequestBufferSize = 1 << 17 + +type BatchFn func() (interface{}, error) + +type DelayFn func(dur time.Duration) + +type Batcher interface { + BatchFor(key string, dur time.Duration, fn BatchFn) (interface{}, error) +} + +type nonBatchingExecutor struct { +} + +func (n *nonBatchingExecutor) BatchFor(key string, dur time.Duration, fn BatchFn) (interface{}, error) { + return fn() +} + +type response struct { + v interface{} + err error +} + +type request struct { + key string + timeout time.Duration + fn BatchFn + onResponse chan *response +} + +type Executor struct { + // requests is the channel accepting inbound requests + requests chan *request + // execs is the internal channel used to dispatch the callback functions. + // Several requests with the same key in a given duration will trigger a single write to exec said key. + execs chan string + waitingOnKey map[string][]*request + Logger logging.Logger + Delay DelayFn +} + +func NopExecutor() *nonBatchingExecutor { + return &nonBatchingExecutor{} +} + +func NewExecutor(logger logging.Logger) *Executor { + return &Executor{ + requests: make(chan *request, RequestBufferSize), + execs: make(chan string, RequestBufferSize), + waitingOnKey: make(map[string][]*request), + Logger: logger, + Delay: time.Sleep, + } +} + +func (e *Executor) BatchFor(key string, timeout time.Duration, fn BatchFn) (interface{}, error) { + cb := make(chan *response) + e.requests <- &request{ + key: key, + timeout: timeout, + fn: fn, + onResponse: cb, + } + response := <-cb + return response.v, response.err +} + +func (e *Executor) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case req := <-e.requests: + // see if we have it scheduled already + if _, exists := e.waitingOnKey[req.key]; !exists { + // this is a new key, let's fire a timer for it + go func(req *request) { + e.Delay(req.timeout) + e.execs <- req.key + }(req) + } + e.waitingOnKey[req.key] = append(e.waitingOnKey[req.key], req) + case execKey := <-e.execs: + // let's take all callbacks + waiters := e.waitingOnKey[execKey] + delete(e.waitingOnKey, execKey) + go func(key string) { + // execute and call all mapped callbacks + v, err := waiters[0].fn() + if e.Logger.IsTracing() { + e.Logger.WithFields(logging.Fields{ + "waiters": len(waiters), + "key": key, + }).Trace("dispatched BatchFn") + } + for _, waiter := range waiters { + waiter.onResponse <- &response{v, err} + } + }(execKey) + } + } +} diff --git a/pkg/batch/executor_test.go b/pkg/batch/executor_test.go new file mode 100644 index 00000000000..16447b5ccdb --- /dev/null +++ b/pkg/batch/executor_test.go @@ -0,0 +1,98 @@ +package batch_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/treeverse/lakefs/pkg/batch" + "github.com/treeverse/lakefs/pkg/logging" +) + +func testReadAfterWrite(t *testing.T) { + // Setup executor + exec := batch.NewExecutor(logging.Default()) + go exec.Run(context.Background()) + // Prove the executor does not violate read-after-write consistency. + // First, let's define read-after-write consistency: + // Any read that started after a successful write has returned, must return the updated value. + // To test this, let's simulate the following scenario: + // 1. reader (r1) starts (Current version: v0) + // 2. writer (w1) writes v1 + // 3. writer (w1) returns (Current version: v1) + // 4. reader (r2) starts + // 5. both readers (r1,r2) return with v1 as their response. + var db = sync.Map{} + db.Store("v", "v0") + + read1Done := make(chan bool) + write1Done := make(chan bool) + read2Done := make(chan bool) + + // we pass a custom delay func that ensures we make the write only after + // reader1 started + waitWrite := make(chan bool) + delays := int32(0) + delayFn := func(dur time.Duration) { + delaysDone := atomic.AddInt32(&delays, 1) + if delaysDone == 1 { + close(waitWrite) + } + time.Sleep(dur) + } + exec.Delay = delayFn + + // reader1 starts + go func() { + r1, _ := exec.BatchFor("k", time.Millisecond*50, func() (interface{}, error) { + version, _ := db.Load("v") + return version, nil + }) + r1v := r1.(string) + if r1v != "v1" { + // reader1, while it could have returned either v0 or v1 without violating read-after-write conisistency, + // is expected to return v1 with this batching logic + t.Fatalf("expected r1 to get v1, got %s instead", r1v) + } + close(read1Done) + }() + + // Writer1 writes + go func() { + <-waitWrite + db.Store("v", "v1") + close(write1Done) + }() + + // following that write, another reader starts, and must read the updated value + go func() { + <-write1Done // ensure we start AFTER write1 has completed + r2, _ := exec.BatchFor("k", time.Millisecond*50, func() (interface{}, error) { + t.Error("this should not be called, only r1's") + version, _ := db.Load("v") + return version, nil + }) + r2v := r2.(string) + if r2v != "v1" { + t.Fatalf("expected r2 to get v1, got %s instead", r2v) + } + close(read2Done) + }() + + <-read1Done + <-read2Done +} + +func TestExecutor_BatchFor(t *testing.T) { + var wg sync.WaitGroup + wg.Add(50) + for i := 0; i < 50; i++ { + go func() { + defer wg.Done() + testReadAfterWrite(t) + }() + } + wg.Wait() +} diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 1642cbf6fef..526b56325d9 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -9,6 +9,8 @@ import ( "io" "strings" + "github.com/treeverse/lakefs/pkg/batch" + "github.com/cockroachdb/pebble" "github.com/hashicorp/go-multierror" "github.com/treeverse/lakefs/pkg/block" @@ -115,13 +117,25 @@ const ( var ErrUnknownDiffType = errors.New("unknown graveler difference type") +type ctxCloser struct { + close context.CancelFunc +} + +func (c *ctxCloser) Close() error { + go c.close() + return nil +} + func New(ctx context.Context, cfg Config) (*Catalog, error) { if cfg.LockDB == nil { cfg.LockDB = cfg.DB } + ctx, cancelFn := context.WithCancel(ctx) + tierFSParams, err := cfg.Config.GetCommittedTierFSParams(ctx) if err != nil { + cancelFn() return nil, fmt.Errorf("configure tiered FS for committed: %w", err) } metaRangeFS, err := pyramid.NewFS(¶ms.InstanceParams{ @@ -130,6 +144,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { DiskAllocProportion: tierFSParams.MetaRangeAllocationProportion, }) if err != nil { + cancelFn() return nil, fmt.Errorf("create tiered FS for committed metaranges: %w", err) } @@ -139,6 +154,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { DiskAllocProportion: tierFSParams.RangeAllocationProportion, }) if err != nil { + cancelFn() return nil, fmt.Errorf("create tiered FS for committed ranges: %w", err) } @@ -154,12 +170,17 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { sstableManager, ) if err != nil { + cancelFn() return nil, fmt.Errorf("create SSTable-based metarange manager: %w", err) } committedManager := committed.NewCommittedManager(sstableMetaRangeManager) stagingManager := staging.NewManager(cfg.DB) - refManager := ref.NewPGRefManager(cfg.DB, ident.NewHexAddressProvider()) + + executor := batch.NewExecutor(logging.Default()) + go executor.Run(ctx) + + refManager := ref.NewPGRefManager(executor, cfg.DB, ident.NewHexAddressProvider()) branchLocker := ref.NewBranchLocker(cfg.LockDB) store := graveler.NewGraveler(branchLocker, committedManager, stagingManager, refManager) @@ -167,7 +188,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { BlockAdapter: tierFSParams.Adapter, Store: store, log: logging.Default().WithField("service_name", "entry_catalog"), - managers: []io.Closer{sstableManager, sstableMetaManager}, + managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}}, }, nil } diff --git a/pkg/graveler/ref/main_test.go b/pkg/graveler/ref/main_test.go index 52b2124c450..ef9d8bd96a3 100644 --- a/pkg/graveler/ref/main_test.go +++ b/pkg/graveler/ref/main_test.go @@ -6,6 +6,8 @@ import ( "os" "testing" + "github.com/treeverse/lakefs/pkg/batch" + "github.com/treeverse/lakefs/pkg/ident" "github.com/ory/dockertest/v3" @@ -23,19 +25,19 @@ var ( func testRefManager(t testing.TB) *ref.Manager { t.Helper() conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true)) - return ref.NewPGRefManager(conn, ident.NewHexAddressProvider()) + return ref.NewPGRefManager(batch.NopExecutor(), conn, ident.NewHexAddressProvider()) } func testRefManagerWithDB(t testing.TB) (*ref.Manager, db.Database) { t.Helper() conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true)) - return ref.NewPGRefManager(conn, ident.NewHexAddressProvider()), conn + return ref.NewPGRefManager(batch.NopExecutor(), conn, ident.NewHexAddressProvider()), conn } func testRefManagerWithAddressProvider(t testing.TB, addressProvider ident.AddressProvider) *ref.Manager { t.Helper() conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true)) - return ref.NewPGRefManager(conn, addressProvider) + return ref.NewPGRefManager(batch.NopExecutor(), conn, addressProvider) } func TestMain(m *testing.M) { diff --git a/pkg/graveler/ref/manager.go b/pkg/graveler/ref/manager.go index 50a1a7b6416..c1c187df728 100644 --- a/pkg/graveler/ref/manager.go +++ b/pkg/graveler/ref/manager.go @@ -3,8 +3,10 @@ package ref import ( "context" "errors" + "fmt" "time" + "github.com/treeverse/lakefs/pkg/batch" "github.com/treeverse/lakefs/pkg/db" "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/ident" @@ -13,26 +15,41 @@ import ( // IteratorPrefetchSize is the amount of records to maybeFetch from PG const IteratorPrefetchSize = 1000 +// 3ms was chosen as a max delay time for critical path queries. +// It trades off amount of queries per second (and thus effectiveness of the batching mechanism) with added latency. +// Since reducing # of expensive operations is only beneficial when there are a lot of concurrent requests, +// the sweet spot is probably between 1-5 milliseconds (representing 200-1000 requests/second to the data store). +// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff. +const MaxBatchDelay = time.Millisecond * 3 + type Manager struct { db db.Database addressProvider ident.AddressProvider + batchExecutor batch.Batcher } -func NewPGRefManager(db db.Database, addressProvider ident.AddressProvider) *Manager { - return &Manager{db: db, addressProvider: addressProvider} +func NewPGRefManager(executor batch.Batcher, db db.Database, addressProvider ident.AddressProvider) *Manager { + return &Manager{ + db: db, + addressProvider: addressProvider, + batchExecutor: executor, + } } func (m *Manager) GetRepository(ctx context.Context, repositoryID graveler.RepositoryID) (*graveler.Repository, error) { - repository, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { - repository := &graveler.Repository{} - err := tx.Get(repository, - `SELECT storage_namespace, creation_date, default_branch FROM graveler_repositories WHERE id = $1`, - repositoryID) - if err != nil { - return nil, err - } - return repository, nil - }, db.ReadOnly()) + key := fmt.Sprintf("GetRepository:%s", repositoryID) + repository, err := m.batchExecutor.BatchFor(key, MaxBatchDelay, func() (interface{}, error) { + return m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + repository := &graveler.Repository{} + err := tx.Get(repository, + `SELECT storage_namespace, creation_date, default_branch FROM graveler_repositories WHERE id = $1`, + repositoryID) + if err != nil { + return nil, err + } + return repository, nil + }, db.ReadOnly()) + }) if errors.Is(err, db.ErrNotFound) { return nil, graveler.ErrRepositoryNotFound } @@ -127,18 +144,21 @@ func (m *Manager) RevParse(ctx context.Context, repositoryID graveler.Repository } func (m *Manager) GetBranch(ctx context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID) (*graveler.Branch, error) { - branch, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { - var rec branchRecord - err := tx.Get(&rec, `SELECT commit_id, staging_token FROM graveler_branches WHERE repository_id = $1 AND id = $2`, - repositoryID, branchID) - if err != nil { - return nil, err - } - return &graveler.Branch{ - CommitID: rec.CommitID, - StagingToken: rec.StagingToken, - }, nil - }, db.ReadOnly()) + key := fmt.Sprintf("GetBranch:%s:%s", repositoryID, branchID) + branch, err := m.batchExecutor.BatchFor(key, MaxBatchDelay, func() (interface{}, error) { + return m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + var rec branchRecord + err := tx.Get(&rec, `SELECT commit_id, staging_token FROM graveler_branches WHERE repository_id = $1 AND id = $2`, + repositoryID, branchID) + if err != nil { + return nil, err + } + return &graveler.Branch{ + CommitID: rec.CommitID, + StagingToken: rec.StagingToken, + }, nil + }, db.ReadOnly()) + }) if errors.Is(err, db.ErrNotFound) { return nil, graveler.ErrBranchNotFound } @@ -185,15 +205,18 @@ func (m *Manager) ListBranches(ctx context.Context, repositoryID graveler.Reposi } func (m *Manager) GetTag(ctx context.Context, repositoryID graveler.RepositoryID, tagID graveler.TagID) (*graveler.CommitID, error) { - commitID, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { - var commitID graveler.CommitID - err := tx.Get(&commitID, `SELECT commit_id FROM graveler_tags WHERE repository_id = $1 AND id = $2`, - repositoryID, tagID) - if err != nil { - return nil, err - } - return &commitID, nil - }, db.ReadOnly()) + key := fmt.Sprintf("GetTag:%s:%s", repositoryID, tagID) + commitID, err := m.batchExecutor.BatchFor(key, MaxBatchDelay, func() (interface{}, error) { + return m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + var commitID graveler.CommitID + err := tx.Get(&commitID, `SELECT commit_id FROM graveler_tags WHERE repository_id = $1 AND id = $2`, + repositoryID, tagID) + if err != nil { + return nil, err + } + return &commitID, nil + }, db.ReadOnly()) + }) if errors.Is(err, db.ErrNotFound) { return nil, graveler.ErrTagNotFound } @@ -243,30 +266,34 @@ func (m *Manager) ListTags(ctx context.Context, repositoryID graveler.Repository } func (m *Manager) GetCommitByPrefix(ctx context.Context, repositoryID graveler.RepositoryID, prefix graveler.CommitID) (*graveler.Commit, error) { - commit, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { - records := make([]*commitRecord, 0) - // LIMIT 2 is used to test if a truncated commit ID resolves to *one* commit. - // if we get 2 results that start with the truncated ID, that's enough to determine this prefix is not unique - err := tx.Select(&records, ` + key := fmt.Sprintf("GetCommitByPrefix:%s:%s", repositoryID, prefix) + + commit, err := m.batchExecutor.BatchFor(key, MaxBatchDelay, func() (interface{}, error) { + return m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + records := make([]*commitRecord, 0) + // LIMIT 2 is used to test if a truncated commit ID resolves to *one* commit. + // if we get 2 results that start with the truncated ID, that's enough to determine this prefix is not unique + err := tx.Select(&records, ` SELECT id, committer, message, creation_date, parents, meta_range_id, metadata FROM graveler_commits WHERE repository_id = $1 AND id LIKE $2 || '%' LIMIT 2`, - repositoryID, prefix) - if errors.Is(err, db.ErrNotFound) { - return nil, graveler.ErrNotFound - } - if err != nil { - return nil, err - } - if len(records) == 0 { - return "", graveler.ErrNotFound - } - if len(records) > 1 { - return "", graveler.ErrRefAmbiguous // more than 1 commit starts with the ID prefix - } - return records[0].toGravelerCommit(), nil - }, db.ReadOnly()) + repositoryID, prefix) + if errors.Is(err, db.ErrNotFound) { + return nil, graveler.ErrNotFound + } + if err != nil { + return nil, err + } + if len(records) == 0 { + return "", graveler.ErrNotFound + } + if len(records) > 1 { + return "", graveler.ErrRefAmbiguous // more than 1 commit starts with the ID prefix + } + return records[0].toGravelerCommit(), nil + }, db.ReadOnly()) + }) if errors.Is(err, db.ErrNotFound) { return nil, graveler.ErrCommitNotFound } @@ -277,17 +304,20 @@ func (m *Manager) GetCommitByPrefix(ctx context.Context, repositoryID graveler.R } func (m *Manager) GetCommit(ctx context.Context, repositoryID graveler.RepositoryID, commitID graveler.CommitID) (*graveler.Commit, error) { - commit, err := m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { - var rec commitRecord - err := tx.Get(&rec, ` + key := fmt.Sprintf("GetCommit:%s:%s", repositoryID, commitID) + commit, err := m.batchExecutor.BatchFor(key, MaxBatchDelay, func() (interface{}, error) { + return m.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + var rec commitRecord + err := tx.Get(&rec, ` SELECT committer, message, creation_date, parents, meta_range_id, metadata FROM graveler_commits WHERE repository_id = $1 AND id = $2`, - repositoryID, commitID) - if err != nil { - return nil, err - } - return rec.toGravelerCommit(), nil - }, db.ReadOnly()) + repositoryID, commitID) + if err != nil { + return nil, err + } + return rec.toGravelerCommit(), nil + }, db.ReadOnly()) + }) if errors.Is(err, db.ErrNotFound) { return nil, graveler.ErrCommitNotFound }