Skip to content

Commit b47a3e4

Browse files
dustmopb5
authored andcommitted
feat(remotes): Complete dsync by writing to ds_refs and pinning
1 parent 165abce commit b47a3e4

File tree

4 files changed

+145
-31
lines changed

4 files changed

+145
-31
lines changed

api/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,9 @@ func NewServerRoutes(s *Server) *http.ServeMux {
243243
}
244244

245245
remh := NewRemoteHandlers(s.qriNode, receivers)
246-
m.Handle("/dataset", s.middleware(remh.ReceiveHandler))
246+
m.Handle("/dsync/push", s.middleware(remh.ReceiveHandler))
247247
m.Handle("/dsync", s.middleware(receivers.HTTPHandler()))
248+
m.Handle("/dsync/complete", s.middleware(remh.CompleteHandler))
248249
}
249250

250251
m.Handle("/list", s.middleware(dsh.ListHandler))

api/remote.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package api
22

33
import (
4+
"encoding/json"
45
"fmt"
5-
"io/ioutil"
66
"net/http"
77

88
util "github.com/datatogether/api/apiutil"
@@ -34,16 +34,26 @@ func (h *RemoteHandlers) ReceiveHandler(w http.ResponseWriter, r *http.Request)
3434
}
3535
}
3636

37+
// CompleteHandler is the endpoint for remotes when they complete the dsync process
38+
func (h *RemoteHandlers) CompleteHandler(w http.ResponseWriter, r *http.Request) {
39+
switch r.Method {
40+
// no "OPTIONS" method here, because browsers should never hit this endpoint
41+
case "POST":
42+
h.completeDataset(w, r)
43+
default:
44+
util.NotFoundHandler(w, r)
45+
}
46+
}
47+
3748
func (h *RemoteHandlers) receiveDataset(w http.ResponseWriter, r *http.Request) {
38-
content, err := ioutil.ReadAll(r.Body)
39-
if err != nil {
49+
var params lib.ReceiveParams
50+
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
4051
util.WriteErrResponse(w, http.StatusInternalServerError, err)
4152
return
4253
}
43-
params := lib.ReceiveParams{Body: string(content)}
4454

4555
var result lib.ReceiveResult
46-
err = h.Receive(&params, &result)
56+
err := h.Receive(&params, &result)
4757
if err != nil {
4858
util.WriteErrResponse(w, http.StatusInternalServerError, err)
4959
return
@@ -55,3 +65,20 @@ func (h *RemoteHandlers) receiveDataset(w http.ResponseWriter, r *http.Request)
5565

5666
util.WriteErrResponse(w, http.StatusForbidden, fmt.Errorf("%s", result.RejectReason))
5767
}
68+
69+
func (h *RemoteHandlers) completeDataset(w http.ResponseWriter, r *http.Request) {
70+
var params lib.CompleteParams
71+
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
72+
util.WriteErrResponse(w, http.StatusInternalServerError, err)
73+
return
74+
}
75+
76+
var result bool
77+
err := h.Complete(&params, &result)
78+
if err != nil {
79+
util.WriteErrResponse(w, http.StatusInternalServerError, err)
80+
return
81+
}
82+
83+
util.WriteResponse(w, "Success")
84+
}

lib/params.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ type PushParams struct {
7676

7777
// ReceiveParams hold parameters for receiving daginfo's when running as a remote
7878
type ReceiveParams struct {
79-
Body string
79+
Peername string
80+
Name string
81+
ProfileID profile.ID
82+
Dinfo *dag.Info
8083
}
8184

8285
// ReceiveResult is the result of receiving a posted dataset when running as a remote
@@ -86,3 +89,8 @@ type ReceiveResult struct {
8689
SessionID string
8790
Diff *dag.Manifest
8891
}
92+
93+
// CompleteParams holds parameters to send when completing a dsync sent to a remote
94+
type CompleteParams struct {
95+
SessionID string
96+
}

lib/remote.go

Lines changed: 102 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/qri-io/dag"
1414
"github.com/qri-io/dag/dsync"
1515
"github.com/qri-io/qri/actions"
16+
"github.com/qri-io/qri/base"
1617
"github.com/qri-io/qri/p2p"
1718
"github.com/qri-io/qri/repo"
1819

@@ -24,10 +25,10 @@ const allowedDagInfoSize uint64 = 10 * 1024 * 1024
2425

2526
// RemoteRequests encapsulates business logic of remote operation
2627
type RemoteRequests struct {
27-
cli *rpc.Client
28-
node *p2p.QriNode
29-
Receivers *dsync.Receivers
30-
SessionIDs map[string]*dag.Info
28+
cli *rpc.Client
29+
node *p2p.QriNode
30+
Receivers *dsync.Receivers
31+
Sessions map[string]*ReceiveParams
3132
}
3233

3334
// NewRemoteRequests creates a RemoteRequests pointer from either a node or an rpc.Client
@@ -36,15 +37,20 @@ func NewRemoteRequests(node *p2p.QriNode, cli *rpc.Client) *RemoteRequests {
3637
panic(fmt.Errorf("both repo and client supplied to NewRemoteRequests"))
3738
}
3839
return &RemoteRequests{
39-
cli: cli,
40-
node: node,
41-
SessionIDs: make(map[string]*dag.Info),
40+
cli: cli,
41+
node: node,
42+
Sessions: make(map[string]*ReceiveParams),
4243
}
4344
}
4445

4546
// CoreRequestsName implements the Requests interface
4647
func (RemoteRequests) CoreRequestsName() string { return "remote" }
4748

49+
// TODO(dlong): Split this function into smaller steps, move them to actions/ or base/ as
50+
// appropriate
51+
52+
// TODO(dlong): Add tests
53+
4854
// PushToRemote posts a dagInfo to a remote
4955
func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
5056
if r.cli != nil {
@@ -69,12 +75,23 @@ func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
6975
return fmt.Errorf("remote name \"%s\" not found", p.RemoteName)
7076
}
7177

72-
data, err := json.Marshal(dinfo)
78+
// Post the dataset's dag.Info to the remote.
79+
fmt.Printf("Posting to /dsync/push...\n")
80+
81+
params := ReceiveParams{
82+
Peername: ref.Peername,
83+
Name: ref.Name,
84+
ProfileID: ref.ProfileID,
85+
Dinfo: dinfo,
86+
}
87+
88+
data, err := json.Marshal(params)
7389
if err != nil {
7490
return err
7591
}
7692

77-
req, err := http.NewRequest("POST", fmt.Sprintf("%s/dataset", location), bytes.NewReader(data))
93+
dsyncPushURL := fmt.Sprintf("%s/dsync/push", location)
94+
req, err := http.NewRequest("POST", dsyncPushURL, bytes.NewReader(data))
7895
if err != nil {
7996
return err
8097
}
@@ -90,13 +107,14 @@ func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
90107
return fmt.Errorf("error code %d: %v", res.StatusCode, rejectionReason(res.Body))
91108
}
92109

93-
env := struct{Data ReceiveResult}{}
110+
env := struct{ Data ReceiveResult }{}
94111
if err := json.NewDecoder(res.Body).Decode(&env); err != nil {
95112
return err
96113
}
97114
res.Body.Close()
98115

99-
ctx := context.Background()
116+
// Run dsync to transfer all of the blocks of the dataset.
117+
fmt.Printf("Running dsync...\n")
100118

101119
ng, err := newNodeGetter(r.node)
102120
if err != nil {
@@ -107,6 +125,7 @@ func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
107125
URL: fmt.Sprintf("%s/dsync", location),
108126
}
109127

128+
ctx := context.Background()
110129
send, err := dsync.NewSend(ctx, ng, dinfo.Manifest, remote)
111130
if err != nil {
112131
return err
@@ -117,7 +136,36 @@ func (r *RemoteRequests) PushToRemote(p *PushParams, out *bool) error {
117136
return err
118137
}
119138

120-
// TODO(dlong): Pin the data.
139+
// Finish the send, pin the dataset in IPFS
140+
fmt.Printf("Writing dsref and pinning...\n")
141+
142+
completeParams := CompleteParams{
143+
SessionID: env.Data.SessionID,
144+
}
145+
146+
data, err = json.Marshal(completeParams)
147+
if err != nil {
148+
return err
149+
}
150+
151+
dsyncCompleteURL := fmt.Sprintf("%s/dsync/complete", location)
152+
req, err = http.NewRequest("POST", dsyncCompleteURL, bytes.NewReader(data))
153+
if err != nil {
154+
return err
155+
}
156+
req.Header.Set("Content-Type", "application/json")
157+
158+
res, err = httpClient.Do(req)
159+
if err != nil {
160+
return err
161+
}
162+
163+
if res.StatusCode != http.StatusOK {
164+
return fmt.Errorf("error code %d: %v", res.StatusCode, rejectionReason(res.Body))
165+
}
166+
167+
// Success!
168+
fmt.Printf("Success!\n")
121169

122170
*out = true
123171
return nil
@@ -142,12 +190,6 @@ func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err erro
142190

143191
res.Success = false
144192

145-
dinfo := dag.Info{}
146-
err = json.Unmarshal([]byte(p.Body), &dinfo)
147-
if err != nil {
148-
return err
149-
}
150-
151193
// TODO(dlong): Customization for how to decide to accept the dataset.
152194
if Config.API.RemoteAcceptSizeMax == 0 {
153195
res.RejectReason = "not accepting any datasets"
@@ -157,7 +199,7 @@ func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err erro
157199
// If size is -1, accept any size of dataset. Otherwise, check if the size is allowed.
158200
if Config.API.RemoteAcceptSizeMax != -1 {
159201
var totalSize uint64
160-
for _, s := range dinfo.Sizes {
202+
for _, s := range p.Dinfo.Sizes {
161203
totalSize += s
162204
}
163205

@@ -167,7 +209,7 @@ func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err erro
167209
}
168210
}
169211

170-
if dinfo.Manifest == nil {
212+
if p.Dinfo.Manifest == nil {
171213
res.RejectReason = "manifest is nil"
172214
return nil
173215
}
@@ -177,21 +219,57 @@ func (r *RemoteRequests) Receive(p *ReceiveParams, res *ReceiveResult) (err erro
177219
return nil
178220
}
179221

180-
sid, diff, err := r.Receivers.ReqSend(dinfo.Manifest)
222+
sid, diff, err := r.Receivers.ReqSend(p.Dinfo.Manifest)
181223
if err != nil {
182224
res.RejectReason = fmt.Sprintf("could not begin send: %s", err)
183225
return nil
184226
}
185227

186-
// TODO: Timeout for sessionIDs. Add a callback to dsync.Receivers when dsync finishes,
187-
// then create a version of the dataset for ds_refs.
188-
r.SessionIDs[sid] = &dinfo
228+
// TODO: Timeout for sessions. Remove sessions when they complete or timeout
229+
r.Sessions[sid] = p
189230
res.Success = true
190231
res.SessionID = sid
191232
res.Diff = diff
192233
return nil
193234
}
194235

236+
// Complete is used to complete a dataset that has been pushed to this remote
237+
func (r *RemoteRequests) Complete(p *CompleteParams, res *bool) (err error) {
238+
sid := p.SessionID
239+
session, ok := r.Sessions[sid]
240+
if !ok {
241+
return fmt.Errorf("session %s not found", sid)
242+
}
243+
244+
if session.Dinfo.Manifest == nil || len(session.Dinfo.Manifest.Nodes) == 0 {
245+
return fmt.Errorf("dataset manifest is invalid")
246+
}
247+
248+
path := fmt.Sprintf("/ipfs/%s", session.Dinfo.Manifest.Nodes[0])
249+
250+
ref := repo.DatasetRef{
251+
Peername: session.Peername,
252+
ProfileID: session.ProfileID,
253+
Name: session.Name,
254+
Path: path,
255+
Published: true,
256+
}
257+
258+
// Save ref to ds_refs.json
259+
err = r.node.Repo.PutRef(ref)
260+
if err != nil {
261+
return err
262+
}
263+
264+
// Pin the dataset in IPFS
265+
err = base.PinDataset(r.node.Repo, ref)
266+
if err != nil {
267+
return err
268+
}
269+
270+
return nil
271+
}
272+
195273
func rejectionReason(r io.Reader) string {
196274
text, err := ioutil.ReadAll(r)
197275
if err != nil {

0 commit comments

Comments
 (0)