Skip to content

Commit

Permalink
refactor: Remove redundant txn param from fetcher start (#1635)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1623

## Description

Removes the redundant txn param from fetcher start as it is always the
same value as fetcher.Init
  • Loading branch information
AndrewSisley authored Jul 17, 2023
1 parent f02e2cd commit 8197795
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 64 deletions.
2 changes: 1 addition & 1 deletion db/collection_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *collection) get(
// construct target key for DocKey
targetKey := base.MakeDocKey(*desc, key.DocKey)
// run the doc fetcher
err = df.Start(ctx, txn, core.NewSpans(core.NewSpan(targetKey, targetKey.PrefixEnd())))
err = df.Start(ctx, core.NewSpans(core.NewSpan(targetKey, targetKey.PrefixEnd())))
if err != nil {
_ = df.Close()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (c *collection) iterateAllDocs(
start := base.MakeCollectionKey(c.desc)
spans := core.NewSpans(core.NewSpan(start, start.PrefixEnd()))

err = df.Start(ctx, txn, spans)
err = df.Start(ctx, spans)
if err != nil {
_ = df.Close()
return err
Expand Down
13 changes: 7 additions & 6 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Fetcher interface {
reverse bool,
showDeleted bool,
) error
Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error
Start(ctx context.Context, spans core.Spans) error
FetchNext(ctx context.Context) (EncodedDocument, error)
FetchNextDecoded(ctx context.Context) (*client.Document, error)
FetchNextDoc(ctx context.Context, mapping *core.DocumentMapping) ([]byte, core.Doc, error)
Expand Down Expand Up @@ -118,6 +118,7 @@ func (df *DocumentFetcher) Init(
reverse bool,
showDeleted bool,
) error {
df.txn = txn
if col.Schema.IsEmpty() {
return client.NewErrUninitializeProperty("DocumentFetcher", "Schema")
}
Expand All @@ -130,6 +131,7 @@ func (df *DocumentFetcher) Init(
if showDeleted {
if df.deletedDocFetcher == nil {
df.deletedDocFetcher = new(DocumentFetcher)
df.deletedDocFetcher.txn = txn
}
return df.deletedDocFetcher.init(col, fields, filter, docmapper, reverse)
}
Expand Down Expand Up @@ -200,21 +202,21 @@ func (df *DocumentFetcher) init(
return nil
}

func (df *DocumentFetcher) Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error {
err := df.start(ctx, txn, spans, false)
func (df *DocumentFetcher) Start(ctx context.Context, spans core.Spans) error {
err := df.start(ctx, spans, false)
if err != nil {
return err
}

if df.deletedDocFetcher != nil {
return df.deletedDocFetcher.start(ctx, txn, spans, true)
return df.deletedDocFetcher.start(ctx, spans, true)
}

return nil
}

// Start implements DocumentFetcher.
func (df *DocumentFetcher) start(ctx context.Context, txn datastore.Txn, spans core.Spans, withDeleted bool) error {
func (df *DocumentFetcher) start(ctx context.Context, spans core.Spans, withDeleted bool) error {
if df.col == nil {
return client.NewErrUninitializeProperty("DocumentFetcher", "CollectionDescription")
}
Expand Down Expand Up @@ -251,7 +253,6 @@ func (df *DocumentFetcher) start(ctx context.Context, txn datastore.Txn, spans c
}

df.curSpanIndex = -1
df.txn = txn

if df.reverse {
df.order = []dsq.Order{dsq.OrderByKeyDescending{}}
Expand Down
21 changes: 10 additions & 11 deletions db/fetcher/mocks/Fetcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion db/fetcher/mocks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewStubbedFetcher(t *testing.T) *Fetcher {
mock.Anything,
mock.Anything,
).Maybe().Return(nil)
f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Maybe().Return(nil)
f.EXPECT().Start(mock.Anything, mock.Anything).Maybe().Return(nil)
f.EXPECT().FetchNext(mock.Anything).Maybe().Return(nil, nil)
f.EXPECT().FetchNextDoc(mock.Anything, mock.Anything).Maybe().
Return(NewEncodedDocument(t), core.Doc{}, nil)
Expand Down
37 changes: 19 additions & 18 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,29 @@ func (vf *VersionedFetcher) Init(
vf.col = col
vf.queuedCids = list.New()
vf.mCRDTs = make(map[uint32]crdt.MerkleCRDT)
vf.txn = txn

// create store
root := memory.NewDatastore(ctx)
vf.root = root

var err error
vf.store, err = datastore.NewTxnFrom(
ctx,
vf.root,
false,
) // were going to discard and nuke this later
if err != nil {
return err
}

// run the DF init, VersionedFetchers only supports the Primary (0) index
vf.DocumentFetcher = new(DocumentFetcher)
return vf.DocumentFetcher.Init(ctx, txn, col, fields, filter, docmapper, reverse, showDeleted)
return vf.DocumentFetcher.Init(ctx, vf.store, col, fields, filter, docmapper, reverse, showDeleted)
}

// Start serializes the correct state according to the Key and CID.
func (vf *VersionedFetcher) Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error {
func (vf *VersionedFetcher) Start(ctx context.Context, spans core.Spans) error {
if vf.col == nil {
return client.NewErrUninitializeProperty("VersionedFetcher", "CollectionDescription")
}
Expand All @@ -145,29 +160,15 @@ func (vf *VersionedFetcher) Start(ctx context.Context, txn datastore.Txn, spans
return NewErrFailedToDecodeCIDForVFetcher(err)
}

vf.txn = txn
vf.ctx = ctx
vf.key = dk
vf.version = c

// create store
root := memory.NewDatastore(ctx)
vf.root = root

vf.store, err = datastore.NewTxnFrom(
ctx,
vf.root,
false,
) // were going to discard and nuke this later
if err != nil {
return err
}

if err := vf.seekTo(vf.version); err != nil {
return NewErrFailedToSeek(c, err)
}

return vf.DocumentFetcher.Start(ctx, vf.store, core.Spans{})
return vf.DocumentFetcher.Start(ctx, core.Spans{})
}

// Rootstore returns the rootstore of the VersionedFetcher.
Expand Down Expand Up @@ -196,7 +197,7 @@ func (vf *VersionedFetcher) SeekTo(ctx context.Context, c cid.Cid) error {
return err
}

return vf.DocumentFetcher.Start(ctx, vf.store, core.Spans{})
return vf.DocumentFetcher.Start(ctx, core.Spans{})
}

// seekTo seeks to the given CID version by stepping through the CRDT state graph from the beginning
Expand Down
24 changes: 7 additions & 17 deletions db/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,14 @@ func TestFetcherStart(t *testing.T) {
df, err := newTestFetcher(ctx, txn)
assert.NoError(t, err)

err = df.Start(ctx, txn, core.Spans{})
err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)
}

func TestFetcherStartWithoutInit(t *testing.T) {
ctx := context.Background()
db, err := newMemoryDB(ctx)
if err != nil {
t.Error(err)
return
}
txn, err := db.NewTxn(ctx, true)
if err != nil {
t.Error(err)
return
}
df := new(fetcher.DocumentFetcher)
err = df.Start(ctx, txn, core.Spans{})
err := df.Start(ctx, core.Spans{})
assert.Error(t, err)
}

Expand Down Expand Up @@ -138,7 +128,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocSingle(t *testing.T) {
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, txn, core.Spans{})
err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)

encdoc, err := df.FetchNext(ctx)
Expand Down Expand Up @@ -183,7 +173,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocMultiple(t *testing.T) {
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, txn, core.Spans{})
err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)

encdoc, err := df.FetchNext(ctx)
Expand Down Expand Up @@ -221,7 +211,7 @@ func TestFetcherGetAllPrimaryIndexDecodedSingle(t *testing.T) {
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, txn, core.Spans{})
err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)

ddoc, err := df.FetchNextDecoded(ctx)
Expand Down Expand Up @@ -273,7 +263,7 @@ func TestFetcherGetAllPrimaryIndexDecodedMultiple(t *testing.T) {
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, txn, core.Spans{})
err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)

ddoc, err := df.FetchNextDecoded(ctx)
Expand Down Expand Up @@ -336,7 +326,7 @@ func TestFetcherGetOnePrimaryIndexDecoded(t *testing.T) {
core.NewSpan(docKey, docKey.PrefixEnd()),
)

err = df.Start(ctx, txn, spans)
err = df.Start(ctx, spans)
assert.NoError(t, err)

ddoc, err := df.FetchNextDecoded(ctx)
Expand Down
8 changes: 4 additions & 4 deletions db/indexed_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ func TestNonUniqueCreate_IfUponIndexingExistingDocsFetcherFails_ReturnError(t *t
Name: "Fails to start",
PrepareFetcher: func() fetcher.Fetcher {
f := fetcherMocks.NewStubbedFetcher(t)
f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Unset()
f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Return(testError)
f.EXPECT().Start(mock.Anything, mock.Anything).Unset()
f.EXPECT().Start(mock.Anything, mock.Anything).Return(testError)
f.EXPECT().Close().Unset()
f.EXPECT().Close().Return(nil)
return f
Expand Down Expand Up @@ -843,8 +843,8 @@ func TestNonUniqueUpdate_IfFetcherFails_ReturnError(t *testing.T) {
Name: "Fails to start",
PrepareFetcher: func() fetcher.Fetcher {
f := fetcherMocks.NewStubbedFetcher(t)
f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Unset()
f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Return(testError)
f.EXPECT().Start(mock.Anything, mock.Anything).Unset()
f.EXPECT().Start(mock.Anything, mock.Anything).Return(testError)
f.EXPECT().Close().Unset()
f.EXPECT().Close().Return(nil)
return f
Expand Down
6 changes: 2 additions & 4 deletions lens/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ func (f *lensedFetcher) Init(
return f.source.Init(ctx, txn, col, innerFetcherFields, filter, docmapper, reverse, showDeleted)
}

func (f *lensedFetcher) Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error {
f.txn = txn

return f.source.Start(ctx, txn, spans)
func (f *lensedFetcher) Start(ctx context.Context, spans core.Spans) error {
return f.source.Start(ctx, spans)
}

func (f *lensedFetcher) FetchNext(ctx context.Context) (fetcher.EncodedDocument, error) {
Expand Down
2 changes: 1 addition & 1 deletion planner/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (n *scanNode) initScan() error {
n.spans = core.NewSpans(core.NewSpan(start, start.PrefixEnd()))
}

err := n.fetcher.Start(n.p.ctx, n.p.txn, n.spans)
err := n.fetcher.Start(n.p.ctx, n.spans)
if err != nil {
return err
}
Expand Down

0 comments on commit 8197795

Please sign in to comment.