From 9d4a333c2af2cdb10747dd64b92b63ac9d278c7e Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Mon, 25 Jul 2022 17:25:34 -0400 Subject: [PATCH] test: Add integration testing for P2P. (#655) Relevant issue(s) Resolves #564 Description This PR adds P2P integration testing functionalities. It's limited to passive syncing of nodes on a given document. The test suite starts 2 Defra nodes, waits for them to connect and then proceeds to update the commonly shared document. We then ensure that the changes on one node are reflected on the other. --- db/collection.go | 3 +- merkle/clock/heads.go | 2 +- merkle/crdt/merklecrdt.go | 5 +- net/peer.go | 20 ++ net/server.go | 35 ++++ node/node.go | 141 ++++++++++++- tests/integration/net/tcp_test.go | 131 ++++++++++++ tests/integration/net/utils.go | 325 ++++++++++++++++++++++++++++++ tests/integration/utils.go | 4 +- 9 files changed, 652 insertions(+), 14 deletions(-) create mode 100644 tests/integration/net/tcp_test.go create mode 100644 tests/integration/net/utils.go diff --git a/db/collection.go b/db/collection.go index 6ef611353e..03aa8e884e 100644 --- a/db/collection.go +++ b/db/collection.go @@ -325,7 +325,6 @@ func (c *collection) getAllDocKeysChan( default: // noop, just continue on the with the for loop } - if res.Error != nil { resCh <- client.DocKeysResult{ Err: res.Error, @@ -334,7 +333,7 @@ func (c *collection) getAllDocKeysChan( } // now we have a doc key - rawDocKey := ds.NewKey(res.Key).Type() + rawDocKey := ds.NewKey(res.Key).BaseNamespace() key, err := client.NewDocKeyFromString(rawDocKey) if err != nil { resCh <- client.DocKeysResult{ diff --git a/merkle/clock/heads.go b/merkle/clock/heads.go index b75103e4c0..b90ccf076e 100644 --- a/merkle/clock/heads.go +++ b/merkle/clock/heads.go @@ -132,7 +132,7 @@ func (hh *heads) Replace(ctx context.Context, h, c cid.Cid, height uint64) error } func (hh *heads) Add(ctx context.Context, c cid.Cid, height uint64) error { - log.Info(ctx, "Adding new DAG head", + log.Debug(ctx, "Adding new DAG head", logging.NewKV("CID", c), logging.NewKV("Height", height)) return hh.write(ctx, hh.store, c, height) diff --git a/merkle/crdt/merklecrdt.go b/merkle/crdt/merklecrdt.go index 0cf6e86d05..6494dbd917 100644 --- a/merkle/crdt/merklecrdt.go +++ b/merkle/crdt/merklecrdt.go @@ -13,6 +13,7 @@ package crdt import ( "context" "fmt" + "time" "github.com/sourcenetwork/defradb/core" corenet "github.com/sourcenetwork/defradb/core/net" @@ -26,6 +27,8 @@ var ( log = logging.MustNewLogger("defra.merklecrdt") ) +const broadcasterTimeout = time.Second + // MerkleCRDT is the implementation of a Merkle Clock along with a // CRDT payload. It implements the ReplicatedData interface // so it can be merged with any given semantics. @@ -109,7 +112,7 @@ func (base *baseMerkleCRDT) Broadcast(ctx context.Context, nd ipld.Node, delta c Block: nd, Priority: netdelta.GetPriority(), } - if err := base.broadcaster.Send(lg); err != nil { + if err := base.broadcaster.SendWithTimeout(lg, broadcasterTimeout); err != nil { log.ErrorE( ctx, "Failed to broadcast MerkleCRDT update", diff --git a/net/peer.go b/net/peer.go index 18e2e13543..e81cf97e7f 100644 --- a/net/peer.go +++ b/net/peer.go @@ -159,6 +159,18 @@ func (p *Peer) Close() error { stopGRPCServer(p.ctx, p.p2pRPC) // stopGRPCServer(p.tcpRPC) + // close event emitters + if p.server.pubSubEmitter != nil { + if err := p.server.pubSubEmitter.Close(); err != nil { + log.Info(p.ctx, "Could not close pubsub event emitter", logging.NewKV("Error", err)) + } + } + if p.server.pushLogEmitter != nil { + if err := p.server.pushLogEmitter.Close(); err != nil { + log.Info(p.ctx, "Could not close push log event emitter", logging.NewKV("Error", err)) + } + } + p.bus.Discard() p.cancel() return nil @@ -463,3 +475,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) { timer.Stop() } } + +type EvtReceivedPushLog struct { + Peer peer.ID +} + +type EvtPubSub struct { + Peer peer.ID +} diff --git a/net/server.go b/net/server.go index 1f9e65629b..cbc23862d8 100644 --- a/net/server.go +++ b/net/server.go @@ -19,6 +19,7 @@ import ( "github.com/gogo/protobuf/proto" format "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p-core/event" libpeer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/textileio/go-libp2p-pubsub-rpc" "google.golang.org/grpc" @@ -45,6 +46,9 @@ type server struct { mu sync.Mutex conns map[libpeer.ID]*grpc.ClientConn + + pubSubEmitter event.Emitter + pushLogEmitter event.Emitter } // newServer creates a new network server that handle/directs RPC requests to the @@ -93,6 +97,16 @@ func newServer(p *Peer, db client.DB, opts ...grpc.DialOption) (*server, error) log.Debug(p.ctx, "Finished registering all DocKey pubsub topics", logging.NewKV("Count", i)) } + var err error + s.pubSubEmitter, err = s.peer.host.EventBus().Emitter(new(EvtPubSub)) + if err != nil { + log.Info(s.peer.ctx, "could not create event emitter", logging.NewKV("Error", err)) + } + s.pushLogEmitter, err = s.peer.host.EventBus().Emitter(new(EvtReceivedPushLog)) + if err != nil { + log.Info(s.peer.ctx, "could not create event emitter", logging.NewKV("Error", err)) + } + return s, nil } @@ -177,6 +191,17 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL log.Debug(ctx, "No more children to process for log", logging.NewKV("CID", cid)) } + if s.pushLogEmitter != nil { + err = s.pushLogEmitter.Emit(EvtReceivedPushLog{ + Peer: pid, + }) + if err != nil { + // logging instead of returning an error because the event bus should + // not break the PushLog execution. + log.Info(ctx, "could not emit push log event", logging.NewKV("Error", err)) + } + } + return &pb.PushLogReply{}, nil } @@ -303,7 +328,17 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { "Received new pubsub event", logging.NewKV("SenderId", from), logging.NewKV("Topic", topic), + logging.NewKV("Message", string(msg)), ) + + if s.pubSubEmitter != nil { + err := s.pubSubEmitter.Emit(EvtPubSub{ + Peer: from, + }) + if err != nil { + log.Info(s.peer.ctx, "could not emit pubsub event", logging.NewKV("Error", err)) + } + } } func (s *server) listAllDocKeys() (<-chan client.DocKeysResult, error) { diff --git a/node/node.go b/node/node.go index f7f75b16d8..9e200476de 100644 --- a/node/node.go +++ b/node/node.go @@ -18,15 +18,19 @@ package node import ( "context" + "fmt" "os" "path/filepath" + "time" ipfslite "github.com/hsanjuan/ipfs-lite" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-peerstore/pstoreds" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -42,6 +46,8 @@ var ( log = logging.MustNewLogger("defra.node") ) +const evtWaitTimeout = 10 * time.Second + type Node struct { // embed the DB interface into the node client.DB @@ -52,6 +58,15 @@ type Node struct { pubsub *pubsub.PubSub litepeer *ipfslite.Peer + // receives an event when the status of a peer connection changes. + peerEvent chan event.EvtPeerConnectednessChanged + + // receives an event when a pubsub topic is added. + pubSubEvent chan net.EvtPubSub + + // receives an event when a pushLog request has been processed. + pushLogEvent chan net.EvtReceivedPushLog + ctx context.Context } @@ -144,20 +159,130 @@ func NewNode( return nil, fin.Cleanup(err) } - return &Node{ - Peer: peer, - host: h, - pubsub: ps, - DB: db, - litepeer: lite, - ctx: ctx, - }, nil + n := &Node{ + pubSubEvent: make(chan net.EvtPubSub), + pushLogEvent: make(chan net.EvtReceivedPushLog), + peerEvent: make(chan event.EvtPeerConnectednessChanged), + Peer: peer, + host: h, + pubsub: ps, + DB: db, + litepeer: lite, + ctx: ctx, + } + + n.subscribeToPeerConnectionEvents() + n.subscribeToPubSubEvents() + n.subscribeToPushLogEvents() + + return n, nil } func (n *Node) Boostrap(addrs []peer.AddrInfo) { n.litepeer.Bootstrap(addrs) } +// PeerID returns the node's peer ID. +func (n *Node) PeerID() peer.ID { + return n.host.ID() +} + +// subscribeToPeerConnectionEvents subscribes the node to the event bus for a peer connection change. +func (n *Node) subscribeToPeerConnectionEvents() { + sub, err := n.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + log.Info( + n.ctx, + fmt.Sprintf("failed to subscribe to peer connectedness changed event: %v", err), + ) + } + go func() { + for e := range sub.Out() { + n.peerEvent <- e.(event.EvtPeerConnectednessChanged) + } + }() +} + +// subscribeToPubSubEvents subscribes the node to the event bus for a pubsub. +func (n *Node) subscribeToPubSubEvents() { + sub, err := n.host.EventBus().Subscribe(new(net.EvtPubSub)) + if err != nil { + log.Info( + n.ctx, + fmt.Sprintf("failed to subscribe to pubsub event: %v", err), + ) + } + go func() { + for e := range sub.Out() { + n.pubSubEvent <- e.(net.EvtPubSub) + } + }() +} + +// subscribeToPushLogEvents subscribes the node to the event bus for a push log request completion. +func (n *Node) subscribeToPushLogEvents() { + sub, err := n.host.EventBus().Subscribe(new(net.EvtReceivedPushLog)) + if err != nil { + log.Info( + n.ctx, + fmt.Sprintf("failed to subscribe to push log event: %v", err), + ) + } + go func() { + for e := range sub.Out() { + n.pushLogEvent <- e.(net.EvtReceivedPushLog) + } + }() +} + +// WaitForPeerConnectionEvent listens to the event channel for a connection event from a given peer. +func (n *Node) WaitForPeerConnectionEvent(id peer.ID) error { + if n.host.Network().Connectedness(id) == network.Connected { + return nil + } + for { + select { + case evt := <-n.peerEvent: + if evt.Peer != id { + continue + } + return nil + case <-time.After(evtWaitTimeout): + return fmt.Errorf("waiting for peer connection timed out") + } + } +} + +// WaitForPubSubEvent listens to the event channel for pub sub event from a given peer. +func (n *Node) WaitForPubSubEvent(id peer.ID) error { + for { + select { + case evt := <-n.pubSubEvent: + if evt.Peer != id { + continue + } + return nil + case <-time.After(evtWaitTimeout): + return fmt.Errorf("waiting for pubsub timed out") + } + } +} + +// WaitForPushLogEvent listens to the event channel for a push log event from a given peer. +func (n *Node) WaitForPushLogEvent(id peer.ID) error { + for { + select { + case evt := <-n.pushLogEvent: + if evt.Peer != id { + continue + } + return nil + case <-time.After(evtWaitTimeout): + return fmt.Errorf("waiting for pushlog timed out") + } + } +} + // replace with proper keystore func getHostKey(keypath string) (crypto.PrivKey, error) { // If a local datastore is used, the key is written to a file diff --git a/tests/integration/net/tcp_test.go b/tests/integration/net/tcp_test.go new file mode 100644 index 0000000000..cb3bfb3400 --- /dev/null +++ b/tests/integration/net/tcp_test.go @@ -0,0 +1,131 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "testing" + + "github.com/sourcenetwork/defradb/config" +) + +// TestP2PWithSingleDocumentUpdatePerNode tests document syncing between two nodes with a single update per node +func TestP2PWithSingleDocumentUpdatePerNode(t *testing.T) { + test := P2PTestCase{ + NodeConfig: []*config.Config{ + randomNetworkingConfig(), + randomNetworkingConfig(), + }, + NodePeers: map[int][]int{ + 1: { + 0, + }, + }, + SeedDocuments: []string{ + `{ + "Name": "John", + "Age": 21 + }`, + }, + Updates: map[int]map[int][]string{ + 1: { + 0: { + `{ + "Age": 45 + }`, + }, + }, + 0: { + 0: { + `{ + "Age": 60 + }`, + }, + }, + }, + Results: map[int]map[int]map[string]interface{}{ + 0: { + 0: { + "Age": uint64(45), + }, + }, + 1: { + 0: { + "Age": uint64(60), + }, + }, + }, + } + + executeTestCase(t, test) +} + +// TestP2PWithMultipleDocumentUpdatesPerNode tests document syncing between two nodes with multiple updates per node. +func TestP2PWithMultipleDocumentUpdatesPerNode(t *testing.T) { + test := P2PTestCase{ + NodeConfig: []*config.Config{ + randomNetworkingConfig(), + randomNetworkingConfig(), + }, + NodePeers: map[int][]int{ + 1: { + 0, + }, + }, + SeedDocuments: []string{ + `{ + "Name": "John", + "Age": 21 + }`, + }, + Updates: map[int]map[int][]string{ + 0: { + 0: { + `{ + "Age": 60 + }`, + `{ + "Age": 61 + }`, + `{ + "Age": 62 + }`, + }, + }, + 1: { + 0: { + `{ + "Age": 45 + }`, + `{ + "Age": 46 + }`, + `{ + "Age": 47 + }`, + }, + }, + }, + Results: map[int]map[int]map[string]interface{}{ + 0: { + 0: { + "Age": uint64(47), + }, + }, + 1: { + 0: { + "Age": uint64(62), + }, + }, + }, + } + + executeTestCase(t, test) +} diff --git a/tests/integration/net/utils.go b/tests/integration/net/utils.go new file mode 100644 index 0000000000..ef1370e6cc --- /dev/null +++ b/tests/integration/net/utils.go @@ -0,0 +1,325 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "testing" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/config" + coreDB "github.com/sourcenetwork/defradb/db" + "github.com/sourcenetwork/defradb/logging" + netutils "github.com/sourcenetwork/defradb/net/utils" + "github.com/sourcenetwork/defradb/node" + testutils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/stretchr/testify/assert" + "github.com/textileio/go-threads/broadcast" +) + +var ( + log = logging.MustNewLogger("defra.test.net") + + usedPorts = make(map[int]bool) + portSyncLock sync.Mutex +) + +const ( + busBufferSize = 100 + + userCollectionGQLSchema = ` + type users { + Name: String + Email: String + Age: Int + HeightM: Float + Verified: Boolean + } + ` + + userCollection = "users" +) + +type P2PTestCase struct { + // Configuration parameters for each peer + NodeConfig []*config.Config + + // List of peers for each node. + // Only peers with lower index than the node can be used in the list of peers. + NodePeers map[int][]int + + SeedDocuments []string + + // node/dockey/values + Updates map[int]map[int][]string + Results map[int]map[int]map[string]interface{} +} + +func setupDefraNode(t *testing.T, cfg *config.Config, seeds []string) (*node.Node, []client.DocKey, error) { + ctx := context.Background() + var err error + + log.Info(ctx, "Building new memory store") + bs := broadcast.NewBroadcaster(busBufferSize) + dbi, err := testutils.NewBadgerMemoryDB(ctx, coreDB.WithBroadcaster(bs)) + if err != nil { + return nil, nil, err + } + + db := dbi.DB() + + if err := seedSchema(ctx, db); err != nil { + return nil, nil, err + } + + // seed the database with a set of documents + dockeys := []client.DocKey{} + for _, document := range seeds { + dockey, err := seedDocument(ctx, db, document) + if err != nil { + t.Fatal(err) + } + dockeys = append(dockeys, dockey) + } + + // init the p2p node + var n *node.Node + log.Info(ctx, "Starting P2P node", logging.NewKV("P2P address", cfg.Net.P2PAddress)) + n, err = node.NewNode( + ctx, + db, + bs, + cfg.NodeConfig(), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to start P2P node: %w", err) + } + + // parse peers and bootstrap + if len(cfg.Net.Peers) != 0 { + log.Info(ctx, "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers)) + addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ",")) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse bootstrap peers %v: %w", cfg.Net.Peers, err) + } + log.Info(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs)) + n.Boostrap(addrs) + } + + if err := n.Start(); err != nil { + closeErr := n.Close() + if closeErr != nil { + return nil, nil, fmt.Errorf("unable to start P2P listeners: %v: problem closing node: %w", err, closeErr) + } + return nil, nil, fmt.Errorf("unable to start P2P listeners: %w", err) + } + + return n, dockeys, nil +} + +func seedSchema(ctx context.Context, db client.DB) error { + return db.AddSchema(ctx, userCollectionGQLSchema) +} + +func seedDocument(ctx context.Context, db client.DB, document string) (client.DocKey, error) { + col, err := db.GetCollectionByName(ctx, userCollection) + if err != nil { + return client.DocKey{}, err + } + + doc, err := client.NewDocFromJSON([]byte(document)) + if err != nil { + return client.DocKey{}, err + } + + err = col.Save(ctx, doc) + if err != nil { + return client.DocKey{}, err + } + + return doc.Key(), nil +} + +func updateDocument(ctx context.Context, db client.DB, dockey client.DocKey, update string) error { + col, err := db.GetCollectionByName(ctx, userCollection) + if err != nil { + return err + } + + doc, err := getDocument(ctx, db, dockey) + if err != nil { + return err + } + + if err := doc.SetWithJSON([]byte(update)); err != nil { + return err + } + + return col.Save(ctx, doc) +} + +func getDocument(ctx context.Context, db client.DB, dockey client.DocKey) (*client.Document, error) { + col, err := db.GetCollectionByName(ctx, userCollection) + if err != nil { + return nil, err + } + + doc, err := col.Get(ctx, dockey) + if err != nil { + return nil, err + } + return doc, err +} + +func executeTestCase(t *testing.T, test P2PTestCase) { + ctx := context.Background() + + dockeys := []client.DocKey{} + nodes := []*node.Node{} + + for i, cfg := range test.NodeConfig { + log.Info(ctx, fmt.Sprintf("Setting up node %d", i)) + cfg.Datastore.Badger.Path = t.TempDir() + if peers, ok := test.NodePeers[i]; ok { + peerAddresses := []string{} + for _, p := range peers { + if p >= len(nodes) { + log.Info(ctx, "cannot set a peer that hasn't been started. Skipping to next peer") + continue + } + peerAddresses = append( + peerAddresses, + fmt.Sprintf("%s/p2p/%s", test.NodeConfig[p].Net.P2PAddress, nodes[p].PeerID()), + ) + } + cfg.Net.Peers = strings.Join(peerAddresses, ",") + } + n, d, err := setupDefraNode(t, cfg, test.SeedDocuments) + if err != nil { + t.Fatal(err) + } + + if i == 0 { + dockeys = append(dockeys, d...) + } + nodes = append(nodes, n) + } + + // wait for peers to connect to each other + for i, n := range nodes { + for j, p := range nodes { + if i == j { + continue + } + log.Info(ctx, fmt.Sprintf("Waiting for node %d to connect with peer %d", i, j)) + err := n.WaitForPubSubEvent(p.PeerID()) + if err != nil { + t.Fatal(err) + } + log.Info(ctx, fmt.Sprintf("Node %d connected to peer %d", i, j)) + } + } + + // update and sync peers + for n, updateMap := range test.Updates { + if n >= len(nodes) { + log.Info(ctx, "cannot update a node that hasn't been started. Skipping to next node") + continue + } + + for d, updates := range updateMap { + for _, update := range updates { + log.Info(ctx, fmt.Sprintf("Updating node %d with update %d", n, d)) + if err := updateDocument(ctx, nodes[n].DB, dockeys[d], update); err != nil { + t.Fatal(err) + } + + // wait for peers to sync + for n2, p := range nodes { + if n2 == n { + continue + } + log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", n2, n)) + err := p.WaitForPushLogEvent(nodes[n].PeerID()) + if err != nil { + t.Fatal(err) + } + log.Info(ctx, fmt.Sprintf("Node %d synced", n2)) + } + } + } + + // check that peers actually received the update + for n2, resultsMap := range test.Results { + if n2 == n { + continue + } + if n2 >= len(nodes) { + log.Info(ctx, "cannot check results of a node that hasn't been started. Skipping to next node") + continue + } + + for d, results := range resultsMap { + for field, result := range results { + doc, err := getDocument(ctx, nodes[n2].DB, dockeys[d]) + if err != nil { + t.Fatal(err) + } + + val, err := doc.Get(field) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, result, val) + } + } + } + } + + // clean up + for _, n := range nodes { + n.DB.Close(ctx) + if err := n.Close(); err != nil { + log.Info(ctx, "node not closing as expected", logging.NewKV("Error", err)) + } + } +} + +func randomNetworkingConfig() *config.Config { + p2pPort := newPort() + tcpPort := newPort() + cfg := config.DefaultConfig() + cfg.Net.P2PAddress = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", p2pPort) + cfg.Net.RPCAddress = fmt.Sprintf("0.0.0.0:%d", tcpPort) + cfg.Net.TCPAddress = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", tcpPort) + return cfg +} + +// newPort returns a port number between 9000 and 9999 and ensures +// it hasn't already been used by the test suite. +func newPort() int { + portSyncLock.Lock() + defer portSyncLock.Unlock() + + p := rand.Intn(999) + 9000 + if usedPorts[p] { + return newPort() + } + + usedPorts[p] = true + + return p +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 9d4d7df415..c18f23a25b 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -180,14 +180,14 @@ func IsDetectingDbChanges() bool { return detectDbChanges } -func NewBadgerMemoryDB(ctx context.Context) (databaseInfo, error) { +func NewBadgerMemoryDB(ctx context.Context, dbopts ...db.Option) (databaseInfo, error) { opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} rootstore, err := badgerds.NewDatastore("", &opts) if err != nil { return databaseInfo{}, err } - db, err := db.NewDB(ctx, rootstore) + db, err := db.NewDB(ctx, rootstore, dbopts...) if err != nil { return databaseInfo{}, err }