forked from mantzas/patron
-
Notifications
You must be signed in to change notification settings - Fork 67
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Sotirios Mantziaris
authored
Feb 18, 2021
1 parent
9099ada
commit 7f52baf
Showing
539 changed files
with
62,675 additions
and
24,150 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
// Package v2 provides a client with included tracing capabilities. | ||
package v2 | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/beatlabs/patron/correlation" | ||
patronerrors "github.com/beatlabs/patron/errors" | ||
"github.com/beatlabs/patron/trace" | ||
opentracing "github.com/opentracing/opentracing-go" | ||
"github.com/opentracing/opentracing-go/ext" | ||
"github.com/streadway/amqp" | ||
) | ||
|
||
const ( | ||
publisherComponent = "amqp-publisher" | ||
) | ||
|
||
// Publisher defines a RabbitMQ publisher with tracing instrumentation. | ||
type Publisher struct { | ||
cfg *amqp.Config | ||
connection *amqp.Connection | ||
channel *amqp.Channel | ||
} | ||
|
||
// New constructor. | ||
func New(url string, oo ...OptionFunc) (*Publisher, error) { | ||
if url == "" { | ||
return nil, errors.New("url is required") | ||
} | ||
|
||
var err error | ||
pub := &Publisher{} | ||
|
||
for _, option := range oo { | ||
err = option(pub) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
var conn *amqp.Connection | ||
|
||
if pub.cfg == nil { | ||
conn, err = amqp.Dial(url) | ||
} else { | ||
conn, err = amqp.DialConfig(url, *pub.cfg) | ||
} | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to open connection: %w", err) | ||
} | ||
|
||
ch, err := conn.Channel() | ||
if err != nil { | ||
return nil, patronerrors.Aggregate(fmt.Errorf("failed to open channel: %w", err), conn.Close()) | ||
} | ||
|
||
pub.connection = conn | ||
pub.channel = ch | ||
return pub, nil | ||
} | ||
|
||
// Publish a message to a exchange. | ||
func (tc *Publisher) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { | ||
sp, _ := trace.ChildSpan(ctx, trace.ComponentOpName(publisherComponent, exchange), | ||
publisherComponent, ext.SpanKindProducer, opentracing.Tag{Key: "exchange", Value: exchange}) | ||
|
||
if msg.Headers == nil { | ||
msg.Headers = amqp.Table{} | ||
} | ||
|
||
c := amqpHeadersCarrier(msg.Headers) | ||
err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, c) | ||
if err != nil { | ||
return fmt.Errorf("failed to inject tracing headers: %w", err) | ||
} | ||
msg.Headers[correlation.HeaderID] = correlation.IDFromContext(ctx) | ||
|
||
err = tc.channel.Publish(exchange, key, mandatory, immediate, msg) | ||
trace.SpanComplete(sp, err) | ||
if err != nil { | ||
return fmt.Errorf("failed to publish message: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Close the channel and connection. | ||
func (tc *Publisher) Close() error { | ||
return patronerrors.Aggregate(tc.channel.Close(), tc.connection.Close()) | ||
} | ||
|
||
type amqpHeadersCarrier map[string]interface{} | ||
|
||
// Set implements Set() of opentracing.TextMapWriter. | ||
func (c amqpHeadersCarrier) Set(key, val string) { | ||
c[key] = val | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package v2 | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestNew(t *testing.T) { | ||
type args struct { | ||
url string | ||
} | ||
tests := map[string]struct { | ||
args args | ||
expectedErr string | ||
}{ | ||
"fail, missing url": {args: args{}, expectedErr: "url is required"}, | ||
} | ||
for name, tt := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
got, err := New(tt.args.url) | ||
if tt.expectedErr != "" { | ||
assert.EqualError(t, err, tt.expectedErr) | ||
assert.Nil(t, got) | ||
} else { | ||
assert.NoError(t, err) | ||
assert.NotNil(t, got) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package v2 | ||
|
||
import ( | ||
"github.com/streadway/amqp" | ||
) | ||
|
||
// OptionFunc definition for configuring the publisher in a functional way. | ||
type OptionFunc func(*Publisher) error | ||
|
||
// Config option for providing dial configuration. | ||
func Config(cfg amqp.Config) OptionFunc { | ||
return func(p *Publisher) error { | ||
p.cfg = &cfg | ||
return nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package v2 | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/streadway/amqp" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestTimeout(t *testing.T) { | ||
cfg := amqp.Config{ | ||
Locale: "123", | ||
} | ||
|
||
p := Publisher{} | ||
assert.NoError(t, Config(cfg)(&p)) | ||
assert.Equal(t, cfg, *p.cfg) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.