From f692d552568293c3b6ee265d4fe6151a03eb45c8 Mon Sep 17 00:00:00 2001 From: b5 Date: Mon, 23 Apr 2018 16:18:33 -0400 Subject: [PATCH] fix(fsrepo.Profiles): add file lock for peers.json we've been getting all sorts of bad json from not guarding peers.json from concurrent reads & writes. This makes some initial inroads on the problem, but isn't to be trusted until comprehensive tests are written. makes more inroads on #357 --- Makefile | 2 +- p2p/profile.go | 15 +--------- repo/fs/profile_store.go | 60 +++++++++++++++++++++++++++------------- 3 files changed, 43 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index 4a8e08245..f702f9669 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ GOFILES = $(shell find . -name '*.go' -not -path './vendor/*') -GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/ugorji/go/codec +GOPACKAGES = github.com/briandowns/spinner github.com/datatogether/api/apiutil github.com/fatih/color github.com/ipfs/go-datastore github.com/olekukonko/tablewriter github.com/qri-io/analytics github.com/qri-io/bleve github.com/qri-io/dataset github.com/qri-io/doggos github.com/qri-io/dsdiff github.com/qri-io/varName github.com/qri-io/registry/regclient github.com/sirupsen/logrus github.com/spf13/cobra github.com/spf13/cobra/doc github.com/ugorji/go/codec github.com/theckman/go-flock default: build diff --git a/p2p/profile.go b/p2p/profile.go index c18d41935..2e48df2bd 100644 --- a/p2p/profile.go +++ b/p2p/profile.go @@ -37,7 +37,7 @@ func (n *QriNode) RequestProfile(pid peer.ID) (*profile.Profile, error) { } res := <-replies - log.Debug(res) + log.Debugf("profile response for message: %s", res.ID) cp := &profile.CodingProfile{} if err := json.Unmarshal(res.Body, cp); err != nil { @@ -68,20 +68,7 @@ func (n *QriNode) handleProfile(ws *WrappedStream, msg Message) (hangup bool) { return } - // pids, err := pro.PeerIDs() - // if err != nil { - // log.Debug(err.Error()) - // return - // } - pro.Updated = time.Now() - n.Repo.Profiles().PutProfile(pro) - - // log.Debugf("adding peer: %s", pid.Pretty()) - // if err := n.Repo.Profiles().PutPeer(pid, pro); err != nil { - // log.Debug(err.Error()) - // return - // } data, err := n.profileBytes() if err != nil { diff --git a/repo/fs/profile_store.go b/repo/fs/profile_store.go index 5c139914c..63b0bb552 100644 --- a/repo/fs/profile_store.go +++ b/repo/fs/profile_store.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/qri-io/qri/repo/profile" + "github.com/theckman/go-flock" "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" ) @@ -21,17 +22,20 @@ var ErrNotFound = fmt.Errorf("Not Found") type ProfileStore struct { sync.Mutex basepath + flock *flock.Flock } // NewProfileStore allocates a ProfileStore func NewProfileStore(bp basepath) ProfileStore { return ProfileStore{ basepath: bp, + flock: flock.NewFlock(bp.filepath(FilePeers) + ".lock"), } } // PutProfile adds a peer to the store func (r ProfileStore) PutProfile(p *profile.Profile) error { + log.Debugf("put profile: %s", p.ID.String()) if p.ID.String() == "" { return fmt.Errorf("profile ID is required") } @@ -48,7 +52,7 @@ func (r ProfileStore) PutProfile(p *profile.Profile) error { if err != nil { return err } - ps[p.ID] = enc + ps[p.ID.String()] = enc return r.saveFile(ps, FilePeers) } @@ -62,8 +66,10 @@ func (r ProfileStore) PeerIDs(id profile.ID) ([]peer.ID, error) { return nil, err } + ids := id.String() + for proid, cp := range ps { - if id == proid { + if ids == proid { pro := &profile.Profile{} if err := pro.Decode(cp); err != nil { return nil, err @@ -88,12 +94,12 @@ func (r ProfileStore) List() (map[profile.ID]*profile.Profile, error) { } profiles := map[profile.ID]*profile.Profile{} - for id, cp := range ps { + for _, cp := range ps { pro := &profile.Profile{} if err := pro.Decode(cp); err != nil { return nil, err } - profiles[id] = pro + profiles[pro.ID] = pro } return profiles, nil @@ -111,7 +117,7 @@ func (r ProfileStore) PeernameID(peername string) (profile.ID, error) { for id, cp := range ps { if cp.Peername == peername { - return id, nil + return profile.IDB58Decode(id) } } return "", datastore.ErrNotFound @@ -119,6 +125,8 @@ func (r ProfileStore) PeernameID(peername string) (profile.ID, error) { // GetProfile fetches a profile from the store func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) { + log.Debugf("get profile: %s", id.String()) + r.Lock() defer r.Unlock() @@ -127,8 +135,10 @@ func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) { return nil, err } + ids := id.String() + for proid, p := range ps { - if id == proid { + if ids == proid { pro := &profile.Profile{} err := pro.Decode(p) return pro, err @@ -140,6 +150,8 @@ func (r ProfileStore) GetProfile(id profile.ID) (*profile.Profile, error) { // PeerProfile gives the profile that corresponds with a given peer.ID func (r ProfileStore) PeerProfile(id peer.ID) (*profile.Profile, error) { + log.Debugf("peerProfile: %s", id.String()) + r.Lock() defer r.Unlock() @@ -149,7 +161,7 @@ func (r ProfileStore) PeerProfile(id peer.ID) (*profile.Profile, error) { } for _, p := range ps { - if _, ok := p.Addresses[id.Pretty()]; ok { + if _, ok := p.Addresses[id.String()]; ok { pro := &profile.Profile{} err := pro.Decode(p) return pro, err @@ -168,28 +180,41 @@ func (r ProfileStore) DeleteProfile(id profile.ID) error { if err != nil { return err } - delete(ps, id) + delete(ps, id.String()) return r.saveFile(ps, FilePeers) } -func (r ProfileStore) saveFile(ps map[profile.ID]*profile.CodingProfile, f File) error { - log.Debugf("writing profiles: %s", r.filepath(f)) - // pss := map[string]*profile.Profile{} - // for _, p := range ps { - // pss[p.ID.String()] = p - // } +func (r ProfileStore) saveFile(ps map[string]*profile.CodingProfile, f File) error { data, err := json.Marshal(ps) if err != nil { log.Debug(err.Error()) return err } + + log.Debugf("writing profiles: %s", r.filepath(f)) + if err := r.flock.Lock(); err != nil { + return err + } + defer func() { + r.flock.Unlock() + log.Debugf("profiles written") + }() return ioutil.WriteFile(r.filepath(f), data, os.ModePerm) } -func (r *ProfileStore) profiles() (map[profile.ID]*profile.CodingProfile, error) { +func (r *ProfileStore) profiles() (map[string]*profile.CodingProfile, error) { log.Debug("reading profiles") - ps := map[profile.ID]*profile.CodingProfile{} + + if err := r.flock.Lock(); err != nil { + return nil, err + } + defer func() { + log.Debug("profiles read") + r.flock.Unlock() + }() + + ps := map[string]*profile.CodingProfile{} data, err := ioutil.ReadFile(r.filepath(FilePeers)) if err != nil { if os.IsNotExist(err) { @@ -206,8 +231,5 @@ func (r *ProfileStore) profiles() (map[profile.ID]*profile.CodingProfile, error) return ps, nil // return ps, fmt.Errorf("error unmarshaling peers: %s", err.Error()) } - // for _, p := range pss { - // ps[p.ID] = p - // } return ps, nil }