diff --git a/Makefile b/Makefile index 71dba0b..a6376b9 100644 --- a/Makefile +++ b/Makefile @@ -24,4 +24,11 @@ test: gha: go test ./... --tags=unittests -coverprofile=cover.out +fmt: ## Go fmt your code + hack/gofmt.sh +fmt-code: ## Run go fmt against code. + go fmt ./... + +vet: ## Run go vet against code. + go vet ./... \ No newline at end of file diff --git a/docs/metrics.md b/docs/metrics.md index ad7b8a7..26c2cb6 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -28,75 +28,75 @@ These metrics describe the status of the cloud native events, publisher and subs All these metrics are prefixed with `cne_` -| Name | Description | Type | -|-------------------------------------------------------|----------------------------------------------------------|---------| -| cne_amqp_events_received | Metric to get number of events received by the transport. | Gauge | -| cne_amqp_events_published | Metric to get number of events published by the transport. | Gauge | -| cne_amqp_connection_reset | Metric to get number of connection resets. | Gauge | -| cne_amqp_sender | Metric to get number of sender created. | Gauge | -| cne_amqp_receiver | Metric to get number of receiver created. | Gauge | -| cne_amqp_status_check_published | Metric to get number of status check published by the transport | Gauge | +| Name | Description | Type | +|-----------------------------|----------------------------------------------------------|---------| +| cne_transport_events_received | Metric to get number of events received by the transport. | Gauge | +| cne_transport_events_published | Metric to get number of events published by the transport. | Gauge | +| cne_transport_connection_reset | Metric to get number of connection resets. | Gauge | +| cne_transport_sender | Metric to get number of sender created. | Gauge | +| cne_transport_receiver | Metric to get number of receiver created. | Gauge | +| cne_transport_status_check_published | Metric to get number of status check published by the transport | Gauge | -`cne_amqp_events_received` - The number of events received by the amqp protocol, and their status by address. +`cne_transport_events_received` - The number of events received by the transport protocol, and their status by address. Example -```json -# HELP cne_amqp_events_received Metric to get number of events received by the transport -# TYPE cne_amqp_events_received gauge -cne_amqp_events_received{address="/news-service/finance",status="success"} 8 -cne_amqp_events_received{address="/news-service/sports",status="success"} 8 +``` +# HELP cne_transport_events_received Metric to get number of events received by the transport +# TYPE cne_transport_events_received gauge +cne_transport_events_received{address="/news-service/finance",status="success"} 8 +cne_transport_events_received{address="/news-service/sports",status="success"} 8 ``` -`cne_amqp_events_published` - This metrics indicates number of events that were published via amqp , grouped by address and status. +`cne_transport_events_published` - This metrics indicates number of events that were published via transport , grouped by address and status. Example -```json -# HELP cne_amqp_events_published Metric to get number of events published by the transport -# TYPE cne_amqp_events_published gauge -cne_amqp_events_published{address="/news-service/finance",status="connection reset"} 1 -cne_amqp_events_published{address="/news-service/finance",status="success"} 8 -cne_amqp_events_published{address="/news-service/sports",status="connection reset"} 1 -cne_amqp_events_published{address="/news-service/sports",status="success"} 8 +``` +# HELP cne_transport_events_published Metric to get number of events published by the transport +# TYPE cne_transport_events_published gauge +cne_transport_events_published{address="/news-service/finance",status="connection reset"} 1 +cne_transport_events_published{address="/news-service/finance",status="success"} 8 +cne_transport_events_published{address="/news-service/sports",status="connection reset"} 1 +cne_transport_events_published{address="/news-service/sports",status="success"} 8 ``` -`cne_amqp_connection_reset` - This metrics indicates number of types amqp connection was reset +`cne_transport_connection_reset` - This metrics indicates number of types transport connection was reset Example -```json -# HELP cne_amqp_connection_reset Metric to get number of connection resets -# TYPE cne_amqp_connection_reset gauge -cne_amqp_connection_reset 1 +``` +# HELP cne_transport_connection_reset Metric to get number of connection resets +# TYPE cne_transport_connection_reset gauge +cne_transport_connection_reset 1 ``` -`cne_amqp_sender` - This metrics indicates number of amqp sender objects were created , grouped by address and status. +`cne_transport_sender` - This metrics indicates number of transport sender objects were created , grouped by address and status. Example -```json -# HELP cne_amqp_sender Metric to get number of sender active -# TYPE cne_amqp_sender gauge -cne_amqp_sender{address="/news-service/finance",status="active"} 1 -cne_amqp_sender{address="/news-service/sports",status="active"} 1 +``` +# HELP cne_transport_sender Metric to get number of sender active +# TYPE cne_transport_sender gauge +cne_transport_sender{address="/news-service/finance",status="active"} 1 +cne_transport_sender{address="/news-service/sports",status="active"} 1 ``` -`cne_amqp_receiver` - This metrics indicates number of amqp receiver objects were created, grouped by address and status. +`cne_transport_receiver` - This metrics indicates number of transport receiver objects were created, grouped by address and status. Example -```json -# HELP cne_amqp_receiver Metric to get number of receiver active -# TYPE cne_amqp_receiver gauge -cne_amqp_receiver{address="/news-service/finance",status="active"} 1 -cne_amqp_receiver{address="/news-service/sports",status="active"} 1 +``` +# HELP cne_transport_receiver Metric to get number of receiver active +# TYPE cne_transport_receiver gauge +cne_transport_receiver{address="/news-service/finance",status="active"} 1 +cne_transport_receiver{address="/news-service/sports",status="active"} 1 ``` -`cne_amqp_status_check_published` - This metrics indicates status check that were published via amqp , grouped by address and status. +`cne_transport_status_check_published` - This metrics indicates status check that were published via transport , grouped by address and status. Example -```json -# HELP cne_amqp_status_check_published Metric to get number of status check published by the transport -# TYPE cne_amqp_status_check_published gauge -cne_amqp_status_check_published{address="/news-service/finance/status",status="failed"} 1 -cne_amqp_status_check_published{address="/news-service/sports/status",status="connection reset"} 1 -cne_amqp_status_check_published{address="/news-service/sports/status",status="success"} 1 +``` +# HELP cne_transport_status_check_published Metric to get number of status check published by the transport +# TYPE cne_transport_status_check_published gauge +cne_transport_status_check_published{address="/news-service/finance/status",status="failed"} 1 +cne_transport_status_check_published{address="/news-service/sports/status",status="connection reset"} 1 +cne_transport_status_check_published{address="/news-service/sports/status",status="success"} 1 ``` diff --git a/pkg/localmetrics/localmetrics.go b/pkg/localmetrics/localmetrics.go index 74bbb18..61e5395 100644 --- a/pkg/localmetrics/localmetrics.go +++ b/pkg/localmetrics/localmetrics.go @@ -34,89 +34,89 @@ const ( var ( - //amqpEventReceivedCount ... Total no of events received by the transport - amqpEventReceivedCount = prometheus.NewGaugeVec( + //transportEventReceivedCount ... Total no of events received by the transport + transportEventReceivedCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_events_received", - Help: "Metric to get number of events received by the transport", + Name: "cne_transport_events_received", + Help: "Metric to get number of events received by the transport", }, []string{"address", "status"}) - //amqpEventPublishedCount ... Total no of events published by the transport - amqpEventPublishedCount = prometheus.NewGaugeVec( + //transportEventPublishedCount ... Total no of events published by the transport + transportEventPublishedCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_events_published", + Name: "cne_transport_events_published", Help: "Metric to get number of events published by the transport", }, []string{"address", "status"}) - //amqpConnectionResetCount ... Total no of connection resets - amqpConnectionResetCount = prometheus.NewGaugeVec( + //transportConnectionResetCount ... Total no of connection resets + transportConnectionResetCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_connection_reset", + Name: "cne_transport_connection_reset", Help: "Metric to get number of connection resets", }, []string{}) - //amqpSenderCount ... Total no of events published by the transport - amqpSenderCount = prometheus.NewGaugeVec( + //transportSenderCount ... Total no of events published by the transport + transportSenderCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_sender", + Name: "cne_transport_sender", Help: "Metric to get number of sender created", }, []string{"address", "status"}) - //amqpReceiverCount ... Total no of events published by the transport - amqpReceiverCount = prometheus.NewGaugeVec( + //transportReceiverCount ... Total no of events published by the transport + transportReceiverCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_receiver", + Name: "cne_transport_receiver", Help: "Metric to get number of receiver created", }, []string{"address", "status"}) - //amqpStatusCheckCount ... Total no of status check received by the transport - amqpStatusCheckCount = prometheus.NewGaugeVec( + //transportStatusCheckCount ... Total no of status check received by the transport + transportStatusCheckCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cne_amqp_status_check_published", + Name: "cne_transport_status_check_published", Help: "Metric to get number of status check published by the transport", }, []string{"address", "status"}) ) // RegisterMetrics ... func RegisterMetrics() { - prometheus.MustRegister(amqpEventReceivedCount) - prometheus.MustRegister(amqpEventPublishedCount) - prometheus.MustRegister(amqpConnectionResetCount) - prometheus.MustRegister(amqpSenderCount) - prometheus.MustRegister(amqpReceiverCount) - prometheus.MustRegister(amqpStatusCheckCount) + prometheus.MustRegister(transportEventReceivedCount) + prometheus.MustRegister(transportEventPublishedCount) + prometheus.MustRegister(transportConnectionResetCount) + prometheus.MustRegister(transportSenderCount) + prometheus.MustRegister(transportReceiverCount) + prometheus.MustRegister(transportStatusCheckCount) } // UpdateTransportConnectionResetCount ... func UpdateTransportConnectionResetCount(val int) { - amqpConnectionResetCount.With(prometheus.Labels{}).Add(float64(val)) + transportConnectionResetCount.With(prometheus.Labels{}).Add(float64(val)) } // UpdateEventReceivedCount ... func UpdateEventReceivedCount(address string, status MetricStatus, val int) { - amqpEventReceivedCount.With( + transportEventReceivedCount.With( prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val)) } // UpdateEventCreatedCount ... func UpdateEventCreatedCount(address string, status MetricStatus, val int) { - amqpEventPublishedCount.With( + transportEventPublishedCount.With( prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val)) } // UpdateStatusCheckCount ... func UpdateStatusCheckCount(address string, status MetricStatus, val int) { - amqpEventPublishedCount.With( + transportEventPublishedCount.With( prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val)) } // UpdateSenderCreatedCount ... func UpdateSenderCreatedCount(address string, status MetricStatus, val int) { - amqpSenderCount.With( + transportSenderCount.With( prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val)) } // UpdateReceiverCreatedCount ... func UpdateReceiverCreatedCount(address string, status MetricStatus, val int) { - amqpReceiverCount.With( + transportReceiverCount.With( prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val)) } diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go index 4be9e39..e51c389 100644 --- a/pkg/protocol/http/http.go +++ b/pkg/protocol/http/http.go @@ -3,11 +3,13 @@ package http import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" "strings" "sync" + "syscall" "time" "github.com/redhat-cne/sdk-go/pkg/errorhandler" @@ -31,7 +33,7 @@ import ( ) var ( - cancelTimeout = 500 * time.Millisecond + cancelTimeout = 2000 * time.Millisecond retryTimeout = 500 * time.Millisecond RequestReadHeaderTimeout = 2 * time.Second ) @@ -153,6 +155,7 @@ func (h *Server) Start(wg *sync.WaitGroup) error { log.Infof("deleting subscribers") _ = h.subscriberAPI.DeleteClient(obj.ClientID) h.DeleteSender(obj.ClientID) + out.Status = channel.SUCCESS localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, -1) } } @@ -369,6 +372,16 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { select { case d := <-h.DataIn: //skips publisher object processing if d.Type == channel.SUBSCRIBER { // Listener means subscriber aka sender + if d.Status == channel.DELETE { + log.Infof("Deleting client %s", d.ClientID) + if dErr := h.subscriberAPI.DeleteClient(d.ClientID); dErr == nil { + h.DeleteSender(d.ClientID) + localmetrics.UpdateSenderCreatedCount(d.Address, localmetrics.ACTIVE, -1) + } else { + log.Errorf("Failed to delete subscriber %s", d.Address) + } + continue + } // Post it to the address that has been specified : to target URL subs := subscriber.New(h.clientID) //Self URL @@ -520,9 +533,14 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, r log.Infof("event genrated %s", e.String()) return } + wg.Add(1) go func(h *Server, clientAddress, resourceAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) { defer wg.Done() + if h.subscriberAPI.SubscriberMarkedForDelete(clientID) { + log.Infof("not posting event, subscriber %s is marked for delete due to inactivity ", clientAddress) + return + } if sender == nil { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) return @@ -532,22 +550,25 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, r if eventType == channel.EVENT { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) } + // has subscriber failed to connect for n times delete the subscribers + if h.subscriberAPI.IncFailCountToFail(clientID) { + h.DataOut <- &channel.DataChan{ + ClientID: clientID, + Address: clientAddress, + Data: e, + Status: channel.DELETE, + Type: channel.SUBSCRIBER, + } + } else { + log.Errorf("client %s not responding, waiting %d times before marking to delete subscriber", + clientAddress, h.subscriberAPI.FailCountThreshold()-h.subscriberAPI.GetFailCount(clientID)) + } h.DataOut <- &channel.DataChan{ Address: resourceAddress, Data: e, Status: channel.FAILED, Type: eventType, } - // has subscriber failed to connect for n times delete the subscribers - if h.subscriberAPI.IncFailCountToFail(clientID) { - log.Errorf("client %s not responding, deleting subscription ", clientAddress) - h.DataOut <- &channel.DataChan{ - Address: clientAddress, - Data: e, - Status: channel.DELETE, - Type: channel.SUBSCRIBER, - } - } log.Errorf("connection lost addressing %s", clientAddress) } else { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1) @@ -671,7 +692,7 @@ func (c *Protocol) Send(e cloudevents.Event) error { e.SetDataContentType(cloudevents.ApplicationJSON) ctx := cloudevents.ContextWithTarget(sendCtx, c.Protocol.Target.String()) result := c.Client.Send(ctx, e) - if cloudevents.IsUndelivered(result) { + if cloudevents.IsUndelivered(result) || errors.Is(result, syscall.ECONNREFUSED) { log.Errorf("failed to send to address %s with %s", c.Protocol.Target.String(), result) return fmt.Errorf("failed to send to address %s with error %s", c.Protocol.Target.String(), result.Error()) } else if !cloudevents.IsACK(result) { @@ -688,8 +709,7 @@ func (c *Protocol) Send(e cloudevents.Event) error { log.Infof("Sent with status code %d, result: %v", httpResult.StatusCode, result) return fmt.Errorf(httpResult.Format, httpResult.Args...) } - log.Printf("Send did not return an HTTP response: %s", result) - return fmt.Errorf("send did not return an HTTP response: %s", result) + return nil } // Get ... getter method @@ -748,23 +768,10 @@ func Post(address string, e cloudevents.Event) error { e.SetDataContentType(cloudevents.ApplicationJSON) ctx := cloudevents.ContextWithTarget(sendCtx, address) result := c.Send(ctx, e) - if cloudevents.IsUndelivered(result) { + // With current implementation of cloudevents we cannot get ack on delivered of not + if cloudevents.IsUndelivered(result) || errors.Is(result, syscall.ECONNREFUSED) { log.Errorf("failed to send to address %s with %s", address, result) return result - } else if !cloudevents.IsACK(result) { - log.Printf("sent: not accepted : %t", cloudevents.IsACK(result)) - return result - } - var httpResult *cehttp.Result - - if cloudevents.ResultAs(result, &httpResult) { - if httpResult.StatusCode == http.StatusOK { - log.Infof("sent with status code %d::%v", httpResult.StatusCode, result) - return nil - } - log.Printf("Sent with status code %d, result: %v", httpResult.StatusCode, result) - return fmt.Errorf(httpResult.Format, httpResult.Args...) } - log.Printf("Send did not return an HTTP response: %s", result) - return fmt.Errorf("send did not return an HTTP response: %s", result) + return nil } diff --git a/pkg/protocol/http/http_test.go b/pkg/protocol/http/http_test.go index 3a95479..5feafd5 100644 --- a/pkg/protocol/http/http_test.go +++ b/pkg/protocol/http/http_test.go @@ -20,6 +20,7 @@ import ( "github.com/redhat-cne/sdk-go/pkg/event/ptp" ceHttp "github.com/redhat-cne/sdk-go/pkg/protocol/http" "github.com/redhat-cne/sdk-go/pkg/pubsub" + "github.com/redhat-cne/sdk-go/pkg/subscriber" "github.com/redhat-cne/sdk-go/pkg/types" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -330,12 +331,147 @@ func TestStatus(t *testing.T) { if e, ok := err.(*json.SyntaxError); ok { log.Infof("syntax error at byte offset %d", e.Offset) } + assert.Nil(t, err) + close(closeCh) +} +func TestPostSubscription(t *testing.T) { + in := make(chan *channel.DataChan) + out := make(chan *channel.DataChan) + closeCh := make(chan struct{}) + server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) assert.Nil(t, err) + wg := sync.WaitGroup{} + // Start the server and channel proceesor + err = server.Start(&wg) + assert.Nil(t, err) + server.HTTPProcessor(&wg) + time.Sleep(2 * time.Second) + subs := subscriber.New(clientClientID) + //Self URL + _ = subs.SetEndPointURI(clientAddress.String()) + obj := pubsub.PubSub{ // all we need is ID and resource address + ID: subscriptionOneID, + Resource: subscriptionOne.Resource, + } + go func() { + select { + case <-closeCh: + return + case <-in: + return + } + }() + // create a subscriber model + subs.AddSubscription(obj) + subs.Action = channel.NEW + ce, _ := subs.CreateCloudEvents() + ce.SetSubject(channel.NEW.String()) + ce.SetSource(subscriptionOne.Resource) + + err = ceHttp.Post(fmt.Sprintf("%s/subscription", serverAddress.String()), *ce) + assert.Nil(t, err) + + subs.Action = channel.DELETE + ce, _ = subs.CreateCloudEvents() + ce.SetSubject(channel.DELETE.String()) + log.Infof("`deleting` subscriber for clientID %s", clientAddress) + err = ceHttp.Post(fmt.Sprintf("%s/subscription", serverAddress.String()), *ce) + if err != nil { + log.Infof("error %s", err.Error()) + } + + assert.Nil(t, err) close(closeCh) } +func TestStaleSubscribers(t *testing.T) { + type DeleteAssert struct { + Type channel.Type + Status channel.Status + } + in := make(chan *channel.DataChan) + out := make(chan *channel.DataChan) + closeCh := make(chan struct{}) + done := make(chan DeleteAssert, 2) + server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) + assert.Nil(t, err) + + wg := sync.WaitGroup{} + // Start the server and channel proceesor + err = server.Start(&wg) + assert.Nil(t, err) + server.HTTPProcessor(&wg) + time.Sleep(2 * time.Second) + subs := subscriber.New(clientClientID) + //Self URL + _ = subs.SetEndPointURI(clientAddress.String()) + obj := pubsub.PubSub{ // all we need is ID and resource address + ID: subscriptionOneID, + Resource: subscriptionOne.Resource, + } + go func() { + for { + select { + case <-closeCh: + break + case o := <-out: + if o.Status == channel.DELETE && o.Type == channel.SUBSCRIBER { + done <- DeleteAssert{ + Type: o.Type, + Status: o.Status, + } + break + } + case <-time.After(3 * time.Second): + done <- DeleteAssert{ + Type: channel.SUBSCRIBER, + Status: channel.NEW, + } + break + } + } + }() + // create a subscriber model + subs.AddSubscription(obj) + subs.Action = channel.NEW + ce, _ := subs.CreateCloudEvents() + ce.SetSubject(channel.NEW.String()) + ce.SetSource(subscriptionOne.Resource) + + // create subscription with server + err = ceHttp.Post(fmt.Sprintf("%s/subscription", serverAddress.String()), *ce) + assert.Nil(t, err) + // now send 10 events to check dead client + e := CloudEvents() + // Generate events + for i := 0; i < 20; i++ { + in <- &channel.DataChan{ + Address: subscriptionOne.Resource, + Data: &e, + Status: channel.NEW, + Type: channel.EVENT, + } + time.Sleep(500 * time.Millisecond) + } + d := <-done + assert.Equal(t, channel.SUBSCRIBER, d.Type) + assert.Equal(t, channel.DELETE, d.Status) + // send for main thread to delete subscriber + assert.Equal(t, 1, len(server.Sender)) + server.HTTPProcessor(&wg) + in <- &channel.DataChan{ + ClientID: clientClientID, + Data: &e, + Status: d.Status, + Type: d.Type, + } + assert.Equal(t, 1, len(server.Sender)) + time.Sleep(5 * time.Second) + assert.Equal(t, 0, len(server.Sender)) + close(closeCh) +} func TestTeardown(t *testing.T) { _ = os.Remove(fmt.Sprintf("./%s.json", clientClientID.String())) } diff --git a/pkg/store/store.go b/pkg/store/store.go index bf0328b..8a61472 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -27,6 +27,16 @@ type PubSubStore struct { Store map[string]*pubsub.PubSub } +// Get ... +func (ps *PubSubStore) Get(subID string) pubsub.PubSub { + ps.Lock() + defer ps.Unlock() + if s, ok := ps.Store[subID]; ok { + return *s + } + return pubsub.PubSub{} +} + // Set is a wrapper for setting the value of a key in the underlying map func (ps *PubSubStore) Set(key string, val pubsub.PubSub) { ps.Lock() diff --git a/pkg/store/subscriber/subscriber.go b/pkg/store/subscriber/subscriber.go index 2de5d91..6584cc0 100644 --- a/pkg/store/subscriber/subscriber.go +++ b/pkg/store/subscriber/subscriber.go @@ -22,6 +22,16 @@ func (ss *Store) Set(clientID uuid.UUID, val subscriber.Subscriber) { ss.Store[clientID] = &val } +// Get is a wrapper for Getting the value of a key in the underlying map +func (ss *Store) Get(clientID uuid.UUID) (subscriber.Subscriber, bool) { + ss.Lock() + defer ss.Unlock() + if s, ok := ss.Store[clientID]; ok { + return *s, true + } + return subscriber.Subscriber{}, false +} + // Delete ... delete from store func (ss *Store) Delete(clientID uuid.UUID) { ss.Lock() diff --git a/pkg/subscriber/subscriber.go b/pkg/subscriber/subscriber.go index 6739bad..b2383eb 100644 --- a/pkg/subscriber/subscriber.go +++ b/pkg/subscriber/subscriber.go @@ -58,6 +58,11 @@ type Subscriber struct { failedCount int } +// Get ... get pubsub +func (s *Subscriber) Get(subID string) pubsub.PubSub { + return s.SubStore.Get(subID) +} + // IncFailCount ... func (s *Subscriber) IncFailCount() { s.failedCount++ diff --git a/v1/subscriber/subscriber.go b/v1/subscriber/subscriber.go index dbd1641..c6e4489 100644 --- a/v1/subscriber/subscriber.go +++ b/v1/subscriber/subscriber.go @@ -137,8 +137,8 @@ func (p *API) HasSubscription(clientID uuid.UUID, address string) (pubsub.PubSub // HasClient check if client is already exists in the store/cache func (p *API) HasClient(clientID uuid.UUID) (*subscriber.Subscriber, bool) { - if subscriber, ok := p.SubscriberStore.Store[clientID]; ok { - return subscriber, true + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + return &subscriber, true } return nil, false } @@ -182,8 +182,8 @@ func (p *API) CreateSubscription(clientID uuid.UUID, sub subscriber.Subscriber) // GetSubscriptionClient get a clientID by id func (p *API) GetSubscriptionClient(clientID uuid.UUID) (subscriber.Subscriber, error) { - if sub, ok := p.SubscriberStore.Store[clientID]; ok { - return *sub, nil + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + return subscriber, nil } return subscriber.Subscriber{}, fmt.Errorf("subscriber data was not found for id %s", clientID) } @@ -196,24 +196,25 @@ func (p *API) GetSubscriptionsFromFile(clientID uuid.UUID) ([]byte, error) { // GetSubscriptions get all subscriptionOne inforamtions func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { - if sub, ok := p.SubscriberStore.Store[clientID]; ok { - return sub.SubStore.Store + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + sub = subscriber.SubStore.Store } + return } // GetSubscription get subscriptionOne inforamtions -func (p *API) GetSubscription(clientID uuid.UUID, subID string) (sub pubsub.PubSub) { - if subscriber, ok := p.SubscriberStore.Store[clientID]; ok { - if subscription, ok2 := subscriber.SubStore.Store[subID]; ok2 { - return *subscription - } +func (p *API) GetSubscription(clientID uuid.UUID, subID string) pubsub.PubSub { + if subscriber, ok := p.SubscriberStore.Get(clientID); ok { + return subscriber.Get(subID) } - return + return pubsub.PubSub{} } // GetSubscriberURLByResourceAndClientID get subscription information by client id/resource func (p *API) GetSubscriberURLByResourceAndClientID(clientID uuid.UUID, resource string) (url *string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() for _, subscriber := range p.SubscriberStore.Store { if subscriber.ClientID == clientID { for _, sub := range subscriber.SubStore.Store { @@ -230,6 +231,8 @@ func (p *API) GetSubscriberURLByResourceAndClientID(clientID uuid.UUID, resource // GetSubscriberURLByResource get subscriptionOne information func (p *API) GetSubscriberURLByResource(resource string) (urls []string) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() for _, subscriber := range p.SubscriberStore.Store { for _, sub := range subscriber.SubStore.Store { if sub.GetResource() == resource { @@ -242,6 +245,8 @@ func (p *API) GetSubscriberURLByResource(resource string) (urls []string) { // GetClientIDByResource get subscriptionOne information func (p *API) GetClientIDByResource(resource string) (clientIds []uuid.UUID) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() for _, subscriber := range p.SubscriberStore.Store { for _, sub := range subscriber.SubStore.Store { if sub.GetResource() == resource { @@ -255,6 +260,8 @@ func (p *API) GetClientIDByResource(resource string) (clientIds []uuid.UUID) { // GetClientIDAddressByResource get subscriptionOne information func (p *API) GetClientIDAddressByResource(resource string) map[uuid.UUID]*types.URI { clients := map[uuid.UUID]*types.URI{} + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() for _, subscriber := range p.SubscriberStore.Store { for _, sub := range subscriber.SubStore.Store { if sub.GetResource() == resource { @@ -267,10 +274,11 @@ func (p *API) GetClientIDAddressByResource(resource string) map[uuid.UUID]*types // DeleteSubscription delete a subscriptionOne by id func (p *API) DeleteSubscription(clientID uuid.UUID, subscriptionID string) error { - if subStore, ok := p.SubscriberStore.Store[clientID]; ok { // client found + if subStore, ok := p.SubscriberStore.Get(clientID); ok { // client found if sub, ok2 := subStore.SubStore.Store[subscriptionID]; ok2 { err := deleteFromFile(*sub, fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) - p.SubscriberStore.Store[clientID].SubStore.Delete(subscriptionID) + subStore.SubStore.Delete(subscriptionID) + p.SubscriberStore.Set(clientID, subStore) return err } } @@ -279,7 +287,7 @@ func (p *API) DeleteSubscription(clientID uuid.UUID, subscriptionID string) erro // DeleteAllSubscriptions delete all subscriptionOne information func (p *API) DeleteAllSubscriptions(clientID uuid.UUID) error { - if subStore, ok := p.SubscriberStore.Store[clientID]; ok { // client found + if subStore, ok := p.SubscriberStore.Get(clientID); ok { if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { return err } @@ -287,30 +295,31 @@ func (p *API) DeleteAllSubscriptions(clientID uuid.UUID) error { RWMutex: sync.RWMutex{}, Store: map[string]*pubsub.PubSub{}, } + p.SubscriberStore.Set(clientID, subStore) } return nil } // DeleteClient delete all subscriptionOne information func (p *API) DeleteClient(clientID uuid.UUID) error { - log.Info("deleting client") - if subStore, ok := p.SubscriberStore.Store[clientID]; ok { // client found + if _, ok := p.SubscriberStore.Get(clientID); ok { // client found + log.Infof("delete from file %s", + fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))) if err := deleteAllFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, fmt.Sprintf("%s.json", clientID))); err != nil { return err } - subStore.SubStore = &store.PubSubStore{ - RWMutex: sync.RWMutex{}, - Store: map[string]*pubsub.PubSub{}, - } - delete(p.SubscriberStore.Store, clientID) + p.SubscriberStore.Delete(clientID) + } else { + log.Infof("subscription for client id %s not found", clientID) } return nil } // UpdateStatus .. update status func (p *API) UpdateStatus(clientID uuid.UUID, status subscriber.Status) error { - if subStore, ok := p.SubscriberStore.Store[clientID]; ok { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { subStore.SetStatus(status) + p.SubscriberStore.Set(clientID, subStore) // do not write to file , if restarts it will consider all client are active } else { return errors.New("failed to update subscriber status") @@ -320,8 +329,31 @@ func (p *API) UpdateStatus(clientID uuid.UUID, status subscriber.Status) error { // IncFailCountToFail .. update fail count func (p *API) IncFailCountToFail(clientID uuid.UUID) bool { - if subStore, ok := p.SubscriberStore.Store[clientID]; ok { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { subStore.IncFailCount() + p.SubscriberStore.Set(clientID, subStore) + if subStore.Action == channel.DELETE { + return true + } + } + return false +} + +// FailCountThreshold .. get threshold +func (p *API) FailCountThreshold() int { + return subscriber.SetConnectionToFailAfter +} + +func (p *API) GetFailCount(clientID uuid.UUID) int { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { + return subStore.FailedCount() + } + return 0 +} + +// SubscriberMarkedForDelete ... +func (p *API) SubscriberMarkedForDelete(clientID uuid.UUID) bool { + if subStore, ok := p.SubscriberStore.Get(clientID); ok { if subStore.Action == channel.DELETE { return true } diff --git a/v1/subscriber/subscriber_test.go b/v1/subscriber/subscriber_test.go index 5f77eab..583ca5d 100644 --- a/v1/subscriber/subscriber_test.go +++ b/v1/subscriber/subscriber_test.go @@ -156,6 +156,64 @@ func TestAPI_HasSubscription(t *testing.T) { assert.Equal(t, *s.SubStore.Store[fs.ID], fs) } +func Test_Concurrency(t *testing.T) { + defer clean() + s, e := globalInstance.CreateSubscription(clientID, subscriberWithOneEventCheck) + assert.Nil(t, e) + assert.NotEmpty(t, s.ClientID) + assert.NotNil(t, s.SubStore.Store) + assert.Equal(t, 1, len(s.SubStore.Store)) + s, e = globalInstance.CreateSubscription(clientID, subscriberWithManyEventCheck) + assert.Nil(t, e) + assert.NotEmpty(t, s.ClientID) + assert.NotNil(t, s.SubStore.Store) + assert.Equal(t, 2, len(s.SubStore.Store)) + assert.Equal(t, s.SubStore.Store[subscriptionOne.ID].URILocation, subscriptionOne.URILocation) + assert.Equal(t, s.SubStore.Store[subscriptionOne.ID].Resource, subscriptionOne.Resource) + assert.Equal(t, s.SubStore.Store[subscriptionOne.ID].EndPointURI, subscriptionOne.EndPointURI) + b, e := globalInstance.GetSubscriptionsFromFile(clientID) + assert.Nil(t, e) + assert.NotNil(t, b) + assert.NotEmpty(t, b) + var subscriptionClient subscriber.Subscriber + e = json.Unmarshal(b, &subscriptionClient) + assert.NotNil(t, subscriptionClient) + assert.Nil(t, e) + assert.NotEmpty(t, s, subscriptionClient) + assert.Equal(t, *s, subscriptionClient) + assert.NotNil(t, subscriptionClient.SubStore) + assert.Equal(t, len(s.SubStore.Store), len(subscriptionClient.SubStore.Store)) + //assert.NotEmpty(t, subscriber[0].SubStore.) + assert.Equal(t, subscriptionOne, subscriptionClient.SubStore.Store[subscriptionOne.ID]) + + var wg sync.WaitGroup + for i := 0; i <= 10; i++ { + wg.Add(1) + go func(w *sync.WaitGroup) { + defer w.Done() + cID := uuid.New() + s, e = globalInstance.CreateSubscription(cID, subscriberWithManyEventCheck) + assert.Nil(t, e) + assert.NotEmpty(t, s.ClientID) + assert.NotNil(t, globalInstance.GetSubscriptions(cID)) + store, ok := globalInstance.SubscriberStore.Get(cID) + assert.True(t, ok) + assert.Equal(t, 2, len(store.SubStore.Store)) + + assert.Equal(t, store.SubStore.Store[subscriptionOne.ID].URILocation, subscriptionOne.URILocation) + assert.Equal(t, store.SubStore.Store[subscriptionOne.ID].Resource, subscriptionOne.Resource) + assert.Equal(t, store.SubStore.Store[subscriptionOne.ID].EndPointURI, subscriptionOne.EndPointURI) + _, T := globalInstance.HasClient(subscriptionClient.ClientID) + assert.True(t, true, T) + _ = globalInstance.UpdateStatus(subscriptionClient.ClientID, subscriber.InActive) + if s, ok := globalInstance.SubscriberStore.Get(subscriptionClient.ClientID); ok { + assert.NotEmpty(t, s.EndPointURI) + } + }(&wg) + } + wg.Wait() +} + func clean() { _ = globalInstance.DeleteAllSubscriptions(clientID) }