-
Notifications
You must be signed in to change notification settings - Fork 3
/
producer_test.go
79 lines (65 loc) · 2.09 KB
/
producer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package feedx_test
import (
"context"
"sync/atomic"
"time"
"github.com/bsm/bfs"
"github.com/bsm/feedx"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
)
var _ = Describe("Producer", func() {
var subject *feedx.Producer
var obj *bfs.Object
var numRuns uint32
var ctx = context.Background()
setup := func(o *feedx.ProducerOptions) {
var err error
subject, err = feedx.NewProducerForRemote(ctx, obj, o, func(w *feedx.Writer) error {
atomic.AddUint32(&numRuns, 1)
for i := 0; i < 10; i++ {
if err := w.Encode(seed()); err != nil {
return err
}
}
return nil
})
Expect(err).NotTo(HaveOccurred())
}
BeforeEach(func() {
atomic.StoreUint32(&numRuns, 0)
obj = bfs.NewInMemObject("path/to/file.jsonz")
})
AfterEach(func() {
if subject != nil {
Expect(subject.Close()).To(Succeed())
}
})
It("produces", func() {
setup(nil)
Expect(subject.LastPush()).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.LastModified()).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.NumWritten()).To(Equal(10))
Expect(subject.Close()).To(Succeed())
info, err := obj.Head(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(info.Size).To(BeNumerically("~", 75, 10))
})
It("produces with custom last-mod check", func() {
setup(&feedx.ProducerOptions{
Interval: 50 * time.Millisecond,
LastModCheck: func(_ context.Context) (time.Time, error) { return time.Unix(1515151515, 987654321), nil },
})
firstPush := subject.LastPush()
Expect(firstPush).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.LastModified()).To(Equal(time.Unix(1515151515, 987000000)))
Expect(subject.NumWritten()).To(Equal(10))
Expect(atomic.LoadUint32(&numRuns)).To(Equal(uint32(1)))
info, err := obj.Head(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(info.Size).To(BeNumerically("~", 75, 10))
Expect(info.Metadata).To(HaveKeyWithValue("X-Feedx-Last-Modified", "1515151515987"))
Eventually(func() bool { return subject.LastPush().After(firstPush) }).Should(BeTrue())
Expect(atomic.LoadUint32(&numRuns)).To(Equal(uint32(1)))
})
})