Skip to content

Commit 78864f3

Browse files
committed
Pass lmdb.Env to Syncer, fix flaky sync tests
Explicitly pass the lmdb.Env to the syncer, so that we have more control over its lifetime and avoid opening and closing it multiple times, which can cause crashes in the LMDB C code. Rewrite the sync tests to wait for expected values, instead of relying on arbitrary sleeps that make the test flaky and slow. Do not close the lmdb.Env in the sync tests, because for some reason it causes random SEGFAULTs on Linux (not on macOS).
1 parent 85230b3 commit 78864f3

File tree

10 files changed

+147
-103
lines changed

10 files changed

+147
-103
lines changed

cmd/lightningstream/commands/sync.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,23 +63,33 @@ func runSync(receiveOnly bool) error {
6363

6464
eg, ctx := errgroup.WithContext(ctx)
6565
for name, lc := range conf.LMDBs {
66+
l := logrus.WithField("db", name)
67+
env, err := syncer.OpenEnv(l, lc)
68+
if err != nil {
69+
return err
70+
}
71+
6672
opt := syncer.Options{
6773
ReceiveOnly: receiveOnly,
6874
}
69-
s, err := syncer.New(name, st, conf, lc, opt)
75+
s, err := syncer.New(name, env, st, conf, lc, opt)
7076
if err != nil {
7177
return err
7278
}
7379

74-
name := name
7580
eg.Go(func() error {
81+
defer func() {
82+
if err := env.Close(); err != nil {
83+
l.WithError(err).Error("Env close failed")
84+
}
85+
}()
7686
err := s.Sync(ctx)
7787
if err != nil {
7888
if err == context.Canceled {
79-
logrus.WithField("db", name).Error("Sync cancelled")
89+
l.Error("Sync cancelled")
8090
return err
8191
}
82-
logrus.WithError(err).WithField("db", name).Error("Sync failed")
92+
l.WithError(err).Error("Sync failed")
8393
}
8494
return err
8595
})

syncer/env.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package syncer
2+
3+
import (
4+
"github.com/PowerDNS/lmdb-go/lmdb"
5+
"github.com/c2h5oh/datasize"
6+
"github.com/sirupsen/logrus"
7+
"powerdns.com/platform/lightningstream/config"
8+
"powerdns.com/platform/lightningstream/lmdbenv"
9+
)
10+
11+
// OpenEnv opens the LMDB env with the right options
12+
func OpenEnv(l logrus.FieldLogger, lc config.LMDB) (env *lmdb.Env, err error) {
13+
l.WithField("lmdbpath", lc.Path).Info("Opening LMDB")
14+
env, err = lmdbenv.NewWithOptions(lc.Path, lc.Options)
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
// Print some env info
20+
info, err := env.Info()
21+
if err != nil {
22+
return nil, err
23+
}
24+
l.WithFields(logrus.Fields{
25+
"MapSize": datasize.ByteSize(info.MapSize).HumanReadable(),
26+
"LastTxnID": info.LastTxnID,
27+
}).Info("Env info")
28+
29+
// TODO: Perhaps check data if SchemaTracksChanges is set. Check if
30+
// the timestamp is in a reasonable range or 0.
31+
32+
return env, nil
33+
}

syncer/send_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,28 @@ func doBenchmarkSyncerSendOnce(b *testing.B, native, dupsort bool) {
3737
extraDBIFlags = lmdb.DupSort
3838
}
3939

40-
syncer, err := New("test", memory.New(), config.Config{}, config.LMDB{
41-
SchemaTracksChanges: native,
42-
DupSortHack: dupsort,
43-
}, Options{})
44-
require.NoError(t, err)
45-
4640
l, hook := test.NewNullLogger()
4741
_ = hook
48-
syncer.l = l
42+
lc := config.LMDB{
43+
SchemaTracksChanges: native,
44+
DupSortHack: dupsort,
45+
}
4946

5047
// Fixed value
5148
// We add a header, but we can also benchmark this as all app value
5249
val := make([]byte, header.MinHeaderSize, 50)
5350
header.PutBasic(val, header.TimestampFromTime(now), 42, header.NoFlags)
5451
val = append(val, "TESTING-123456789"...)
5552

56-
err = lmdbenv.TestEnv(func(env *lmdb.Env) error {
53+
err := lmdbenv.TestEnv(func(env *lmdb.Env) error {
5754
info, err := env.Info()
5855
require.NoError(t, err)
5956
t.Logf("env info: %+v", info)
6057

58+
syncer, err := New("test", env, memory.New(), config.Config{}, lc, Options{})
59+
require.NoError(t, err)
60+
syncer.l = l
61+
6162
// Fill some data to dump
6263
err = env.Update(func(txn *lmdb.Txn) error {
6364
// First insert the initial data into the main database

syncer/shadow_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func TestSyncer_shadow(t *testing.T) {
3939
ts2 := testTS(2)
4040
ts3 := testTS(3)
4141

42-
s, err := New("test", nil, config.Config{}, config.LMDB{}, Options{})
43-
assert.NoError(t, err)
42+
err := lmdbenv.TestEnv(func(env *lmdb.Env) error {
43+
s, err := New("test", env, nil, config.Config{}, config.LMDB{}, Options{})
44+
assert.NoError(t, err)
4445

45-
err = lmdbenv.TestEnv(func(env *lmdb.Env) error {
4646
// Initial data
47-
err := env.Update(func(txn *lmdb.Txn) error {
47+
err = env.Update(func(txn *lmdb.Txn) error {
4848
// First insert the initial data into the main database
4949
dbi, err := txn.OpenDBI("foo", lmdb.Create)
5050
assert.NoError(t, err)

syncer/sync.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,7 @@ const (
2727

2828
// Sync opens the env and starts the two-way sync loop.
2929
func (s *Syncer) Sync(ctx context.Context) error {
30-
// Open the env
31-
env, err := s.openEnv()
32-
if err != nil {
33-
return err
34-
}
35-
defer s.closeEnv(env)
36-
30+
env := s.env
3731
status.AddLMDBEnv(s.name, env)
3832
defer status.RemoveLMDBEnv(s.name)
3933

syncer/sync_test.go

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/PowerDNS/simpleblob"
1414
"github.com/PowerDNS/simpleblob/backends/memory"
1515
"github.com/sirupsen/logrus"
16-
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
1717
"powerdns.com/platform/lightningstream/config"
1818
"powerdns.com/platform/lightningstream/config/logger"
1919
"powerdns.com/platform/lightningstream/lmdbenv"
@@ -23,7 +23,7 @@ import (
2323

2424
const testLMDBName = "default"
2525
const testDBIName = "test"
26-
const tick = 100 * time.Millisecond
26+
const tick = 10 * time.Millisecond
2727

2828
func TestSyncer_Sync_startup(t *testing.T) {
2929
t.Run("with-timestamped-schema", func(t *testing.T) {
@@ -44,6 +44,13 @@ func doTest(t *testing.T, withHeader bool) {
4444
syncerA, envA := createInstance(t, "a", st, withHeader)
4545
syncerB, envB := createInstance(t, "b", st, withHeader)
4646

47+
// For some reason trying to close the envs in this test segfaults on Linux (not on macOS).
48+
// It appears like this is caused by the syncer still running after cancellation
49+
// (and thus after the env is closed), but I did not get to the bottom of this yet.
50+
// [signal SIGSEGV: segmentation violation code=0x1 addr=0x7fd015132090 pc=0x8dc942]
51+
//defer envA.Close()
52+
//defer envB.Close()
53+
4754
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
4855
defer cancel()
4956
ctxA, cancelA := context.WithCancel(ctx)
@@ -55,46 +62,36 @@ func doTest(t *testing.T, withHeader bool) {
5562
t.Log("Starting syncer A")
5663
go runSync(ctxA, syncerA)
5764

58-
t.Log("----------")
59-
time.Sleep(2 * tick)
6065
t.Log("----------")
6166

6267
// Expecting one snapshot on startup, because not empty
63-
logSnapshotList(t, st)
64-
entries := listInstanceSnapshots(st, "a")
65-
assert.Len(t, entries, 1, "A")
68+
requireSnapshotsLenWait(t, st, 1, "A")
6669

6770
// Start syncer B with an empty LMDB
71+
// Starting with an empty LMDB is a special case that will not trigger any
72+
// local snapshot.
6873
t.Log("Starting syncer B")
6974
go runSync(ctxB, syncerB)
7075

71-
t.Log("----------")
72-
time.Sleep(2 * tick)
7376
t.Log("----------")
7477

75-
// No snapshot, because empty
76-
logSnapshotList(t, st)
77-
entries = listInstanceSnapshots(st, "b")
78-
assert.Len(t, entries, 0, "B")
78+
// Wait until the data from A was synced to B
79+
assertKeyWait(t, envB, "foo", "v1", withHeader)
7980

80-
assertKey(t, envB, "foo", "v1", withHeader)
81+
// No snapshot made by B, because we started empty
82+
requireSnapshotsLenWait(t, st, 0, "B")
8183

8284
// Now set something in B
8385
setKey(t, envB, "foo", "v2", withHeader)
8486

85-
t.Log("----------")
86-
time.Sleep(3 * tick)
8787
t.Log("----------")
8888

8989
// New snapshot in B, no new one in A
90-
logSnapshotList(t, st)
91-
entries = listInstanceSnapshots(st, "b")
92-
assert.Len(t, entries, 1, "B")
93-
entries = listInstanceSnapshots(st, "a")
94-
assert.Len(t, entries, 1, "A")
90+
requireSnapshotsLenWait(t, st, 1, "B")
91+
requireSnapshotsLenWait(t, st, 1, "A")
9592

9693
// New value should be present in A
97-
assertKey(t, envB, "foo", "v2", withHeader)
94+
assertKeyWait(t, envB, "foo", "v2", withHeader)
9895

9996
// Restart syncer for A
10097
t.Log("Restarting syncer A")
@@ -103,16 +100,13 @@ func doTest(t *testing.T, withHeader bool) {
103100
t.Log("----------")
104101
go runSync(ctxA, syncerA)
105102

106-
t.Log("----------")
107-
time.Sleep(3 * tick)
108103
t.Log("----------")
109104

110105
// Check is the contents of A are still correct after restart
111-
assertKey(t, envA, "foo", "v2", withHeader)
112-
entries = listInstanceSnapshots(st, "a")
106+
assertKeyWait(t, envA, "foo", "v2", withHeader)
113107
// A new snapshot should always be created on startup, in case the LMDB
114108
// was modified while it was down.
115-
assert.Len(t, entries, 2, "A")
109+
requireSnapshotsLenWait(t, st, 2, "A")
116110

117111
// Stopping syncer for A
118112
t.Log("Stopping syncer A")
@@ -127,15 +121,12 @@ func doTest(t *testing.T, withHeader bool) {
127121
ctxA, cancelA = context.WithCancel(ctx)
128122
go runSync(ctxA, syncerA)
129123
t.Log("----------")
130-
time.Sleep(6 * tick)
131-
t.Log("----------")
132124

125+
// New value in A should get synced to B
126+
assertKeyWait(t, envB, "new", "hello", withHeader)
133127
// Check if the contents of A are still correct after restart
134-
assertKey(t, envA, "new", "hello", withHeader)
135-
// It should also be synced to B
136-
assertKey(t, envB, "new", "hello", withHeader)
137-
entries = listInstanceSnapshots(st, "a")
138-
assert.Len(t, entries, 3, "A")
128+
assertKeyWait(t, envA, "new", "hello", withHeader)
129+
requireSnapshotsLenWait(t, st, 3, "A")
139130

140131
cancelA()
141132
cancelB()
@@ -144,16 +135,16 @@ func doTest(t *testing.T, withHeader bool) {
144135

145136
func createInstance(t *testing.T, name string, st simpleblob.Interface, timestamped bool) (*Syncer, *lmdb.Env) {
146137
env, tmp, err := createLMDB(t)
147-
assert.NoError(t, err)
138+
require.NoError(t, err)
148139

149140
c := createConfig(name, tmp, timestamped)
150-
syncer, err := New("default", st, c, c.LMDBs[testLMDBName], Options{})
151-
assert.NoError(t, err)
141+
syncer, err := New("default", env, st, c, c.LMDBs[testLMDBName], Options{})
142+
require.NoError(t, err)
152143

153144
return syncer, env
154145
}
155146

156-
func logSnapshotList(t *testing.T, st simpleblob.Interface) {
147+
func LogSnapshotList(t *testing.T, st simpleblob.Interface) {
157148
ctx := context.Background()
158149
entries, _ := st.List(ctx, "")
159150
var lines []string
@@ -178,17 +169,57 @@ func listInstanceSnapshots(st simpleblob.Interface, instance string) simpleblob.
178169
return entries
179170
}
180171

172+
func requireSnapshotsLenWait(t *testing.T, st simpleblob.Interface, expLen int, instance string) {
173+
var list simpleblob.BlobList
174+
// Retry until it succeeds
175+
var i int
176+
const maxIter = 150
177+
const sleepTime = 10 * time.Millisecond
178+
defer func() {
179+
t.Logf("Waited %d/%d iterations for the expected snapshot length", i, maxIter)
180+
}()
181+
for i = 0; i < maxIter; i++ {
182+
list = listInstanceSnapshots(st, strings.ToLower(instance))
183+
l := len(list)
184+
if l == expLen {
185+
return
186+
}
187+
time.Sleep(sleepTime)
188+
}
189+
// This one is actually expected to fail, call it for the formatting
190+
t.Logf("Gave up on waiting for the expected snapshot length")
191+
require.Len(t, list, expLen, instance)
192+
}
193+
181194
func runSync(ctx context.Context, syncer *Syncer) {
182195
err := syncer.Sync(ctx)
183196
if err != nil && err != context.Canceled {
184197
logrus.WithError(err).WithField("syncer", syncer.name).Error("Syncer Sync error")
185198
}
186199
}
187200

188-
func assertKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) {
189-
kv, err := dumpData(env, withHeader)
190-
assert.NoError(t, err)
191-
assert.Equal(t, val, kv[key])
201+
func assertKeyWait(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) {
202+
var kv map[string]string
203+
var err error
204+
var i int
205+
const maxIter = 150
206+
const sleepTime = 10 * time.Millisecond
207+
defer func() {
208+
t.Logf("Waited %d/%d iterations for the expected key", i, maxIter)
209+
}()
210+
for i = 0; i < maxIter; i++ {
211+
kv, err = dumpData(env, withHeader)
212+
if err != nil && !lmdb.IsNotFound(err) {
213+
require.NoError(t, err)
214+
}
215+
if kv[key] == val {
216+
return
217+
}
218+
time.Sleep(sleepTime)
219+
}
220+
// Expected to fail now, called for formatting
221+
t.Logf("Gave up on waiting for the expected key")
222+
require.Equal(t, val, kv[key])
192223
}
193224

194225
func setKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) {
@@ -210,13 +241,13 @@ func setKey(t *testing.T, env *lmdb.Env, key, val string, withHeader bool) {
210241
err = txn.Put(dbi, []byte(key), valb, 0)
211242
return err
212243
})
213-
assert.NoError(t, err)
244+
require.NoError(t, err)
214245
}
215246

216247
func dumpData(env *lmdb.Env, withHeader bool) (map[string]string, error) {
217248
data := make(map[string]string)
218249
err := env.View(func(txn *lmdb.Txn) error {
219-
dbi, err := txn.OpenDBI(testDBIName, lmdb.Create)
250+
dbi, err := txn.OpenDBI(testDBIName, 0)
220251
if err != nil {
221252
return err
222253
}

0 commit comments

Comments
 (0)