Skip to content

Commit

Permalink
O-RAN V3 Rest Api: Event Subscription
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding committed Jun 22, 2024
1 parent 2b9bd58 commit 18e822c
Show file tree
Hide file tree
Showing 28 changed files with 1,872 additions and 73 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/prometheus/client_golang v1.14.0
github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac
github.com/redhat-cne/sdk-go v1.0.1-unpublished
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
)

replace github.com/redhat-cne/sdk-go v1.0.1-unpublished => ../sdk-go

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand All @@ -23,6 +25,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac h1:I9dfgug3GIViEQxgKrLdWB8Y3mqVe0alismyTOdTz6w=
github.com/redhat-cne/sdk-go v1.0.1-0.20240614182056-bfc7566a02ac/go.mod h1:q9LxxPbK1tGpDbQm/KIPujqdP0bK1hhuHrIXV3vuUrM=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down
121 changes: 82 additions & 39 deletions v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
ce "github.com/cloudevents/sdk-go/v2/event"
"github.com/redhat-cne/sdk-go/pkg/subscriber"

cne "github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/pubsub"
Expand Down Expand Up @@ -64,7 +65,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
localmetrics.UpdateSubscriptionCount(localmetrics.FAILCREATE, 1)
return
}
if sub.GetEndpointURI() != "" {
endPointURI := sub.GetEndpointURI()
if endPointURI != "" {
response, err = s.HTTPClient.Post(sub.GetEndpointURI(), cloudevents.ApplicationJSON, nil)
if err != nil {
log.Printf("there was error validating endpointurl %v, subscription wont be created", err)
Expand All @@ -81,7 +83,8 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
}
}
// check sub.EndpointURI by get
sub.SetID(uuid.New().String())
id := uuid.New().String()
sub.SetID(id)
_ = sub.SetURILocation(fmt.Sprintf("http://localhost:%d%s%s/%s", s.port, s.apiPath, "subscriptions", sub.ID)) //nolint:errcheck

newSub, err := s.pubSubAPI.CreateSubscription(sub)
Expand All @@ -92,10 +95,47 @@ func (s *Server) createSubscription(w http.ResponseWriter, r *http.Request) {
return
}
log.Printf("subscription created successfully.")
// go ahead and create QDR to this address
s.sendOut(channel.SUBSCRIBER, &newSub)
localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1)
respondWithJSON(w, http.StatusCreated, newSub)

addr := newSub.GetResource()
// create unique clientId for each subscription based on endPointURI
subs := subscriber.New(s.getClientIDFromURI(endPointURI))

_ = subs.SetEndPointURI(endPointURI)

// create a subscriber model
// subs.AddSubscription(newSub)
subs.AddSubscription(newSub)
subs.Action = channel.NEW

cevent, _ := subs.CreateCloudEvents()
cevent.SetSubject(channel.NEW.String())
cevent.SetSource(addr)

out := channel.DataChan{
Address: addr,
Data: cevent,
Status: channel.NEW,
Type: channel.SUBSCRIBER,
}

var updatedObj *subscriber.Subscriber
// writes a file <clientID>.json that has the same content as configMap.
// configMap was created later as a way to persist the data.
if updatedObj, err = s.subscriberAPI.CreateSubscription(subs.ClientID, *subs); err != nil {
log.Printf("failed creating subscription for %s", subs.ClientID.String())
out.Status = channel.FAILED
} else {
out.Status = channel.SUCCESS
_ = out.Data.SetData(cloudevents.ApplicationJSON, updatedObj)
// TODO: this function is in sdk-go
// localmetrics.UpdateSenderCreatedCount(obj.ClientID.String(), localmetrics.ACTIVE, 1)

log.Printf("subscription created successfully.")
localmetrics.UpdateSubscriptionCount(localmetrics.ACTIVE, 1)
respondWithJSON(w, http.StatusCreated, newSub)
}

s.dataOut <- &out
}

// createPublisher create publisher and send it to a channel that is shared by middleware to process
Expand Down Expand Up @@ -332,19 +372,23 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
return
}

log.Printf("DZK num of subscriptions=%d, num of publishers=%d", len(s.pubSubAPI.GetSubscriptions()), len(s.pubSubAPI.GetPublishers()))

//identify publisher or subscriber is asking for status
var sub *pubsub.PubSub
if len(s.pubSubAPI.GetSubscriptions()) > 0 {
for _, subscriptions := range s.pubSubAPI.GetSubscriptions() {
if strings.Contains(subscriptions.GetResource(), resourceAddress) {
sub = subscriptions
log.Printf("DZK found subscription %v", sub)
break
}
}
} else if len(s.pubSubAPI.GetPublishers()) > 0 {
for _, publishers := range s.pubSubAPI.GetPublishers() {
if strings.Contains(publishers.GetResource(), resourceAddress) {
sub = publishers
log.Printf("DZK found publisher %v", sub)
break
}
}
Expand All @@ -357,41 +401,36 @@ func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
respondWithError(w, "subscription not found")
return
}
cneEvent := event.CloudNativeEvent()
cneEvent.SetID(sub.ID)
cneEvent.Type = channel.STATUS.String()
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{
Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(sub)

if err != nil {
respondWithError(w, err.Error())
} else {
// for http you send to the protocol address
statusChannel := make(chan *channel.StatusChan, 1)
s.dataOut <- &channel.DataChan{
Type: channel.STATUS,
Data: ceEvent,
Address: sub.GetResource(),
StatusChan: statusChannel,
}
select {
case d := <-statusChannel:
if d.Data == nil || d.StatusCode != http.StatusOK {
if string(d.Message) == "" {
d.Message = []byte("event not found")
}
respondWithError(w, string(d.Message))
} else {
respondWithJSON(w, d.StatusCode, *d.Data)
}
case <-time.After(5 * time.Second):
close(statusChannel)
respondWithError(w, "timeout waiting for status")
if resourceAddress == "" {
_ = json.NewEncoder(w).Encode(map[string]string{"message": "validation failed, resource is empty"})
}

if !strings.HasPrefix(resourceAddress, "/") {
resourceAddress = fmt.Sprintf("/%s", resourceAddress)
}
// this is placeholder not sending back to report
out := channel.DataChan{
Address: resourceAddress,
// ClientID is not used
ClientID: uuid.New(),
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}

e, _ := out.CreateCloudEvents(CURRENTSTATE)
e.SetSource(resourceAddress)
// statusReceiveOverrideFn must return value for
if s.statusReceiveOverrideFn != nil {
if statusErr := s.statusReceiveOverrideFn(*e, &out); statusErr != nil {
respondWithError(w, statusErr.Error())
} else if out.Data != nil {
respondWithJSON(w, http.StatusOK, *out.Data)
} else {
respondWithError(w, "event not found")
}
} else {
respondWithError(w, "onReceive function not defined")
}
}

Expand Down Expand Up @@ -449,6 +488,10 @@ func (s *Server) logEvent(w http.ResponseWriter, r *http.Request) {
respondWithMessage(w, http.StatusAccepted, "Event published to log")
}

func (s *Server) getClientIDFromURI(uri string) uuid.UUID {
return uuid.NewMD5(uuid.NameSpaceURL, []byte(uri))
}

func dummy(w http.ResponseWriter, _ *http.Request) {
respondWithMessage(w, http.StatusNoContent, "dummy test")
}
Expand Down
51 changes: 38 additions & 13 deletions v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
// Terms Of Service:
//
// Schemes: http, https
// Host: localhost:8080
// Host: localhost:8089
// basePath: /api/ocloudNotifications/v1
// Version: 1.0.0
// Contact: Aneesh Puttur<aputtur@redhat.com>
//
Expand All @@ -25,12 +26,14 @@ import (

"sync"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/gorilla/mux"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/pubsub"
"github.com/redhat-cne/sdk-go/pkg/types"
pubsubv1 "github.com/redhat-cne/sdk-go/v1/pubsub"
subscriberApi "github.com/redhat-cne/sdk-go/v1/subscriber"

"io"
"net/http"
Expand All @@ -55,19 +58,22 @@ const (
started
notReady
failed
CURRENTSTATE = "CurrentState"
)

// Server defines rest routes server object
type Server struct {
port int
apiPath string
//data out is transport in channel
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
status serverStatus
//use dataOut chanel to write to configMap
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
subscriberAPI *subscriberApi.API
status serverStatus
statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error
}

// publisher/subscription data model
Expand Down Expand Up @@ -115,7 +121,9 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server {
func InitServer(port int, apiPath, storePath string,
dataOut chan<- *channel.DataChan, closeCh <-chan struct{},
onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
Expand All @@ -129,7 +137,9 @@ func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.Dat
},
Timeout: 10 * time.Second,
},
pubSubAPI: pubsubv1.GetAPIInstance(storePath),
pubSubAPI: pubsubv1.GetAPIInstance(storePath),
subscriberAPI: subscriberApi.GetAPIInstance(storePath),
statusReceiveOverrideFn: onStatusReceiveOverrideFn,
}
})
// singleton
Expand Down Expand Up @@ -196,7 +206,7 @@ func (s *Server) Start() {
api := r.PathPrefix(s.apiPath).Subrouter()

// createSubscription create subscription and send it to a channel that is shared by middleware to process
// swagger:operation POST /subscriptions/ subscription createSubscription
// swagger:operation POST /subscriptions subscription createSubscription
// ---
// summary: Creates a new subscription.
// description: If subscription creation is success(or if already exists), subscription will be returned with Created (201).
Expand Down Expand Up @@ -237,6 +247,17 @@ func (s *Server) Start() {
404 Subscription resources are not available (not created).
*/
api.HandleFunc("/subscriptions", s.getSubscriptions).Methods(http.MethodGet)
//publishers create publisher and send it to a channel that is shared by middleware to process
// swagger:operation GET /publishers/ publishers getPublishers
// ---
// summary: Get publishers.
// description: If publisher creation is success(or if already exists), publisher will be returned with Created (201).
// parameters:
// responses:
// "200":
// "$ref": "#/responses/publishers"
// "404":
// "$ref": "#/responses/notFound"
api.HandleFunc("/publishers", s.getPublishers).Methods(http.MethodGet)
// 200 and 404
api.HandleFunc("/subscriptions/{subscriptionid}", s.getSubscriptionByID).Methods(http.MethodGet)
Expand Down Expand Up @@ -322,8 +343,7 @@ func (s *Server) Start() {
fmt.Fprintln(w, r)
})

log.Info("starting rest api server")
log.Infof("endpoint %s", s.apiPath)
log.Infof("starting v2 rest api server at port %d, endpoint %s", s.port, s.apiPath)
go wait.Until(func() {
s.status = started
s.httpServer = &http.Server{
Expand All @@ -344,3 +364,8 @@ func (s *Server) Shutdown() {
log.Warnf("trying to shutdown rest api sever, please use close channel to shutdown ")
s.httpServer.Close()
}

// SetOnStatusReceiveOverrideFn ... sets receiver function
func (s *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error) {
s.statusReceiveOverrideFn = fn
}
Loading

0 comments on commit 18e822c

Please sign in to comment.