Skip to content

Commit

Permalink
refactor(pubsub): make pub command read from a file
Browse files Browse the repository at this point in the history
We want to send payload in the body as multipart so users can use
existing tools like curl for publishing arbitrary bytes to a topic.

StringArg was created for "one message per line" use case, and if data
has `\n` or `\r\n` byte sequences, it will cause payload to be split. It
is not possible to undo this, because mentioned sequences are lost, so
we are not able to tell if it was `\n` or `\r\n`

We already avoid this problem in `block put` and `dht put` by reading
payload via FileArg which does not mangle binary data and send it as-is.
It feel like `pubsub pub` should be using it in the first place anyway,
so this commit replaces StringArg with FileArg.

This also closes ipfs#8454
and makes rpc in go-ipfs easier to code against.
  • Loading branch information
lidel committed Oct 25, 2021
1 parent 5bd282d commit e426088
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 31 deletions.
50 changes: 22 additions & 28 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"io/ioutil"
"net/http"
"os"
"sort"

cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
Expand Down Expand Up @@ -146,41 +145,32 @@ TOPIC AND DATA ENCODING

var PubsubPubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Publish a message to a given pubsub topic.",
Tagline: "Publish data to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
It reads binary data from stdin or a file.
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
TOPIC AND DATA ENCODING
HTTP RPC ENCODING
The data to be published is sent in HTTP request body as multipart/form-data.
Topic names are a binary data too. To ensure all bytes are transferred
correctly RPC client and server will use multibase encoding behind
the scenes.
correctly via URL params, the RPC client and server will use multibase
encoding behind the scenes.
You can inspect the format by passing --enc=json. ipfs multibase commands
can be used for encoding/decoding multibase strings in the userland.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "Topic to publish to."),
cmds.StringArg("data", false, true, "Payload of message to publish."),
},
PreRun: func(req *cmds.Request, env cmds.Environment) error {
// when there are no string args with data, read from stdin.
if len(req.Arguments) == 1 {
buf, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return err
}
req.Arguments = append(req.Arguments, string(buf))
}
return urlArgsEncoder(req, env)
cmds.FileArg("data", true, false, "The data to be published.").EnableStdin(),
},
PreRun: urlArgsEncoder,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
Expand All @@ -191,13 +181,20 @@ TOPIC AND DATA ENCODING
}

topic := req.Arguments[0]
for _, data := range req.Arguments[1:] {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
return err
}

// read data passed as a file
file, err := cmdenv.GetFileArg(req.Files.Entries())
if err != nil {
return err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
return err
}

return nil
// publish
return api.PubSub().Publish(req.Context, topic, data)
},
}

Expand Down Expand Up @@ -333,6 +330,7 @@ TOPIC AND DATA ENCODING
},
}

// TODO: move to cmdenv?
// Encode binary data to be passed as multibase string in URL arguments.
// (avoiding issues described in https://github.com/ipfs/go-ipfs/issues/7939)
func urlArgsEncoder(req *cmds.Request, env cmds.Environment) error {
Expand All @@ -346,10 +344,6 @@ func urlArgsEncoder(req *cmds.Request, env cmds.Environment) error {
// Decode binary data passed as multibase string in URL arguments.
// (avoiding issues described in https://github.com/ipfs/go-ipfs/issues/7939)
func urlArgsDecoder(req *cmds.Request, env cmds.Environment) error {
err := req.ParseBodyArgs()
if err != nil {
return err
}
for n, arg := range req.Arguments {
encoding, data, err := mbase.Decode(arg)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions test/sharness/t0180-pubsub.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ run_pubsub_tests() {
test_cmp peers_exp peers_out
'

test_expect_success "publish something" '
ipfsi 1 pubsub pub testTopic "testOK" &> pubErr
test_expect_success "publish something from file" '
echo -n "testOK" > payload-file &&
ipfsi 1 pubsub pub testTopic payload-file &> pubErr
'

test_expect_success "wait until echo > wait executed" '
Expand Down Expand Up @@ -79,7 +80,7 @@ run_pubsub_tests() {
go-sleep 500ms
'

test_expect_success "publish something" '
test_expect_success "publish something from stdin" '
echo -n "testOK2" | ipfsi 3 pubsub pub testTopic &> pubErr
'

Expand Down

0 comments on commit e426088

Please sign in to comment.