-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher_options.go
135 lines (118 loc) · 2.96 KB
/
publisher_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package amqpx
import "context"
// PublisherOption is used to configure a publisher.
type PublisherOption func(*publisherOptions)
type publisherOptions struct {
confirm bool
publish publishOptions
marshaler Marshaler
interceptor []PublishInterceptor
}
func (p *publisherOptions) validate() error {
if p.marshaler == nil {
return errMarshalerNotFound
}
return nil
}
// SetConfirmMode sets confirm mode.
//
// confirm sets channel into confirm mode so that the client can ensure
// all publishers have successfully been received by the server.
func SetConfirmMode() PublisherOption {
return func(o *publisherOptions) {
o.confirm = true
}
}
// SetPublishInterceptor sets publish interceptor.
func SetPublishInterceptor(i ...PublishInterceptor) PublisherOption {
return func(o *publisherOptions) {
o.interceptor = append(o.interceptor, i...)
}
}
// SetMarshaler sets marshaler.
func SetMarshaler(m Marshaler) PublisherOption {
return func(o *publisherOptions) {
if m != nil {
o.marshaler = m
}
}
}
// UseRoutingKey sets routing key.
func UseRoutingKey(s string) PublisherOption {
return func(o *publisherOptions) {
if s != "" {
o.publish.key = s
}
}
}
// UseMandatory sets mandatory.
// The default is false.
//
// Message can be undeliverable when the mandatory flag is true and no queue is
// bound that matches the routing key.
func UseMandatory(b bool) PublisherOption {
return func(o *publisherOptions) {
o.publish.mandatory = b
}
}
// UseImmediate sets immediate.
// The default is false.
//
// Message can be undeliverable when the immediate flag is true and no
// consumer on the matched queue is ready to accept the delivery.
func UseImmediate(b bool) PublisherOption {
return func(o *publisherOptions) {
o.publish.immediate = b
}
}
// PublishOption is used to configure the publishing message.
type PublishOption func(*publishOptions)
type publishOptions struct {
key string
exchange string
mandatory bool
immediate bool
ctx context.Context
}
func (p publishOptions) validate() error {
if p.key == "" {
return errRoutingKeyEmpty
}
return nil
}
// SetRoutingKey sets routing key.
func SetRoutingKey(s string) PublishOption {
return func(o *publishOptions) {
if s != "" {
o.key = s
}
}
}
// SetMandatory sets mandatory.
// The default is false.
//
// Message can be undeliverable when the mandatory flag is true and no queue is
// bound that matches the routing key.
func SetMandatory(b bool) PublishOption {
return func(o *publishOptions) {
o.mandatory = b
}
}
// SetImmediate sets immediate.
// The default is false.
//
// Message can be undeliverable when the immediate flag is true and no
// consumer on the matched queue is ready to accept the delivery.
func SetImmediate(b bool) PublishOption {
return func(o *publishOptions) {
o.immediate = b
}
}
// SetContext sets publish context.
func SetContext(ctx context.Context) PublishOption {
return func(o *publishOptions) {
if ctx != nil {
o.ctx = ctx
}
}
}