Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronization primitive for server shutdown #114

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 30 additions & 33 deletions pkg/kine/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,35 @@ type Broadcaster struct {
subs map[chan interface{}]struct{}
}

func (b *Broadcaster) Subscribe(ctx context.Context, connect ConnectFunc) (<-chan interface{}, error) {
func (b *Broadcaster) Subscribe(ctx context.Context) (<-chan interface{}, error) {
b.Lock()
defer b.Unlock()

if !b.running {
if err := b.start(connect); err != nil {
return nil, err
}
}

sub := make(chan interface{}, 100)
if b.subs == nil {
b.subs = map[chan interface{}]struct{}{}
}
b.subs[sub] = struct{}{}
go func() {
<-ctx.Done()
b.unsub(sub, true)
}()
context.AfterFunc(ctx, func() {
b.Lock()
defer b.Unlock()
b.unsub(sub)
})

return sub, nil
}

func (b *Broadcaster) unsub(sub chan interface{}, lock bool) {
if lock {
b.Lock()
}
func (b *Broadcaster) unsub(sub chan interface{}) {
if _, ok := b.subs[sub]; ok {
close(sub)
delete(b.subs, sub)
}
if lock {
b.Unlock()
}
}

func (b *Broadcaster) start(connect ConnectFunc) error {
func (b *Broadcaster) Start(connect ConnectFunc) error {
b.Lock()
defer b.Unlock()

c, err := connect()
if err != nil {
return err
Expand All @@ -60,24 +52,29 @@ func (b *Broadcaster) start(connect ConnectFunc) error {
return nil
}

func (b *Broadcaster) stream(input chan interface{}) {
for item := range input {
b.Lock()
for sub := range b.subs {
select {
case sub <- item:
default:
// Slow consumer, drop
go b.unsub(sub, true)
}
}
b.Unlock()
func (b *Broadcaster) stream(ch chan interface{}) {
for item := range ch {
b.publish(item)
}

b.Lock()
defer b.Unlock()
for sub := range b.subs {
b.unsub(sub, false)
b.unsub(sub)
}
b.running = false
b.Unlock()
}

func (b *Broadcaster) publish(item interface{}) {
b.Lock()
defer b.Unlock()

for sub := range b.subs {
select {
case sub <- item:
default:
// Slow consumer, drop
b.unsub(sub)
}
}
}
17 changes: 17 additions & 0 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,23 @@ func (d *Generic) Prepare() error {
return nil
}

func (d *Generic) Close() {
d.getRevisionSQLPrepared.Close()
d.countCurrentSQLPrepared.Close()
d.countRevisionSQLPrepared.Close()
d.afterSQLPrefixPrepared.Close()
d.deleteSQLPrepared.Close()
d.updateCompactSQLPrepared.Close()
if d.LastInsertID {
d.insertLastInsertIDSQLPrepared.Close()
} else {
d.insertSQLPrepared.Close()
}
d.fillSQLPrepared.Close()
d.getSizeSQLPrepared.Close()
d.DB.Close()
}

func getPrefixRange(prefix string) (start, end string) {
start = prefix
if strings.HasSuffix(prefix, "/") {
Expand Down
24 changes: 18 additions & 6 deletions pkg/kine/drivers/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ import (
type makeBackendFunc func(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error)

func testCompaction(t *testing.T, makeBackend makeBackendFunc) {
ctx := context.Background()

t.Run("SmallDatabaseDeleteEntry", func(t *testing.T) {
g := NewWithT(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend, dialect, err := makeBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()
t.Cleanup(func() {
backend.Wait()
dialect.Close()
})

addEntries(ctx, dialect, 2)
deleteEntries(ctx, dialect, 1)
Expand All @@ -42,11 +45,16 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) {

t.Run("LargeDatabaseDeleteFivePercent", func(t *testing.T) {
g := NewWithT(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend, dialect, err := makeBackend(ctx, t)
if err != nil {
t.Fatal(err)
}
defer dialect.DB.Close()
t.Cleanup(func() {
backend.Wait()
dialect.Close()
})

addEntries(ctx, dialect, 10_000)
deleteEntries(ctx, dialect, 500)
Expand All @@ -67,13 +75,17 @@ func testCompaction(t *testing.T, makeBackend makeBackendFunc) {

func benchmarkCompaction(b *testing.B, makeBackend makeBackendFunc) {
b.StopTimer()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

backend, dialect, err := makeBackend(ctx, b)
if err != nil {
b.Fatal(err)
}
defer dialect.DB.Close()
b.Cleanup(func() {
backend.Wait()
dialect.Close()
})

// Make sure there's enough rows deleted to have
// b.N rows to compact.
Expand Down
6 changes: 2 additions & 4 deletions pkg/kine/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) {
if err := grpcServer.Serve(listener); err != nil {
logrus.Errorf("Kine server shutdown: %v", err)
}
<-ctx.Done()
grpcServer.Stop()
listener.Close()
grpcServer.Stop()
}()

return ETCDConfig{
Expand Down Expand Up @@ -143,9 +142,8 @@ func ListenAndReturnBackend(ctx context.Context, config Config) (ETCDConfig, ser
if err := grpcServer.Serve(listener); err != nil {
logrus.Errorf("Kine server shutdown: %v", err)
}
<-ctx.Done()
grpcServer.Stop()
listener.Close()
grpcServer.Stop()
}()

return ETCDConfig{
Expand Down
42 changes: 31 additions & 11 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logstructured
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/server"
Expand All @@ -11,6 +12,7 @@ import (

type Log interface {
Start(ctx context.Context) error
Wait()
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Expand All @@ -23,6 +25,7 @@ type Log interface {

type LogStructured struct {
log Log
wg sync.WaitGroup
}

func New(log Log) *LogStructured {
Expand All @@ -40,10 +43,19 @@ func (l *LogStructured) Start(ctx context.Context) error {
return err
}
l.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0)
go l.ttl(ctx)

l.wg.Add(1)
go func() {
defer l.wg.Done()
l.ttl(ctx)
}()
return nil
}

func (l *LogStructured) Wait() {
l.wg.Wait()
}

func (l *LogStructured) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) {
defer func() {
l.adjustRevision(ctx, &revRet)
Expand Down Expand Up @@ -258,20 +270,16 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re

func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
result := make(chan *server.Event)
wg := sync.WaitGroup{}
wg.Add(2)
var shouldClose atomic.Bool

l.wg.Add(2)
go func() {
wg.Wait()
close(result)
}()
defer l.wg.Done()

go func() {
defer wg.Done()
rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("failed to read old events for ttl")
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

Expand All @@ -283,24 +291,33 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {

_, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
}

if !shouldClose.CompareAndSwap(false, true) {
close(result)
}
}()

go func() {
defer wg.Done()
defer l.wg.Done()

for events := range l.log.Watch(ctx, "/") {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}

if !shouldClose.CompareAndSwap(false, true) {
close(result)
}
}()

return result
}

func (l *LogStructured) ttl(ctx context.Context) {
// vary naive TTL support
// very naive TTL support
mutex := &sync.Mutex{}
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
Expand Down Expand Up @@ -338,7 +355,10 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64

logrus.Debugf("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs))

l.wg.Add(1)
go func() {
defer l.wg.Done()

lastRevision := revision
if len(kvs) > 0 {
lastRevision = rev
Expand Down
Loading
Loading