-
-
Notifications
You must be signed in to change notification settings - Fork 3k
/
pubsub.go
370 lines (315 loc) · 10.4 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
package commands
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"sort"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
mbase "github.com/multiformats/go-multibase"
"github.com/pkg/errors"
cmds "github.com/ipfs/go-ipfs-cmds"
options "github.com/ipfs/interface-go-ipfs-core/options"
)
var PubsubCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
`,
},
Subcommands: map[string]*cmds.Command{
"pub": PubsubPubCmd,
"sub": PubsubSubCmd,
"ls": PubsubLsCmd,
"peers": PubsubPeersCmd,
},
}
type pubsubMessage struct {
From string `json:"from,omitempty"`
Data string `json:"data,omitempty"`
Seqno string `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}
var PubsubSubCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
PEER ENCODING
Peer IDs in From fields are encoded using the default text representation
from go-libp2p. This ensures the same string values as in 'ipfs pubsub peers'.
TOPIC AND DATA ENCODING
Topics, Data and Seqno are binary data. To ensure all bytes are transferred
correctly the RPC client and server will use multibase encoding behind
the scenes.
You can inspect the format by passing --enc=json. The ipfs multibase commands
can be used for encoding/decoding multibase strings in the userland.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "Name of topic to subscribe to."),
},
PreRun: urlArgsEncoder,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
if err := urlArgsDecoder(req, env); err != nil {
return err
}
topic := req.Arguments[0]
sub, err := api.PubSub().Subscribe(req.Context, topic)
if err != nil {
return err
}
defer sub.Close()
if f, ok := res.(http.Flusher); ok {
f.Flush()
}
for {
msg, err := sub.Next(req.Context)
if err == io.EOF || err == context.Canceled {
return nil
} else if err != nil {
return err
}
// turn bytes into strings
encoder, _ := mbase.EncoderByName("base64url")
psm := pubsubMessage{
Data: encoder.Encode(msg.Data()),
From: msg.From().Pretty(),
Seqno: encoder.Encode(msg.Seq()),
}
for _, topic := range msg.Topics() {
psm.TopicIDs = append(psm.TopicIDs, encoder.Encode([]byte(topic)))
}
if err := res.Emit(&psm); err != nil {
return err
}
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
_, dec, err := mbase.Decode(psm.Data)
if err != nil {
return err
}
_, err = w.Write(dec)
return err
}),
// DEPRECATED, undocumented format we used in tests, but not anymore
// <message.payload>\n<message.payload>\n
"ndpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
return errors.New("--enc=ndpayload was removed, use --enc=json instead")
}),
// DEPRECATED, uncodumented format we used in tests, but not anymore
// <varint-len><message.payload><varint-len><message.payload>
"lenpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
return errors.New("--enc=lenpayload was removed, use --enc=json instead")
}),
},
Type: pubsubMessage{},
}
var PubsubPubCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Publish data to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
It reads binary data from stdin or a file.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
HTTP RPC ENCODING
The data to be published is sent in HTTP request body as multipart/form-data.
Topic names are binary data too. To ensure all bytes are transferred
correctly via URL params, the RPC client and server will use multibase
encoding behind the scenes.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "Topic to publish to."),
cmds.FileArg("data", true, false, "The data to be published.").EnableStdin(),
},
PreRun: urlArgsEncoder,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
if err := urlArgsDecoder(req, env); err != nil {
return err
}
topic := req.Arguments[0]
// read data passed as a file
file, err := cmdenv.GetFileArg(req.Files.Entries())
if err != nil {
return err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
return err
}
// publish
return api.PubSub().Publish(req.Context, topic, data)
},
}
var PubsubLsCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List subscribed topics by name.",
ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
TOPIC ENCODING
Topic names are a binary data. To ensure all bytes are transferred
correctly RPC client and server will use multibase encoding behind
the scenes.
You can inspect the format by passing --enc=json. ipfs multibase commands
can be used for encoding/decoding multibase strings in the userland.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
l, err := api.PubSub().Ls(req.Context)
if err != nil {
return err
}
// emit topics encoded in multibase
encoder, _ := mbase.EncoderByName("base64url")
for n, topic := range l {
l[n] = encoder.Encode([]byte(topic))
}
return cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(multibaseDecodedStringListEncoder),
},
}
func multibaseDecodedStringListEncoder(req *cmds.Request, w io.Writer, list *stringList) error {
for n, mb := range list.Strings {
_, data, err := mbase.Decode(mb)
if err != nil {
return err
}
list.Strings[n] = string(data)
}
return safeTextListEncoder(req, w, list)
}
// converts list of strings to text representation where each string is placed
// in separate line with non-printable/unsafe characters escaped
// (this protects terminal output from being mangled by non-ascii topic names)
func safeTextListEncoder(req *cmds.Request, w io.Writer, list *stringList) error {
for _, str := range list.Strings {
_, err := fmt.Fprintf(w, "%s\n", cmdenv.EscNonPrint(str))
if err != nil {
return err
}
}
return nil
}
var PubsubPeersCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List peers we are currently pubsubbing with.",
ShortDescription: `
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected peers who are
subscribed to the named topic.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
TOPIC AND DATA ENCODING
Topic names are a binary data. To ensure all bytes are transferred
correctly RPC client and server will use multibase encoding behind
the scenes.
You can inspect the format by passing --enc=json. ipfs multibase commands
can be used for encoding/decoding multibase strings in the userland.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", false, false, "Topic to list connected peers of."),
},
PreRun: urlArgsEncoder,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
if err := urlArgsDecoder(req, env); err != nil {
return err
}
var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
return err
}
list := &stringList{make([]string, 0, len(peers))}
for _, peer := range peers {
list.Strings = append(list.Strings, peer.Pretty())
}
sort.Strings(list.Strings)
return cmds.EmitOnce(res, list)
},
Type: stringList{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(safeTextListEncoder),
},
}
// TODO: move to cmdenv?
// Encode binary data to be passed as multibase string in URL arguments.
// (avoiding issues described in https://github.com/ipfs/go-ipfs/issues/7939)
func urlArgsEncoder(req *cmds.Request, env cmds.Environment) error {
encoder, _ := mbase.EncoderByName("base64url")
for n, arg := range req.Arguments {
req.Arguments[n] = encoder.Encode([]byte(arg))
}
return nil
}
// Decode binary data passed as multibase string in URL arguments.
// (avoiding issues described in https://github.com/ipfs/go-ipfs/issues/7939)
func urlArgsDecoder(req *cmds.Request, env cmds.Environment) error {
for n, arg := range req.Arguments {
encoding, data, err := mbase.Decode(arg)
if err != nil {
return errors.Wrap(err, "URL arg must be multibase encoded")
}
// Enforce URL-safe encoding is used for data passed via URL arguments
// - without this we get data corruption similar to https://github.com/ipfs/go-ipfs/issues/7939
// - we can't just deny base64, because there may be other bases that
// are not URL-safe – better to force base64url which is known to be
// safe in URL context
if encoding != mbase.Base64url {
return errors.New("URL arg must be base64url encoded")
}
req.Arguments[n] = string(data)
}
return nil
}