From 0ddf0fb8ff343adc9481af1ebd1c78cfd267e820 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 23 Apr 2023 10:17:27 +0800 Subject: [PATCH] test: refactor worker/operation and support Delete operation Signed-off-by: Benjamin Wang --- concurrent_test.go | 407 +++++++++++++++++++++------------------------ 1 file changed, 188 insertions(+), 219 deletions(-) diff --git a/concurrent_test.go b/concurrent_test.go index 3615d4fdc..edb0b0c8c 100644 --- a/concurrent_test.go +++ b/concurrent_test.go @@ -34,10 +34,16 @@ type bytesRange struct { max int } +type operationChance struct { + operation OperationType + chance int +} + type concurrentConfig struct { - readTime duration - writeTime duration - writeBytes bytesRange + workInterval duration + operationRatio []operationChance + readInterval duration // only used by readOpeartion + writeBytes bytesRange // only used by writeOperation } /* @@ -54,94 +60,60 @@ func TestConcurrentReadAndWrite(t *testing.T) { } bucket := []byte("data") keys := []string{"key0", "key1", "key2", "key3", "key4", "key5", "key6", "key7", "key8", "key9"} + conf := concurrentConfig{ + workInterval: duration{ + min: 5 * time.Millisecond, + max: 100 * time.Millisecond, + }, + operationRatio: []operationChance{ + {operation: Read, chance: 60}, + {operation: Write, chance: 20}, + {operation: Delete, chance: 20}, + }, + readInterval: duration{ + min: 50 * time.Millisecond, + max: 100 * time.Millisecond, + }, + writeBytes: bytesRange{ + min: 200, + max: 16000, + }, + } testCases := []struct { name string - readerCount int - writerCount int + workerCount int conf concurrentConfig testDuration time.Duration }{ { - name: "1 reader", - readerCount: 1, - writerCount: 1, - conf: concurrentConfig{ - readTime: duration{ - min: 50 * time.Millisecond, - max: 100 * time.Millisecond, - }, - writeTime: duration{ - min: 10 * time.Millisecond, - max: 20 * time.Millisecond, - }, - writeBytes: bytesRange{ - min: 200, - max: 8000, - }, - }, + name: "1 worker", + workerCount: 1, + conf: conf, testDuration: 30 * time.Second, }, { - name: "10 readers", - readerCount: 10, - writerCount: 2, - conf: concurrentConfig{ - readTime: duration{ - min: 50 * time.Millisecond, - max: 100 * time.Millisecond, - }, - writeTime: duration{ - min: 10 * time.Millisecond, - max: 20 * time.Millisecond, - }, - writeBytes: bytesRange{ - min: 200, - max: 8000, - }, - }, + name: "10 workers", + workerCount: 10, + conf: conf, testDuration: 30 * time.Second, }, { - name: "50 readers", - readerCount: 50, - writerCount: 10, - conf: concurrentConfig{ - readTime: duration{ - min: 50 * time.Millisecond, - max: 100 * time.Millisecond, - }, - writeTime: duration{ - min: 10 * time.Millisecond, - max: 20 * time.Millisecond, - }, - writeBytes: bytesRange{ - min: 500, - max: 8000, - }, - }, - + name: "50 workers", + workerCount: 50, + conf: conf, testDuration: 30 * time.Second, }, { - name: "100 readers", - readerCount: 100, - writerCount: 20, - conf: concurrentConfig{ - readTime: duration{ - min: 50 * time.Millisecond, - max: 100 * time.Millisecond, - }, - writeTime: duration{ - min: 10 * time.Millisecond, - max: 20 * time.Millisecond, - }, - writeBytes: bytesRange{ - min: 500, - max: 8000, - }, - }, - + name: "100 workers", + workerCount: 100, + conf: conf, + testDuration: 30 * time.Second, + }, + { + name: "200 workers", + workerCount: 200, + conf: conf, testDuration: 30 * time.Second, }, } @@ -152,8 +124,7 @@ func TestConcurrentReadAndWrite(t *testing.T) { concurrentReadAndWrite(t, bucket, keys, - tc.readerCount, - tc.writerCount, + tc.workerCount, tc.conf, tc.testDuration) }) @@ -163,8 +134,7 @@ func TestConcurrentReadAndWrite(t *testing.T) { func concurrentReadAndWrite(t *testing.T, bucket []byte, keys []string, - readerCount int, - writerCount int, + workerCount int, conf concurrentConfig, testDuration time.Duration) { @@ -179,8 +149,7 @@ func concurrentReadAndWrite(t *testing.T, t.Log("Starting workers.") records := runWorkers(t, db, bucket, keys, - readerCount, - writerCount, + workerCount, conf, testDuration) @@ -198,80 +167,55 @@ func concurrentReadAndWrite(t *testing.T, /* ********************************************************* -Data structures and functions/methods for running -concurrent workers, including reading and writing workers +Data structures and functions/methods for running concurrent +workers, which execute different operations, including `Read`, +`Write` and `Delete`. ********************************************************* */ func runWorkers(t *testing.T, db *btesting.DB, bucket []byte, keys []string, - readerCount int, - writerCount int, + workerCount int, conf concurrentConfig, testDuration time.Duration) historyRecords { stopCh := make(chan struct{}, 1) - errCh := make(chan error, readerCount+1) + errCh := make(chan error, workerCount) var mu sync.Mutex var rs historyRecords - runFunc := func(w worker) error { - wrs, err := runWorker(t, w, errCh) - mu.Lock() - rs = append(rs, wrs...) - mu.Unlock() - return err - } - - // start write transactions g := new(errgroup.Group) - for i := 0; i < writerCount; i++ { - writer := &writeWorker{ - id: i, - db: db, - bucket: bucket, - keys: keys, - - writeBytes: conf.writeBytes, - writeTime: conf.writeTime, - - errCh: errCh, - stopCh: stopCh, - t: t, - } - g.Go(func() error { - return runFunc(writer) - }) - } - - // start readonly transactions - for i := 0; i < readerCount; i++ { - reader := &readWorker{ + for i := 0; i < workerCount; i++ { + w := &worker{ id: i, db: db, bucket: bucket, keys: keys, - readTime: conf.readTime, + conf: conf, errCh: errCh, stopCh: stopCh, t: t, } g.Go(func() error { - return runFunc(reader) + wrs, err := runWorker(t, w, errCh) + mu.Lock() + rs = append(rs, wrs...) + mu.Unlock() + return err }) } - t.Logf("Keep reading and writing transactions running for about %s.", testDuration) + t.Logf("Keep all workers running for about %s.", testDuration) select { case <-time.After(testDuration): case <-errCh: } close(stopCh) - t.Log("Waiting for all transactions to finish.") + t.Log("Waiting for all workers to finish.") if err := g.Wait(); err != nil { t.Errorf("Received error: %v", err) } @@ -279,7 +223,7 @@ func runWorkers(t *testing.T, return rs } -func runWorker(t *testing.T, w worker, errCh chan error) (historyRecords, error) { +func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) { rs, err := w.run() if len(rs) > 0 && err == nil { if terr := validateIncrementalTxid(rs); terr != nil { @@ -292,19 +236,14 @@ func runWorker(t *testing.T, w worker, errCh chan error) (historyRecords, error) return rs, err } -type worker interface { - name() string - run() (historyRecords, error) -} - -type readWorker struct { +type worker struct { id int db *btesting.DB bucket []byte keys []string - readTime duration + conf concurrentConfig errCh chan error stopCh chan struct{} @@ -312,118 +251,144 @@ type readWorker struct { t *testing.T } -func (r *readWorker) name() string { - return fmt.Sprintf("readWorker-%d", r.id) +func (w *worker) name() string { + return fmt.Sprintf("worker-%d", w.id) } -func (r *readWorker) run() (historyRecords, error) { +func (w *worker) run() (historyRecords, error) { var rs historyRecords for { select { - case <-r.stopCh: - r.t.Log("Reading transaction finished.") + case <-w.stopCh: + w.t.Logf("%q finished.", w.name()) return rs, nil default: } - err := r.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(r.bucket) + op := w.pickOperation() + rec, err := runOperation(op, w.db, w.bucket, w.keys, w.conf) + if err != nil { + readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err) + w.t.Error(readErr) + w.errCh <- readErr + return rs, readErr + } - selectedKey := r.keys[mrand.Intn(len(r.keys))] - initialVal := b.Get([]byte(selectedKey)) - time.Sleep(randomDurationInRange(r.readTime.min, r.readTime.max)) - val := b.Get([]byte(selectedKey)) + rs = append(rs, rec) + time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max)) + } +} - if !reflect.DeepEqual(initialVal, val) { - return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", - selectedKey, formatBytes(initialVal), formatBytes(val)) - } +func (w *worker) pickOperation() OperationType { + sum := 0 + for _, op := range w.conf.operationRatio { + sum += op.chance + } + roll := mrand.Int() % sum + for _, op := range w.conf.operationRatio { + if roll < op.chance { + return op.operation + } + roll -= op.chance + } + panic("unexpected") +} - clonedVal := make([]byte, len(val)) - copy(clonedVal, val) +func runOperation(op OperationType, db *btesting.DB, bucket []byte, keys []string, conf concurrentConfig) (historyRecord, error) { + switch op { + case Read: + return executeRead(db, bucket, keys, conf.readInterval) + case Write: + return executeWrite(db, bucket, keys, conf.writeBytes) + case Delete: + return executeDelete(db, bucket, keys) + default: + panic(fmt.Sprintf("unexpected operation type: %s", op)) + } +} - rs = append(rs, historyRecord{ - OperationType: Read, - Key: selectedKey, - Value: clonedVal, - Txid: tx.ID(), - }) +func executeRead(db *btesting.DB, bucket []byte, keys []string, readInterval duration) (historyRecord, error) { + var rec historyRecord + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) - return nil - }) + selectedKey := keys[mrand.Intn(len(keys))] + initialVal := b.Get([]byte(selectedKey)) + time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) + val := b.Get([]byte(selectedKey)) - if err != nil { - readErr := fmt.Errorf("[reader error]: %w", err) - r.t.Error(readErr) - r.errCh <- readErr - return rs, readErr + if !reflect.DeepEqual(initialVal, val) { + return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", + selectedKey, formatBytes(initialVal), formatBytes(val)) } - } -} - -type writeWorker struct { - id int - db *btesting.DB - bucket []byte - keys []string + clonedVal := make([]byte, len(val)) + copy(clonedVal, val) - writeBytes bytesRange - writeTime duration + rec = historyRecord{ + OperationType: Read, + Key: selectedKey, + Value: clonedVal, + Txid: tx.ID(), + } - errCh chan error - stopCh chan struct{} + return nil + }) - t *testing.T + return rec, err } -func (w *writeWorker) name() string { - return fmt.Sprintf("writeWorker-%d", w.id) -} +func executeWrite(db *btesting.DB, bucket []byte, keys []string, writeBytes bytesRange) (historyRecord, error) { + var rec historyRecord -func (w *writeWorker) run() (historyRecords, error) { - var rs historyRecords - for { - select { - case <-w.stopCh: - w.t.Log("Writing transaction finished.") - return rs, nil - default: - } + err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) - err := w.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(w.bucket) + selectedKey := keys[mrand.Intn(len(keys))] - selectedKey := w.keys[mrand.Intn(len(w.keys))] + valueBytes := randomIntInRange(writeBytes.min, writeBytes.max) + v := make([]byte, valueBytes) + if _, cErr := crand.Read(v); cErr != nil { + return cErr + } - valueBytes := randomIntInRange(w.writeBytes.min, w.writeBytes.max) - v := make([]byte, valueBytes) - if _, cErr := crand.Read(v); cErr != nil { - return cErr + putErr := b.Put([]byte(selectedKey), v) + if putErr == nil { + rec = historyRecord{ + OperationType: Write, + Key: selectedKey, + Value: v, + Txid: tx.ID(), } + } - putErr := b.Put([]byte(selectedKey), v) - if putErr == nil { - rs = append(rs, historyRecord{ - OperationType: Write, - Key: selectedKey, - Value: v, - Txid: tx.ID(), - }) - } + return putErr + }) - return putErr - }) + return rec, err +} - if err != nil { - writeErr := fmt.Errorf("[writer error]: %w", err) - w.t.Error(writeErr) - w.errCh <- writeErr - return rs, writeErr +func executeDelete(db *btesting.DB, bucket []byte, keys []string) (historyRecord, error) { + var rec historyRecord + + err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + selectedKey := keys[mrand.Intn(len(keys))] + + deleteErr := b.Delete([]byte(selectedKey)) + if deleteErr == nil { + rec = historyRecord{ + OperationType: Delete, + Key: selectedKey, + Txid: tx.ID(), + } } - time.Sleep(randomDurationInRange(w.writeTime.min, w.writeTime.max)) - } + return deleteErr + }) + + return rec, err } func randomDurationInRange(min, max time.Duration) time.Duration { @@ -512,8 +477,9 @@ Data structures and functions for analyzing history records type OperationType string const ( - Read OperationType = "read" - Write OperationType = "write" + Read OperationType = "read" + Write OperationType = "write" + Delete OperationType = "delete" ) type historyRecord struct { @@ -541,12 +507,13 @@ func (rs historyRecords) Less(i, j int) bool { return rs[i].Txid < rs[j].Txid } - // Sorted by workerType: put writer before reader if they have the same txid. - if rs[i].OperationType == Write { - return true + // Sorted by operation type: put `Read` after other operation types + // if they operate on the same key and have the same txid. + if rs[i].OperationType == Read { + return false } - return false + return true } func (rs historyRecords) Swap(i, j int) { @@ -557,7 +524,7 @@ func validateIncrementalTxid(rs historyRecords) error { lastTxid := rs[0].Txid for i := 1; i < len(rs); i++ { - if (rs[i].OperationType == Write && rs[i].Txid <= lastTxid) || (rs[i].OperationType == Read && rs[i].Txid < lastTxid) { + if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) { return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType) } lastTxid = rs[i].Txid @@ -576,9 +543,11 @@ func validateSerializable(rs historyRecords) error { if rec.OperationType == Write { v.Value = rec.Value v.Txid = rec.Txid + } else if rec.OperationType == Delete { + delete(lastWriteKeyValueMap, rec.Key) } else { if !reflect.DeepEqual(v.Value, rec.Value) { - return fmt.Errorf("reader[txid: %d, key: %s] read %x, \nbut writer[txid: %d, key: %s] wrote %x", + return fmt.Errorf("readOperation[txid: %d, key: %s] read %x, \nbut writer[txid: %d, key: %s] wrote %x", rec.Txid, rec.Key, rec.Value, v.Txid, v.Key, v.Value) } @@ -591,9 +560,9 @@ func validateSerializable(rs historyRecords) error { Value: rec.Value, Txid: rec.Txid, } - } else { + } else if rec.OperationType == Read { if len(rec.Value) != 0 { - return fmt.Errorf("expected the first reader[txid: %d, key: %s] read nil, \nbut got %x", + return fmt.Errorf("expected the first readOperation[txid: %d, key: %s] read nil, \nbut got %x", rec.Txid, rec.Key, rec.Value) } }