diff --git a/broker/adapter/natsstream/output.go b/broker/adapter/natsstream/output.go index 30c7a2430..0c89ad42c 100644 --- a/broker/adapter/natsstream/output.go +++ b/broker/adapter/natsstream/output.go @@ -14,9 +14,9 @@ import ( "github.com/choria-io/go-choria/broker/adapter/stats" "github.com/choria-io/go-choria/choria" "github.com/choria-io/go-choria/srvcache" + uuid "github.com/gofrs/uuid" stan "github.com/nats-io/go-nats-streaming" "github.com/prometheus/client_golang/prometheus" - uuid "github.com/satori/go.uuid" log "github.com/sirupsen/logrus" ) @@ -62,7 +62,7 @@ func newStream(name string, work chan adaptable, logger *log.Entry) ([]*stream, clusterID := cfg.Option(prefix+"clusterid", "") if clusterID == "" { - return nil, fmt.Errorf("No ClusterID configured, please set %s", prefix+"clusterid'") + return nil, fmt.Errorf("no ClusterID configured, please set %s", prefix+"clusterid'") } workers := []*stream{} @@ -70,7 +70,12 @@ func newStream(name string, work chan adaptable, logger *log.Entry) ([]*stream, for i := 0; i < instances; i++ { logger.Infof("Creating NATS Streaming Adapter %s NATS Streaming instance %d / %d publishing to %s on cluster %s", name, i, instances, topic, clusterID) - iname := fmt.Sprintf("%s_%d-%s", name, i, strings.Replace(uuid.NewV4().String(), "-", "", -1)) + wid, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("could not start output worker %d: %s", i, err) + } + + iname := fmt.Sprintf("%s_%d-%s", name, i, strings.Replace(wid.String(), "-", "", -1)) st := &stream{ clusterID: clusterID, diff --git a/broker/federation/choria_nats_egest_test.go b/broker/federation/choria_nats_egest_test.go index afce36b6e..c76216e9c 100644 --- a/broker/federation/choria_nats_egest_test.go +++ b/broker/federation/choria_nats_egest_test.go @@ -19,7 +19,6 @@ var _ = Describe("Choria NATS Egest", func() { connector *pooledWorker manager *stubConnectionManager in chainmessage - err error logtxt *bufio.Writer logbuf *bytes.Buffer logger *log.Entry @@ -31,7 +30,10 @@ var _ = Describe("Choria NATS Egest", func() { ctx, cancel = context.WithCancel(context.Background()) logger, logtxt, logbuf = newDiscardLogger() - request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, c.NewRequestID(), "mcollective") + rid, err := c.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, rid, "mcollective") Expect(err).ToNot(HaveOccurred()) request.SetMessage(`{"hello":"world"}`) diff --git a/broker/federation/choria_nats_ingest_test.go b/broker/federation/choria_nats_ingest_test.go index 43197e2c4..9349e79ca 100644 --- a/broker/federation/choria_nats_ingest_test.go +++ b/broker/federation/choria_nats_ingest_test.go @@ -20,7 +20,6 @@ var _ = Describe("Choria NATS Ingest", func() { connector *pooledWorker manager *stubConnectionManager in *choria.ConnectorMessage - err error logtxt *bufio.Writer logbuf *bytes.Buffer logger *log.Entry @@ -33,7 +32,10 @@ var _ = Describe("Choria NATS Ingest", func() { ctx, cancel = context.WithCancel(context.Background()) logger, logtxt, logbuf = newDiscardLogger() - request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, c.NewRequestID(), "mcollective") + rid, err := c.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, rid, "mcollective") Expect(err).ToNot(HaveOccurred()) request.SetMessage(`{"hello":"world"}`) diff --git a/broker/federation/federation_test.go b/broker/federation/federation_test.go index 0d5c7a2b2..6325e1e97 100644 --- a/broker/federation/federation_test.go +++ b/broker/federation/federation_test.go @@ -141,8 +141,8 @@ func (s *stubConnection) Close() { return } -func (s *stubConnection) ReplyTarget(msg *choria.Message) string { - return "" +func (s *stubConnection) ReplyTarget(msg *choria.Message) (string, error) { + return "stubreplytarget", nil } func (s *stubConnection) Nats() *nats.Conn { diff --git a/broker/federation/reply_transformer_test.go b/broker/federation/reply_transformer_test.go index c584dce34..658351ccb 100644 --- a/broker/federation/reply_transformer_test.go +++ b/broker/federation/reply_transformer_test.go @@ -37,7 +37,10 @@ var _ = Describe("Reply Transformer", func() { c, err = choria.New("testdata/federation.cfg") Expect(err).ToNot(HaveOccurred()) - request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, c.NewRequestID(), "mcollective") + rid, err := c.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, rid, "mcollective") Expect(err).ToNot(HaveOccurred()) request.SetMessage(`{"hello":"world"}`) diff --git a/broker/federation/request_transformer_test.go b/broker/federation/request_transformer_test.go index b63f2a941..8b0f1ac7d 100644 --- a/broker/federation/request_transformer_test.go +++ b/broker/federation/request_transformer_test.go @@ -35,7 +35,10 @@ var _ = Describe("RequestTransformer", func() { c, err = choria.New("testdata/federation.cfg") Expect(err).ToNot(HaveOccurred()) - request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, c.NewRequestID(), "mcollective") + rid, err := c.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + request, err = c.NewRequest(protocol.RequestV1, "test", "tester", "choria=tester", 60, rid, "mcollective") Expect(err).ToNot(HaveOccurred()) request.SetMessage(`{"hello":"world"}`) diff --git a/choria/connection.go b/choria/connection.go index 6641339e4..302689ec4 100644 --- a/choria/connection.go +++ b/choria/connection.go @@ -69,7 +69,7 @@ type InstanceConnector interface { type Connector interface { InstanceConnector - ReplyTarget(msg *Message) string + ReplyTarget(msg *Message) (string, error) ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error) Connect(ctx context.Context) (err error) Nats() *nats.Conn @@ -486,8 +486,13 @@ func ReplyTarget(msg *Message, requestid string) string { return fmt.Sprintf("%s.reply.%s.%s", msg.Collective(), msg.SenderID, requestid) } -func (conn *Connection) ReplyTarget(msg *Message) string { - return ReplyTarget(msg, conn.choria.NewRequestID()) +func (conn *Connection) ReplyTarget(msg *Message) (string, error) { + id, err := conn.choria.NewRequestID() + if err != nil { + return "", err + } + + return ReplyTarget(msg, id), nil } func (conn *Connection) federationTarget(federation string, side string) string { diff --git a/choria/framework.go b/choria/framework.go index 72e49219c..cce6d4eb4 100644 --- a/choria/framework.go +++ b/choria/framework.go @@ -471,7 +471,7 @@ func (fw *Framework) PuppetAIOCmd(command string, def string) string { } // NewRequestID Creates a new RequestID -func (fw *Framework) NewRequestID() string { +func (fw *Framework) NewRequestID() (string, error) { return NewRequestID() } diff --git a/choria/message.go b/choria/message.go index 5c705dcbd..d6b1cbd46 100644 --- a/choria/message.go +++ b/choria/message.go @@ -65,9 +65,14 @@ func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framewo // NewMessage constructs a basic Message instance func NewMessage(payload string, agent string, collective string, msgType string, request *Message, choria *Framework) (msg *Message, err error) { + id, err := choria.NewRequestID() + if err != nil { + return + } + msg = &Message{ Payload: payload, - RequestID: choria.NewRequestID(), + RequestID: id, TTL: choria.Config.TTL, DiscoveredHosts: []string{}, SenderID: choria.Config.Identity, diff --git a/choria/message_test.go b/choria/message_test.go index 6c04c5fa8..05b14de8c 100644 --- a/choria/message_test.go +++ b/choria/message_test.go @@ -129,7 +129,10 @@ var _ = Describe("Choria/Message", func() { }) It("Should support reply", func() { - req, err := fw.NewRequest(protocol.RequestV1, "test_agent", "sender.example.net", "test=sender", 60, fw.NewRequestID(), "test_collective") + rid, err := fw.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + req, err := fw.NewRequest(protocol.RequestV1, "test_agent", "sender.example.net", "test=sender", 60, rid, "test_collective") Expect(err).ToNot(HaveOccurred()) req.SetMessage("hello world") @@ -194,7 +197,10 @@ var _ = Describe("Choria/Message", func() { }) It("Should set up the transport", func() { - req, err := fw.NewRequest(protocol.RequestV1, "test_agent", "sender.example.net", "test=sender", 60, fw.NewRequestID(), "test_collective") + rid, err := fw.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + req, err := fw.NewRequest(protocol.RequestV1, "test_agent", "sender.example.net", "test=sender", 60, rid, "test_collective") Expect(err).ToNot(HaveOccurred()) req.SetMessage("hello world") diff --git a/choria/util.go b/choria/util.go index 75eca87ab..30fe7e34b 100644 --- a/choria/util.go +++ b/choria/util.go @@ -10,7 +10,7 @@ import ( "strings" "github.com/choria-io/go-choria/puppet" - uuid "github.com/satori/go.uuid" + uuid "github.com/gofrs/uuid" ) // UserConfig determines what is the active config file for a user @@ -162,6 +162,11 @@ func MatchAnyRegex(str []byte, regex []string) bool { } // NewRequestID Creates a new RequestID -func NewRequestID() string { - return strings.Replace(uuid.NewV4().String(), "-", "", -1) +func NewRequestID() (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + return strings.Replace(id.String(), "-", "", -1), nil } diff --git a/glide.lock b/glide.lock index 98e4edb52..8e22264d7 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 590994ca4c75252ec481758dfa453c35b3fd2499bdb11592a5f9c0170dbfd512 -updated: 2018-09-18T12:32:51.958791+02:00 +hash: cc97fd762a899ff194a1deb0ae546912b17aa280b7314454dc21f7f698c12f35 +updated: 2018-10-27T22:16:36.730309+01:00 imports: - name: github.com/alecthomas/template version: a0175ee3bccc567396460bf5acd36800cb10c49c @@ -64,6 +64,8 @@ imports: subpackages: - encoders/builtin - util +- name: github.com/gofrs/uuid + version: 370558f003bfe29580cd0f698d8640daccdcc45c - name: github.com/gogo/protobuf version: 99cb9b23110011cc45571c901ecae6f6f5e65cd3 subpackages: @@ -79,6 +81,8 @@ imports: version: 1643683e1b54a9e88ad26d98f81400c8c9d9f4f9 subpackages: - proto +- name: github.com/konsorten/go-windows-terminal-sequences + version: b729f2633dfe35f4d1d8a32385f6685610ce1cb5 - name: github.com/matttproud/golang_protobuf_extensions version: c12348ce28de40eed0136aa2b644d0ee0650e56c subpackages: @@ -155,10 +159,8 @@ imports: version: b15cd069a83443be3154b719d0cc9fe8117f09fb subpackages: - xfs -- name: github.com/satori/go.uuid - version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sirupsen/logrus - version: 3e01752db0189b9157070a0e1668a620f9a85da2 + version: ad15b42461921f1fb3529b058c6786c6a45d5162 - name: github.com/tidwall/gjson version: 1e3f6aeaa5bad08d777ea7807b279a07885dd8b2 - name: github.com/tidwall/match @@ -186,6 +188,7 @@ imports: - html/charset - name: golang.org/x/sys version: f7928cfef4d09d1b080aa2b6fd3ca9ba1567c733 + repo: https://go.googlesource.com/sys subpackages: - unix - windows diff --git a/glide.yaml b/glide.yaml index 7a2dbdfdb..08979cf8a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -11,8 +11,6 @@ import: version: ^1 - package: github.com/nats-io/go-nats-streaming version: ^0.4.0 -- package: github.com/satori/go.uuid - version: ^1.2.0 - package: github.com/tidwall/gjson - package: github.com/tidwall/match - package: github.com/xeipuuv/gojsonschema @@ -58,3 +56,5 @@ import: - package: github.com/choria-io/go-mcoshim - package: github.com/choria-io/go-lifecycle version: ^0.2.0 +- package: github.com/gofrs/uuid + version: ^3.1.1 diff --git a/server/discovery/discovery_test.go b/server/discovery/discovery_test.go index 879993671..21076e3a6 100644 --- a/server/discovery/discovery_test.go +++ b/server/discovery/discovery_test.go @@ -44,7 +44,10 @@ var _ = Describe("Server/Discovery", func() { BeforeEach(func() { mgr = New(fw, log) - req, err = fw.NewRequest(protocol.RequestV1, "test", "testid", "callerid", 60, fw.NewRequestID(), "mcollective") + rid, err := fw.NewRequestID() + Expect(err).ToNot(HaveOccurred()) + + req, err = fw.NewRequest(protocol.RequestV1, "test", "testid", "callerid", 60, rid, "mcollective") Expect(err).ToNot(HaveOccurred()) filter = req.NewFilter()