From e4b3dd77dacf21ba3f2f3f6ef17f6b4c5d91dc72 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 14 Oct 2023 23:54:53 -0700 Subject: [PATCH 01/11] Feat(WIP): prioritized generic queue --- go.mod | 7 ++- go.sum | 4 ++ queue/errors.go | 5 ++ queue/queue.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 queue/errors.go create mode 100644 queue/queue.go diff --git a/go.mod b/go.mod index 596c76b..f383d4a 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,9 @@ module github.com/yunginnanet/Rate5 go 1.18 -require github.com/patrickmn/go-cache v2.1.0+incompatible +require ( + github.com/patrickmn/go-cache v2.1.0+incompatible + golang.org/x/exp v0.0.0-20231006140011-7918f672742d +) + +require git.tcp.direct/kayos/common v0.9.2 // indirect diff --git a/go.sum b/go.sum index 33c1ec4..89ca189 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ +git.tcp.direct/kayos/common v0.9.2 h1:T7XzvmtPTmhXMTIb8v5W7a4DAV8ekusRkCe1WJ/qGBY= +git.tcp.direct/kayos/common v0.9.2/go.mod h1:rMExTem3JjB5XrwL+i+/QBv3agRFB8mdsoQ97wzBa+8= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= diff --git a/queue/errors.go b/queue/errors.go new file mode 100644 index 0000000..d612f79 --- /dev/null +++ b/queue/errors.go @@ -0,0 +1,5 @@ +package queue + +import "errors" + +var ErrQueueFull = errors.New("bounded queue is full") diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..8c6bcd4 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,122 @@ +package queue + +import ( + "fmt" + "sync" + "sync/atomic" + + "git.tcp.direct/kayos/common/list" + "golang.org/x/exp/constraints" +) + +type Item[P constraints.Ordered, V any] struct { + value *V + priority P + index int +} + +func NewItem[P constraints.Ordered, V any](v *V, p P) *Item[P, V] { + return &Item[P, V]{ + value: v, + priority: p, + index: 0, + } +} + +func (i *Item[P, V]) Less(j any) bool { + return i.priority < j.(Item[P, V]).priority +} + +func (i *Item[P, V]) Value() *V { + return i.value +} + +func (i *Item[P, V]) Priority() P { + return i.priority +} + +func (i *Item[P, V]) Set(v *V) { + i.value = v +} + +func (i *Item[P, V]) SetPriority(p P) { + i.priority = p +} + +type boundedQueue[P constraints.Ordered, V any] struct { + items []*Item[P, V] + qcap int64 + qlen *atomic.Int64 + mu *sync.RWMutex + errs *list.LockingList +} + +type BoundedQueue[P constraints.Ordered, V any] struct { + // making sure container/heap can access boundedQueue without the normal API being exposed to the user + inner *boundedQueue[P, V] +} + +func NewBoundedQueue[P constraints.Ordered, V any](cap int64) *BoundedQueue[P, V] { + bq := &BoundedQueue[P, V]{} + bq.inner = &boundedQueue[P, V]{ + items: make([]*Item[P, V], 0, cap), + qcap: cap, + qlen: &atomic.Int64{}, + mu: &sync.RWMutex{}, + errs: list.New(), + } + bq.inner.qlen.Store(0) + return bq +} + +func (b *boundedQueue[P, V]) err(err error) { + b.errs.PushBack(err) +} + +func (b *BoundedQueue[P, V]) Err() error { + errLen := b.inner.errs.Len() + switch errLen { + case 0: + return nil + case 1: + return b.inner.errs.Front().Value().(error) + default: + return fmt.Errorf("%w | (%d more errors in queue)", b.inner.errs.Front().Value().(error), errLen-1) + } +} + +func (b *BoundedQueue[P, V]) Len() int { + return int(b.inner.qlen.Load()) +} + +func (b *boundedQueue[P, V]) Less(i, j int) bool { + b.mu.RLock() + less := b.items[P, V][i].priority < b.items[P, V][j].priority + b.mu.RUnlock() + return less +} + +func (b *boundedQueue[P, V]) Swap(i, j int) { + b.mu.Lock() + b.items[P, V][i], b.items[P, V][j] = b.items[P, V][j], b.items[P, V][i] + b.items[P, V][i].index = i + b.items[P, V][j].index = j + b.mu.Unlock() +} + +func (b *boundedQueue[P, V]) Push(x any) { + if b.qlen.Load() >= b.qcap { + b.err(fmt.Errorf("%w: %v dropped", ErrQueueFull, x)) + return + } + defer b.qlen.Add(1) + b.mu.Lock() + b.items[P, V] = append(b.items[P, V], x.(*Item[P, V])) + b.qlen.Add(1) + b.mu.Unlock() +} + +func (b *boundedQueue[P, V]) Pop() any { + // todo + return nil +} From ce01a166263981489791c7586c678143e2e4c646 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Fri, 20 Oct 2023 01:59:56 -0700 Subject: [PATCH 02/11] Feat: Speedometer - io.Writer limiter and capper --- speedometer/speedometer.go | 244 +++++++++++++++++ speedometer/speedometer_test.go | 452 ++++++++++++++++++++++++++++++++ 2 files changed, 696 insertions(+) create mode 100644 speedometer/speedometer.go create mode 100644 speedometer/speedometer_test.go diff --git a/speedometer/speedometer.go b/speedometer/speedometer.go new file mode 100644 index 0000000..0e820af --- /dev/null +++ b/speedometer/speedometer.go @@ -0,0 +1,244 @@ +package util + +import ( + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + "time" +) + +var ErrLimitReached = errors.New("limit reached") + +// Speedometer is an io.Writer wrapper that will limit the rate at which data is written to the underlying target. +// +// It is safe for concurrent use, but writers will block when slowed down. +// +// Optionally, it can be given; +// +// - a capacity, which will cause it to return an error if the capacity is exceeded. +// +// - a speed limit, causing slow downs of data written to the underlying writer if the speed limit is exceeded. +type Speedometer struct { + ceiling int64 + speedLimit *SpeedLimit + internal atomics + w io.Writer +} + +type atomics struct { + count *atomic.Int64 + closed *atomic.Bool + start *sync.Once + stop *sync.Once + birth *atomic.Pointer[time.Time] + duration *atomic.Pointer[time.Duration] + slow *atomic.Bool +} + +func newAtomics() atomics { + manhattan := atomics{ + count: new(atomic.Int64), + closed: new(atomic.Bool), + start: new(sync.Once), + stop: new(sync.Once), + birth: new(atomic.Pointer[time.Time]), + duration: new(atomic.Pointer[time.Duration]), + slow: new(atomic.Bool), + } + manhattan.birth.Store(&time.Time{}) + manhattan.closed.Store(false) + manhattan.count.Store(0) + return manhattan +} + +// SpeedLimit is used to limit the rate at which data is written to the underlying writer. +type SpeedLimit struct { + // Burst is the number of bytes that can be written to the underlying writer per Frame. + Burst int64 + // Frame is the duration of the frame in which Burst can be written to the underlying writer. + Frame time.Duration + // CheckEveryBytes is the number of bytes written before checking if the speed limit has been exceeded. + CheckEveryBytes int64 + // Delay is the duration to delay writing if the speed limit has been exceeded during a Write call. (blocking) + Delay time.Duration +} + +func NewBytesPerSecondLimit(bytes int64) *SpeedLimit { + return &SpeedLimit{ + Burst: bytes, + Frame: time.Second, + CheckEveryBytes: 1, + Delay: 100 * time.Millisecond, + } +} + +const fallbackDelay = 100 + +func regulateSpeedLimit(speedLimit *SpeedLimit) (*SpeedLimit, error) { + if speedLimit.Burst <= 0 || speedLimit.Frame <= 0 { + return nil, errors.New("invalid speed limit") + } + if speedLimit.CheckEveryBytes <= 0 { + speedLimit.CheckEveryBytes = speedLimit.Burst + } + if speedLimit.Delay <= 0 { + speedLimit.Delay = fallbackDelay * time.Millisecond + } + return speedLimit, nil +} + +func newSpeedometer(w io.Writer, speedLimit *SpeedLimit, ceiling int64) (*Speedometer, error) { + if w == nil { + return nil, errors.New("writer cannot be nil") + } + var err error + if speedLimit != nil { + if speedLimit, err = regulateSpeedLimit(speedLimit); err != nil { + return nil, err + } + } + + return &Speedometer{ + w: w, + ceiling: ceiling, + speedLimit: speedLimit, + internal: newAtomics(), + }, nil +} + +// NewSpeedometer creates a new Speedometer that wraps the given io.Writer. +// It will not limit the rate at which data is written to the underlying writer, it only measures it. +func NewSpeedometer(w io.Writer) (*Speedometer, error) { + return newSpeedometer(w, nil, -1) +} + +// NewLimitedSpeedometer creates a new Speedometer that wraps the given io.Writer. +// If the speed limit is exceeded, writes to the underlying writer will be limited. +// See SpeedLimit for more information. +func NewLimitedSpeedometer(w io.Writer, speedLimit *SpeedLimit) (*Speedometer, error) { + return newSpeedometer(w, speedLimit, -1) +} + +// NewCappedSpeedometer creates a new Speedometer that wraps the given io.Writer. +// If len(written) bytes exceeds cap, writes to the underlying writer will be ceased permanently for the Speedometer. +func NewCappedSpeedometer(w io.Writer, capacity int64) (*Speedometer, error) { + return newSpeedometer(w, nil, capacity) +} + +// NewCappedLimitedSpeedometer creates a new Speedometer that wraps the given io.Writer. +// It is a combination of NewLimitedSpeedometer and NewCappedSpeedometer. +func NewCappedLimitedSpeedometer(w io.Writer, speedLimit *SpeedLimit, capacity int64) (*Speedometer, error) { + return newSpeedometer(w, speedLimit, capacity) +} + +func (s *Speedometer) increment(inc int64) (int, error) { + if s.internal.closed.Load() { + return 0, io.ErrClosedPipe + } + var err error + if s.ceiling > 0 && s.Total()+inc > s.ceiling { + _ = s.Close() + err = ErrLimitReached + inc = s.ceiling - s.Total() + } + s.internal.count.Add(inc) + return int(inc), err +} + +// Running returns true if the Speedometer is still running. +func (s *Speedometer) Running() bool { + return !s.internal.closed.Load() +} + +// Total returns the total number of bytes written to the underlying writer. +func (s *Speedometer) Total() int64 { + return s.internal.count.Load() +} + +// Close stops the Speedometer. No additional writes will be accepted. +func (s *Speedometer) Close() error { + if s.internal.closed.Load() { + return io.ErrClosedPipe + } + s.internal.stop.Do(func() { + s.internal.closed.Store(true) + stopped := time.Now() + birth := s.internal.birth.Load() + duration := stopped.Sub(*birth) + s.internal.duration.Store(&duration) + }) + return nil +} + +/*func (s *Speedometer) IsSlow() bool { + return s.internal.slow.Load() +}*/ + +// Rate returns the rate at which data is being written to the underlying writer per second. +func (s *Speedometer) Rate() float64 { + if s.internal.closed.Load() { + return float64(s.Total()) / s.internal.duration.Load().Seconds() + } + return float64(s.Total()) / time.Since(*s.internal.birth.Load()).Seconds() +} + +func (s *Speedometer) slowDown() error { + switch { + case s.speedLimit == nil: + return nil + case s.speedLimit.Burst <= 0 || s.speedLimit.Frame <= 0, + s.speedLimit.CheckEveryBytes <= 0, s.speedLimit.Delay <= 0: + return errors.New("invalid speed limit") + default: + // + } + if s.Total()%int64(s.speedLimit.CheckEveryBytes) != 0 { + return nil + } + s.internal.slow.Store(true) + for s.Rate() > float64(s.speedLimit.Burst)/s.speedLimit.Frame.Seconds() { + time.Sleep(s.speedLimit.Delay) + } + s.internal.slow.Store(false) + return nil +} + +// Write writes p to the underlying writer, following all defined speed limits. +func (s *Speedometer) Write(p []byte) (n int, err error) { + if s.internal.closed.Load() { + return 0, io.ErrClosedPipe + } + s.internal.start.Do(func() { + now := time.Now() + s.internal.birth.Store(&now) + }) + + // if no speed limit, just write and record + if s.speedLimit == nil { + n, err = s.w.Write(p) + if err != nil { + return n, fmt.Errorf("error writing to underlying writer: %w", err) + } + return s.increment(int64(len(p))) + } + + var ( + wErr error + accepted int + ) + accepted, wErr = s.increment(int64(len(p))) + + if wErr != nil { + return 0, fmt.Errorf("error incrementing: %w", wErr) + } + + _ = s.slowDown() + + var iErr error + if n, iErr = s.w.Write(p[:accepted]); iErr != nil { + return n, fmt.Errorf("error writing to underlying writer: %w", iErr) + } + return +} diff --git a/speedometer/speedometer_test.go b/speedometer/speedometer_test.go new file mode 100644 index 0000000..cc5e4d3 --- /dev/null +++ b/speedometer/speedometer_test.go @@ -0,0 +1,452 @@ +package util + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "testing" + "time" +) + +type testWriter struct { + t *testing.T + total int64 +} + +func (w *testWriter) Write(p []byte) (n int, err error) { + atomic.AddInt64(&w.total, int64(len(p))) + return len(p), nil +} + +func writeStuff(t *testing.T, target io.Writer, count int) error { + t.Helper() + write := func() error { + _, err := target.Write([]byte("a")) + if err != nil { + return fmt.Errorf("error writing: %w", err) + } + return nil + } + + if count < 0 { + var err error + for err = write(); err == nil; err = write() { + time.Sleep(5 * time.Millisecond) + } + return err + } + for i := 0; i < count; i++ { + if err := write(); err != nil { + return err + } + } + return nil +} + +//nolint:funlen +func Test_Speedometer(t *testing.T) { + t.Parallel() + type results struct { + total int64 + written int + rate float64 + err error + } + + isIt := func(want, have results) { + t.Helper() + if have.total != want.total { + t.Errorf("total: want %d, have %d", want.total, have.total) + } + if have.written != want.written { + t.Errorf("written: want %d, have %d", want.written, have.written) + } + if have.rate != want.rate { + t.Errorf("rate: want %f, have %f", want.rate, have.rate) + } + if !errors.Is(have.err, want.err) { + t.Errorf("wantErr: want %v, have %v", want.err, have.err) + } + } + + var ( + errChan = make(chan error, 10) + ) + + t.Run("EarlyClose", func(t *testing.T) { + var ( + err error + cnt int + ) + t.Parallel() + sp, nerr := NewSpeedometer(&testWriter{t: t}) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + go func() { + errChan <- writeStuff(t, sp, -1) + }() + time.Sleep(1 * time.Second) + if closeErr := sp.Close(); closeErr != nil { + t.Errorf("wantErr: want %v, have %v", nil, closeErr) + } + err = <-errChan + if !errors.Is(err, io.ErrClosedPipe) { + t.Errorf("wantErr: want %v, have %v", io.ErrClosedPipe, err) + } + cnt, err = sp.Write([]byte("a")) + isIt(results{err: io.ErrClosedPipe, written: 0}, results{err: err, written: cnt}) + }) + + t.Run("Basic", func(t *testing.T) { + var ( + err error + cnt int + ) + t.Parallel() + sp, nerr := NewSpeedometer(&testWriter{t: t}) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + cnt, err = sp.Write([]byte("a")) + isIt(results{err: nil, written: 1, total: 1}, results{err: err, written: cnt, total: sp.Total()}) + cnt, err = sp.Write([]byte("aa")) + isIt(results{err: nil, written: 2, total: 3}, results{err: err, written: cnt, total: sp.Total()}) + cnt, err = sp.Write([]byte("a")) + isIt(results{err: nil, written: 1, total: 4}, results{err: err, written: cnt, total: sp.Total()}) + cnt, err = sp.Write([]byte("a")) + isIt(results{err: nil, written: 1, total: 5}, results{err: err, written: cnt, total: sp.Total()}) + }) + + t.Run("ConcurrentWrites", func(t *testing.T) { + var ( + err error + ) + + count := int64(0) + sp, nerr := NewSpeedometer(&testWriter{t: t}) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + wg := &sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + var counted int + var gerr error + counted, gerr = sp.Write([]byte("a")) + if gerr != nil { + t.Errorf("unexpected error: %v", err) + } + atomic.AddInt64(&count, int64(counted)) + wg.Done() + }() + } + wg.Wait() + isIt(results{err: nil, written: 100, total: 100}, + results{err: err, written: int(atomic.LoadInt64(&count)), total: sp.Total()}) + }) + + t.Run("GottaGoFast", func(t *testing.T) { + t.Parallel() + var ( + err error + ) + sp, nerr := NewSpeedometer(&testWriter{t: t}) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + go func() { + errChan <- writeStuff(t, sp, -1) + }() + var count = 0 + for sp.Running() { + select { + case err = <-errChan: + if !errors.Is(err, io.ErrClosedPipe) { + t.Errorf("unexpected error: %v", err) + } else { + if count < 5 { + t.Errorf("too few iterations: %d", count) + } + t.Logf("final rate: %v per second", sp.Rate()) + } + default: + if count > 5 { + _ = sp.Close() + } + time.Sleep(100 * time.Millisecond) + t.Logf("rate: %v per second", sp.Rate()) + count++ + } + } + }) + + // test limiter with speedlimit + t.Run("CantGoFast", func(t *testing.T) { + t.Parallel() + t.Run("10BytesASecond", func(t *testing.T) { + t.Parallel() + var ( + err error + ) + sp, nerr := NewLimitedSpeedometer(&testWriter{t: t}, &SpeedLimit{ + Burst: 10, + Frame: time.Second, + CheckEveryBytes: 1, + Delay: 100 * time.Millisecond, + }) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + for i := 0; i < 15; i++ { + if _, err = sp.Write([]byte("a")); err != nil { + t.Errorf("unexpected error: %v", err) + } + /*if sp.IsSlow() { + t.Errorf("unexpected slow state") + }*/ + t.Logf("rate: %v per second", sp.Rate()) + if sp.Rate() > 10 { + t.Errorf("speeding in a school zone (expected under %d): %v", sp.speedLimit.Burst, sp.Rate()) + } + } + }) + + t.Run("1000BytesPer5SecondsMeasuredEvery5000Bytes", func(t *testing.T) { + t.Parallel() + var ( + err error + ) + sp, nerr := NewLimitedSpeedometer(&testWriter{t: t}, &SpeedLimit{ + Burst: 1000, + Frame: 2 * time.Second, + CheckEveryBytes: 5000, + Delay: 500 * time.Millisecond, + }) + + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + + for i := 0; i < 4999; i++ { + if _, err = sp.Write([]byte("a")); err != nil { + t.Errorf("unexpected error: %v", err) + } + if i%1000 == 0 { + t.Logf("rate: %v per second", sp.Rate()) + } + if sp.Rate() < 1000 { + t.Errorf("shouldn't have slowed down yet (expected over %d): %v", sp.speedLimit.Burst, sp.Rate()) + } + } + if _, err = sp.Write([]byte("a")); err != nil { + t.Errorf("unexpected error: %v", err) + } + for i := 0; i < 10; i++ { + if _, err = sp.Write([]byte("a")); err != nil { + t.Errorf("unexpected error: %v", err) + } + t.Logf("rate: %v per second", sp.Rate()) + if sp.Rate() > 1000 { + t.Errorf("speeding in a school zone (expected under %d): %v", sp.speedLimit.Burst, sp.Rate()) + } + } + }) + }) + + // test capped speedometer + t.Run("OnlyALittle", func(t *testing.T) { + t.Parallel() + var ( + err error + ) + sp, nerr := NewCappedSpeedometer(&testWriter{t: t}, 1024) + if nerr != nil { + t.Errorf("unexpected error: %v", nerr) + } + for i := 0; i < 1024; i++ { + if _, err = sp.Write([]byte("a")); err != nil { + t.Errorf("unexpected error: %v", err) + } + if sp.Total() > 1024 { + t.Errorf("shouldn't have written more than 1024 bytes") + } + } + if _, err = sp.Write([]byte("a")); err == nil { + t.Errorf("expected error when writing over capacity") + } + }) + + t.Run("SynSynAckAck", func(t *testing.T) { + t.Parallel() + var ( + server net.Listener + err error + ) + //goland:noinspection GoCommentLeadingSpace + if server, err = net.Listen("tcp", ":8080"); err != nil { // #nosec:G102 - this is a unit test. + t.Fatalf("Failed to start server: %v", err) + } + defer func(server net.Listener) { + if cErr := server.Close(); cErr != nil { + t.Errorf("Failed to close server: %v", err) + } + }(server) + + go func() { + var ( + conn net.Conn + aErr error + ) + if conn, aErr = server.Accept(); aErr != nil { + t.Errorf("Failed to accept connection: %v", err) + } + + t.Logf("Accepted connection from %s", conn.RemoteAddr().String()) + + defer func(conn net.Conn) { + if cErr := conn.Close(); cErr != nil { + t.Errorf("Failed to close connection: %v", err) + } + }(conn) + + speedLimit := &SpeedLimit{ + Burst: 512, + Frame: time.Second, + CheckEveryBytes: 1, + Delay: 10 * time.Millisecond, + } + + var ( + speedometer *Speedometer + sErr error + ) + if speedometer, sErr = NewCappedLimitedSpeedometer(conn, speedLimit, 4096); sErr != nil { + t.Errorf("Failed to create speedometer: %v", sErr) + } + + buf := make([]byte, 1024) + for i := range buf { + targ := byte('E') + if i%2 == 0 { + targ = byte('e') + } + buf[i] = targ + } + for { + n, wErr := speedometer.Write(buf) + switch { + case errors.Is(wErr, io.EOF), errors.Is(wErr, ErrLimitReached): + return + case wErr != nil: + t.Errorf("Failed to write: %v", wErr) + case n != len(buf): + t.Errorf("Failed to write all bytes: %d", n) + default: + t.Logf("Wrote %d bytes", n) + } + } + }() + + var ( + client net.Conn + aErr error + ) + + if client, aErr = net.Dial("tcp", "localhost:8080"); aErr != nil { + t.Fatalf("Failed to connect to server: %v", err) + } + + defer func(client net.Conn) { + if clErr := client.Close(); clErr != nil { + t.Errorf("Failed to close client: %v", err) + } + }(client) + + buf := &bytes.Buffer{} + startTime := time.Now() + n, cpErr := io.Copy(buf, client) + if cpErr != nil { + t.Errorf("Failed to copy: %v", cpErr) + } + + duration := time.Since(startTime) + if buf.Len() == 0 || n == 0 { + t.Fatalf("No data received") + } + + rate := measureRate(t, n, duration) + + if rate > 512.0 { + t.Fatalf("Rate exceeded: got %f, expected <= 100.0", rate) + } + }) +} + +type badWrites struct { +} + +func (bw badWrites) Write(_ []byte) (int, error) { + return 0, io.EOF +} + +func TestImprobableEdgeCasesForCoverage(t *testing.T) { + t.Parallel() + sp, _ := NewSpeedometer(io.Discard) + sp.speedLimit = nil + if sp.slowDown() != nil { + t.Fatal("should have received no error when running slowdown with nil speedlimit") + } + sp, _ = NewLimitedSpeedometer(io.Discard, NewBytesPerSecondLimit(5)) + sp.speedLimit.Burst = 0 + if sp.slowDown() == nil { + t.Fatal("should have received error when running slowdown with invalid speedlimit") + } + sp, _ = NewLimitedSpeedometer(badWrites{}, NewBytesPerSecondLimit(5)) + if _, e := sp.Write([]byte("yeet")); !errors.Is(e, io.EOF) { + t.Errorf("wrong error from underlying writer err passdown: %v", e) + } + sp.speedLimit = nil + if _, e := sp.Write([]byte("yeet")); !errors.Is(e, io.EOF) { + t.Errorf("wrong error from underlying writer err passdown: %v", e) + } + if e := sp.Close(); e != nil { + t.Fatal("close err not nil") + } + if e := sp.Close(); !errors.Is(e, io.ErrClosedPipe) { + t.Errorf("wrong error from already closed speedo: %v", e) + } + if _, e := sp.increment(1); !errors.Is(e, io.ErrClosedPipe) { + t.Errorf("wrong error from already closed speedo: %v", e) + } + if _, err := NewLimitedSpeedometer(nil, nil); err == nil { + t.Fatal("should have received error when creating invalid speedo") + } + if _, err := NewLimitedSpeedometer(io.Discard, &SpeedLimit{}); err == nil { + t.Fatal("should have received error when creating invalid speedo") + } + sl := NewBytesPerSecondLimit(5) + sl.CheckEveryBytes = 0 + sl.Delay = 0 + var err error + if sp, err = NewLimitedSpeedometer(io.Discard, sl); err != nil { + t.Fatal("should have received no error when creating iffy speedo") + } + if sp.speedLimit.CheckEveryBytes != 5 { + t.Fatal("speed limit regularization failed") + } + if sp.speedLimit.Delay != time.Duration(100)*time.Millisecond { + t.Fatal("speed limit regularization failed") + } + +} + +func measureRate(t *testing.T, received int64, duration time.Duration) float64 { + t.Helper() + return float64(received) / duration.Seconds() +} From b8210b691c8f89393dd40d17eb4ff0858955bb2c Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 21 Oct 2023 13:50:23 -0700 Subject: [PATCH 03/11] Fix[CI]: fix go version in workflow --- .github/workflows/go.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c33d8b9..eca9b8c 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -11,7 +11,7 @@ jobs: fetch-depth: 2 - uses: actions/setup-go@v2 with: - go-version: '1.17' + go-version: '1.21' - name: Run coverage run: go test -v -race -coverprofile=coverage.txt -covermode=atomic ./... - name: Upload coverage to Codecov From d30d1734623bd6d58c80f705361ce4eb4cd98eb3 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 21 Oct 2023 13:52:27 -0700 Subject: [PATCH 04/11] Remove queue branch code from speedometer branch tree --- queue/errors.go | 5 -- queue/queue.go | 122 ------------------------------------------------ 2 files changed, 127 deletions(-) delete mode 100644 queue/errors.go delete mode 100644 queue/queue.go diff --git a/queue/errors.go b/queue/errors.go deleted file mode 100644 index d612f79..0000000 --- a/queue/errors.go +++ /dev/null @@ -1,5 +0,0 @@ -package queue - -import "errors" - -var ErrQueueFull = errors.New("bounded queue is full") diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 8c6bcd4..0000000 --- a/queue/queue.go +++ /dev/null @@ -1,122 +0,0 @@ -package queue - -import ( - "fmt" - "sync" - "sync/atomic" - - "git.tcp.direct/kayos/common/list" - "golang.org/x/exp/constraints" -) - -type Item[P constraints.Ordered, V any] struct { - value *V - priority P - index int -} - -func NewItem[P constraints.Ordered, V any](v *V, p P) *Item[P, V] { - return &Item[P, V]{ - value: v, - priority: p, - index: 0, - } -} - -func (i *Item[P, V]) Less(j any) bool { - return i.priority < j.(Item[P, V]).priority -} - -func (i *Item[P, V]) Value() *V { - return i.value -} - -func (i *Item[P, V]) Priority() P { - return i.priority -} - -func (i *Item[P, V]) Set(v *V) { - i.value = v -} - -func (i *Item[P, V]) SetPriority(p P) { - i.priority = p -} - -type boundedQueue[P constraints.Ordered, V any] struct { - items []*Item[P, V] - qcap int64 - qlen *atomic.Int64 - mu *sync.RWMutex - errs *list.LockingList -} - -type BoundedQueue[P constraints.Ordered, V any] struct { - // making sure container/heap can access boundedQueue without the normal API being exposed to the user - inner *boundedQueue[P, V] -} - -func NewBoundedQueue[P constraints.Ordered, V any](cap int64) *BoundedQueue[P, V] { - bq := &BoundedQueue[P, V]{} - bq.inner = &boundedQueue[P, V]{ - items: make([]*Item[P, V], 0, cap), - qcap: cap, - qlen: &atomic.Int64{}, - mu: &sync.RWMutex{}, - errs: list.New(), - } - bq.inner.qlen.Store(0) - return bq -} - -func (b *boundedQueue[P, V]) err(err error) { - b.errs.PushBack(err) -} - -func (b *BoundedQueue[P, V]) Err() error { - errLen := b.inner.errs.Len() - switch errLen { - case 0: - return nil - case 1: - return b.inner.errs.Front().Value().(error) - default: - return fmt.Errorf("%w | (%d more errors in queue)", b.inner.errs.Front().Value().(error), errLen-1) - } -} - -func (b *BoundedQueue[P, V]) Len() int { - return int(b.inner.qlen.Load()) -} - -func (b *boundedQueue[P, V]) Less(i, j int) bool { - b.mu.RLock() - less := b.items[P, V][i].priority < b.items[P, V][j].priority - b.mu.RUnlock() - return less -} - -func (b *boundedQueue[P, V]) Swap(i, j int) { - b.mu.Lock() - b.items[P, V][i], b.items[P, V][j] = b.items[P, V][j], b.items[P, V][i] - b.items[P, V][i].index = i - b.items[P, V][j].index = j - b.mu.Unlock() -} - -func (b *boundedQueue[P, V]) Push(x any) { - if b.qlen.Load() >= b.qcap { - b.err(fmt.Errorf("%w: %v dropped", ErrQueueFull, x)) - return - } - defer b.qlen.Add(1) - b.mu.Lock() - b.items[P, V] = append(b.items[P, V], x.(*Item[P, V])) - b.qlen.Add(1) - b.mu.Unlock() -} - -func (b *boundedQueue[P, V]) Pop() any { - // todo - return nil -} From 99403c33cc5dad49cb16a7622633006d10c95c46 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Mon, 8 Jan 2024 00:23:41 -0800 Subject: [PATCH 05/11] Style[tests]: adjust test log output --- speedometer/speedometer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/speedometer/speedometer_test.go b/speedometer/speedometer_test.go index cc5e4d3..2c0ee2d 100644 --- a/speedometer/speedometer_test.go +++ b/speedometer/speedometer_test.go @@ -282,7 +282,7 @@ func Test_Speedometer(t *testing.T) { } }) - t.Run("SynSynAckAck", func(t *testing.T) { + t.Run("limited TCP bandwidth", func(t *testing.T) { t.Parallel() var ( server net.Listener @@ -348,7 +348,7 @@ func Test_Speedometer(t *testing.T) { case n != len(buf): t.Errorf("Failed to write all bytes: %d", n) default: - t.Logf("Wrote %d bytes", n) + t.Logf("Wrote %d bytes (rate: %v/bps)", n, speedometer.Rate()) } } }() From 9c5d78eea457b7b5e0b894a5c1882ea530e4fc79 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Mon, 8 Jan 2024 00:35:54 -0800 Subject: [PATCH 06/11] Chore[deps]: go mod tidy --- go.mod | 7 +------ go.sum | 4 ---- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/go.mod b/go.mod index f383d4a..596c76b 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,4 @@ module github.com/yunginnanet/Rate5 go 1.18 -require ( - github.com/patrickmn/go-cache v2.1.0+incompatible - golang.org/x/exp v0.0.0-20231006140011-7918f672742d -) - -require git.tcp.direct/kayos/common v0.9.2 // indirect +require github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index 89ca189..33c1ec4 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,2 @@ -git.tcp.direct/kayos/common v0.9.2 h1:T7XzvmtPTmhXMTIb8v5W7a4DAV8ekusRkCe1WJ/qGBY= -git.tcp.direct/kayos/common v0.9.2/go.mod h1:rMExTem3JjB5XrwL+i+/QBv3agRFB8mdsoQ97wzBa+8= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= From 6fda3907504eb711cd0ab1c8c0c9f398d9008b42 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sun, 14 Jan 2024 21:55:56 -0800 Subject: [PATCH 07/11] Refactor[speedometer]: change module name --- speedometer/speedometer.go | 4 ++-- speedometer/speedometer_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/speedometer/speedometer.go b/speedometer/speedometer.go index 0e820af..7419b67 100644 --- a/speedometer/speedometer.go +++ b/speedometer/speedometer.go @@ -1,4 +1,4 @@ -package util +package speedometer import ( "errors" @@ -176,7 +176,7 @@ func (s *Speedometer) Close() error { return s.internal.slow.Load() }*/ -// Rate returns the rate at which data is being written to the underlying writer per second. +// Rate returns the bytes per second rate at which data is being written to the underlying writer. func (s *Speedometer) Rate() float64 { if s.internal.closed.Load() { return float64(s.Total()) / s.internal.duration.Load().Seconds() diff --git a/speedometer/speedometer_test.go b/speedometer/speedometer_test.go index 2c0ee2d..d0aeadf 100644 --- a/speedometer/speedometer_test.go +++ b/speedometer/speedometer_test.go @@ -1,4 +1,4 @@ -package util +package speedometer import ( "bytes" From 0ed82d8936e6281bc287382324e7c7ccf122a6ae Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 20 Jan 2024 04:46:06 -0800 Subject: [PATCH 08/11] Chore: tidy up --- speedometer/speedometer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/speedometer/speedometer.go b/speedometer/speedometer.go index 7419b67..9b94c7d 100644 --- a/speedometer/speedometer.go +++ b/speedometer/speedometer.go @@ -172,10 +172,6 @@ func (s *Speedometer) Close() error { return nil } -/*func (s *Speedometer) IsSlow() bool { - return s.internal.slow.Load() -}*/ - // Rate returns the bytes per second rate at which data is being written to the underlying writer. func (s *Speedometer) Rate() float64 { if s.internal.closed.Load() { From cbac6f9581b4c0120be38a1d55bec762cdbce86e Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 20 Jan 2024 04:58:43 -0800 Subject: [PATCH 09/11] Chore: tidy up + document --- speedometer/speedometer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/speedometer/speedometer.go b/speedometer/speedometer.go index 9b94c7d..984b821 100644 --- a/speedometer/speedometer.go +++ b/speedometer/speedometer.go @@ -183,16 +183,21 @@ func (s *Speedometer) Rate() float64 { func (s *Speedometer) slowDown() error { switch { case s.speedLimit == nil: + // welcome to the autobahn, motherfucker. return nil case s.speedLimit.Burst <= 0 || s.speedLimit.Frame <= 0, s.speedLimit.CheckEveryBytes <= 0, s.speedLimit.Delay <= 0: + // invalid speedLimit return errors.New("invalid speed limit") + case s.Total()%int64(s.speedLimit.CheckEveryBytes) != 0: + // if (total written [modulus] checkeverybytes is not 0) then our total byte count + // is not a multiple of our configured check frequency. + // bypass check and write at normal speed + return nil default: // } - if s.Total()%int64(s.speedLimit.CheckEveryBytes) != 0 { - return nil - } + s.internal.slow.Store(true) for s.Rate() > float64(s.speedLimit.Burst)/s.speedLimit.Frame.Seconds() { time.Sleep(s.speedLimit.Delay) From fa40018d6b871b5416a2f7ea403cfa68e120150a Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 20 Jan 2024 04:59:28 -0800 Subject: [PATCH 10/11] Chore[CI]: Remove defunct commit summarizer --- .github/workflows/gpt-commit-summarizer.yml | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 .github/workflows/gpt-commit-summarizer.yml diff --git a/.github/workflows/gpt-commit-summarizer.yml b/.github/workflows/gpt-commit-summarizer.yml deleted file mode 100644 index d3b6fe4..0000000 --- a/.github/workflows/gpt-commit-summarizer.yml +++ /dev/null @@ -1,15 +0,0 @@ -name: GPT Commits summarizer - -on: - pull_request_target: - types: [ opened, synchronize ] - -jobs: - summarize: - runs-on: ubuntu-latest - permissions: write-all - steps: - - uses: KanHarI/gpt-commit-summarizer@master - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} From c1ed13396b0c7ee7f5adee09bec5d32797716db9 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Sat, 20 Jan 2024 05:03:29 -0800 Subject: [PATCH 11/11] Chore: remove unused atomic.Bool + doc --- speedometer/speedometer.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/speedometer/speedometer.go b/speedometer/speedometer.go index 984b821..06b4f1d 100644 --- a/speedometer/speedometer.go +++ b/speedometer/speedometer.go @@ -34,7 +34,6 @@ type atomics struct { stop *sync.Once birth *atomic.Pointer[time.Time] duration *atomic.Pointer[time.Duration] - slow *atomic.Bool } func newAtomics() atomics { @@ -45,7 +44,6 @@ func newAtomics() atomics { stop: new(sync.Once), birth: new(atomic.Pointer[time.Time]), duration: new(atomic.Pointer[time.Duration]), - slow: new(atomic.Bool), } manhattan.birth.Store(&time.Time{}) manhattan.closed.Store(false) @@ -198,11 +196,12 @@ func (s *Speedometer) slowDown() error { // } - s.internal.slow.Store(true) + // the slowing will continue until morale improves + // (sleep until our overall rate re-enters acceptable threshhold) for s.Rate() > float64(s.speedLimit.Burst)/s.speedLimit.Frame.Seconds() { time.Sleep(s.speedLimit.Delay) } - s.internal.slow.Store(false) + return nil }