-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy paths3.go
364 lines (302 loc) · 9.49 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package awss3
import (
"context"
"net/url"
"sync"
"time"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/go-concert/timed"
)
type commitWriteState struct {
time.Time
}
type s3ObjectInfo struct {
name string
key string
etag string
lastModified time.Time
listingID string
}
type s3ObjectPayload struct {
s3ObjectHandler s3ObjectHandler
s3ObjectInfo s3ObjectInfo
s3ObjectEvent s3EventV2
}
type s3Poller struct {
numberOfWorkers int
bucket string
listPrefix string
region string
provider string
bucketPollInterval time.Duration
workerSem *awscommon.Sem
s3 s3API
log *logp.Logger
metrics *inputMetrics
s3ObjectHandler s3ObjectHandlerFactory
states *states
store *statestore.Store
workersListingMap *sync.Map
workersProcessingMap *sync.Map
}
func newS3Poller(log *logp.Logger,
metrics *inputMetrics,
s3 s3API,
s3ObjectHandler s3ObjectHandlerFactory,
states *states,
store *statestore.Store,
bucket string,
listPrefix string,
awsRegion string,
provider string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
listPrefix: listPrefix,
region: awsRegion,
provider: provider,
bucketPollInterval: bucketPollInterval,
workerSem: awscommon.NewSem(numberOfWorkers),
s3: s3,
log: log,
metrics: metrics,
s3ObjectHandler: s3ObjectHandler,
states: states,
store: store,
workersListingMap: new(sync.Map),
workersProcessingMap: new(sync.Map),
}
}
func (p *s3Poller) handlePurgingLock(info s3ObjectInfo, isStored bool) {
id := info.name + info.key
previousState := p.states.FindPreviousByID(id)
if !previousState.IsEmpty() {
if isStored {
previousState.MarkAsStored()
} else {
previousState.MarkAsError()
}
p.states.Update(previousState, info.listingID)
}
// Manage locks for purging.
if p.states.IsListingFullyStored(info.listingID) {
// locked on processing we unlock when all the object were ACKed
lock, _ := p.workersListingMap.Load(info.listingID)
lock.(*sync.Mutex).Unlock()
}
}
func (p *s3Poller) ProcessObject(s3ObjectPayloadChan <-chan *s3ObjectPayload) error {
var errs []error
for s3ObjectPayload := range s3ObjectPayloadChan {
// Process S3 object (download, parse, create events).
err := s3ObjectPayload.s3ObjectHandler.ProcessS3Object()
// Wait for all events to be ACKed before proceeding.
s3ObjectPayload.s3ObjectHandler.Wait()
info := s3ObjectPayload.s3ObjectInfo
if err != nil {
event := s3ObjectPayload.s3ObjectEvent
errs = append(errs, errors.Wrapf(err,
"failed processing S3 event for object key %q in bucket %q",
event.S3.Object.Key, event.S3.Bucket.Name))
p.handlePurgingLock(info, false)
continue
}
p.handlePurgingLock(info, true)
// Metrics
p.metrics.s3ObjectsAckedTotal.Inc()
}
return multierr.Combine(errs...)
}
func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- *s3ObjectPayload) {
defer close(s3ObjectPayloadChan)
bucketName := getBucketNameFromARN(p.bucket)
paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix)
for paginator.Next(ctx) {
listingID, err := uuid.NewV4()
if err != nil {
p.log.Warnw("Error generating UUID for listing page.", "error", err)
continue
}
// lock for the listing page and state in workersListingMap
// this map is shared with the storedOp and will be unlocked there
lock := new(sync.Mutex)
lock.Lock()
p.workersListingMap.Store(listingID.String(), lock)
page := paginator.CurrentPage()
totProcessableObjects := 0
totListedObjects := len(page.Contents)
s3ObjectPayloadChanByPage := make(chan *s3ObjectPayload, totListedObjects)
// Metrics
p.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects))
for _, object := range page.Contents {
// Unescape s3 key name. For example, convert "%3D" back to "=".
filename, err := url.QueryUnescape(*object.Key)
if err != nil {
p.log.Errorw("Error when unescaping object key, skipping.", "error", err, "s3_object", *object.Key)
continue
}
state := newState(bucketName, filename, *object.ETag, *object.LastModified)
if p.states.MustSkip(state, p.store) {
p.log.Debugw("skipping state.", "state", state)
continue
}
p.states.Update(state, "")
event := s3EventV2{}
event.AWSRegion = p.region
event.Provider = p.provider
event.S3.Bucket.Name = bucketName
event.S3.Bucket.ARN = p.bucket
event.S3.Object.Key = filename
acker := awscommon.NewEventACKTracker(ctx)
s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event)
if s3Processor == nil {
continue
}
totProcessableObjects++
s3ObjectPayloadChanByPage <- &s3ObjectPayload{
s3ObjectHandler: s3Processor,
s3ObjectInfo: s3ObjectInfo{
name: bucketName,
key: filename,
etag: *object.ETag,
lastModified: *object.LastModified,
listingID: listingID.String(),
},
s3ObjectEvent: event,
}
}
if totProcessableObjects == 0 {
// nothing to be ACKed, unlock here
p.states.DeleteListing(listingID.String())
lock.Unlock()
} else {
listingInfo := &listingInfo{totObjects: totProcessableObjects}
p.states.AddListing(listingID.String(), listingInfo)
// Metrics
p.metrics.s3ObjectsProcessedTotal.Add(uint64(totProcessableObjects))
}
close(s3ObjectPayloadChanByPage)
for s3ObjectPayload := range s3ObjectPayloadChanByPage {
s3ObjectPayloadChan <- s3ObjectPayload
}
}
if err := paginator.Err(); err != nil {
p.log.Warnw("Error when paginating listing.", "error", err)
}
return
}
func (p *s3Poller) Purge() {
listingIDs := p.states.GetListingIDs()
for _, listingID := range listingIDs {
// we lock here in order to process the purge only after
// full listing page is ACKed by all the workers
lock, loaded := p.workersListingMap.Load(listingID)
if !loaded {
// purge calls can overlap, GetListingIDs can return
// an outdated snapshot with listing already purged
p.states.DeleteListing(listingID)
continue
}
lock.(*sync.Mutex).Lock()
keys := map[string]struct{}{}
latestStoredTimeByBucket := make(map[string]time.Time, 0)
for _, state := range p.states.GetStatesByListingID(listingID) {
// it is not stored, keep
if !state.Stored {
continue
}
var latestStoredTime time.Time
keys[state.ID] = struct{}{}
latestStoredTime, ok := latestStoredTimeByBucket[state.Bucket]
if !ok {
var commitWriteState commitWriteState
err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket, &commitWriteState)
if err == nil {
// we have no entry in the map and we have no entry in the store
// set zero time
latestStoredTime = time.Time{}
} else {
latestStoredTime = commitWriteState.Time
}
}
if state.LastModified.After(latestStoredTime) {
latestStoredTimeByBucket[state.Bucket] = state.LastModified
}
}
for key := range keys {
p.states.Delete(key)
}
if err := p.states.writeStates(p.store); err != nil {
p.log.Errorw("Failed to write states to the registry", "error", err)
}
for bucket, latestStoredTime := range latestStoredTimeByBucket {
if err := p.store.Set(awsS3WriteCommitPrefix+bucket, commitWriteState{latestStoredTime}); err != nil {
p.log.Errorw("Failed to write commit time to the registry", "error", err)
}
}
// purge is done, we can unlock and clean
lock.(*sync.Mutex).Unlock()
p.workersListingMap.Delete(listingID)
p.states.DeleteListing(listingID)
}
return
}
func (p *s3Poller) Poll(ctx context.Context) error {
// This loop tries to keep the workers busy as much as possible while
// honoring the number in config opposed to a simpler loop that does one
// listing, sequentially processes every object and then does another listing
workerWg := new(sync.WaitGroup)
for ctx.Err() == nil {
// Determine how many S3 workers are available.
workers, err := p.workerSem.AcquireContext(p.numberOfWorkers, ctx)
if err != nil {
break
}
if workers == 0 {
continue
}
s3ObjectPayloadChan := make(chan *s3ObjectPayload)
workerWg.Add(1)
go func() {
defer func() {
workerWg.Done()
}()
p.GetS3Objects(ctx, s3ObjectPayloadChan)
p.Purge()
}()
workerWg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer func() {
workerWg.Done()
p.workerSem.Release(1)
}()
if err := p.ProcessObject(s3ObjectPayloadChan); err != nil {
p.log.Warnw("Failed processing S3 listing.", "error", err)
}
}()
}
timed.Wait(ctx, p.bucketPollInterval)
}
// Wait for all workers to finish.
workerWg.Wait()
if errors.Is(ctx.Err(), context.Canceled) {
// A canceled context is a normal shutdown.
return nil
}
return ctx.Err()
}