Skip to content

Commit 3fd7dd3

Browse files
authored
Merge branch 'master' into binding-gcp-bucket-update
2 parents 0409530 + e41acc4 commit 3fd7dd3

File tree

15 files changed

+611
-10
lines changed

15 files changed

+611
-10
lines changed

bindings/azure/blobstorage/blobstorage.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,12 @@ func (a *AzureBlobStorage) Init(metadata bindings.Metadata) error {
119119
if err != nil {
120120
return fmt.Errorf("invalid credentials with error: %w", err)
121121
}
122-
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
122+
123+
userAgent := "dapr-" + logger.DaprVersion
124+
options := azblob.PipelineOptions{
125+
Telemetry: azblob.TelemetryOptions{Value: userAgent},
126+
}
127+
p := azblob.NewPipeline(credential, options)
123128

124129
containerName := a.metadata.Container
125130
URL, _ := url.Parse(

bindings/azure/eventgrid/eventgrid.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import (
2323

2424
// AzureEventGrid allows sending/receiving Azure Event Grid events.
2525
type AzureEventGrid struct {
26-
metadata *azureEventGridMetadata
27-
logger logger.Logger
26+
metadata *azureEventGridMetadata
27+
logger logger.Logger
28+
userAgent string
2829
}
2930

3031
type azureEventGridMetadata struct {
@@ -55,6 +56,7 @@ func NewAzureEventGrid(logger logger.Logger) *AzureEventGrid {
5556

5657
// Init performs metadata init.
5758
func (a *AzureEventGrid) Init(metadata bindings.Metadata) error {
59+
a.userAgent = "dapr-" + logger.DaprVersion
5860
m, err := a.parseMetadata(metadata)
5961
if err != nil {
6062
return err
@@ -126,6 +128,7 @@ func (a *AzureEventGrid) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeRe
126128
request.Header.SetMethod(fasthttp.MethodPost)
127129
request.Header.Set("Content-Type", "application/cloudevents+json")
128130
request.Header.Set("aeg-sas-key", a.metadata.AccessKey)
131+
request.Header.Set("User-Agent", a.userAgent)
129132
request.SetRequestURI(a.metadata.TopicEndpoint)
130133
request.SetBody(req.Data)
131134

@@ -222,6 +225,7 @@ func (a *AzureEventGrid) createSubscription() error {
222225
clientCredentialsConfig := auth.NewClientCredentialsConfig(a.metadata.ClientID, a.metadata.ClientSecret, a.metadata.TenantID)
223226

224227
subscriptionClient := eventgrid.NewEventSubscriptionsClient(a.metadata.SubscriptionID)
228+
subscriptionClient.AddToUserAgent(a.userAgent)
225229
authorizer, err := clientCredentialsConfig.Authorizer()
226230
if err != nil {
227231
return err

bindings/azure/eventhubs/eventhubs.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,18 @@ func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
132132
if err != nil {
133133
return err
134134
}
135+
userAgent := "dapr-" + logger.DaprVersion
135136
a.metadata = m
136-
hub, err := eventhub.NewHubFromConnectionString(a.metadata.connectionString)
137+
hub, err := eventhub.NewHubFromConnectionString(a.metadata.connectionString,
138+
eventhub.HubWithUserAgent(userAgent),
139+
)
137140

138141
// Create partitioned sender if the partitionID is configured
139142
if a.metadata.partitioned() {
140143
hub, err = eventhub.NewHubFromConnectionString(a.metadata.connectionString,
141-
eventhub.HubWithPartitionedSender(a.metadata.partitionID))
144+
eventhub.HubWithPartitionedSender(a.metadata.partitionID),
145+
eventhub.HubWithUserAgent(userAgent),
146+
)
142147
}
143148

144149
if err != nil {

bindings/azure/servicebusqueues/servicebusqueues.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
5151
if err != nil {
5252
return err
5353
}
54+
userAgent := "dapr-" + logger.DaprVersion
5455
a.metadata = meta
5556

56-
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(a.metadata.ConnectionString))
57+
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(a.metadata.ConnectionString),
58+
servicebus.NamespaceWithUserAgent(userAgent))
5759
if err != nil {
5860
return err
5961
}

bindings/azure/signalr/signalr.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type SignalR struct {
5050
accessKey string
5151
version string
5252
hub string
53+
userAgent string
5354
tokens map[string]signalrCachedToken
5455
httpClient *http.Client
5556

@@ -58,6 +59,8 @@ type SignalR struct {
5859

5960
// Init is responsible for initializing the SignalR output based on the metadata.
6061
func (s *SignalR) Init(metadata bindings.Metadata) error {
62+
s.userAgent = "dapr-" + logger.DaprVersion
63+
6164
connectionString, ok := metadata.Properties[connectionStringKey]
6265
if !ok || connectionString == "" {
6366
return fmt.Errorf("missing connection string")
@@ -128,6 +131,7 @@ func (s *SignalR) sendMessageToSignalR(url string, token string, data []byte) er
128131

129132
httpReq.Header.Set("Authorization", "Bearer "+token)
130133
httpReq.Header.Set("Content-Type", "application/json")
134+
httpReq.Header.Set("User-Agent", s.userAgent)
131135

132136
resp, err := s.httpClient.Do(httpReq)
133137
if err != nil {

bindings/azure/storagequeues/storagequeues.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ func (d *AzureQueueHelper) Init(accountName string, accountKey string, queueName
5858
d.credential = credential
5959
d.decodeBase64 = decodeBase64
6060
u, _ := url.Parse(fmt.Sprintf(d.reqURI, accountName, queueName))
61-
d.queueURL = azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
61+
userAgent := "dapr-" + logger.DaprVersion
62+
pipelineOptions := azqueue.PipelineOptions{
63+
Telemetry: azqueue.TelemetryOptions{
64+
Value: userAgent,
65+
},
66+
}
67+
d.queueURL = azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, pipelineOptions))
6268
ctx := context.TODO()
6369
_, err = d.queueURL.Create(ctx, azqueue.Metadata{})
6470
if err != nil {

pubsub/azure/eventhubs/eventhubs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
157157
if err != nil {
158158
return err
159159
}
160+
userAgent := "dapr-" + logger.DaprVersion
160161
aeh.metadata = m
161-
hub, err := eventhub.NewHubFromConnectionString(aeh.metadata.connectionString)
162+
hub, err := eventhub.NewHubFromConnectionString(aeh.metadata.connectionString,
163+
eventhub.HubWithUserAgent(userAgent))
162164
if err != nil {
163165
return fmt.Errorf("unable to connect to azure event hubs: %v", err)
164166
}

pubsub/azure/servicebus/servicebus.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,12 @@ func (a *azureServiceBus) Init(metadata pubsub.Metadata) error {
257257
return err
258258
}
259259

260+
userAgent := "dapr-" + logger.DaprVersion
260261
a.metadata = m
261-
a.namespace, err = azservicebus.NewNamespace(azservicebus.NamespaceWithConnectionString(a.metadata.ConnectionString))
262+
a.namespace, err = azservicebus.NewNamespace(
263+
azservicebus.NamespaceWithConnectionString(a.metadata.ConnectionString),
264+
azservicebus.NamespaceWithUserAgent(userAgent))
265+
262266
if err != nil {
263267
return err
264268
}

secretstores/azure/keyvault/keyvault.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func (k *keyvaultSecretStore) Init(metadata secretstores.Metadata) error {
7777
authorizer, err := settings.GetAuthorizer()
7878
if err == nil {
7979
k.vaultClient.Authorizer = authorizer
80+
k.vaultClient.UserAgent = "dapr-" + logger.DaprVersion
8081
}
8182

8283
k.vaultName = settings.Values[componentVaultName]
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package tablestore
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
7+
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
8+
)
9+
10+
type mockClient struct {
11+
tablestore.TableStoreClient
12+
13+
data map[string][]byte
14+
}
15+
16+
func (m *mockClient) DeleteRow(request *tablestore.DeleteRowRequest) (*tablestore.DeleteRowResponse, error) {
17+
var key string
18+
for _, col := range request.DeleteRowChange.PrimaryKey.PrimaryKeys {
19+
if col.ColumnName == stateKey {
20+
key = col.Value.(string)
21+
22+
break
23+
}
24+
}
25+
26+
delete(m.data, key)
27+
28+
return nil, nil
29+
}
30+
31+
func (m *mockClient) GetRow(request *tablestore.GetRowRequest) (*tablestore.GetRowResponse, error) {
32+
var key string
33+
for _, col := range request.SingleRowQueryCriteria.PrimaryKey.PrimaryKeys {
34+
if col.ColumnName == stateKey {
35+
key = col.Value.(string)
36+
37+
break
38+
}
39+
}
40+
41+
val := m.data[key]
42+
43+
resp := &tablestore.GetRowResponse{
44+
Columns: []*tablestore.AttributeColumn{{
45+
ColumnName: stateValue,
46+
Value: val,
47+
}},
48+
}
49+
50+
return resp, nil
51+
}
52+
53+
func (m *mockClient) UpdateRow(req *tablestore.UpdateRowRequest) (*tablestore.UpdateRowResponse, error) {
54+
change := req.UpdateRowChange
55+
56+
var val []byte
57+
var key string
58+
59+
for _, col := range change.PrimaryKey.PrimaryKeys {
60+
if col.ColumnName == stateKey {
61+
key = col.Value.(string)
62+
63+
break
64+
}
65+
}
66+
67+
for _, col := range change.Columns {
68+
if col.ColumnName == stateValue {
69+
buf := &bytes.Buffer{}
70+
binary.Write(buf, binary.BigEndian, col.Value)
71+
val = buf.Bytes()
72+
73+
break
74+
}
75+
}
76+
77+
m.data[key] = val
78+
79+
return nil, nil
80+
}
81+
82+
func (m *mockClient) BatchGetRow(request *tablestore.BatchGetRowRequest) (*tablestore.BatchGetRowResponse, error) {
83+
resp := &tablestore.BatchGetRowResponse{
84+
TableToRowsResult: map[string][]tablestore.RowResult{},
85+
}
86+
87+
for _, criteria := range request.MultiRowQueryCriteria {
88+
tableRes := resp.TableToRowsResult[criteria.TableName]
89+
if tableRes == nil {
90+
tableRes = []tablestore.RowResult{}
91+
}
92+
for _, keys := range criteria.PrimaryKey {
93+
for _, key := range keys.PrimaryKeys {
94+
if key.ColumnName == stateKey {
95+
pk := key.Value.(string)
96+
97+
if m.data[pk] == nil {
98+
continue
99+
}
100+
101+
value := m.data[key.Value.(string)]
102+
tableRes = append(tableRes, tablestore.RowResult{
103+
TableName: criteria.TableName,
104+
Columns: []*tablestore.AttributeColumn{
105+
{
106+
ColumnName: stateValue,
107+
Value: value,
108+
},
109+
},
110+
PrimaryKey: tablestore.PrimaryKey{
111+
PrimaryKeys: []*tablestore.PrimaryKeyColumn{
112+
{
113+
ColumnName: stateKey,
114+
Value: key.Value,
115+
},
116+
},
117+
},
118+
})
119+
resp.TableToRowsResult[criteria.TableName] = tableRes
120+
121+
break
122+
}
123+
}
124+
}
125+
}
126+
127+
return resp, nil
128+
}
129+
130+
func (m *mockClient) BatchWriteRow(request *tablestore.BatchWriteRowRequest) (*tablestore.BatchWriteRowResponse, error) {
131+
resp := &tablestore.BatchWriteRowResponse{}
132+
for _, changes := range request.RowChangesGroupByTable {
133+
for _, change := range changes {
134+
switch inst := change.(type) {
135+
case *tablestore.UpdateRowChange:
136+
var pk string
137+
for _, col := range inst.PrimaryKey.PrimaryKeys {
138+
if col.ColumnName == stateKey {
139+
pk = col.Value.(string)
140+
141+
break
142+
}
143+
}
144+
145+
for _, col := range inst.Columns {
146+
if col.ColumnName == stateValue {
147+
buf := &bytes.Buffer{}
148+
binary.Write(buf, binary.BigEndian, col.Value)
149+
m.data[pk] = buf.Bytes()
150+
}
151+
}
152+
153+
case *tablestore.DeleteRowChange:
154+
for _, col := range inst.PrimaryKey.PrimaryKeys {
155+
if col.ColumnName == stateKey {
156+
delete(m.data, col.Value.(string))
157+
158+
break
159+
}
160+
}
161+
}
162+
}
163+
}
164+
165+
return resp, nil
166+
}

0 commit comments

Comments
 (0)