From 3c90da3ca0e4da86ab3b0d0feb5abc9148c28101 Mon Sep 17 00:00:00 2001 From: Aneesh Puttur Date: Mon, 29 Aug 2022 11:32:01 -0400 Subject: [PATCH] fix clientID to make it unique Created UUID using serviecnao make it unique per pod even on restarts unless sericename are not changed Signed-off-by: Aneesh Puttur --- pkg/protocol/http/http.go | 18 ++++++++----- pkg/protocol/http/http_test.go | 47 +++++++++++++++------------------- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go index 73327d2..ea3f1b9 100644 --- a/pkg/protocol/http/http.go +++ b/pkg/protocol/http/http.go @@ -65,7 +65,7 @@ type Server struct { subscriberAPI *subscriberApi.API //close on true CloseCh <-chan struct{} - ClientID uuid.UUID + clientID uuid.UUID httpServer *http.Server statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error processEventFn func(e interface{}) error @@ -88,7 +88,11 @@ func InitServer(serviceName string, port int, storePath string, dataIn <-chan *c subscriberAPI: subscriberApi.GetAPIInstance(storePath), statusReceiveOverrideFn: onStatusReceiveOverrideFn, processEventFn: processEventFn, - ClientID: uuid.New(), //TODO: Persists this UUID to save so when restarts uses same UUID + clientID: func(serviceName string) uuid.UUID { + var namespace = uuid.NameSpaceURL + var url = []byte(serviceName) + return uuid.NewMD5(namespace, url) + }(serviceName), } return &server, nil } @@ -257,9 +261,9 @@ func (h *Server) Shutdown() { h.httpServer.Close() } -// SetClientID ... -func (h *Server) SetClientID(clientID uuid.UUID) { - h.ClientID = clientID +// ClientID ... +func (h *Server) ClientID() uuid.UUID { + return h.clientID } // RegisterPublishers this will register publisher @@ -346,7 +350,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { case d := <-h.DataIn: //skips publisher object processing if d.Type == channel.SUBSCRIBER { // Listener means subscriber aka sender // Post it to the address that has been specified : to target URL - subs := subscriber.New(h.ClientID) + subs := subscriber.New(h.clientID) //Self URL _ = subs.SetEndPointURI(h.ServiceName) obj := pubsub.PubSub{ // all we need is ID and resource address @@ -415,7 +419,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { //TODO: change to Get status for all events // current implementation expects to have a resource address // Post it to the address that has been specified : to target URL - subs := subscriber.New(h.ClientID) + subs := subscriber.New(h.clientID) //Self URL _ = subs.SetEndPointURI(h.ServiceName) obj := pubsub.PubSub{} diff --git a/pkg/protocol/http/http_test.go b/pkg/protocol/http/http_test.go index 3539308..aab7cae 100644 --- a/pkg/protocol/http/http_test.go +++ b/pkg/protocol/http/http_test.go @@ -27,14 +27,24 @@ import ( func strptr(s string) *string { return &s } var ( - storePath = "." - serverClientID, _ = uuid.Parse("d73555b4-b01e-4802-89f1-f058f215a1f8") - clientClientID, _ = uuid.Parse("5202d2c4-f652-4974-b24d-1934f0d819e3") + storePath = "." + subscriptionOneID = "123e4567-e89b-12d3-a456-426614174001" serverAddress = types.ParseURI("http://localhost:8089") clientAddress = types.ParseURI("http://localhost:8087") hostPort = 8089 clientPort = 8087 + serverClientID = func(serviceName string) uuid.UUID { + var namespace = uuid.NameSpaceURL + var url = []byte(serviceName) + return uuid.NewMD5(namespace, url) + }(serverAddress.String()) + + clientClientID = func(serviceName string) uuid.UUID { + var namespace = uuid.NameSpaceURL + var url = []byte(serviceName) + return uuid.NewMD5(namespace, url) + }(clientAddress.String()) subscriptionOne = &pubsub.PubSub{ ID: subscriptionOneID, @@ -109,10 +119,7 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, w var err error assert.Nil(t, clientS) clientS, err = ceHttp.InitServer(clientAddress.String(), clientPort, storePath, in, clientOutChannel, closeCh, nil, nil) - if err != nil { - log.Infof("error creating client ") - } - clientS.ClientID = clientClientID + assert.Nil(t, err) clientS.RegisterPublishers(serverAddress) wg := sync.WaitGroup{} time.Sleep(250 * time.Millisecond) @@ -145,10 +152,7 @@ func TestSubscribeCreated(t *testing.T) { closeCh := make(chan struct{}) eventChannel := make(chan *channel.DataChan, 10) server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } - server.ClientID = serverClientID + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel proceesor @@ -176,9 +180,7 @@ func TestSendEvent(t *testing.T) { clientOutChannel := make(chan *channel.DataChan) closeCh := make(chan struct{}) server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel proceesor err = server.Start(&wg) @@ -241,9 +243,7 @@ func TestSendSuccessStatus(t *testing.T) { } return nil }, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel proceesor err = server.Start(&wg) @@ -284,9 +284,7 @@ func TestHealth(t *testing.T) { var status int var urlErr error server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel proceesor @@ -306,10 +304,7 @@ func TestSender(t *testing.T) { closeCh := make(chan struct{}) server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } - + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel processor err = server.Start(&wg) @@ -332,9 +327,7 @@ func TestPing(t *testing.T) { closeCh := make(chan struct{}) server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) - if err != nil { - t.Skipf("http failed(%#v): %v", server, err) - } + assert.Nil(t, err) wg := sync.WaitGroup{} // Start the server and channel processor