Skip to content

Commit

Permalink
fix ack of events (#57)
Browse files Browse the repository at this point in the history
Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>
  • Loading branch information
aneeshkp authored Oct 26, 2022
1 parent 232ab2c commit e226a93
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
OnReceiveOverrideFn: d.OnReceiveOverrideFn,
ProcessEventFn: d.ProcessEventFn,
}
h.SendTo(wg, d.ClientID, *url, data.Data, d.Type)
h.SendTo(wg, d.ClientID, *url, d.Address, data.Data, d.Type)
log.Infof("status ping: queued event status for client %s for resource %s", d.ClientID.String(), d.Address)
} else {
log.Errorf("status ping: failed to find subscription for client %s", d.ClientID.String())
Expand Down Expand Up @@ -444,7 +444,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
OnReceiveOverrideFn: d.OnReceiveOverrideFn,
ProcessEventFn: d.ProcessEventFn,
}
h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type)
h.SendTo(wg, clientID, endPointURI.String(), d.Address, data.Data, d.Type)
}
}
}
Expand Down Expand Up @@ -512,15 +512,15 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
}

// SendTo sends events to the address specified
func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress string, e *cloudevents.Event, eventType channel.Type) {
func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, resourceAddress string, e *cloudevents.Event, eventType channel.Type) {
if sender, ok := h.Sender[clientID]; ok {
if len(sender) == 0 {
log.Errorf("event not publsihed to empty subscribers, clients need to register %s", clientAddress)
log.Infof("event genrated %s", e.String())
return
}
wg.Add(1)
go func(h *Server, clientAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) {
go func(h *Server, clientAddress, resourceAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) {
defer wg.Done()
if sender == nil {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
Expand All @@ -531,7 +531,7 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
}
h.DataOut <- &channel.DataChan{
Address: clientAddress,
Address: resourceAddress,
Data: e,
Status: channel.FAILED,
Type: eventType,
Expand All @@ -549,15 +549,14 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
log.Errorf("connection lost addressing %s", clientAddress)
} else {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1)

h.DataOut <- &channel.DataChan{
Address: clientAddress,
Address: resourceAddress,
Data: e,
Status: channel.SUCCESS,
Type: eventType,
}
}
}(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol {
}(h, clientAddress, resourceAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol {
if s, ok := sender[EVENT]; ok {
return s
}
Expand Down

0 comments on commit e226a93

Please sign in to comment.