-
Notifications
You must be signed in to change notification settings - Fork 314
/
common.go
222 lines (193 loc) · 5.88 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package common
import (
stdjson "encoding/json"
"net/http"
"strings"
"sync"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"github.com/rudderlabs/rudder-go-kit/config"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type AsyncUploadAndTransformManager interface {
Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput
Transform(job *jobsdb.JobT) (string, error)
}
type AsyncDestinationManager interface {
AsyncUploadAndTransformManager
Poll(pollInput AsyncPoll) PollStatusResponse
GetUploadStats(UploadStatsInput GetUploadStatsInput) GetUploadStatsResponse
}
type SimpleAsyncDestinationManager struct {
UploaderAndTransformer AsyncUploadAndTransformManager
}
func (m SimpleAsyncDestinationManager) Upload(asyncDestStruct *AsyncDestinationStruct) AsyncUploadOutput {
return m.UploaderAndTransformer.Upload(asyncDestStruct)
}
func (m SimpleAsyncDestinationManager) Poll(AsyncPoll) PollStatusResponse {
return PollStatusResponse{
StatusCode: http.StatusOK,
Complete: true,
}
}
func (m SimpleAsyncDestinationManager) GetUploadStats(GetUploadStatsInput) GetUploadStatsResponse {
return GetUploadStatsResponse{
StatusCode: http.StatusOK,
}
}
func (m SimpleAsyncDestinationManager) Transform(job *jobsdb.JobT) (string, error) {
return m.UploaderAndTransformer.Transform(job)
}
type PollStatusResponse struct {
Complete bool
InProgress bool
StatusCode int
HasFailed bool
HasWarning bool
FailedJobURLs string
WarningJobURLs string
Error string `json:"error"`
}
type AsyncUploadOutput struct {
ImportingJobIDs []int64
ImportingParameters stdjson.RawMessage
FailedJobIDs []int64
SucceededJobIDs []int64
SuccessResponse string
FailedReason string
AbortJobIDs []int64
AbortReason string
ImportingCount int
FailedCount int
AbortCount int
DestinationID string
}
type AsyncPoll struct {
ImportId string `json:"importId"`
}
type ErrorResponse struct {
Error string
}
type Connection struct {
Source backendconfig.SourceT
Destination backendconfig.DestinationT
}
type BatchedJobs struct {
Jobs []*jobsdb.JobT
Connection *Connection
TimeWindow time.Time
}
type AsyncJob struct {
Message map[string]interface{} `json:"message"`
Metadata map[string]interface{} `json:"metadata"`
}
type AsyncUploadT struct {
Config map[string]interface{} `json:"config"`
Input []AsyncJob `json:"input"`
DestType string `json:"destType"`
}
type UploadStruct struct {
ImportId string `json:"importId"`
Metadata map[string]interface{} `json:"metadata"`
}
type MetaDataT struct {
CSVHeaders string `json:"csvHeader"`
}
type ImportParameters struct {
ImportId string `json:"importId"`
MetaData MetaDataT `json:"metadata"`
}
type AsyncDestinationStruct struct {
ImportingJobIDs []int64
FailedJobIDs []int64
Exists bool
Size int
CreatedAt time.Time
FileName string
Count int
CanUpload bool
UploadInProgress bool
UploadMutex sync.RWMutex
DestinationUploadURL string
Destination *backendconfig.DestinationT
Manager AsyncDestinationManager
AttemptNums map[int64]int
FirstAttemptedAts map[int64]time.Time
OriginalJobParameters map[int64]stdjson.RawMessage
PartFileNumber int
SourceJobRunID string
}
type AsyncFailedPayload struct {
Config map[string]interface{} `json:"config"`
Input []map[string]interface{} `json:"input"`
DestType string `json:"destType"`
ImportId string `json:"importId"`
MetaData MetaDataT `json:"metadata"`
}
type GetUploadStatsInput struct {
FailedJobURLs string
Parameters stdjson.RawMessage
ImportingList []*jobsdb.JobT
PollResultFileURLs string
WarningJobURLs string
}
type EventStatMeta struct {
FailedKeys []int64
WarningKeys []int64
SucceededKeys []int64
FailedReasons map[int64]string
WarningReasons map[int64]string
}
type GetUploadStatsResponse struct {
StatusCode int `json:"statusCode"`
Metadata EventStatMeta `json:"metadata"`
Error string `json:"error"`
}
func GetTransformedData(payload stdjson.RawMessage) string {
return gjson.GetBytes(payload, "body.JSON").String()
}
func GetMarshalledData(payload string, jobID int64) (string, error) {
var asyncJob AsyncJob
err := json.Unmarshal([]byte(payload), &asyncJob.Message)
if err != nil {
return "", err
}
asyncJob.Metadata = make(map[string]interface{})
asyncJob.Metadata["job_id"] = jobID
responsePayload, err := json.Marshal(asyncJob)
if err != nil {
return "", err
}
return string(responsePayload), nil
}
func GetBatchRouterConfigInt64(key, destType string, defaultValue int64) int64 {
destOverrideFound := config.IsSet("BatchRouter." + destType + "." + key)
if destOverrideFound {
return config.GetInt64("BatchRouter."+destType+"."+key, defaultValue)
}
return config.GetInt64("BatchRouter."+key, defaultValue)
}
func GetBatchRouterConfigStringMap(key, destType string, defaultValue []string) []string {
destOverrideFound := config.IsSet("BatchRouter." + destType + "." + key)
if destOverrideFound {
return config.GetStringSlice("BatchRouter."+destType+"."+key, defaultValue)
}
return config.GetStringSlice("BatchRouter."+key, defaultValue)
}
/*
Generates array of strings for comma separated string
Also removes "" elements from the array of strings if any.
*/
func GenerateArrayOfStrings(value string) []string {
result := []string{}
requestIdsArray := strings.Split(value, ",")
for _, requestId := range requestIdsArray {
if requestId != "" {
result = append(result, requestId)
}
}
return result
}