Skip to content

Commit

Permalink
fix(fsrepo.Profiles): add file lock for peers.json
Browse files Browse the repository at this point in the history
we've been getting all sorts of bad json from not guarding peers.json
from concurrent reads & writes. This makes some initial inroads on the
problem, but isn't to be trusted until comprehensive tests are written.

makes more inroads on #357
  • Loading branch information
b5 committed Apr 23, 2018
1 parent 0942b9f commit f692d55
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
GOFILES = $(shell find . -name '*.go' -not -path './vendor/*')
GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/ugorji/go/codec
GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/ugorji/go/codec github.com/theckman/go-flock

default: build

Expand Down
15 changes: 1 addition & 14 deletions p2p/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (n *QriNode) RequestProfile(pid peer.ID) (*profile.Profile, error) {
}

res := <-replies
log.Debug(res)
log.Debugf("profile response for message: %s", res.ID)

cp := &profile.CodingProfile{}
if err := json.Unmarshal(res.Body, cp); err != nil {
Expand Down Expand Up @@ -68,20 +68,7 @@ func (n *QriNode) handleProfile(ws *WrappedStream, msg Message) (hangup bool) {
return
}

// pids, err := pro.PeerIDs()
// if err != nil {
// log.Debug(err.Error())
// return
// }

pro.Updated = time.Now()
n.Repo.Profiles().PutProfile(pro)

// log.Debugf("adding peer: %s", pid.Pretty())
// if err := n.Repo.Profiles().PutPeer(pid, pro); err != nil {
// log.Debug(err.Error())
// return
// }

data, err := n.profileBytes()
if err != nil {
Expand Down
60 changes: 41 additions & 19 deletions repo/fs/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-datastore"
"github.com/qri-io/qri/repo/profile"
"github.com/theckman/go-flock"

"gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
)
Expand All @@ -21,17 +22,20 @@ var ErrNotFound = fmt.Errorf("Not Found")
type ProfileStore struct {
sync.Mutex
basepath
flock *flock.Flock
}

// NewProfileStore allocates a ProfileStore
func NewProfileStore(bp basepath) ProfileStore {
return ProfileStore{
basepath: bp,
flock: flock.NewFlock(bp.filepath(FilePeers) + ".lock"),
}
}

// PutProfile adds a peer to the store
func (r ProfileStore) PutProfile(p *profile.Profile) error {
log.Debugf("put profile: %s", p.ID.String())
if p.ID.String() == "" {
return fmt.Errorf("profile ID is required")
}
Expand All @@ -48,7 +52,7 @@ func (r ProfileStore) PutProfile(p *profile.Profile) error {
if err != nil {
return err
}
ps[p.ID] = enc
ps[p.ID.String()] = enc
return r.saveFile(ps, FilePeers)
}

Expand All @@ -62,8 +66,10 @@ func (r ProfileStore) PeerIDs(id profile.ID) ([]peer.ID, error) {
return nil, err
}

ids := id.String()

for proid, cp := range ps {
if id == proid {
if ids == proid {
pro := &profile.Profile{}
if err := pro.Decode(cp); err != nil {
return nil, err
Expand All @@ -88,12 +94,12 @@ func (r ProfileStore) List() (map[profile.ID]*profile.Profile, error) {
}

profiles := map[profile.ID]*profile.Profile{}
for id, cp := range ps {
for _, cp := range ps {
pro := &profile.Profile{}
if err := pro.Decode(cp); err != nil {
return nil, err
}
profiles[id] = pro
profiles[pro.ID] = pro
}

return profiles, nil
Expand All @@ -111,14 +117,16 @@ func (r ProfileStore) PeernameID(peername string) (profile.ID, error) {

for id, cp := range ps {
if cp.Peername == peername {
return id, nil
return profile.IDB58Decode(id)
}
}
return "", datastore.ErrNotFound
}

// GetProfile fetches a profile from the store
func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) {
log.Debugf("get profile: %s", id.String())

r.Lock()
defer r.Unlock()

Expand All @@ -127,8 +135,10 @@ func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) {
return nil, err
}

ids := id.String()

for proid, p := range ps {
if id == proid {
if ids == proid {
pro := &profile.Profile{}
err := pro.Decode(p)
return pro, err
Expand All @@ -140,6 +150,8 @@ func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) {

// PeerProfile gives the profile that corresponds with a given peer.ID
func (r ProfileStore) PeerProfile(id peer.ID) (*profile.Profile, error) {
log.Debugf("peerProfile: %s", id.String())

r.Lock()
defer r.Unlock()

Expand All @@ -149,7 +161,7 @@ func (r ProfileStore) PeerProfile(id peer.ID) (*profile.Profile, error) {
}

for _, p := range ps {
if _, ok := p.Addresses[id.Pretty()]; ok {
if _, ok := p.Addresses[id.String()]; ok {
pro := &profile.Profile{}
err := pro.Decode(p)
return pro, err
Expand All @@ -168,28 +180,41 @@ func (r ProfileStore) DeleteProfile(id profile.ID) error {
if err != nil {
return err
}
delete(ps, id)
delete(ps, id.String())
return r.saveFile(ps, FilePeers)
}

func (r ProfileStore) saveFile(ps map[profile.ID]*profile.CodingProfile, f File) error {
log.Debugf("writing profiles: %s", r.filepath(f))
// pss := map[string]*profile.Profile{}
// for _, p := range ps {
// pss[p.ID.String()] = p
// }
func (r ProfileStore) saveFile(ps map[string]*profile.CodingProfile, f File) error {

data, err := json.Marshal(ps)
if err != nil {
log.Debug(err.Error())
return err
}

log.Debugf("writing profiles: %s", r.filepath(f))
if err := r.flock.Lock(); err != nil {
return err
}
defer func() {
r.flock.Unlock()
log.Debugf("profiles written")
}()
return ioutil.WriteFile(r.filepath(f), data, os.ModePerm)
}

func (r *ProfileStore) profiles() (map[profile.ID]*profile.CodingProfile, error) {
func (r *ProfileStore) profiles() (map[string]*profile.CodingProfile, error) {
log.Debug("reading profiles")
ps := map[profile.ID]*profile.CodingProfile{}

if err := r.flock.Lock(); err != nil {
return nil, err
}
defer func() {
log.Debug("profiles read")
r.flock.Unlock()
}()

ps := map[string]*profile.CodingProfile{}
data, err := ioutil.ReadFile(r.filepath(FilePeers))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -206,8 +231,5 @@ func (r *ProfileStore) profiles() (map[profile.ID]*profile.CodingProfile, error)
return ps, nil
// return ps, fmt.Errorf("error unmarshaling peers: %s", err.Error())
}
// for _, p := range pss {
// ps[p.ID] = p
// }
return ps, nil
}

0 comments on commit f692d55

Please sign in to comment.