Skip to content

Commit

Permalink
feat(p2p, history): add handling history/log requests for peer's dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Feb 7, 2018
1 parent 00334d1 commit a5b5f9b
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 5 deletions.
32 changes: 32 additions & 0 deletions p2p/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,35 @@ func (n *QriNode) RequestDatasetInfo(ref *repo.DatasetRef) (*repo.DatasetRef, er

return resref, err
}

// DatasetLogParams encapsulates options for requesting datasets

// RequestDatasetLog gets the log information of Peer's dataset
func (n *QriNode) RequestDatasetLog(ref *repo.DatasetRef) (*[]*repo.DatasetRef, error) {
id, err := n.Repo.Peers().IPFSPeerID(ref.Peername)
if err != nil {
return nil, fmt.Errorf("error getting peer IPFS id: %s", err.Error())
}
res, err := n.SendMessage(id, &Message{
Type: MtDatasetLog,
Phase: MpRequest,
Payload: ref,
})
if err != nil {
fmt.Println("send dataset log message error:", err.Error())
return nil, err
}

data, err := json.Marshal(res.Payload)
if err != nil {
return nil, err
}

resref := []*repo.DatasetRef{}
err = json.Unmarshal(data, &resref)
if len(resref) == 0 && err != nil {
err = fmt.Errorf("no log found")
}

return &resref, err
}
77 changes: 72 additions & 5 deletions p2p/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package p2p
import (
"encoding/json"
"fmt"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
"time"

pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
Expand Down Expand Up @@ -338,9 +338,9 @@ func (n *QriNode) handleDatasetInfoRequest(r *Message) *Message {
if err = json.Unmarshal(data, ref); err != nil {
n.log.Infof(err.Error())
return &Message{
Phase: MpError,
Type: MtDatasetInfo,
Payload: ref,
Phase: MpError,
Payload: err,
}
}

Expand All @@ -364,12 +364,79 @@ func (n *QriNode) handleDatasetInfoRequest(r *Message) *Message {
ref.Dataset = ds

return &Message{
Phase: MpResponse,
Type: MtDatasetInfo,
Phase: MpResponse,
Payload: ref,
}
}

func (n *QriNode) handleDatasetInfoResponse(m *Message) error {
return fmt.Errorf("not yet finished")
}

func (n *QriNode) handleDatasetLogRequest(r *Message) *Message {
data, err := json.Marshal(r.Payload)

if err != nil {
n.log.Info(err.Error())
return nil
}

ref := &repo.DatasetRef{}
if err = json.Unmarshal(data, ref); err != nil {
n.log.Infof(err.Error())
return &Message{
Type: MtDatasetLog,
Phase: MpError,
Payload: err,
}
}

path, err := n.Repo.GetPath(ref.Name)
if err != nil {
return &Message{
Type: MtDatasetLog,
Phase: MpError,
Payload: err,
}
}
// TODO: probably shouldn't write over ref.Path if ref.Path is set, but
// until we make the changes to the way we use hashes to make them
// more consistent, this feels safer.
ref.Path = path.String()

log := []*repo.DatasetRef{}
limit := 50

for {
ref.Dataset, err = n.Repo.GetDataset(datastore.NewKey(ref.Path))
if err != nil {
return &Message{
Type: MtDatasetLog,
Phase: MpError,
Payload: err,
}
}
log = append(log, ref)

limit--
if limit == 0 || ref.Dataset.PreviousPath == "" {
break
}

ref, err = repo.ParseDatasetRef(ref.Dataset.PreviousPath)

if err != nil {
return &Message{
Type: MtDatasetLog,
Phase: MpError,
Payload: err,
}
}
}
return &Message{
Type: MtDatasetLog,
Phase: MpResponse,
Payload: &log,
}
}
4 changes: 4 additions & 0 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
MtNodes = MsgType("NODES")
// MtDatasetInfo gets info on a dataset
MtDatasetInfo = MsgType("DATASET_INFO")
// MtDatasetLog gets log of a dataset
MtDatasetLog = MsgType("DATASET_LOG")
)

func (mt MsgType) String() string {
Expand Down Expand Up @@ -239,6 +241,8 @@ func (n *QriNode) handleStream(ws *WrappedStream) {
res = n.handleNodesRequest(r)
case MtDatasetInfo:
res = n.handleDatasetInfoRequest(r)
case MtDatasetLog:
res = n.handleDatasetLogRequest(r)
}
}

Expand Down

0 comments on commit a5b5f9b

Please sign in to comment.