Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move persistent datastores to plugins #5695

Merged
merged 2 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/ipfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) {
if err != nil {
return nil, err
}
if ok {
if _, err := loader.LoadPlugins(pluginpath); err != nil {
log.Error("error loading plugins: ", err)
}
if !ok {
pluginpath = ""
}
if _, err := loader.LoadPlugins(pluginpath); err != nil {
log.Error("error loading plugins: ", err)
}

exctr = cmds.NewExecutor(req.Root)
Expand Down
28 changes: 15 additions & 13 deletions plugin/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,30 @@ func LoadPlugins(pluginDir string) ([]plugin.Plugin, error) {
plMap[v.Name()] = v
}

newPls, err := loadDynamicPlugins(pluginDir)
if err != nil {
return nil, err
}
if pluginDir != "" {
newPls, err := loadDynamicPlugins(pluginDir)
if err != nil {
return nil, err
}

for _, pl := range newPls {
if ppl, ok := plMap[pl.Name()]; ok {
// plugin is already preloaded
return nil, fmt.Errorf(
"plugin: %s, is duplicated in version: %s, "+
"while trying to load dynamically: %s",
ppl.Name(), ppl.Version(), pl.Version())
for _, pl := range newPls {
if ppl, ok := plMap[pl.Name()]; ok {
// plugin is already preloaded
return nil, fmt.Errorf(
"plugin: %s, is duplicated in version: %s, "+
"while trying to load dynamically: %s",
ppl.Name(), ppl.Version(), pl.Version())
}
plMap[pl.Name()] = pl
}
plMap[pl.Name()] = pl
}

pls := make([]plugin.Plugin, 0, len(plMap))
for _, v := range plMap {
pls = append(pls, v)
}

err = initialize(pls)
err := initialize(pls)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions plugin/loader/preload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package loader

import (
"github.com/ipfs/go-ipfs/plugin"
pluginbadgerds "github.com/ipfs/go-ipfs/plugin/plugins/badgerds"
pluginflatfs "github.com/ipfs/go-ipfs/plugin/plugins/flatfs"
pluginipldgit "github.com/ipfs/go-ipfs/plugin/plugins/git"
pluginlevelds "github.com/ipfs/go-ipfs/plugin/plugins/levelds"
)

// DO NOT EDIT THIS FILE
Expand All @@ -11,4 +14,7 @@ import (

var preloadPlugins = []plugin.Plugin{
pluginipldgit.Plugins[0],
pluginbadgerds.Plugins[0],
pluginflatfs.Plugins[0],
pluginlevelds.Plugins[0],
}
4 changes: 4 additions & 0 deletions plugin/loader/preload_list
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
# name go-path number of the sub-plugin

ipldgit github.com/ipfs/go-ipfs/plugin/plugins/git 0

badgerds github.com/ipfs/go-ipfs/plugin/plugins/badgerds 0
flatfs github.com/ipfs/go-ipfs/plugin/plugins/flatfs 0
levelds github.com/ipfs/go-ipfs/plugin/plugins/levelds 0
2 changes: 1 addition & 1 deletion plugin/plugins/Rules.mk
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
include mk/header.mk

$(d)_plugins:=$(d)/git
$(d)_plugins:=$(d)/git $(d)/badgerds $(d)/flatfs $(d)/levelds
$(d)_plugins_so:=$(addsuffix .so,$($(d)_plugins))
$(d)_plugins_main:=$(addsuffix /main/main.go,$($(d)_plugins))

Expand Down
114 changes: 114 additions & 0 deletions plugin/plugins/badgerds/badgerds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package badgerds

import (
"fmt"
"os"
"path/filepath"

"github.com/ipfs/go-ipfs/plugin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"

humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
badgerds "gx/ipfs/QmaixNkKwtinV3umL5VD1VDD5CQjnZhXY31awM2YHTzbui/go-ds-badger"
)

// Plugins is exported list of plugins that will be loaded
var Plugins = []plugin.Plugin{
&badgerdsPlugin{},
}

type badgerdsPlugin struct{}

var _ plugin.PluginDatastore = (*badgerdsPlugin)(nil)

func (*badgerdsPlugin) Name() string {
return "ds-badgerds"
}

func (*badgerdsPlugin) Version() string {
return "0.1.0"
}

func (*badgerdsPlugin) Init() error {
return nil
}

func (*badgerdsPlugin) DatastoreTypeName() string {
return "badgerds"
}

type datastoreConfig struct {
path string
syncWrites bool

vlogFileSize int64
}

// BadgerdsDatastoreConfig returns a configuration stub for a badger datastore
// from the given parameters
func (*badgerdsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {
var c datastoreConfig
var ok bool

c.path, ok = params["path"].(string)
if !ok {
return nil, fmt.Errorf("'path' field is missing or not string")
}

sw, ok := params["syncWrites"]
if !ok {
c.syncWrites = true
} else {
if swb, ok := sw.(bool); ok {
c.syncWrites = swb
} else {
return nil, fmt.Errorf("'syncWrites' field was not a boolean")
}
}

vls, ok := params["vlogFileSize"]
if !ok {
// default to 1GiB
c.vlogFileSize = badgerds.DefaultOptions.ValueLogFileSize
} else {
if vlogSize, ok := vls.(string); ok {
s, err := humanize.ParseBytes(vlogSize)
if err != nil {
return nil, err
}
c.vlogFileSize = int64(s)
} else {
return nil, fmt.Errorf("'vlogFileSize' field was not a string")
}
}

return &c, nil
}
}

func (c *datastoreConfig) DiskSpec() fsrepo.DiskSpec {
return map[string]interface{}{
"type": "badgerds",
"path": c.path,
}
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
p = filepath.Join(path, p)
}

err := os.MkdirAll(p, 0755)
if err != nil {
return nil, err
}

defopts := badgerds.DefaultOptions
defopts.SyncWrites = c.syncWrites
defopts.ValueLogFileSize = c.vlogFileSize

return badgerds.NewDatastore(p, &defopts)
}
90 changes: 90 additions & 0 deletions plugin/plugins/flatfs/flatfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package flatfs

import (
"fmt"
"path/filepath"

"github.com/ipfs/go-ipfs/plugin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"

flatfs "gx/ipfs/QmVFboKxbVJZMJAoFdvX6q4hzvXFkbWCE8DejnqrQV4ZtN/go-ds-flatfs"
)

// Plugins is exported list of plugins that will be loaded
var Plugins = []plugin.Plugin{
&flatfsPlugin{},
}

type flatfsPlugin struct{}

var _ plugin.PluginDatastore = (*flatfsPlugin)(nil)

func (*flatfsPlugin) Name() string {
return "ds-flatfs"
}

func (*flatfsPlugin) Version() string {
return "0.1.0"
}

func (*flatfsPlugin) Init() error {
return nil
}

func (*flatfsPlugin) DatastoreTypeName() string {
return "flatfs"
}

type datastoreConfig struct {
path string
shardFun *flatfs.ShardIdV1
syncField bool
}

// BadgerdsDatastoreConfig returns a configuration stub for a badger datastore
// from the given parameters
func (*flatfsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {
var c datastoreConfig
var ok bool
var err error

c.path, ok = params["path"].(string)
if !ok {
return nil, fmt.Errorf("'path' field is missing or not boolean")
}

sshardFun, ok := params["shardFunc"].(string)
if !ok {
return nil, fmt.Errorf("'shardFunc' field is missing or not a string")
}
c.shardFun, err = flatfs.ParseShardFunc(sshardFun)
if err != nil {
return nil, err
}

c.syncField, ok = params["sync"].(bool)
if !ok {
return nil, fmt.Errorf("'sync' field is missing or not boolean")
}
return &c, nil
}
}

func (c *datastoreConfig) DiskSpec() fsrepo.DiskSpec {
return map[string]interface{}{
"type": "flatfs",
"path": c.path,
"shardFunc": c.shardFun.String(),
}
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
p = filepath.Join(path, p)
}

return flatfs.CreateOrOpen(p, c.shardFun, c.syncField)
}
88 changes: 88 additions & 0 deletions plugin/plugins/levelds/levelds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package levelds

import (
"fmt"
"path/filepath"

"github.com/ipfs/go-ipfs/plugin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"

ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
levelds "gx/ipfs/QmccqjKZUTqp4ikWNyAbjBuP5HEdqSqRuAr9mcEhYab54a/go-ds-leveldb"
)

// Plugins is exported list of plugins that will be loaded
var Plugins = []plugin.Plugin{
&leveldsPlugin{},
}

type leveldsPlugin struct{}

var _ plugin.PluginDatastore = (*leveldsPlugin)(nil)

func (*leveldsPlugin) Name() string {
return "ds-level"
}

func (*leveldsPlugin) Version() string {
return "0.1.0"
}

func (*leveldsPlugin) Init() error {
return nil
}

func (*leveldsPlugin) DatastoreTypeName() string {
return "levelds"
}

type datastoreConfig struct {
path string
compression ldbopts.Compression
}

// BadgerdsDatastoreConfig returns a configuration stub for a badger datastore
// from the given parameters
func (*leveldsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {
var c datastoreConfig
var ok bool

c.path, ok = params["path"].(string)
if !ok {
return nil, fmt.Errorf("'path' field is missing or not string")
}

switch cm := params["compression"].(string); cm {
case "none":
c.compression = ldbopts.NoCompression
case "snappy":
c.compression = ldbopts.SnappyCompression
case "":
c.compression = ldbopts.DefaultCompression
default:
return nil, fmt.Errorf("unrecognized value for compression: %s", cm)
}

return &c, nil
}
}

func (c *datastoreConfig) DiskSpec() fsrepo.DiskSpec {
return map[string]interface{}{
"type": "levelds",
"path": c.path,
}
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
p = filepath.Join(path, p)
}

return levelds.NewDatastore(p, &levelds.Options{
Compression: c.compression,
})
}
Loading