From faadf203ce06e0a63481225eaadf50409d0faef0 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 2 Jul 2024 11:52:06 +0200 Subject: [PATCH 1/4] Add synchronization primitive for server shutdown --- pkg/kine/broadcaster/broadcaster.go | 63 ++++++++++++------------- pkg/kine/endpoint/endpoint.go | 6 +-- pkg/kine/logstructured/logstructured.go | 42 ++++++++++++----- pkg/kine/logstructured/sqllog/sql.go | 29 ++++++++++-- pkg/kine/server/types.go | 1 + pkg/server/server.go | 9 +++- 6 files changed, 96 insertions(+), 54 deletions(-) diff --git a/pkg/kine/broadcaster/broadcaster.go b/pkg/kine/broadcaster/broadcaster.go index 7b248100..c7ecd014 100644 --- a/pkg/kine/broadcaster/broadcaster.go +++ b/pkg/kine/broadcaster/broadcaster.go @@ -13,43 +13,35 @@ type Broadcaster struct { subs map[chan interface{}]struct{} } -func (b *Broadcaster) Subscribe(ctx context.Context, connect ConnectFunc) (<-chan interface{}, error) { +func (b *Broadcaster) Subscribe(ctx context.Context) (<-chan interface{}, error) { b.Lock() defer b.Unlock() - if !b.running { - if err := b.start(connect); err != nil { - return nil, err - } - } - sub := make(chan interface{}, 100) if b.subs == nil { b.subs = map[chan interface{}]struct{}{} } b.subs[sub] = struct{}{} - go func() { - <-ctx.Done() - b.unsub(sub, true) - }() + context.AfterFunc(ctx, func() { + b.Lock() + defer b.Unlock() + b.unsub(sub) + }) return sub, nil } -func (b *Broadcaster) unsub(sub chan interface{}, lock bool) { - if lock { - b.Lock() - } +func (b *Broadcaster) unsub(sub chan interface{}) { if _, ok := b.subs[sub]; ok { close(sub) delete(b.subs, sub) } - if lock { - b.Unlock() - } } -func (b *Broadcaster) start(connect ConnectFunc) error { +func (b *Broadcaster) Start(connect ConnectFunc) error { + b.Lock() + defer b.Unlock() + c, err := connect() if err != nil { return err @@ -60,24 +52,29 @@ func (b *Broadcaster) start(connect ConnectFunc) error { return nil } -func (b *Broadcaster) stream(input chan interface{}) { - for item := range input { - b.Lock() - for sub := range b.subs { - select { - case sub <- item: - default: - // Slow consumer, drop - go b.unsub(sub, true) - } - } - b.Unlock() +func (b *Broadcaster) stream(ch chan interface{}) { + for item := range ch { + b.publish(item) } b.Lock() + defer b.Unlock() for sub := range b.subs { - b.unsub(sub, false) + b.unsub(sub) } b.running = false - b.Unlock() +} + +func (b *Broadcaster) publish(item interface{}) { + b.Lock() + defer b.Unlock() + + for sub := range b.subs { + select { + case sub <- item: + default: + // Slow consumer, drop + b.unsub(sub) + } + } } diff --git a/pkg/kine/endpoint/endpoint.go b/pkg/kine/endpoint/endpoint.go index 0d90586e..4f024dc0 100644 --- a/pkg/kine/endpoint/endpoint.go +++ b/pkg/kine/endpoint/endpoint.go @@ -76,9 +76,8 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { if err := grpcServer.Serve(listener); err != nil { logrus.Errorf("Kine server shutdown: %v", err) } - <-ctx.Done() - grpcServer.Stop() listener.Close() + grpcServer.Stop() }() return ETCDConfig{ @@ -143,9 +142,8 @@ func ListenAndReturnBackend(ctx context.Context, config Config) (ETCDConfig, ser if err := grpcServer.Serve(listener); err != nil { logrus.Errorf("Kine server shutdown: %v", err) } - <-ctx.Done() - grpcServer.Stop() listener.Close() + grpcServer.Stop() }() return ETCDConfig{ diff --git a/pkg/kine/logstructured/logstructured.go b/pkg/kine/logstructured/logstructured.go index 933e5f79..c3f59a92 100644 --- a/pkg/kine/logstructured/logstructured.go +++ b/pkg/kine/logstructured/logstructured.go @@ -3,6 +3,7 @@ package logstructured import ( "context" "sync" + "sync/atomic" "time" "github.com/canonical/k8s-dqlite/pkg/kine/server" @@ -11,6 +12,7 @@ import ( type Log interface { Start(ctx context.Context) error + Wait() CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) @@ -23,6 +25,7 @@ type Log interface { type LogStructured struct { log Log + wg sync.WaitGroup } func New(log Log) *LogStructured { @@ -40,10 +43,19 @@ func (l *LogStructured) Start(ctx context.Context) error { return err } l.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0) - go l.ttl(ctx) + + l.wg.Add(1) + go func() { + defer l.wg.Done() + l.ttl(ctx) + }() return nil } +func (l *LogStructured) Wait() { + l.wg.Wait() +} + func (l *LogStructured) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) { defer func() { l.adjustRevision(ctx, &revRet) @@ -258,20 +270,16 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { result := make(chan *server.Event) - wg := sync.WaitGroup{} - wg.Add(2) + var shouldClose atomic.Bool + l.wg.Add(2) go func() { - wg.Wait() - close(result) - }() + defer l.wg.Done() - go func() { - defer wg.Done() rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false) for len(events) > 0 { if err != nil { - logrus.Errorf("failed to read old events for ttl") + logrus.Errorf("failed to read old events for ttl: %v", err) return } @@ -283,10 +291,15 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { _, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false) } + + if !shouldClose.CompareAndSwap(false, true) { + close(result) + } }() go func() { - defer wg.Done() + defer l.wg.Done() + for events := range l.log.Watch(ctx, "/") { for _, event := range events { if event.KV.Lease > 0 { @@ -294,13 +307,17 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { } } } + + if !shouldClose.CompareAndSwap(false, true) { + close(result) + } }() return result } func (l *LogStructured) ttl(ctx context.Context) { - // vary naive TTL support + // very naive TTL support mutex := &sync.Mutex{} for event := range l.ttlEvents(ctx) { go func(event *server.Event) { @@ -338,7 +355,10 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 logrus.Debugf("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs)) + l.wg.Add(1) go func() { + defer l.wg.Done() + lastRevision := revision if len(kvs) > 0 { lastRevision = rev diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index a7417622..19844a9e 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "strings" + "sync" "time" "github.com/canonical/k8s-dqlite/pkg/kine/broadcaster" @@ -19,6 +20,7 @@ type SQLLog struct { broadcaster broadcaster.Broadcaster ctx context.Context notify chan int64 + wg sync.WaitGroup } func New(d Dialect) *SQLLog { @@ -51,7 +53,11 @@ type Dialect interface { func (s *SQLLog) Start(ctx context.Context) (err error) { s.ctx = ctx - return + return s.broadcaster.Start(s.startWatch) +} + +func (s *SQLLog) Wait() { + s.wg.Wait() } func (s *SQLLog) compactStart(ctx context.Context) error { @@ -315,15 +321,18 @@ func RowsToEvents(rows *sql.Rows) ([]*server.Event, error) { func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event { res := make(chan []*server.Event, 100) - values, err := s.broadcaster.Subscribe(ctx, s.startWatch) + values, err := s.broadcaster.Subscribe(ctx) if err != nil { return nil } checkPrefix := strings.HasSuffix(prefix, "/") + s.wg.Add(1) go func() { + defer s.wg.Done() defer close(res) + for i := range values { events, ok := filter(i, checkPrefix, prefix) if ok { @@ -361,8 +370,18 @@ func (s *SQLLog) startWatch() (chan interface{}, error) { c := make(chan interface{}) // start compaction and polling at the same time to watch starts // at the oldest revision, but compaction doesn't create gaps - go s.compact() - go s.poll(c, pollStart) + s.wg.Add(2) + + go func() { + defer s.wg.Done() + s.compact() + }() + + go func() { + defer s.wg.Done() + s.poll(c, pollStart) + }() + return c, nil } @@ -474,7 +493,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } func canSkipRevision(rev, skip int64, skipTime time.Time) bool { - return rev == skip && time.Now().Sub(skipTime) > time.Second + return rev == skip && time.Since(skipTime) > time.Second } func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) { diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index d8751ceb..642587a3 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -13,6 +13,7 @@ var ( type Backend interface { Start(ctx context.Context) error + Wait() Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error) diff --git a/pkg/server/server.go b/pkg/server/server.go index 564d3e84..6d3397de 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/canonical/go-dqlite/app" "github.com/canonical/go-dqlite/client" "github.com/canonical/k8s-dqlite/pkg/kine/endpoint" + "github.com/canonical/k8s-dqlite/pkg/kine/server" kine_tls "github.com/canonical/k8s-dqlite/pkg/kine/tls" "github.com/sirupsen/logrus" ) @@ -25,6 +26,8 @@ type Server struct { // app is the dqlite application driving the server. app *app.App + backend server.Backend + // kineConfig is the configuration to use for starting kine against the dqlite application. kineConfig endpoint.Config @@ -336,11 +339,14 @@ func (s *Server) Start(ctx context.Context) error { logrus.WithFields(logrus.Fields{"id": s.app.ID(), "address": s.app.Address()}).Print("Started dqlite") logrus.WithField("config", s.kineConfig).Debug("Starting kine") - if _, err := endpoint.Listen(ctx, s.kineConfig); err != nil { + _, backend, err := endpoint.ListenAndReturnBackend(ctx, s.kineConfig) + if err != nil { return fmt.Errorf("failed to start kine: %w", err) } logrus.WithFields(logrus.Fields{"address": s.kineConfig.Listener, "database": s.kineConfig.Endpoint}).Print("Started kine") + s.backend = backend + go s.watchAvailableStorageSize(ctx) return nil @@ -357,6 +363,7 @@ func (s *Server) Shutdown(ctx context.Context) error { return fmt.Errorf("failed to close dqlite app: %w", err) } close(s.mustStopCh) + s.backend.Wait() return nil } From 44ecb87752d9b4ee6f15b735a5b4d21914d6976a Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 2 Jul 2024 15:15:24 +0200 Subject: [PATCH 2/4] Wait for backend stop after tests --- test/util_test.go | 9 ++++++--- test/util_test_dqlite.go | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/test/util_test.go b/test/util_test.go index 359116e9..b094b578 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -45,11 +45,14 @@ func newKine(ctx context.Context, tb testing.TB, qs ...string) (*clientv3.Client } config, backend, err := endpoint.ListenAndReturnBackend(ctx, endpointConfig) if err != nil { - panic(err) + tb.Fatal(err) } + tb.Cleanup(func() { + backend.Wait() + }) tlsConfig, err := config.TLSConfig.ClientConfig() if err != nil { - panic(err) + tb.Fatal(err) } client, err := clientv3.New(clientv3.Config{ Endpoints: []string{endpointConfig.Listener}, @@ -57,7 +60,7 @@ func newKine(ctx context.Context, tb testing.TB, qs ...string) (*clientv3.Client TLS: tlsConfig, }) if err != nil { - panic(err) + tb.Fatal(err) } return client, backend } diff --git a/test/util_test_dqlite.go b/test/util_test_dqlite.go index 5b915e23..0828289f 100644 --- a/test/util_test_dqlite.go +++ b/test/util_test_dqlite.go @@ -21,10 +21,10 @@ func makeEndpointConfig(ctx context.Context, tb testing.TB) endpoint.Config { app, err := app.New(dir, app.WithAddress(fmt.Sprintf("127.0.0.1:%d", 59090+nextIdx))) if err != nil { - panic(fmt.Errorf("failed to create dqlite app: %w", err)) + tb.Fatal(fmt.Errorf("failed to create dqlite app: %w", err)) } if err := app.Ready(ctx); err != nil { - panic(fmt.Errorf("failed to initialize dqlite: %w", err)) + tb.Fatal(fmt.Errorf("failed to initialize dqlite: %w", err)) } tb.Cleanup(func() { app.Close() From ee9d88c50390f87317f2c20c2b5a102b7ed470ff Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 2 Jul 2024 16:01:48 +0200 Subject: [PATCH 3/4] Cancelling context in tests --- pkg/kine/drivers/generic_test.go | 11 ++- test/admission_test.go | 139 +++++++++++++++---------------- test/create_test.go | 48 +++++------ test/delete_test.go | 11 ++- test/get_test.go | 133 ++++++++--------------------- test/lease_test.go | 19 +++-- test/list_test.go | 38 +++------ test/update_test.go | 72 ++++------------ test/watch_test.go | 14 +--- 9 files changed, 187 insertions(+), 298 deletions(-) diff --git a/pkg/kine/drivers/generic_test.go b/pkg/kine/drivers/generic_test.go index c182aa84..b41f9527 100644 --- a/pkg/kine/drivers/generic_test.go +++ b/pkg/kine/drivers/generic_test.go @@ -14,14 +14,16 @@ import ( type makeBackendFunc func(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error) func testCompaction(t *testing.T, makeBackend makeBackendFunc) { - ctx := context.Background() t.Run("SmallDatabaseDeleteEntry", func(t *testing.T) { g := NewWithT(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() backend, dialect, err := makeBackend(ctx, t) if err != nil { t.Fatal(err) } + defer backend.Wait() defer dialect.DB.Close() addEntries(ctx, dialect, 2) @@ -42,10 +44,13 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) { t.Run("LargeDatabaseDeleteFivePercent", func(t *testing.T) { g := NewWithT(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() backend, dialect, err := makeBackend(ctx, t) if err != nil { t.Fatal(err) } + defer backend.Wait() defer dialect.DB.Close() addEntries(ctx, dialect, 10_000) @@ -67,12 +72,14 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) { func benchmarkCompaction(b *testing.B, makeBackend makeBackendFunc) { b.StopTimer() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() backend, dialect, err := makeBackend(ctx, b) if err != nil { b.Fatal(err) } + defer backend.Wait() defer dialect.DB.Close() // Make sure there's enough rows deleted to have diff --git a/test/admission_test.go b/test/admission_test.go index 9452ea4a..a2483253 100644 --- a/test/admission_test.go +++ b/test/admission_test.go @@ -16,94 +16,87 @@ import ( // TestAdmissionControl puts heavy load on kine and expects that some requests are denied // by the admission control. func TestAdmissionControl(t *testing.T) { - ctx := context.Background() - client, _ := newKine(ctx, t, "admission-control-policy=limit", "admission-control-policy-limit-max-concurrent-txn=600", "admission-control-only-write-queries=true") g := NewWithT(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, _ := newKine(ctx, t, "admission-control-policy=limit", "admission-control-policy-limit-max-concurrent-txn=600", "admission-control-only-write-queries=true") + // create a key space of 1000 items - { - for i := 0; i < 1000; i++ { + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("Key-%d", i) + value := fmt.Sprintf("Value-%d", i) + createKey(ctx, g, client, key, value) + } + + var wg sync.WaitGroup + + var numSuccessfulWriterTxn = atomic.Uint64{} + var numSuccessfulReaderTxn = atomic.Uint64{} + + reader := func(first int, last int) { + defer wg.Done() + for i := first; i < last; i++ { key := fmt.Sprintf("Key-%d", i) - value := fmt.Sprintf("Value-%d", i) - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, value)). - Commit() - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) + _, err := client.Get(ctx, key, clientv3.WithRange("")) + if err == nil { + numSuccessfulReaderTxn.Add(1) + } } } - t.Run("LatestRevision", func(t *testing.T) { - g := NewWithT(t) - var wg sync.WaitGroup - - var numSuccessfulWriterTxn = atomic.Uint64{} - var numSuccessfulReaderTxn = atomic.Uint64{} - - reader := func(first int, last int) { - defer wg.Done() - for i := first; i < last; i++ { - key := fmt.Sprintf("Key-%d", i) - _, err := client.Get(ctx, key, clientv3.WithRange("")) - if err == nil { - numSuccessfulReaderTxn.Add(1) - } + writer := func(first int, last int) { + defer wg.Done() + for i := first; i < last; i++ { + key := fmt.Sprintf("Key-%d", i) + new_value := fmt.Sprintf("New-Value-%d", i) + resp, err := client.Get(ctx, key, clientv3.WithRange("")) + if err != nil || len(resp.Kvs) == 0 { + t.Logf("Could not get %s\n", key) + continue } - } + lastModRev := resp.Kvs[0].ModRevision + put_resp, err := client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(key), "=", lastModRev)). + Then(clientv3.OpPut(key, new_value)). + Else(clientv3.OpGet(key, clientv3.WithRange(""))). + Commit() - writer := func(first int, last int) { - defer wg.Done() - for i := first; i < last; i++ { - key := fmt.Sprintf("Key-%d", i) - new_value := fmt.Sprintf("New-Value-%d", i) - resp, err := client.Get(ctx, key, clientv3.WithRange("")) - if err != nil || len(resp.Kvs) == 0 { - t.Logf("Could not get %s\n", key) - continue - } - lastModRev := resp.Kvs[0].ModRevision - put_resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", lastModRev)). - Then(clientv3.OpPut(key, new_value)). - Else(clientv3.OpGet(key, clientv3.WithRange(""))). - Commit() - - if err == nil && put_resp.Succeeded == true { - numSuccessfulWriterTxn.Add(1) - break - } + if err == nil && put_resp.Succeeded == true { + numSuccessfulWriterTxn.Add(1) + break } } + } - readers := 50 - readers_replication := 3 - read_entries := 1000 / readers - writers := 500 - writers_replication := 10 - write_entries := 1000 / writers - wg.Add(readers*readers_replication + writers*writers_replication) - - start := time.Now() - for i := 0; i < readers; i++ { - for j := 0; j < readers_replication; j++ { - go reader(i*read_entries, (i+1)*read_entries) - } + readers := 50 + readers_replication := 3 + read_entries := 1000 / readers + writers := 500 + writers_replication := 10 + write_entries := 1000 / writers + wg.Add(readers*readers_replication + writers*writers_replication) + + start := time.Now() + for i := 0; i < readers; i++ { + for j := 0; j < readers_replication; j++ { + go reader(i*read_entries, (i+1)*read_entries) } - for i := 0; i < writers; i++ { - for j := 0; j < writers_replication; j++ { - go writer(i*write_entries, (i+1)*write_entries) - } + } + for i := 0; i < writers; i++ { + for j := 0; j < writers_replication; j++ { + go writer(i*write_entries, (i+1)*write_entries) } + } - wg.Wait() - duration := time.Since(start) + wg.Wait() + duration := time.Since(start) - t.Logf("Executed 1000 queries in %.2f seconds\n", duration.Seconds()) - // It is expected that some queries are denied by the admission control due to the load. - g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*writers_replication*write_entries)) + t.Logf("Executed 1000 queries in %.2f seconds\n", duration.Seconds()) + // It is expected that some queries are denied by the admission control due to the load. + g.Expect(numSuccessfulWriterTxn.Load()).To(BeNumerically("<", writers*writers_replication*write_entries)) - // read queries should be ignored by the admission control - g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*readers_replication*read_entries)) - }) + // read queries should be ignored by the admission control + g.Expect(numSuccessfulReaderTxn.Load()).To(BeNumerically("==", readers*readers_replication*read_entries)) } diff --git a/test/create_test.go b/test/create_test.go index 381908d9..4497e6a0 100644 --- a/test/create_test.go +++ b/test/create_test.go @@ -11,40 +11,35 @@ import ( // TestCreate is unit testing for the create operation. func TestCreate(t *testing.T) { - ctx := context.Background() + g := NewWithT(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) - t.Run("CreateOne", func(t *testing.T) { - g := NewWithT(t) - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("testKey"), "=", 0)). - Then(clientv3.OpPut("testKey", "testValue")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - }) - - t.Run("CreateExistingFails", func(t *testing.T) { - g := NewWithT(t) - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("testKey"), "=", 0)). - Then(clientv3.OpPut("testKey", "testValue2")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeFalse()) - }) + createKey(ctx, g, client, "testKey", "testValue") + + resp, err := client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision("testKey"), "=", 0)). + Then(clientv3.OpPut("testKey", "testValue2")). + Commit() + + g.Expect(err).To(BeNil()) + g.Expect(resp.Succeeded).To(BeFalse()) } // BenchmarkCreate is a benchmark for the Create operation. func BenchmarkCreate(b *testing.B) { b.StopTimer() - ctx := context.Background() + g := NewWithT(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, b) b.StartTimer() - g := NewWithT(b) for i := 0; i < b.N; i++ { key := fmt.Sprintf("key-%d", i) value := fmt.Sprintf("value-%d", i) @@ -52,7 +47,7 @@ func BenchmarkCreate(b *testing.B) { } } -func createKey(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) { +func createKey(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) int64 { resp, err := client.Txn(ctx). If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). Then(clientv3.OpPut(key, value)). @@ -60,4 +55,7 @@ func createKey(ctx context.Context, g Gomega, client *clientv3.Client, key strin g.Expect(err).To(BeNil()) g.Expect(resp.Succeeded).To(BeTrue()) + g.Expect(resp.Responses).To(HaveLen(1)) + g.Expect(resp.Responses[0].GetResponsePut()).NotTo(BeNil()) + return resp.Responses[0].GetResponsePut().Header.Revision } diff --git a/test/delete_test.go b/test/delete_test.go index 2e609e3c..32fa57f4 100644 --- a/test/delete_test.go +++ b/test/delete_test.go @@ -11,7 +11,9 @@ import ( // TestDelete is unit testing for the delete operation. func TestDelete(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) // Calling the delete method outside a transaction should fail in kine @@ -49,10 +51,13 @@ func TestDelete(t *testing.T) { // BenchmarkDelete is a benchmark for the delete operation. func BenchmarkDelete(b *testing.B) { b.StopTimer() - ctx := context.Background() + g := NewWithT(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, b) - g := NewWithT(b) for i := 0; i < b.N; i++ { key := fmt.Sprintf("key-%d", i) value := fmt.Sprintf("value-%d", i) diff --git a/test/get_test.go b/test/get_test.go index 0f6b73a3..1956ba2f 100644 --- a/test/get_test.go +++ b/test/get_test.go @@ -10,7 +10,9 @@ import ( // TestGet is unit testing for the Get operation. func TestGet(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) t.Run("FailNotFound", func(t *testing.T) { @@ -46,100 +48,47 @@ func TestGet(t *testing.T) { g := NewWithT(t) key := "testKeySuccess" - // Create a key - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, "testValue")). - Commit() - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } + createKey(ctx, g, client, key, "testValue") - // Get key - { - resp, err := client.Get(ctx, key, clientv3.WithRange("")) - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].Key).To(Equal([]byte(key))) - g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue"))) - } + resp, err := client.Get(ctx, key, clientv3.WithRange("")) + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(1)) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte(key))) + g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue"))) }) t.Run("KeyRevision", func(t *testing.T) { g := NewWithT(t) key := "testKeyRevision" - var lastModRev int64 - - // Create a key with a known value - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, "testValue")). - Commit() - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - lastModRev = resp.Responses[0].GetResponsePut().Header.Revision - } + lastModRev := createKey(ctx, g, client, key, "testValue") // Get the key's version - { - resp, err := client.Get(ctx, key, clientv3.WithCountOnly()) - g.Expect(err).To(BeNil()) - g.Expect(resp.Count).To(Equal(int64(0))) - } - - // Update the key - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", lastModRev)). - Then(clientv3.OpPut(key, "testValue2")). - Else(clientv3.OpGet(key, clientv3.WithRange(""))). - Commit() - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } + resp, err := client.Get(ctx, key, clientv3.WithCountOnly()) + g.Expect(err).To(BeNil()) + g.Expect(resp.Count).To(Equal(int64(0))) + + updateRevision(ctx, g, client, key, lastModRev, "testValue2") // Get the updated key - { - resp, err := client.Get(ctx, key, clientv3.WithCountOnly()) - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue2"))) - g.Expect(resp.Kvs[0].ModRevision).To(BeNumerically(">", resp.Kvs[0].CreateRevision)) - } + resp, err = client.Get(ctx, key, clientv3.WithCountOnly()) + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue2"))) + g.Expect(resp.Kvs[0].ModRevision).To(BeNumerically(">", resp.Kvs[0].CreateRevision)) }) t.Run("SuccessWithPrefix", func(t *testing.T) { g := NewWithT(t) // Create keys with prefix - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("prefix/testKey1"), "=", 0)). - Then(clientv3.OpPut("prefix/testKey1", "testValue1")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - - resp, err = client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("prefix/testKey2"), "=", 0)). - Then(clientv3.OpPut("prefix/testKey2", "testValue2")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } - - // Get keys with prefix - { - resp, err := client.Get(ctx, "prefix", clientv3.WithPrefix()) - - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(2)) - g.Expect(resp.Kvs[0].Key).To(Equal([]byte("prefix/testKey1"))) - g.Expect(resp.Kvs[1].Key).To(Equal([]byte("prefix/testKey2"))) - } + createKey(ctx, g, client, "prefix/testKey1", "testValue1") + createKey(ctx, g, client, "prefix/testKey2", "testValue2") + + resp, err := client.Get(ctx, "prefix", clientv3.WithPrefix()) + + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(2)) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte("prefix/testKey1"))) + g.Expect(resp.Kvs[1].Key).To(Equal([]byte("prefix/testKey2"))) }) t.Run("FailNotFound", func(t *testing.T) { @@ -147,33 +96,25 @@ func TestGet(t *testing.T) { key := "testKeyFailNotFound" // Delete key - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpDelete(key)). - Else(clientv3.OpGet(key)). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } + deleteKey(ctx, g, client, key) // Get key - { - resp, err := client.Get(ctx, key, clientv3.WithRange("")) - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(BeEmpty()) - } + resp, err := client.Get(ctx, key, clientv3.WithRange("")) + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(BeEmpty()) }) } // BenchmarkGet is a benchmark for the Get operation. func BenchmarkGet(b *testing.B) { b.StopTimer() - ctx := context.Background() - client, _ := newKine(ctx, b) g := NewWithT(b) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, _ := newKine(ctx, b) + // create a kv createKey(ctx, g, client, "testKey", "testValue") diff --git a/test/lease_test.go b/test/lease_test.go index 881013bb..1ee3d7a5 100644 --- a/test/lease_test.go +++ b/test/lease_test.go @@ -12,12 +12,14 @@ import ( // TestLease is unit testing for the lease operation. func TestLease(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) t.Run("LeaseGrant", func(t *testing.T) { g := NewWithT(t) - var ttl int64 = 300 + ttl := int64(300) resp, err := client.Lease.Grant(ctx, ttl) g.Expect(err).To(BeNil()) @@ -26,12 +28,12 @@ func TestLease(t *testing.T) { }) t.Run("UseLease", func(t *testing.T) { - var ttl int64 = 1 + ttl := int64(1) t.Run("CreateWithLease", func(t *testing.T) { g := NewWithT(t) + { resp, err := client.Lease.Grant(ctx, ttl) - g.Expect(err).To(BeNil()) g.Expect(resp.ID).To(Equal(clientv3.LeaseID(ttl))) g.Expect(resp.TTL).To(Equal(ttl)) @@ -42,7 +44,6 @@ func TestLease(t *testing.T) { If(clientv3.Compare(clientv3.ModRevision("/leaseTestKey"), "=", 0)). Then(clientv3.OpPut("/leaseTestKey", "testValue", clientv3.WithLease(clientv3.LeaseID(ttl)))). Commit() - g.Expect(err).To(BeNil()) g.Expect(resp.Succeeded).To(BeTrue()) } @@ -66,17 +67,19 @@ func TestLease(t *testing.T) { return resp.Kvs }, time.Duration(ttl*2)*time.Second, testExpirePollPeriod, ctx).Should(BeEmpty()) }) - }) } // BenchmarkLease is a benchmark for the lease operation. func BenchmarkLease(b *testing.B) { b.StopTimer() - ctx := context.Background() + g := NewWithT(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, b) - g := NewWithT(b) b.StartTimer() for i := 0; i < b.N; i++ { var ttl int64 = int64(i + 1) diff --git a/test/list_test.go b/test/list_test.go index 2fdeb4ae..eff25543 100644 --- a/test/list_test.go +++ b/test/list_test.go @@ -12,7 +12,9 @@ import ( // TestList is the unit test for List operation. func TestList(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) t.Run("ListSuccess", func(t *testing.T) { @@ -21,14 +23,7 @@ func TestList(t *testing.T) { // Create some keys keys := shuffleList([]string{"/key/1", "/key/2", "/key/3", "/key/4", "/key/5"}) for _, key := range keys { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, "value")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - g.Expect(resp.Header.Revision).ToNot(BeZero()) + createKey(ctx, g, client, key, "value") } t.Run("ListAll", func(t *testing.T) { @@ -102,14 +97,7 @@ func TestList(t *testing.T) { // Create some keys keys := []string{"key/sub/2", "key/sub/1", "key/other/1"} for _, key := range keys { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, "value")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - g.Expect(resp.Header.Revision).ToNot(BeZero()) + createKey(ctx, g, client, key, "value") } // Get a list of all the keys sice they have '/key' prefix @@ -161,14 +149,7 @@ func TestList(t *testing.T) { // Create some keys keys := []string{"/revkey/1"} for _, key := range keys { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, "value")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - g.Expect(resp.Header.Revision).ToNot(BeZero()) + createKey(ctx, g, client, key, "value") } }) @@ -226,10 +207,13 @@ func TestList(t *testing.T) { // BenchmarkList is a benchmark for the Get operation. func BenchmarkList(b *testing.B) { b.StopTimer() - ctx := context.Background() - client, _ := newKine(ctx, b) g := NewWithT(b) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, _ := newKine(ctx, b) + for i := 0; i < b.N; i++ { key := fmt.Sprintf("key/%d", i) createKey(ctx, g, client, key, "benchValue") diff --git a/test/update_test.go b/test/update_test.go index 116dca87..eae3494d 100644 --- a/test/update_test.go +++ b/test/update_test.go @@ -12,51 +12,29 @@ import ( // TestUpdate is unit testing for the update operation. func TestUpdate(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) // Testing that update can create a new key if ModRevision is 0 t.Run("UpdateNewKey", func(t *testing.T) { g := NewWithT(t) - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("updateNewKey"), "=", 0)). - Then(clientv3.OpPut("updateNewKey", "testValue")). - Else(clientv3.OpGet("updateNewKey", clientv3.WithRange(""))). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } + createKey(ctx, g, client, "updateNewKey", "testValue") - { - resp, err := client.Get(ctx, "updateNewKey", clientv3.WithRange("")) - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].Key).To(Equal([]byte("updateNewKey"))) - g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue"))) - g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(resp.Kvs[0].CreateRevision))) - } + resp, err := client.Get(ctx, "updateNewKey", clientv3.WithRange("")) + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(1)) + g.Expect(resp.Kvs[0].Key).To(Equal([]byte("updateNewKey"))) + g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue"))) + g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(resp.Kvs[0].CreateRevision))) }) t.Run("UpdateExisting", func(t *testing.T) { g := NewWithT(t) - var lastModRev int64 - - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("updateExistingKey"), "=", 0)). - Then(clientv3.OpPut("updateExistingKey", "testValue1")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - g.Expect(resp.Responses).To(HaveLen(1)) - g.Expect(resp.Responses[0].GetResponsePut()).NotTo(BeNil()) - lastModRev = resp.Responses[0].GetResponsePut().Header.Revision - } + lastModRev := createKey(ctx, g, client, "updateExistingKey", "testValue1") { resp, err := client.Txn(ctx). @@ -101,20 +79,7 @@ func TestUpdate(t *testing.T) { t.Run("UpdateOldRevisionFails", func(t *testing.T) { g := NewWithT(t) - var lastModRev int64 - - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision("updateOldRevKey"), "=", 0)). - Then(clientv3.OpPut("updateOldRevKey", "testValue1")). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - g.Expect(resp.Responses).To(HaveLen(1)) - g.Expect(resp.Responses[0].GetResponsePut()).NotTo(BeNil()) - lastModRev = resp.Responses[0].GetResponsePut().Header.Revision - } + lastModRev := createKey(ctx, g, client, "updateOldRevKey", "testValue1") { resp, err := client.Txn(ctx). @@ -139,9 +104,7 @@ func TestUpdate(t *testing.T) { g.Expect(resp.Responses).To(HaveLen(1)) g.Expect(resp.Responses[0].GetResponseRange()).ToNot(BeNil()) } - }) - } func addSameEntries(ctx context.Context, g Gomega, client *clientv3.Client, numEntries int, create_first bool) { @@ -192,14 +155,15 @@ func addEntry(ctx context.Context, g Gomega, client *clientv3.Client, key string // BenchmarkUpdate is a benchmark for the Update operation. func BenchmarkUpdate(b *testing.B) { b.StopTimer() - ctx := context.Background() - client, _ := newKine(ctx, b) - g := NewWithT(b) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, _ := newKine(ctx, b) + b.StartTimer() - var lastModRev int64 = 0 - for i := 0; i < b.N; i++ { + for i, lastModRev := 0, int64(0); i < b.N; i++ { value := fmt.Sprintf("value-%d", i) lastModRev = updateRevision(ctx, g, client, "benchKey", lastModRev, value) } diff --git a/test/watch_test.go b/test/watch_test.go index 706ee051..41dad0a9 100644 --- a/test/watch_test.go +++ b/test/watch_test.go @@ -10,7 +10,9 @@ import ( // TestWatch is unit testing for the Watch operation. func TestWatch(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, _ := newKine(ctx, t) var ( @@ -35,15 +37,7 @@ func TestWatch(t *testing.T) { g := NewWithT(t) // create a key - { - resp, err := client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, value)). - Commit() - - g.Expect(err).To(BeNil()) - g.Expect(resp.Succeeded).To(BeTrue()) - } + createKey(ctx, g, client, key, value) // receive event t.Run("Receive", func(t *testing.T) { From 2d572df5f72469bf9e8163a2104b6ac887cb2358 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 2 Jul 2024 16:55:37 +0200 Subject: [PATCH 4/4] Close dialect DB --- pkg/kine/drivers/generic/generic.go | 17 +++++++++++++++++ pkg/kine/drivers/generic_test.go | 19 ++++++++++++------- pkg/kine/logstructured/sqllog/sql.go | 4 ++++ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 429d20c4..309b33bb 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -306,6 +306,23 @@ func (d *Generic) Prepare() error { return nil } +func (d *Generic) Close() { + d.getRevisionSQLPrepared.Close() + d.countCurrentSQLPrepared.Close() + d.countRevisionSQLPrepared.Close() + d.afterSQLPrefixPrepared.Close() + d.deleteSQLPrepared.Close() + d.updateCompactSQLPrepared.Close() + if d.LastInsertID { + d.insertLastInsertIDSQLPrepared.Close() + } else { + d.insertSQLPrepared.Close() + } + d.fillSQLPrepared.Close() + d.getSizeSQLPrepared.Close() + d.DB.Close() +} + func getPrefixRange(prefix string) (start, end string) { start = prefix if strings.HasSuffix(prefix, "/") { diff --git a/pkg/kine/drivers/generic_test.go b/pkg/kine/drivers/generic_test.go index b41f9527..3b7be180 100644 --- a/pkg/kine/drivers/generic_test.go +++ b/pkg/kine/drivers/generic_test.go @@ -14,7 +14,6 @@ import ( type makeBackendFunc func(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error) func testCompaction(t *testing.T, makeBackend makeBackendFunc) { - t.Run("SmallDatabaseDeleteEntry", func(t *testing.T) { g := NewWithT(t) ctx, cancel := context.WithCancel(context.Background()) @@ -23,8 +22,10 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) { if err != nil { t.Fatal(err) } - defer backend.Wait() - defer dialect.DB.Close() + t.Cleanup(func() { + backend.Wait() + dialect.Close() + }) addEntries(ctx, dialect, 2) deleteEntries(ctx, dialect, 1) @@ -50,8 +51,10 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) { if err != nil { t.Fatal(err) } - defer backend.Wait() - defer dialect.DB.Close() + t.Cleanup(func() { + backend.Wait() + dialect.Close() + }) addEntries(ctx, dialect, 10_000) deleteEntries(ctx, dialect, 500) @@ -79,8 +82,10 @@ func benchmarkCompaction(b *testing.B, makeBackend makeBackendFunc) { if err != nil { b.Fatal(err) } - defer backend.Wait() - defer dialect.DB.Close() + b.Cleanup(func() { + backend.Wait() + dialect.Close() + }) // Make sure there's enough rows deleted to have // b.N rows to compact. diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 19844a9e..c688a594 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -49,10 +49,14 @@ type Dialect interface { GetSize(ctx context.Context) (int64, error) GetCompactInterval() time.Duration GetPollInterval() time.Duration + Close() } func (s *SQLLog) Start(ctx context.Context) (err error) { s.ctx = ctx + context.AfterFunc(ctx, func() { + s.d.Close() + }) return s.broadcaster.Start(s.startWatch) }