Skip to content

Commit

Permalink
Merge pull request Azure#12 from Azure/m1-refactor
Browse files Browse the repository at this point in the history
M1 refactor
  • Loading branch information
devigned authored Jun 1, 2018
2 parents b0660bb + 6f4c251 commit 089e807
Show file tree
Hide file tree
Showing 15 changed files with 453 additions and 330 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(conn
handleErr(err)

queueName := "helloworld"
// Create the queue if it doesn't exist
qm := ns.NewQueueManager()
_, err := qm.Put(context.Background(), queueName)
q, err := ns.NewQueue(context.Background(), queueName)
handleErr(err)
q := ns.NewQueue(queueName)

// Send message to queue
err := q.Send(context.Background(), servicebus.NewEventFromString("Hello World!"))
Expand Down
22 changes: 12 additions & 10 deletions _examples/helloworld/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ func main() {
}

queueName := "helloworld"
// Create the queue if it doesn't exist
err = ensureQueue(ns, queueName)
q := ns.NewQueue(queueName)
q, err := getQueue(ns, queueName)
if err != nil {
fmt.Printf("failed to build a new queue named %q\n", queueName)
os.Exit(1)
}

// Start listening to events on the queue
exit := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
exit := make(chan struct{})
listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Event) error {

listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Message) error {
text := string(event.Data)
if text == "exit\n" {
fmt.Println("Oh snap!! Someone told me to exit!")
Expand All @@ -38,6 +40,7 @@ func main() {
return nil
})
defer listenHandle.Close(context.Background())

if err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -56,12 +59,11 @@ func main() {
}
}

func ensureQueue(ns *servicebus.Namespace, queueName string) error {
qm := ns.NewQueueManager()
func getQueue(ns *servicebus.Namespace, queueName string) (*servicebus.Queue, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := qm.Put(ctx, queueName)
return err
q, err := ns.NewQueue(ctx, queueName)
return q, err
}

func mustGetenv(key string) string {
Expand Down
18 changes: 10 additions & 8 deletions _examples/helloworld/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,30 @@ func main() {
}

queueName := "helloworld"
// Create the queue if it doesn't exist
err = ensureQueue(ns, queueName)
q := ns.NewQueue(queueName)
q, err := getQueue(ns, queueName)
if err != nil {
fmt.Printf("failed to build a new queue named %q\n", queueName)
os.Exit(1)
}

reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("Enter text: ")
text, _ := reader.ReadString('\n')
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
q.Send(ctx, servicebus.NewEventFromString(text))
q.Send(ctx, servicebus.NewMessageFromString(text))
if text == "exit\n" {
break
}
cancel()
}
}

func ensureQueue(ns *servicebus.Namespace, queueName string) error {
qm := ns.NewQueueManager()
func getQueue(ns *servicebus.Namespace, queueName string) (*servicebus.Queue, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := qm.Put(ctx, queueName)
return err
q, err := ns.NewQueue(ctx, queueName)
return q, err
}

func mustGetenv(key string) string {
Expand Down
42 changes: 21 additions & 21 deletions event.go → message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
)

type (
// Event is an Service Bus message to be sent or received
Event struct {
// Message is an Service Bus message to be sent or received
Message struct {
Data []byte
Properties map[string]interface{}
ID string
Expand All @@ -38,28 +38,28 @@ type (
}
)

// NewEventFromString builds an Event from a string message
func NewEventFromString(message string) *Event {
return NewEvent([]byte(message))
// NewMessageFromString builds an Message from a string message
func NewMessageFromString(message string) *Message {
return NewMessage([]byte(message))
}

// NewEvent builds an Event from a slice of data
func NewEvent(data []byte) *Event {
return &Event{
// NewMessage builds an Message from a slice of data
func NewMessage(data []byte) *Message {
return &Message{
Data: data,
}
}

// Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker
func (e *Event) Set(key, value string) {
func (e *Message) Set(key, value string) {
if e.Properties == nil {
e.Properties = make(map[string]interface{})
}
e.Properties[key] = value
}

// ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (e *Event) ForeachKey(handler func(key, val string) error) error {
func (e *Message) ForeachKey(handler func(key, val string) error) error {
for key, value := range e.Properties {
err := handler(key, value.(string))
if err != nil {
Expand All @@ -69,7 +69,7 @@ func (e *Event) ForeachKey(handler func(key, val string) error) error {
return nil
}

func (e *Event) toMsg() *amqp.Message {
func (e *Message) toMsg() *amqp.Message {
msg := e.message
if msg == nil {
msg = amqp.NewMessage(e.Data)
Expand All @@ -94,29 +94,29 @@ func (e *Event) toMsg() *amqp.Message {
return msg
}

func eventFromMsg(msg *amqp.Message) *Event {
return newEvent(msg.Data[0], msg)
func messageFromAMQPMessage(msg *amqp.Message) *Message {
return newMessage(msg.Data[0], msg)
}

func newEvent(data []byte, msg *amqp.Message) *Event {
event := &Event{
func newMessage(data []byte, msg *amqp.Message) *Message {
message := &Message{
Data: data,
message: msg,
}

if msg.Properties != nil {
if id, ok := msg.Properties.MessageID.(string); ok {
event.ID = id
message.ID = id
}
event.GroupID = &msg.Properties.GroupID
event.GroupSequence = &msg.Properties.GroupSequence
message.GroupID = &msg.Properties.GroupID
message.GroupSequence = &msg.Properties.GroupSequence
}

if msg != nil {
event.Properties = make(map[string]interface{})
message.Properties = make(map[string]interface{})
for key, value := range msg.ApplicationProperties {
event.Properties[key] = value
message.Properties[key] = value
}
}
return event
return message
}
6 changes: 6 additions & 0 deletions mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ func (em *EntityManager) Execute(ctx context.Context, method string, entityPath
return res, err
}

func isEmptyFeed(b []byte) bool {
var emptyFeed queueFeed
feedErr := xml.Unmarshal(b, &emptyFeed)
return feedErr == nil && emptyFeed.Title == "Publicly Listed Services"
}

func (em *EntityManager) addAuthorization(req *http.Request) (*http.Request, error) {
signature, err := em.TokenProvider.GetToken(req.URL.String())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
}

// Handler is the function signature for any receiver of AMQP messages
Handler func(context.Context, *Event) error
Handler func(context.Context, *Message) error

// NamespaceOption provides structure for configuring a new Service Bus namespace
NamespaceOption func(h *Namespace) error
Expand Down
16 changes: 8 additions & 8 deletions namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func (suite *serviceBusSuite) deleteAllTaggedQueues(ctx context.Context) {
ns := suite.getNewSasInstance()
qm := ns.NewQueueManager()

feed, err := qm.List(ctx)
qs, err := qm.List(ctx)
if err != nil {
suite.T().Fatal(err)
}

for _, entry := range feed.Entries {
if strings.HasSuffix(entry.Title, suite.TagID) {
err := qm.Delete(ctx, entry.Title)
for _, q := range qs {
if strings.HasSuffix(q.Name, suite.TagID) {
err := qm.Delete(ctx, q.Name)
if err != nil {
suite.T().Fatal(err)
}
Expand All @@ -83,14 +83,14 @@ func (suite *serviceBusSuite) deleteAllTaggedTopics(ctx context.Context) {
ns := suite.getNewSasInstance()
tm := ns.NewTopicManager()

feed, err := tm.List(ctx)
topics, err := tm.List(ctx)
if err != nil {
suite.T().Fatal(err)
}

for _, entry := range feed.Entries {
if strings.HasSuffix(entry.Title, suite.TagID) {
err := tm.Delete(ctx, entry.Title)
for _, topic := range topics {
if strings.HasSuffix(topic.Name, suite.TagID) {
err := tm.Delete(ctx, topic.Name)
if err != nil {
suite.T().Fatal(err)
}
Expand Down
Loading

0 comments on commit 089e807

Please sign in to comment.