Skip to content

Commit

Permalink
batch hot paths for a very short duration (#1618)
Browse files Browse the repository at this point in the history
* batch hot paths for a very short duration

* added tests and a controlled runner that can be stopped

* removed blank lines

* added a small channel buffer and document max batch time

* better parameter naming, run consistency test multiple times in parallel
  • Loading branch information
ozkatz authored Mar 14, 2021
1 parent eadb5ba commit c68123e
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 67 deletions.
112 changes: 112 additions & 0 deletions pkg/batch/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package batch

import (
"context"
"time"

"github.com/treeverse/lakefs/pkg/logging"
)

// RequestBufferSize is the amount of requests users can dispatch that haven't been processed yet before
// dispatching new ones would start blocking.
const RequestBufferSize = 1 << 17

type BatchFn func() (interface{}, error)

type DelayFn func(dur time.Duration)

type Batcher interface {
BatchFor(key string, dur time.Duration, fn BatchFn) (interface{}, error)
}

type nonBatchingExecutor struct {
}

func (n *nonBatchingExecutor) BatchFor(key string, dur time.Duration, fn BatchFn) (interface{}, error) {
return fn()
}

type response struct {
v interface{}
err error
}

type request struct {
key string
timeout time.Duration
fn BatchFn
onResponse chan *response
}

type Executor struct {
// requests is the channel accepting inbound requests
requests chan *request
// execs is the internal channel used to dispatch the callback functions.
// Several requests with the same key in a given duration will trigger a single write to exec said key.
execs chan string
waitingOnKey map[string][]*request
Logger logging.Logger
Delay DelayFn
}

func NopExecutor() *nonBatchingExecutor {
return &nonBatchingExecutor{}
}

func NewExecutor(logger logging.Logger) *Executor {
return &Executor{
requests: make(chan *request, RequestBufferSize),
execs: make(chan string, RequestBufferSize),
waitingOnKey: make(map[string][]*request),
Logger: logger,
Delay: time.Sleep,
}
}

func (e *Executor) BatchFor(key string, timeout time.Duration, fn BatchFn) (interface{}, error) {
cb := make(chan *response)
e.requests <- &request{
key: key,
timeout: timeout,
fn: fn,
onResponse: cb,
}
response := <-cb
return response.v, response.err
}

func (e *Executor) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case req := <-e.requests:
// see if we have it scheduled already
if _, exists := e.waitingOnKey[req.key]; !exists {
// this is a new key, let's fire a timer for it
go func(req *request) {
e.Delay(req.timeout)
e.execs <- req.key
}(req)
}
e.waitingOnKey[req.key] = append(e.waitingOnKey[req.key], req)
case execKey := <-e.execs:
// let's take all callbacks
waiters := e.waitingOnKey[execKey]
delete(e.waitingOnKey, execKey)
go func(key string) {
// execute and call all mapped callbacks
v, err := waiters[0].fn()
if e.Logger.IsTracing() {
e.Logger.WithFields(logging.Fields{
"waiters": len(waiters),
"key": key,
}).Trace("dispatched BatchFn")
}
for _, waiter := range waiters {
waiter.onResponse <- &response{v, err}
}
}(execKey)
}
}
}
98 changes: 98 additions & 0 deletions pkg/batch/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package batch_test

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/logging"
)

func testReadAfterWrite(t *testing.T) {
// Setup executor
exec := batch.NewExecutor(logging.Default())
go exec.Run(context.Background())
// Prove the executor does not violate read-after-write consistency.
// First, let's define read-after-write consistency:
// Any read that started after a successful write has returned, must return the updated value.
// To test this, let's simulate the following scenario:
// 1. reader (r1) starts (Current version: v0)
// 2. writer (w1) writes v1
// 3. writer (w1) returns (Current version: v1)
// 4. reader (r2) starts
// 5. both readers (r1,r2) return with v1 as their response.
var db = sync.Map{}
db.Store("v", "v0")

read1Done := make(chan bool)
write1Done := make(chan bool)
read2Done := make(chan bool)

// we pass a custom delay func that ensures we make the write only after
// reader1 started
waitWrite := make(chan bool)
delays := int32(0)
delayFn := func(dur time.Duration) {
delaysDone := atomic.AddInt32(&delays, 1)
if delaysDone == 1 {
close(waitWrite)
}
time.Sleep(dur)
}
exec.Delay = delayFn

// reader1 starts
go func() {
r1, _ := exec.BatchFor("k", time.Millisecond*50, func() (interface{}, error) {
version, _ := db.Load("v")
return version, nil
})
r1v := r1.(string)
if r1v != "v1" {
// reader1, while it could have returned either v0 or v1 without violating read-after-write conisistency,
// is expected to return v1 with this batching logic
t.Fatalf("expected r1 to get v1, got %s instead", r1v)
}
close(read1Done)
}()

// Writer1 writes
go func() {
<-waitWrite
db.Store("v", "v1")
close(write1Done)
}()

// following that write, another reader starts, and must read the updated value
go func() {
<-write1Done // ensure we start AFTER write1 has completed
r2, _ := exec.BatchFor("k", time.Millisecond*50, func() (interface{}, error) {
t.Error("this should not be called, only r1's")
version, _ := db.Load("v")
return version, nil
})
r2v := r2.(string)
if r2v != "v1" {
t.Fatalf("expected r2 to get v1, got %s instead", r2v)
}
close(read2Done)
}()

<-read1Done
<-read2Done
}

func TestExecutor_BatchFor(t *testing.T) {
var wg sync.WaitGroup
wg.Add(50)
for i := 0; i < 50; i++ {
go func() {
defer wg.Done()
testReadAfterWrite(t)
}()
}
wg.Wait()
}
25 changes: 23 additions & 2 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"strings"

"github.com/treeverse/lakefs/pkg/batch"

"github.com/cockroachdb/pebble"
"github.com/hashicorp/go-multierror"
"github.com/treeverse/lakefs/pkg/block"
Expand Down Expand Up @@ -115,13 +117,25 @@ const (

var ErrUnknownDiffType = errors.New("unknown graveler difference type")

type ctxCloser struct {
close context.CancelFunc
}

func (c *ctxCloser) Close() error {
go c.close()
return nil
}

func New(ctx context.Context, cfg Config) (*Catalog, error) {
if cfg.LockDB == nil {
cfg.LockDB = cfg.DB
}

ctx, cancelFn := context.WithCancel(ctx)

tierFSParams, err := cfg.Config.GetCommittedTierFSParams(ctx)
if err != nil {
cancelFn()
return nil, fmt.Errorf("configure tiered FS for committed: %w", err)
}
metaRangeFS, err := pyramid.NewFS(&params.InstanceParams{
Expand All @@ -130,6 +144,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
DiskAllocProportion: tierFSParams.MetaRangeAllocationProportion,
})
if err != nil {
cancelFn()
return nil, fmt.Errorf("create tiered FS for committed metaranges: %w", err)
}

Expand All @@ -139,6 +154,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
DiskAllocProportion: tierFSParams.RangeAllocationProportion,
})
if err != nil {
cancelFn()
return nil, fmt.Errorf("create tiered FS for committed ranges: %w", err)
}

Expand All @@ -154,20 +170,25 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
sstableManager,
)
if err != nil {
cancelFn()
return nil, fmt.Errorf("create SSTable-based metarange manager: %w", err)
}
committedManager := committed.NewCommittedManager(sstableMetaRangeManager)

stagingManager := staging.NewManager(cfg.DB)
refManager := ref.NewPGRefManager(cfg.DB, ident.NewHexAddressProvider())

executor := batch.NewExecutor(logging.Default())
go executor.Run(ctx)

refManager := ref.NewPGRefManager(executor, cfg.DB, ident.NewHexAddressProvider())
branchLocker := ref.NewBranchLocker(cfg.LockDB)
store := graveler.NewGraveler(branchLocker, committedManager, stagingManager, refManager)

return &Catalog{
BlockAdapter: tierFSParams.Adapter,
Store: store,
log: logging.Default().WithField("service_name", "entry_catalog"),
managers: []io.Closer{sstableManager, sstableMetaManager},
managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}},
}, nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/graveler/ref/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"testing"

"github.com/treeverse/lakefs/pkg/batch"

"github.com/treeverse/lakefs/pkg/ident"

"github.com/ory/dockertest/v3"
Expand All @@ -23,19 +25,19 @@ var (
func testRefManager(t testing.TB) *ref.Manager {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true))
return ref.NewPGRefManager(conn, ident.NewHexAddressProvider())
return ref.NewPGRefManager(batch.NopExecutor(), conn, ident.NewHexAddressProvider())
}

func testRefManagerWithDB(t testing.TB) (*ref.Manager, db.Database) {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true))
return ref.NewPGRefManager(conn, ident.NewHexAddressProvider()), conn
return ref.NewPGRefManager(batch.NopExecutor(), conn, ident.NewHexAddressProvider()), conn
}

func testRefManagerWithAddressProvider(t testing.TB, addressProvider ident.AddressProvider) *ref.Manager {
t.Helper()
conn, _ := testutil.GetDB(t, databaseURI, testutil.WithGetDBApplyDDL(true))
return ref.NewPGRefManager(conn, addressProvider)
return ref.NewPGRefManager(batch.NopExecutor(), conn, addressProvider)
}

func TestMain(m *testing.M) {
Expand Down
Loading

0 comments on commit c68123e

Please sign in to comment.