diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go index 74ce4a4..b8b8430 100644 --- a/pkg/protocol/http/http.go +++ b/pkg/protocol/http/http.go @@ -416,7 +416,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { OnReceiveOverrideFn: d.OnReceiveOverrideFn, ProcessEventFn: d.ProcessEventFn, } - h.SendTo(wg, d.ClientID, *url, data.Data, d.Type) + h.SendTo(wg, d.ClientID, *url, d.Address, data.Data, d.Type) log.Infof("status ping: queued event status for client %s for resource %s", d.ClientID.String(), d.Address) } else { log.Errorf("status ping: failed to find subscription for client %s", d.ClientID.String()) @@ -444,7 +444,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { OnReceiveOverrideFn: d.OnReceiveOverrideFn, ProcessEventFn: d.ProcessEventFn, } - h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type) + h.SendTo(wg, clientID, endPointURI.String(), d.Address, data.Data, d.Type) } } } @@ -512,7 +512,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { } // SendTo sends events to the address specified -func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress string, e *cloudevents.Event, eventType channel.Type) { +func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, resourceAddress string, e *cloudevents.Event, eventType channel.Type) { if sender, ok := h.Sender[clientID]; ok { if len(sender) == 0 { log.Errorf("event not publsihed to empty subscribers, clients need to register %s", clientAddress) @@ -520,7 +520,7 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st return } wg.Add(1) - go func(h *Server, clientAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) { + go func(h *Server, clientAddress, resourceAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) { defer wg.Done() if sender == nil { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) @@ -531,7 +531,7 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) } h.DataOut <- &channel.DataChan{ - Address: clientAddress, + Address: resourceAddress, Data: e, Status: channel.FAILED, Type: eventType, @@ -549,15 +549,14 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st log.Errorf("connection lost addressing %s", clientAddress) } else { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1) - h.DataOut <- &channel.DataChan{ - Address: clientAddress, + Address: resourceAddress, Data: e, Status: channel.SUCCESS, Type: eventType, } } - }(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol { + }(h, clientAddress, resourceAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol { if s, ok := sender[EVENT]; ok { return s }