Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hybrid logstore #463

Merged
merged 25 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
45fbb2c
Refactoring headbook test suite
dgtony Oct 8, 2020
ee254a7
Impl headbook export test
dgtony Oct 8, 2020
e6bdb5e
Impl exportable for ds-backed headbook
dgtony Oct 8, 2020
11ae55d
Impl exportable for in-memory headbook
dgtony Oct 8, 2020
37508ba
Cleanup go modules
dgtony Oct 8, 2020
6ad3c72
Specialize dump/restore signatures in components
dgtony Oct 9, 2020
05e3f91
Impl export for address books (ds + mem)
dgtony Oct 9, 2020
d87a1dd
Protect against empty dumps
dgtony Oct 9, 2020
2e7c913
Cover address book export with tests
dgtony Oct 9, 2020
f70ee73
Move log/thread ID parsing into dedicated functions
dgtony Oct 12, 2020
a8f4841
Use named error for empty dump restore
dgtony Oct 12, 2020
0d9c076
Impl export for key books (ds + mem)
dgtony Oct 12, 2020
64507a8
Fix key suffix comparison
dgtony Oct 12, 2020
33b656f
Cover key book export with tests
dgtony Oct 12, 2020
05240e1
Fix resource leak on early return from key iteration
dgtony Oct 13, 2020
011921f
Impl export for metadata books (ds + mem)
dgtony Oct 13, 2020
99d59ef
Fix value decoding on dumps
dgtony Oct 13, 2020
97bd7d7
Cover metadata book export with tests
dgtony Oct 13, 2020
ce4d902
Control restore behaviour with flag
dgtony Oct 13, 2020
e831d4e
Impl hybrid logstore
dgtony Oct 13, 2020
4eda75a
Add logstore suite to datastore tests
dgtony Oct 13, 2020
20899d3
Run standard test suites on a hybrid store
dgtony Oct 14, 2020
12d816d
Impl resource finalizer
dgtony Oct 30, 2020
5c7ff62
Logstore kind option + refactoring
dgtony Oct 30, 2020
d27ca62
Rename: LogstoreKind -> LogstoreType
dgtony Nov 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 103 additions & 82 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ package common

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

ipfslite "github.com/hsanjuan/ipfs-lite"
datastore "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
cconnmgr "github.com/libp2p/go-libp2p-core/connmgr"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
ma "github.com/multiformats/go-multiaddr"
"github.com/textileio/go-threads/core/app"
core "github.com/textileio/go-threads/core/logstore"
"github.com/textileio/go-threads/logstore/lstoreds"
"github.com/textileio/go-threads/logstore/lstorehybrid"
"github.com/textileio/go-threads/logstore/lstoremem"
"github.com/textileio/go-threads/net"
util "github.com/textileio/go-threads/util"
"github.com/textileio/go-threads/util"
"google.golang.org/grpc"
)

Expand All @@ -37,41 +37,41 @@ type NetBoostrapper interface {
}

func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error) {
config := &NetConfig{}
for _, opt := range opts {
if err := opt(config); err != nil {
return nil, err
}
}
var (
config NetConfig
fin = util.NewFinalizer()
)

if config.HostAddr == nil {
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
for _, opt := range opts {
if err := opt(&config); err != nil {
return nil, err
}
config.HostAddr = addr
}

if config.ConnManager == nil {
config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20)
if err := setDefaults(&config); err != nil {
return nil, err
}

ipfsLitePath := filepath.Join(repoPath, defaultIpfsLitePath)
if err := os.MkdirAll(ipfsLitePath, os.ModePerm); err != nil {
return nil, err
}

litestore, err := ipfslite.BadgerDatastore(ipfsLitePath)
if err != nil {
return nil, err
}
fin.Add(litestore)

ctx, cancel := context.WithCancel(context.Background())
fin.Add(util.NewContextCloser(cancel))

pstore, err := pstoreds.NewPeerstore(ctx, litestore, pstoreds.DefaultOpts())
if err != nil {
litestore.Close()
cancel()
return nil, err
return nil, fin.Cleanup(err)
}
fin.Add(pstore)

priv := util.LoadKey(filepath.Join(ipfsLitePath, "key"))
h, d, err := ipfslite.SetupLibp2p(
ctx,
Expand All @@ -84,37 +84,17 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
libp2p.DisableRelay(),
)
if err != nil {
cancel()
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

lite, err := ipfslite.New(ctx, litestore, h, d, nil)
if err != nil {
cancel()
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

// Build a logstore
logstorePath := filepath.Join(repoPath, defaultLogstorePath)
if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil {
cancel()
return nil, err
}
logstore, err := ipfslite.BadgerDatastore(logstorePath)
tstore, err := buildLogstore(ctx, config.LogstoreKind, repoPath, fin)
if err != nil {
cancel()
litestore.Close()
return nil, err
}
tstore, err := lstoreds.NewLogstore(ctx, logstore, lstoreds.DefaultOpts())
if err != nil {
cancel()
if err := logstore.Close(); err != nil {
return nil, err
}
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}

// Build a network
Expand All @@ -123,33 +103,89 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
PubSub: config.PubSub,
}, config.GRPCServerOptions, config.GRPCDialOptions)
if err != nil {
cancel()
if err := logstore.Close(); err != nil {
return nil, err
}
litestore.Close()
return nil, err
return nil, fin.Cleanup(err)
}
fin.Add(h, d, api)

return &netBoostrapper{
cancel: cancel,
Net: api,
litepeer: lite,
pstore: pstore,
logstore: logstore,
litestore: litestore,
host: h,
dht: d,
finalizer: fin,
}, nil
}

func buildLogstore(ctx context.Context, kind LogstoreKind, repoPath string, fin *util.Finalizer) (core.Logstore, error) {
switch kind {
case LogstoreInMemory:
return lstoremem.NewLogstore(), nil

case LogstoreHybrid:
pls, err := persistentLogstore(ctx, repoPath, fin)
if err != nil {
return nil, err
}
mls := lstoremem.NewLogstore()
return lstorehybrid.NewLogstore(pls, mls)

case LogstorePersistent:
return persistentLogstore(ctx, repoPath, fin)

default:
return nil, fmt.Errorf("unsupported kind of logstore: %s", kind)
}
}

func persistentLogstore(ctx context.Context, repoPath string, fin *util.Finalizer) (core.Logstore, error) {
logstorePath := filepath.Join(repoPath, defaultLogstorePath)
if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil {
return nil, err
}

dstore, err := ipfslite.BadgerDatastore(logstorePath)
if err != nil {
return nil, err
}
fin.Add(dstore)

return lstoreds.NewLogstore(ctx, dstore, lstoreds.DefaultOpts())
}

func setDefaults(config *NetConfig) error {
if config.HostAddr == nil {
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
return err
}
config.HostAddr = addr
}

if config.ConnManager == nil {
config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20)
}

if len(config.LogstoreKind) == 0 {
config.LogstoreKind = LogstorePersistent
}

return nil
}

type LogstoreKind string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about LogstoreType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed


const (
LogstoreInMemory LogstoreKind = "in-memory"
LogstorePersistent LogstoreKind = "persistent"
LogstoreHybrid LogstoreKind = "hybrid"
)

type NetConfig struct {
HostAddr ma.Multiaddr
ConnManager cconnmgr.ConnManager
Debug bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
LogstoreKind LogstoreKind
PubSub bool
Debug bool
}

type NetOption func(c *NetConfig) error
Expand Down Expand Up @@ -196,15 +232,17 @@ func WithNetPubSub(enabled bool) NetOption {
}
}

func WithNetLogstore(lk LogstoreKind) NetOption {
return func(c *NetConfig) error {
c.LogstoreKind = lk
return nil
}
}

type netBoostrapper struct {
cancel context.CancelFunc
app.Net
litepeer *ipfslite.Peer
pstore peerstore.Peerstore
logstore datastore.Datastore
litestore datastore.Datastore
host host.Host
dht *dual.DHT
finalizer *util.Finalizer
}

var _ NetBoostrapper = (*netBoostrapper)(nil)
Expand All @@ -218,22 +256,5 @@ func (tsb *netBoostrapper) GetIpfsLite() *ipfslite.Peer {
}

func (tsb *netBoostrapper) Close() error {
if err := tsb.Net.Close(); err != nil {
return err
}
tsb.cancel()
if err := tsb.dht.Close(); err != nil {
return err
}
if err := tsb.host.Close(); err != nil {
return err
}
if err := tsb.pstore.Close(); err != nil {
return err
}
if err := tsb.litestore.Close(); err != nil {
return err
}
return tsb.logstore.Close()
// Logstore closed by network
return tsb.finalizer.Cleanup(nil)
}
66 changes: 66 additions & 0 deletions core/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstore

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,9 @@ var ErrLogNotFound = fmt.Errorf("log not found")
// ErrLogExists indicates a requested log already exists.
var ErrLogExists = fmt.Errorf("log already exists")

// ErrEmptyDump indicates an attempt to restore from empty dump.
var ErrEmptyDump = errors.New("empty dump")

// Logstore stores log keys, addresses, heads and thread meta data.
type Logstore interface {
Close() error
Expand Down Expand Up @@ -87,6 +91,12 @@ type ThreadMetadata interface {

// ClearMetadata clears all metadata under a thread.
ClearMetadata(t thread.ID) error

// DumpMeta packs all the stored metadata.
DumpMeta() (DumpMetadata, error)

// RestoreMeta restores metadata from the dump.
RestoreMeta(book DumpMetadata) error
}

// KeyBook stores log keys.
Expand Down Expand Up @@ -126,6 +136,12 @@ type KeyBook interface {

// ThreadsFromKeys returns a list of threads referenced in the book.
ThreadsFromKeys() (thread.IDSlice, error)

// DumpKeys packs all stored keys.
DumpKeys() (DumpKeyBook, error)

// RestoreKeys restores keys from the dump.
RestoreKeys(book DumpKeyBook) error
}

// AddrBook stores log addresses.
Expand Down Expand Up @@ -159,6 +175,12 @@ type AddrBook interface {

// ThreadsFromAddrs returns a list of threads referenced in the book.
ThreadsFromAddrs() (thread.IDSlice, error)

// DumpHeads packs all stored addresses.
DumpAddrs() (DumpAddrBook, error)

// RestoreHeads restores addresses from the dump.
RestoreAddrs(book DumpAddrBook) error
}

// HeadBook stores log heads.
Expand All @@ -180,4 +202,48 @@ type HeadBook interface {

// ClearHeads deletes the head entry for a log.
ClearHeads(thread.ID, peer.ID) error

// DumpHeads packs entire headbook into the tree.
DumpHeads() (DumpHeadBook, error)

// RestoreHeads restores headbook from the dump.
RestoreHeads(DumpHeadBook) error
}

type (
DumpHeadBook struct {
Data map[thread.ID]map[peer.ID][]cid.Cid
}

ExpiredAddress struct {
Addr ma.Multiaddr
Expires time.Time
}

DumpAddrBook struct {
Data map[thread.ID]map[peer.ID][]ExpiredAddress
}

DumpKeyBook struct {
Data struct {
Public map[thread.ID]map[peer.ID]crypto.PubKey
Private map[thread.ID]map[peer.ID]crypto.PrivKey
Read map[thread.ID][]byte
Service map[thread.ID][]byte
}
}

MetadataKey struct {
T thread.ID
K string
}

DumpMetadata struct {
Data struct {
Int64 map[MetadataKey]int64
Bool map[MetadataKey]bool
String map[MetadataKey]string
Bytes map[MetadataKey][]byte
}
}
)
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ require (
github.com/libp2p/go-libp2p v0.10.3
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-gostream v0.2.0
github.com/libp2p/go-libp2p-kad-dht v0.8.3
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.2.4
github.com/libp2p/go-libp2p-swarm v0.2.8
Expand Down
Loading