From b1356a1999f32be83f74bb0595c93ca897afbfa9 Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Thu, 4 Jul 2024 13:48:55 -0400 Subject: [PATCH] O-RAN V3 Rest Api: Status Notification Signed-off-by: Jack Ding --- go.mod | 4 +- go.sum | 2 - server_test.go | 2 +- v2/routes.go | 67 +++++++++---------- v2/server.go | 4 +- v2/server_test.go | 11 +-- .../redhat-cne/sdk-go/pkg/channel/data.go | 5 +- .../redhat-cne/sdk-go/pkg/event/event_ce.go | 8 +-- .../sdk-go/pkg/event/ptp/resource.go | 11 ++- .../sdk-go/pkg/event/ptp/syncstate.go | 12 ++++ .../redhat-cne/sdk-go/pkg/event/ptp/types.go | 13 +++- .../redhat-cne/sdk-go/pkg/pubsub/pubsub.go | 2 +- .../sdk-go/pkg/subscriber/subscriber.go | 6 +- .../pkg/subscriber/subscriber_reader.go | 7 +- .../pkg/subscriber/subscriber_writer.go | 8 +-- .../redhat-cne/sdk-go/v1/event/event.go | 8 +-- .../sdk-go/v1/subscriber/subscriber.go | 37 +++++++--- vendor/modules.txt | 2 +- 18 files changed, 123 insertions(+), 86 deletions(-) diff --git a/go.mod b/go.mod index 4db048b..ea58dff 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,14 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/prometheus/client_golang v1.14.0 - github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 + github.com/redhat-cne/sdk-go v1.0.1-unpublished github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 golang.org/x/net v0.7.0 ) +replace github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go + require ( github.com/BurntSushi/toml v0.3.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 7ff684e..210e666 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 h1:qDOGSHOtHRszd8FnM0GZVUvbIvHhZrw5GeccXYPwT04= -github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= diff --git a/server_test.go b/server_test.go index f72c7ee..a620c00 100644 --- a/server_test.go +++ b/server_test.go @@ -100,7 +100,7 @@ func TestMain(m *testing.M) { Subject: func(s string) *string { return &s }("topic"), }.AsV1(), } - _ = e.SetData(cloudevents.ApplicationJSON, cneEvent) + _ = e.SetData("", cneEvent) func() { defer func() { if err := recover(); err != nil { diff --git a/v2/routes.go b/v2/routes.go index 668dba5..141db6f 100644 --- a/v2/routes.go +++ b/v2/routes.go @@ -56,6 +56,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { return } sub := pubsub.PubSub{} + sub.SetVersion(API_VERSION) if err = json.Unmarshal(bodyBytes, &sub); err != nil { respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err)) localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) @@ -67,26 +68,18 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) return } - if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok { + clientIDs := s.subscriberAPI.GetClientIDByResource(sub.GetResource()) + if len(clientIDs) != 0 { respondWithStatusCode(w, http.StatusConflict, - fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation", - subExists.GetID())) + fmt.Sprintf("subscription (clientID: %s) with same resource already exists, skipping creation", + clientIDs[0])) return } id := uuid.New().String() sub.SetID(id) - sub.SetVersion(API_VERSION) - sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck - - // TODO: cleanup: local pubsub is no longer needed since we are using configMap - newSub, err := s.pubSubAPI.CreateSubscription(sub) - if err != nil { - respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err)) - localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) - return - } - addr := newSub.GetResource() + sub.SetURILocation(fmt.Sprintf("http://%s:%d%s%s/%s", s.apiHost, s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck + addr := sub.GetResource() // this is placeholder not sending back to report out := channel.DataChan{ @@ -119,7 +112,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { } restClient := restclient.New() - out.Data.SetID(newSub.ID) // set ID to the subscriptionID + // make sure event ID is unique + out.Data.SetID(uuid.New().String()) status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data) if err != nil { respondWithStatusCode(w, http.StatusBadRequest, @@ -139,7 +133,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { subs := subscriber.New(s.getClientIDFromURI(endPointURI)) _ = subs.SetEndPointURI(endPointURI) - subs.AddSubscription(newSub) + subs.AddSubscription(sub) subs.Action = channel.NEW cevent, _ := subs.CreateCloudEvents() cevent.SetSource(addr) @@ -162,10 +156,10 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) { localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1) } else { out.Status = channel.SUCCESS - _ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj) + _ = out.Data.SetData("", updatedObj) log.Infof("subscription created successfully.") localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1) - respondWithJSON(w, http.StatusCreated, newSub) + respondWithJSON(w, http.StatusCreated, sub) } s.dataOut <- &out @@ -256,12 +250,15 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) { respondWithStatusCode(w, http.StatusNotFound, "") return } - sub, err := s.pubSubAPI.GetSubscription(subscriptionID) - if err != nil { - respondWithStatusCode(w, http.StatusNotFound, "") - return + + for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) { + sub, err := s.subscriberAPI.GetSubscription(c, subscriptionID) + if err == nil { + respondWithJSON(w, http.StatusOK, sub) + return + } } - respondWithJSON(w, http.StatusOK, sub) + respondWithStatusCode(w, http.StatusNotFound, "") } func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) { @@ -278,9 +275,11 @@ func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) { } respondWithJSON(w, http.StatusOK, pub) } + func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) { - b, err := s.pubSubAPI.GetSubscriptionsFromFile() + b, err := s.subscriberAPI.GetSubscriptions() if err != nil { + log.Errorf("error loading subscriber data %v", err) respondWithError(w, "error loading subscriber data") return } @@ -322,14 +321,13 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { return } - if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil { - localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) - respondWithStatusCode(w, http.StatusNotFound, err.Error()) + clientIDs := s.subscriberAPI.GetClientIDBySubID(subscriptionID) + if len(clientIDs) == 0 { + respondWithStatusCode(w, http.StatusNotFound, "") return } - // update configMap - for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) { + for _, c := range clientIDs { if err := s.subscriberAPI.DeleteSubscription(c, subscriptionID); err != nil { localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1) respondWithStatusCode(w, http.StatusNotFound, err.Error()) @@ -337,6 +335,7 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) { } } + // update configMap for _, subs := range s.subscriberAPI.SubscriberStore.Store { cevent, _ := subs.CreateCloudEvents() out := channel.DataChan{ @@ -441,12 +440,12 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { } } } else { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "subscription not found") return } if sub == nil { - respondWithError(w, "subscription not found") + respondWithStatusCode(w, http.StatusNotFound, "subscription not found") return } @@ -471,14 +470,14 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) { // statusReceiveOverrideFn must return value for if s.statusReceiveOverrideFn != nil { if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil { - respondWithError(w, statusErr.Error()) + respondWithStatusCode(w, http.StatusNotFound, statusErr.Error()) } else if out.Data != nil { respondWithJSON(w, http.StatusOK, *out.Data) } else { - respondWithError(w, "event not found") + respondWithStatusCode(w, http.StatusNotFound, "event not found") } } else { - respondWithError(w, "onReceive function not defined") + respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined") } } diff --git a/v2/server.go b/v2/server.go index eff5cc1..37678f9 100644 --- a/v2/server.go +++ b/v2/server.go @@ -81,6 +81,7 @@ const ( // Server defines rest routes server object type Server struct { port int + apiHost string apiPath string //use dataOut chanel to write to configMap dataOut chan<- *channel.DataChan @@ -138,12 +139,13 @@ type swaggReqAccepted struct { //nolint:deadcode,unused } // InitServer is used to supply configurations for rest routes server -func InitServer(port int, apiPath, storePath string, +func InitServer(port int, apiHost, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server { once.Do(func() { ServerInstance = &Server{ port: port, + apiHost: apiHost, apiPath: apiPath, dataOut: dataOut, closeCh: closeCh, diff --git a/v2/server_test.go b/v2/server_test.go index d805442..955a93f 100644 --- a/v2/server_test.go +++ b/v2/server_test.go @@ -30,7 +30,7 @@ import ( types2 "github.com/cloudevents/sdk-go/v2/types" "github.com/google/uuid" - restapi "github.com/redhat-cne/rest-api" + restapi "github.com/redhat-cne/rest-api/v2" "github.com/redhat-cne/sdk-go/pkg/channel" "github.com/redhat-cne/sdk-go/pkg/event" "github.com/redhat-cne/sdk-go/pkg/event/ptp" @@ -50,6 +50,7 @@ var ( closeCh chan struct{} wg sync.WaitGroup port = 8989 + apHost = "localhost" apPath = "/routes/cne/v1/" resource = "test/test" resourceNoneSubscribed = "test/nonesubscribed" @@ -64,7 +65,7 @@ func init() { } func TestMain(m *testing.M) { - server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh) + server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil) //start http server server.Start() @@ -100,7 +101,7 @@ func TestMain(m *testing.M) { Subject: func(s string) *string { return &s }("topic"), }.AsV1(), } - _ = e.SetData(cloudevents.ApplicationJSON, cneEvent) + _ = e.SetData("", cneEvent) func() { defer func() { if err := recover(); err != nil { @@ -362,7 +363,7 @@ func TestServer_GetCurrentState_withoutSubscription(t *testing.T) { s, err2 := io.ReadAll(resp.Body) assert.Nil(t, err2) log.Infof("tedt %s ", string(s)) - assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) } func TestServer_TestPingStatusStatusCode(t *testing.T) { @@ -424,7 +425,7 @@ func TestServer_GetNonExistingSubscription(t *testing.T) { resp, err := server.HTTPClient.Do(req) assert.Nil(t, err) defer resp.Body.Close() - assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) } func TestServer_TestDummyStatusCode(t *testing.T) { diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go index 7c20be5..45dc812 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/channel/data.go @@ -51,9 +51,8 @@ type StatusChan struct { // CreateCloudEvents ... func (d *DataChan) CreateCloudEvents(dataType string) (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSpecVersion(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetType(dataType) ce.SetSource(d.Address) ce.SetID(d.ClientID.String()) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go index 5fe66d4..afd5aa2 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go @@ -25,15 +25,13 @@ import ( // NewCloudEvent create new cloud event from cloud native events and pubsub func (e *Event) NewCloudEvent(ps *pubsub.PubSub) (*cloudevent.Event, error) { - ce := cloudevent.NewEvent(cloudevent.VersionV03) + ce := cloudevent.NewEvent(cloudevent.VersionV1) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetDataContentType(cloudevent.ApplicationJSON) - ce.SetSubject(e.Source) // subject is set to source of the event object ce.SetSource(ps.Resource) // bus address - ce.SetSpecVersion(cloudevent.VersionV03) + ce.SetSpecVersion(cloudevent.VersionV1) ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevent.ApplicationJSON, e.GetData()); err != nil { + if err := ce.SetData("", e.GetData()); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go index 10cd127..7d21726 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go @@ -18,27 +18,34 @@ package ptp type EventResource string const ( + // O-RAN 7.2.3.6 // GnssSyncStatus notification is signalled from equipment at state change GnssSyncStatus EventResource = "/sync/gnss-status/gnss-sync-status" + // O-RAN 7.2.3.8 // OsClockSyncState State of node OS clock synchronization is notified at state change OsClockSyncState EventResource = "/sync/sync-status/os-clock-sync-state" + // O-RAN 7.2.3.10 // PtpClockClass notification is generated when the clock-class changes. - PtpClockClass EventResource = "/sync/ptp-status/ptp-clock-class-change" + PtpClockClass EventResource = "/sync/ptp-status/clock-class" + // O-RAN 7.2.3.3 // PtpLockState notification is signalled from equipment at state change PtpLockState EventResource = "/sync/ptp-status/lock-state" + // O-RAN 7.2.3.11 // SynceClockQuality notification is generated when the clock-quality changes. SynceClockQuality EventResource = "/sync/synce-status/clock-quality" + // O-RAN 7.2.3.9 // SynceLockState Notification used to inform about synce synchronization state change SynceLockState EventResource = "/sync/synce-status/lock-state" // SynceLockStateExtended notification is signalled from equipment at state change, enhanced information SynceLockStateExtended EventResource = "/sync/synce-status/lock-state-extended" - // SyncStatusState State of equipment synchronization is notified at state change + // O-RAN 7.2.3.1 + // SyncStatusState is the overall synchronization health of the node, including the OS System Clock SyncStatusState EventResource = "/sync/sync-status/sync-state" ) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go index 741e337..4b2ee93 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go @@ -30,6 +30,18 @@ const ( // BOOTING ... BOOTING SyncState = "BOOTING" + // FAILURE_MULTIPATH is GNSS Sync Failure - Multipath condition detected + FAILURE_MULTIPATH SyncState = "FAILURE-MULTIPATH" + + // FAILURE_NOFIX is GNSS Sync Failure - Unknown + FAILURE_NOFIX SyncState = "FAILURE-NOFIX" + + // FAILURE_LOW_SNR is GNSS Sync Failure - Low SNR condition detected + FAILURE_LOW_SNR SyncState = "FAILURE-LOW-SNR" + + // FAILURE_PLL is GNSS Sync Failure - PLL is not functioning + FAILURE_PLL SyncState = "FAILURE-PLL" + // FREERUN ... FREERUN SyncState = "FREERUN" diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go index 9ff2865..ff6e5ea 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/types.go @@ -18,27 +18,34 @@ package ptp type EventType string const ( + // O-RAN 7.2.3.6 // GnssStateChange is Notification used to inform about gnss synchronization state change GnssStateChange EventType = "event.sync.gnss-status.gnss-state-change" + // O-RAN 7.2.3.8 // OsClockSyncStateChange is the object contains information related to a notification OsClockSyncStateChange EventType = "event.sync.sync-status.os-clock-sync-state-change" + // O-RAN 7.2.3.10 // PtpClockClassChange is Notification used to inform about ptp clock class changes. PtpClockClassChange EventType = "event.sync.ptp-status.ptp-clock-class-change" + // O-RAN 7.2.3.3 // PtpStateChange is Notification used to inform about ptp synchronization state change PtpStateChange EventType = "event.sync.ptp-status.ptp-state-change" + // O-RAN 7.2.3.11 // SynceClockQualityChange is Notification used to inform about changes in the clock quality of the primary SyncE signal advertised in ESMC packets - SynceClockQualityChange EventType = "event.sync.synce-status.sync-clock-quality-change" + SynceClockQualityChange EventType = "event.sync.synce-status.synce-clock-quality-change" + // O-RAN 7.2.3.9 // SynceStateChange is Notification used to inform about synce synchronization state change - SynceStateChange EventType = "event.sync.sync-status.synce-state-change" + SynceStateChange EventType = "event.sync.synce-status.synce-state-change" // SynceStateChangeExtended is Notification used to inform about synce synchronization state change, enhanced state information SynceStateChangeExtended EventType = "event.sync.synce-status.synce-state-change-extended" - // SyncStateChange is Notification used to inform about synchronization state change + // O-RAN 7.2.3.1 + // SyncStateChange is Notification used to inform about the overall synchronization state change SyncStateChange EventType = "event.sync.sync-status.synchronization-state-change" ) diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go index 428c9bf..be7afd0 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/pubsub/pubsub.go @@ -52,7 +52,7 @@ func (ps *PubSub) String() string { b := strings.Builder{} b.WriteString(" EndpointUri: " + ps.GetEndpointURI() + "\n") b.WriteString(" UriLocation: " + ps.GetURILocation() + "\n") - b.WriteString(" ID: " + ps.GetID() + "\n") + b.WriteString(" SubscriptionId: " + ps.GetID() + "\n") b.WriteString(" Resource: " + ps.GetResource() + "\n") return b.String() } diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go index 3c8f3fd..ce8e82c 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber.go @@ -49,7 +49,7 @@ type Subscriber struct { SubStore *store.PubSubStore `json:"subStore" omit:"empty"` // EndPointURI - A URI describing the subscriber link . // +required - EndPointURI *types.URI `json:"endPointURI" omit:"empty"` + EndPointURI *types.URI `json:"EndpointUri" omit:"empty"` // Status ... Status Status `json:"status" omit:"empty"` // Action ... @@ -84,8 +84,8 @@ func (s *Subscriber) FailedCount() int { // String returns a pretty-printed representation of the Event. func (s *Subscriber) String() string { b := strings.Builder{} - b.WriteString(" EndPointURI: " + s.GetEndPointURI() + "\n") - b.WriteString(" ID: " + s.GetClientID().String() + "\n") + b.WriteString(" EndpointUri: " + s.GetEndPointURI() + "\n") + b.WriteString(" clientID: " + s.GetClientID().String() + "\n") b.WriteString(" sub :{") if s.SubStore != nil { for _, v := range s.SubStore.Store { diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go index 98bcf43..48fcbb6 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_reader.go @@ -31,13 +31,12 @@ func (s *Subscriber) GetSubStore() *store.PubSubStore { // CreateCloudEvents ... func (s *Subscriber) CreateCloudEvents() (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSpecVersion(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetType(channel.SUBSCRIBER.String()) ce.SetSource("subscription-request") ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevents.ApplicationJSON, s); err != nil { + if err := ce.SetData("", s); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go index 5b9bad9..9d3d3b8 100644 --- a/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go +++ b/vendor/github.com/redhat-cne/sdk-go/pkg/subscriber/subscriber_writer.go @@ -42,12 +42,6 @@ func (s *Subscriber) SetStatus(status Status) { // AddSubscription ... func (s *Subscriber) AddSubscription(subs ...pubsub.PubSub) { for _, ss := range subs { - newS := &pubsub.PubSub{ - ID: ss.GetID(), - EndPointURI: nil, - URILocation: nil, - Resource: ss.Resource, - } - s.SubStore.Store[ss.GetID()] = newS + s.SubStore.Store[ss.GetID()] = &ss } } diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go b/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go index 93786f4..c9b9d96 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/event/event.go @@ -85,15 +85,13 @@ func SendCloudEventsToDataChannel(inChan chan<- *channel.DataChan, status channe // CreateCloudEvents create new cloud event from cloud native events and pubsub func CreateCloudEvents(e event.Event, ps pubsub.PubSub) (*cloudevents.Event, error) { - ce := cloudevents.NewEvent(cloudevents.VersionV03) + ce := cloudevents.NewEvent(cloudevents.VersionV1) ce.SetTime(e.GetTime()) ce.SetType(e.Type) - ce.SetDataContentType(cloudevents.ApplicationJSON) - ce.SetSubject(e.Source) // subject is set to source of the event object ce.SetSource(ps.Resource) // bus address - ce.SetSpecVersion(cloudevents.VersionV03) + ce.SetSpecVersion(cloudevents.VersionV1) ce.SetID(uuid.New().String()) - if err := ce.SetData(cloudevents.ApplicationJSON, e.GetData()); err != nil { + if err := ce.SetData("", e.GetData()); err != nil { return nil, err } return &ce, nil diff --git a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go index 6fb59c8..b755ef1 100644 --- a/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go +++ b/vendor/github.com/redhat-cne/sdk-go/v1/subscriber/subscriber.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "strings" "sync" "github.com/google/uuid" @@ -73,12 +74,19 @@ func (p *API) ReloadStore() { log.Infof("reloading subscribers from the store %s", p.storeFilePath) if files, err := loadFileNamesFromDir(p.storeFilePath); err == nil { for _, f := range files { - if b, err1 := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, f)); err1 == nil { + // valid subscription filename is .json + if uuid.Validate(strings.Split(f, ".")[0]) != nil { + continue + } else if b, err1 := loadFromFile(fmt.Sprintf("%s/%s", p.storeFilePath, f)); err1 == nil { if len(b) > 0 { var sub subscriber.Subscriber var err2 error if err2 = json.Unmarshal(b, &sub); err2 == nil { - p.SubscriberStore.Set(sub.ClientID, sub) + if sub.ClientID != uuid.Nil { + p.SubscriberStore.Set(sub.ClientID, sub) + } else { + log.Errorf("subscriber data from file %s is not valid", f) + } } else { log.Errorf("error parsing subscriber %s \n %s", string(b), err2.Error()) } @@ -201,8 +209,8 @@ func (p *API) GetSubscriptionsFromFile(clientID uuid.UUID) ([]byte, error) { return b, err } -// GetSubscriptions get all subscriptionOne inforamtions -func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { +// GetSubscriptionsFromClient get all subs from the client +func (p *API) GetSubscriptionsFromClientID(clientID uuid.UUID) (sub map[string]*pubsub.PubSub) { if subs, ok := p.SubscriberStore.Get(clientID); ok { sub = subs.SubStore.Store } @@ -210,12 +218,25 @@ func (p *API) GetSubscriptions(clientID uuid.UUID) (sub map[string]*pubsub.PubSu return } -// GetSubscription get subscriptionOne inforamtions -func (p *API) GetSubscription(clientID uuid.UUID, subID string) pubsub.PubSub { +// GetSubscriptions get all subs +func (p *API) GetSubscriptions() ([]byte, error) { + p.SubscriberStore.RLock() + defer p.SubscriberStore.RUnlock() + var allSubs []pubsub.PubSub + for _, s := range p.SubscriberStore.Store { + for _, sub := range s.SubStore.Store { + allSubs = append(allSubs, *sub) + } + } + return json.MarshalIndent(&allSubs, "", " ") +} + +// GetSubscription get sub info from clientID and subID +func (p *API) GetSubscription(clientID uuid.UUID, subID string) (pubsub.PubSub, error) { if subs, ok := p.SubscriberStore.Get(clientID); ok { - return subs.Get(subID) + return subs.Get(subID), nil } - return pubsub.PubSub{} + return pubsub.PubSub{}, fmt.Errorf("subscription data was not found for id %s", subID) } // GetSubscriberURLByResourceAndClientID get subscription information by client id/resource diff --git a/vendor/modules.txt b/vendor/modules.txt index ac0647f..6090f78 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -71,7 +71,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 +# github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go ## explicit; go 1.22 github.com/redhat-cne/sdk-go/pkg/channel github.com/redhat-cne/sdk-go/pkg/common