Skip to content

Commit

Permalink
Implement DiskId()
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
  • Loading branch information
kevina authored and whyrusleeping committed Sep 3, 2017
1 parent 1f97170 commit 5027b4b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 21 deletions.
60 changes: 52 additions & 8 deletions repo/fsrepo/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,28 @@ func TestDefaultDatastoreConfig(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(dir) // clean up
repo := FSRepo{path: dir}

config := new(config.Datastore)
err = json.Unmarshal(defaultConfig, config)
if err != nil {
t.Fatal(err)
}
ds, err := repo.constructDatastore(config.Spec)

dsc, err := AnyDatastoreConfig(config.Spec)
if err != nil {
t.Fatal(err)
}

expected := "/blocks:{flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2};/:{levelds;datastore};"
if dsc.DiskId() != expected {
t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId())
}

ds, err := dsc.Create(dir)
if err != nil {
t.Fatal(err)
}

if typ := reflect.TypeOf(ds).String(); typ != "*syncmount.Datastore" {
t.Errorf("expected '*syncmount.Datastore' got '%s'", typ)
}
Expand All @@ -102,17 +113,28 @@ func TestLevelDbConfig(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(dir) // clean up
repo := FSRepo{path: dir}

spec := make(map[string]interface{})
err = json.Unmarshal(leveldbConfig, &spec)
if err != nil {
t.Fatal(err)
}
ds, err := repo.constructDatastore(spec)

dsc, err := AnyDatastoreConfig(spec)
if err != nil {
t.Fatal(err)
}

expected := "levelds;datastore"
if dsc.DiskId() != expected {
t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId())
}

ds, err := dsc.Create(dir)
if err != nil {
t.Fatal(err)
}

if typ := reflect.TypeOf(ds).String(); typ != "*leveldb.datastore" {
t.Errorf("expected '*leveldb.datastore' got '%s'", typ)
}
Expand All @@ -129,17 +151,28 @@ func TestFlatfsConfig(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(dir) // clean up
repo := FSRepo{path: dir}

spec := make(map[string]interface{})
err = json.Unmarshal(flatfsConfig, &spec)
if err != nil {
t.Fatal(err)
}
ds, err := repo.constructDatastore(spec)

dsc, err := AnyDatastoreConfig(spec)
if err != nil {
t.Fatal(err)
}

expected := "flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2"
if dsc.DiskId() != expected {
t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId())
}

ds, err := dsc.Create(dir)
if err != nil {
t.Fatal(err)
}

if typ := reflect.TypeOf(ds).String(); typ != "*flatfs.Datastore" {
t.Errorf("expected '*flatfs.Datastore' got '%s'", typ)
}
Expand All @@ -156,17 +189,28 @@ func TestMeasureConfig(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(dir) // clean up
repo := FSRepo{path: dir}

spec := make(map[string]interface{})
err = json.Unmarshal(measureConfig, &spec)
if err != nil {
t.Fatal(err)
}
ds, err := repo.constructDatastore(spec)

dsc, err := AnyDatastoreConfig(spec)
if err != nil {
t.Fatal(err)
}

expected := "flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2"
if dsc.DiskId() != expected {
t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId())
}

ds, err := dsc.Create(dir)
if err != nil {
t.Fatal(err)
}

if typ := reflect.TypeOf(ds).String(); typ != "*measure.measure" {
t.Errorf("expected '*measure.measure' got '%s'", typ)
}
Expand Down
37 changes: 34 additions & 3 deletions repo/fsrepo/datastores.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fsrepo

import (
"bytes"
"fmt"
"path/filepath"

Expand All @@ -18,9 +19,11 @@ import (
type ConfigFromMap func(map[string]interface{}) (DatastoreConfig, error)

type DatastoreConfig interface {
// DiskId is a unique id representing the Datastore config as stored on disk, runtime config values are not
// part of this Id. No length limit.
//DiskId() string
// DiskId is a unique id representing the Datastore config as
// stored on disk, runtime config values are not part of this Id.
// Returns an empty string if the datastore does not have an on
// disk representation. No length limit.
DiskId() string

// Create instantiate a new datastore from this config
Create(path string) (repo.Datastore, error)
Expand Down Expand Up @@ -91,6 +94,14 @@ func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error
return &res, nil
}

func (c *mountDatastoreConfig) DiskId() string {
buf := new(bytes.Buffer)
for _, m := range c.mounts {
fmt.Fprintf(buf, "%s:{%s};", m.prefix.String(), m.ds.DiskId())
}
return buf.String()
}

func (c *mountDatastoreConfig) Create(path string) (repo.Datastore, error) {
mounts := make([]mount.Mount, len(c.mounts))
for i, m := range c.mounts {
Expand Down Expand Up @@ -136,6 +147,10 @@ func FlatfsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, erro
return &c, nil
}

func (c *flatfsDatastoreConfig) DiskId() string {
return fmt.Sprintf("flatfs;%s;%s", c.path, c.shardFun.String())
}

func (c *flatfsDatastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
Expand Down Expand Up @@ -173,6 +188,10 @@ func LeveldsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, err
return &c, nil
}

func (c *leveldsDatastoreConfig) DiskId() string {
return fmt.Sprintf("levelds;%s", c.path)
}

func (c *leveldsDatastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
Expand All @@ -192,6 +211,10 @@ func MemDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error)
return &memDatastoreConfig{params}, nil
}

func (c *memDatastoreConfig) DiskId() string {
return ""
}

func (c *memDatastoreConfig) Create(string) (repo.Datastore, error) {
return ds.NewMapDatastore(), nil
}
Expand Down Expand Up @@ -226,6 +249,10 @@ func (c *logDatastoreConfig) Create(path string) (repo.Datastore, error) {
return ds.NewLogDatastore(child, c.name), nil
}

func (c *logDatastoreConfig) DiskId() string {
return c.child.DiskId()
}

type measureDatastoreConfig struct {
child DatastoreConfig
prefix string
Expand All @@ -247,6 +274,10 @@ func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, err
return &measureDatastoreConfig{child, prefix}, nil
}

func (c *measureDatastoreConfig) DiskId() string {
return c.child.DiskId()
}

func (c measureDatastoreConfig) Create(path string) (repo.Datastore, error) {
child, err := c.child.Create(path)
if err != nil {
Expand Down
61 changes: 51 additions & 10 deletions repo/fsrepo/fsrepo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fsrepo

import (
"bytes"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -357,31 +358,71 @@ func (r *FSRepo) openKeystore() error {

// openDatastore returns an error if the config file is not present.
func (r *FSRepo) openDatastore() error {
if r.config.Datastore.Spec != nil {
d, err := r.constructDatastore(r.config.Datastore.Spec)
if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" {
return fmt.Errorf("old style datatstore config detected")
} else if r.config.Datastore.Spec == nil {
return fmt.Errorf("required Datastore.Spec entry missing form config file")
}

dsc, err := AnyDatastoreConfig(r.config.Datastore.Spec)
if err != nil {
return err
}
diskId := dsc.DiskId()

oldDiskId, err := r.readDiskId()
if err == nil {
if oldDiskId != diskId {
return fmt.Errorf("Datastore configuration of '%s' does not match what is on disk '%s'",
oldDiskId, diskId)
}
} else if os.IsNotExist(err) {
err := r.writeDiskId(diskId)
if err != nil {
return err
}
r.ds = d
} else if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" {
return fmt.Errorf("old style datatstore config detected")
} else {
return fmt.Errorf("required Datastore.Spec entry missing form config file")
return err
}

d, err := dsc.Create(r.path)
if err != nil {
return err
}
r.ds = d

// Wrap it with metrics gathering
prefix := "ipfs.fsrepo.datastore"
r.ds = measure.New(prefix, r.ds)

return nil
}

func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) {
cfg, err := AnyDatastoreConfig(params)
var DiskIdFn = "dsid"

func (r *FSRepo) readDiskId() (string, error) {
fn, err := config.Path(r.path, DiskIdFn)
if err != nil {
return nil, err
return "", err
}
b, err := ioutil.ReadFile(fn)
if err != nil {
return "", err
}
b = bytes.TrimSpace(b)
return string(b), nil
}

func (r *FSRepo) writeDiskId(newId string) error {
fn, err := config.Path(r.path, DiskIdFn)
if err != nil {
return err
}
return cfg.Create(r.path)
err = ioutil.WriteFile(fn, []byte(newId), 0666)
if err != nil {
return err
}
return nil
}

// Close closes the FSRepo, releasing held resources.
Expand Down

0 comments on commit 5027b4b

Please sign in to comment.