Skip to content

Commit

Permalink
O-RAN V3 Rest Api: Status Notification (redhat-cne#71)
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding committed Aug 29, 2024
1 parent 08b7772 commit 232ff31
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 77 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9 h1:qDOGSHOtHRszd8FnM0GZVUvbIvHhZrw5GeccXYPwT04=
github.com/redhat-cne/sdk-go v1.0.1-0.20240702163442-605f629084b9/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612 h1:TnnP33wqdtZ4GCp8WYHVFVywWxrcGonc0ijGCpfqTdU=
github.com/redhat-cne/sdk-go v1.0.1-0.20240715150244-f435c154a612/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down
72 changes: 36 additions & 36 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
return
}
sub := pubsub.PubSub{}
sub.SetVersion(API_VERSION)
if err = json.Unmarshal(bodyBytes, &sub); err != nil {
respondWithStatusCode(w, http.StatusBadRequest, fmt.Sprintf("marshalling error %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
Expand All @@ -67,26 +68,18 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
if subExists, ok := s.pubSubAPI.HasSubscription(sub.GetResource()); ok {
clientIDs := s.subscriberAPI.GetClientIDByResource(sub.GetResource())
if len(clientIDs) != 0 {
respondWithStatusCode(w, http.StatusConflict,
fmt.Sprintf("subscription (id: %s) with same resource already exists, skipping creation",
subExists.GetID()))
fmt.Sprintf("subscription (clientID: %s) with same resource already exists, skipping creation",
clientIDs[0]))
return
}

id := uuid.New().String()
sub.SetID(id)
sub.SetVersion(API_VERSION)
sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck

// TODO: cleanup: local pubsub is no longer needed since we are using configMap
newSub, err := s.pubSubAPI.CreateSubscription(sub)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, fmt.Sprintf("error creating subscription %v", err))
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
addr := newSub.GetResource()
sub.SetURILocation(fmt.Sprintf("http://%s:%d%s%s/%s", s.apiHost, s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck
addr := sub.GetResource()

// this is placeholder not sending back to report
out := channel.DataChan{
Expand Down Expand Up @@ -119,7 +112,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
}

restClient := restclient.New()
out.Data.SetID(newSub.ID) // set ID to the subscriptionID
// make sure event ID is unique
out.Data.SetID(uuid.New().String())
status, err := restClient.PostCloudEvent(sub.EndPointURI, *out.Data)
if err != nil {
respondWithStatusCode(w, http.StatusBadRequest,
Expand All @@ -139,7 +133,7 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
subs := subscriber.New(s.getClientIDFromURI(endPointURI))
_ = subs.SetEndPointURI(endPointURI)

subs.AddSubscription(newSub)
subs.AddSubscription(sub)
subs.Action = channel.NEW
cevent, _ := subs.CreateCloudEvents()
cevent.SetSource(addr)
Expand All @@ -162,10 +156,10 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
} else {
out.Status = channel.SUCCESS
_ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj)
_ = out.Data.SetData("", updatedObj)
log.Infof("subscription created successfully.")
localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1)
respondWithJSON(w, http.StatusCreated, newSub)
respondWithJSON(w, http.StatusCreated, sub)
}

s.dataOut <- &out
Expand All @@ -188,6 +182,7 @@ func (s *Server) createPublisher(w http.ResponseWriter, r *http.Request) {
return
}
pub := pubsub.PubSub{}
pub.SetVersion(API_VERSION)
if err = json.Unmarshal(bodyBytes, &pub); err != nil {
localmetrics.UpdatePublisherCount(localmetrics.FAILCREATE, 1)
respondWithError(w, "marshalling error")
Expand Down Expand Up @@ -256,12 +251,15 @@ func (s *Server) getSubscriptionByID(w http.ResponseWriter, r *http.Request) {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}
sub, err := s.pubSubAPI.GetSubscription(subscriptionID)
if err != nil {
respondWithStatusCode(w, http.StatusNotFound, "")
return

for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
sub, err := s.subscriberAPI.GetSubscription(c, subscriptionID)
if err == nil {
respondWithJSON(w, http.StatusOK, sub)
return
}
}
respondWithJSON(w, http.StatusOK, sub)
respondWithStatusCode(w, http.StatusNotFound, "")
}

func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
Expand All @@ -278,9 +276,11 @@ func (s *Server) getPublisherByID(w http.ResponseWriter, r *http.Request) {
}
respondWithJSON(w, http.StatusOK, pub)
}

func (s *Server) getSubscriptions(w http.ResponseWriter, _ *http.Request) {
b, err := s.pubSubAPI.GetSubscriptionsFromFile()
b, err := s.subscriberAPI.GetSubscriptions()
if err != nil {
log.Errorf("error loading subscriber data %v", err)
respondWithError(w, "error loading subscriber data")
return
}
Expand Down Expand Up @@ -322,21 +322,21 @@ func (s *Server) deleteSubscription(w http.ResponseWriter, r *http.Request) {
return
}

if err := s.pubSubAPI.DeleteSubscription(subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
clientIDs := s.subscriberAPI.GetClientIDBySubID(subscriptionID)
if len(clientIDs) == 0 {
respondWithStatusCode(w, http.StatusNotFound, "")
return
}

// update configMap
for _, c := range s.subscriberAPI.GetClientIDBySubID(subscriptionID) {
for _, c := range clientIDs {
if err := s.subscriberAPI.DeleteSubscription(c, subscriptionID); err != nil {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILDELETE, 1)
respondWithStatusCode(w, http.StatusNotFound, err.Error())
return
}
}

// update configMap
for _, subs := range s.subscriberAPI.SubscriberStore.Store {
cevent, _ := subs.CreateCloudEvents()
out := channel.DataChan{
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
respondWithError(w, fmt.Sprintf("no publisher data for id %s found to publish event for", cneEvent.ID))
return
}
ceEvent, err := cneEvent.NewCloudEvent(&pub)
ceEvent, err := cneEvent.NewCloudEventV2(&pub)
if err != nil {
localmetrics.UpdateEventPublishedCount(pub.Resource, localmetrics.FAIL, 1)
respondWithError(w, err.Error())
Expand Down Expand Up @@ -441,12 +441,12 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
}
}
} else {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

if sub == nil {
respondWithError(w, "subscription not found")
respondWithStatusCode(w, http.StatusNotFound, "subscription not found")
return
}

Expand All @@ -471,14 +471,14 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
respondWithError(w, statusErr.Error())
respondWithStatusCode(w, http.StatusNotFound, statusErr.Error())
} else if out.Data != nil {
respondWithJSON(w, http.StatusOK, *out.Data)
} else {
respondWithError(w, "event not found")
respondWithStatusCode(w, http.StatusNotFound, "event not found")
}
} else {
respondWithError(w, "onReceive function not defined")
respondWithStatusCode(w, http.StatusNotFound, "onReceive function not defined")
}
}

Expand All @@ -504,7 +504,7 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req

Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(&sub)
ceEvent, err := cneEvent.NewCloudEventV2(&sub)

if err != nil {
respondWithError(w, err.Error())
Expand Down
4 changes: 3 additions & 1 deletion v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
// Server defines rest routes server object
type Server struct {
port int
apiHost string
apiPath string
//use dataOut chanel to write to configMap
dataOut chan<- *channel.DataChan
Expand Down Expand Up @@ -138,12 +139,13 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string,
func InitServer(port int, apiHost, apiPath, storePath string,
dataOut chan<- *channel.DataChan, closeCh <-chan struct{},
onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
apiHost: apiHost,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
Expand Down
11 changes: 6 additions & 5 deletions v2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
types2 "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"

restapi "github.com/redhat-cne/rest-api"
restapi "github.com/redhat-cne/rest-api/v2"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/event/ptp"
Expand All @@ -50,6 +50,7 @@ var (
closeCh chan struct{}
wg sync.WaitGroup
port = 8989
apHost = "localhost"
apPath = "/routes/cne/v1/"
resource = "test/test"
resourceNoneSubscribed = "test/nonesubscribed"
Expand All @@ -64,7 +65,7 @@ func init() {
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh)
server = restapi.InitServer(port, apHost, apPath, storePath, eventOutCh, closeCh, nil)
//start http server
server.Start()

Expand Down Expand Up @@ -100,7 +101,7 @@ func TestMain(m *testing.M) {
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
_ = e.SetData("", cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestServer_GetCurrentState_withoutSubscription(t *testing.T) {
s, err2 := io.ReadAll(resp.Body)
assert.Nil(t, err2)
log.Infof("tedt %s ", string(s))
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestPingStatusStatusCode(t *testing.T) {
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestServer_GetNonExistingSubscription(t *testing.T) {
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestServer_TestDummyStatusCode(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions vendor/github.com/redhat-cne/sdk-go/pkg/common/common.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/event_ce.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/resource.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/redhat-cne/sdk-go/pkg/event/ptp/syncstate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 232ff31

Please sign in to comment.