-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
149 lines (143 loc) · 4.81 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package pubsub
import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"log"
"os"
"reflect"
"strconv"
"time"
)
func NewPubSubClientWithRetries(ctx context.Context, credentials []byte, retries []time.Duration, options ...string) (*pubsub.Client, error) {
var projectId string
if len(options) > 0 && len(options[0]) > 0 {
projectId = options[0]
}
if credentials != nil && len(credentials) > 0 {
opts := option.WithCredentialsJSON(credentials)
creds, er0 := transport.Creds(ctx, opts)
if er0 != nil {
return nil, er0
}
if len(projectId) == 0 {
projectId = creds.ProjectID
}
c, er1 := pubsub.NewClient(ctx, projectId, opts)
if er1 == nil {
return c, er1
}
i := 0
err := Retry(retries, func() (err error) {
i = i + 1
c2, er2 := pubsub.NewClient(ctx, projectId, opts)
if er2 == nil {
c = c2
}
return er2
})
if err != nil {
log.Printf("Failed to new pubsub client: %s.", err.Error())
}
return c, err
} else {
log.Println("empty credentials")
return pubsub.NewClient(ctx, projectId)
}
}
func NewPubSubClientWithFile(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
if len(keyFilename) > 0 && existFile(keyFilename) {
log.Println("key file exists")
return pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
} else {
log.Println("key file doesn't exists")
return pubsub.NewClient(ctx, projectId)
}
}
func NewPubSubClient(ctx context.Context, credentials []byte, options ...string) (*pubsub.Client, error) {
opts := option.WithCredentialsJSON(credentials)
var projectId string
if len(options) > 0 && len(options[0]) > 0 {
projectId = options[0]
} else {
creds, err := transport.Creds(ctx, opts)
projectId = creds.ProjectID
if err != nil {
panic("Credentials Error: " + err.Error())
}
if creds == nil {
panic("Error: creds is nil")
}
}
if credentials != nil && len(credentials) > 0 {
return pubsub.NewClient(ctx, projectId, opts)
} else {
log.Println("empty credentials")
return pubsub.NewClient(ctx, projectId)
}
}
func existFile(filename string) bool {
if _, err := os.Stat(filename); err == nil {
return true
} else if os.IsNotExist(err) {
return false
} else {
log.Println(err.Error())
}
return false
}
func MakeDurations(vs []int64) []time.Duration {
durations := make([]time.Duration, 0)
for _, v := range vs {
d := time.Duration(v) * time.Second
durations = append(durations, d)
}
return durations
}
func MakeArray(v interface{}, prefix string, max int) []int64 {
var ar []int64
v2 := reflect.Indirect(reflect.ValueOf(v))
for i := 1; i <= max; i++ {
fn := prefix + strconv.Itoa(i)
v3 := v2.FieldByName(fn).Interface().(int64)
if v3 > 0 {
ar = append(ar, v3)
} else {
return ar
}
}
return ar
}
func DurationsFromValue(v interface{}, prefix string, max int) []time.Duration {
arr := MakeArray(v, prefix, max)
return MakeDurations(arr)
}
type RetryConfig struct {
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
}
func Retry(sleeps []time.Duration, f func() error) (err error) {
attempts := len(sleeps)
for i := 0; ; i++ {
log.Printf("Retrying %d of %d ", i+1, attempts)
err = f()
if err == nil {
return
}
if i >= (attempts - 1) {
break
}
time.Sleep(sleeps[i])
log.Printf("Retrying %d of %d after error: %s", i+1, attempts, err.Error())
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}