Skip to content

Commit

Permalink
fix clientID to make it unique
Browse files Browse the repository at this point in the history
Created UUID using serviecnao make it unique per pod even on restarts unless sericename are not changed

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>
  • Loading branch information
aneeshkp committed Aug 29, 2022
1 parent fa4606e commit 3c90da3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 34 deletions.
18 changes: 11 additions & 7 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Server struct {
subscriberAPI *subscriberApi.API
//close on true
CloseCh <-chan struct{}
ClientID uuid.UUID
clientID uuid.UUID
httpServer *http.Server
statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error
processEventFn func(e interface{}) error
Expand All @@ -88,7 +88,11 @@ func InitServer(serviceName string, port int, storePath string, dataIn <-chan *c
subscriberAPI: subscriberApi.GetAPIInstance(storePath),
statusReceiveOverrideFn: onStatusReceiveOverrideFn,
processEventFn: processEventFn,
ClientID: uuid.New(), //TODO: Persists this UUID to save so when restarts uses same UUID
clientID: func(serviceName string) uuid.UUID {
var namespace = uuid.NameSpaceURL
var url = []byte(serviceName)
return uuid.NewMD5(namespace, url)
}(serviceName),
}
return &server, nil
}
Expand Down Expand Up @@ -257,9 +261,9 @@ func (h *Server) Shutdown() {
h.httpServer.Close()
}

// SetClientID ...
func (h *Server) SetClientID(clientID uuid.UUID) {
h.ClientID = clientID
// ClientID ...
func (h *Server) ClientID() uuid.UUID {
return h.clientID
}

// RegisterPublishers this will register publisher
Expand Down Expand Up @@ -346,7 +350,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
case d := <-h.DataIn: //skips publisher object processing
if d.Type == channel.SUBSCRIBER { // Listener means subscriber aka sender
// Post it to the address that has been specified : to target URL
subs := subscriber.New(h.ClientID)
subs := subscriber.New(h.clientID)
//Self URL
_ = subs.SetEndPointURI(h.ServiceName)
obj := pubsub.PubSub{ // all we need is ID and resource address
Expand Down Expand Up @@ -415,7 +419,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
//TODO: change to Get status for all events
// current implementation expects to have a resource address
// Post it to the address that has been specified : to target URL
subs := subscriber.New(h.ClientID)
subs := subscriber.New(h.clientID)
//Self URL
_ = subs.SetEndPointURI(h.ServiceName)
obj := pubsub.PubSub{}
Expand Down
47 changes: 20 additions & 27 deletions pkg/protocol/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ import (
func strptr(s string) *string { return &s }

var (
storePath = "."
serverClientID, _ = uuid.Parse("d73555b4-b01e-4802-89f1-f058f215a1f8")
clientClientID, _ = uuid.Parse("5202d2c4-f652-4974-b24d-1934f0d819e3")
storePath = "."

subscriptionOneID = "123e4567-e89b-12d3-a456-426614174001"
serverAddress = types.ParseURI("http://localhost:8089")
clientAddress = types.ParseURI("http://localhost:8087")
hostPort = 8089
clientPort = 8087
serverClientID = func(serviceName string) uuid.UUID {
var namespace = uuid.NameSpaceURL
var url = []byte(serviceName)
return uuid.NewMD5(namespace, url)
}(serverAddress.String())

clientClientID = func(serviceName string) uuid.UUID {
var namespace = uuid.NameSpaceURL
var url = []byte(serviceName)
return uuid.NewMD5(namespace, url)
}(clientAddress.String())

subscriptionOne = &pubsub.PubSub{
ID: subscriptionOneID,
Expand Down Expand Up @@ -109,10 +119,7 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, w
var err error
assert.Nil(t, clientS)
clientS, err = ceHttp.InitServer(clientAddress.String(), clientPort, storePath, in, clientOutChannel, closeCh, nil, nil)
if err != nil {
log.Infof("error creating client ")
}
clientS.ClientID = clientClientID
assert.Nil(t, err)
clientS.RegisterPublishers(serverAddress)
wg := sync.WaitGroup{}
time.Sleep(250 * time.Millisecond)
Expand Down Expand Up @@ -145,10 +152,7 @@ func TestSubscribeCreated(t *testing.T) {
closeCh := make(chan struct{})
eventChannel := make(chan *channel.DataChan, 10)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}
server.ClientID = serverClientID
assert.Nil(t, err)

wg := sync.WaitGroup{}
// Start the server and channel proceesor
Expand Down Expand Up @@ -176,9 +180,7 @@ func TestSendEvent(t *testing.T) {
clientOutChannel := make(chan *channel.DataChan)
closeCh := make(chan struct{})
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}
assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel proceesor
err = server.Start(&wg)
Expand Down Expand Up @@ -241,9 +243,7 @@ func TestSendSuccessStatus(t *testing.T) {
}
return nil
}, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}
assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel proceesor
err = server.Start(&wg)
Expand Down Expand Up @@ -284,9 +284,7 @@ func TestHealth(t *testing.T) {
var status int
var urlErr error
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}
assert.Nil(t, err)

wg := sync.WaitGroup{}
// Start the server and channel proceesor
Expand All @@ -306,10 +304,7 @@ func TestSender(t *testing.T) {
closeCh := make(chan struct{})

server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}

assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel processor
err = server.Start(&wg)
Expand All @@ -332,9 +327,7 @@ func TestPing(t *testing.T) {
closeCh := make(chan struct{})

server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
if err != nil {
t.Skipf("http failed(%#v): %v", server, err)
}
assert.Nil(t, err)

wg := sync.WaitGroup{}
// Start the server and channel processor
Expand Down

0 comments on commit 3c90da3

Please sign in to comment.