diff --git a/go.mod b/go.mod index d10e3ce..041c3dd 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( ) require ( + github.com/sushydev/ring_buffer_go v0.1.2 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d // indirect golang.org/x/net v0.30.0 // indirect diff --git a/go.sum b/go.sum index 02ead36..33f548c 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/sushydev/ring_buffer_go v0.1.2 h1:6NpdtDSiSD0EaiH2ZCHU5JuWBkCjzVA40L5goREPNHg= +github.com/sushydev/ring_buffer_go v0.1.2/go.mod h1:BEce5YqSc/+vdIR0sLXp2pcGSpwUvOrSjG96VormGZg= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/stream/buffer/main.go b/stream/buffer/main.go deleted file mode 100644 index bc841d5..0000000 --- a/stream/buffer/main.go +++ /dev/null @@ -1,262 +0,0 @@ -package buffer - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" -) - -type Buffer struct { - data []byte - startPosition atomic.Uint64 // The logical start position of the buffer - - readPosition atomic.Uint64 // The position where the next read will happen - writePosition atomic.Uint64 // The position where the next write will happen - - count atomic.Uint64 // The number of bytes currently in the buffer - - readPage atomic.Uint64 - writePage atomic.Uint64 - - mu sync.RWMutex - - closed bool -} - -func NewBuffer(size uint64, startPosition uint64) *Buffer { - buffer := &Buffer{ - data: make([]byte, size), - } - - buffer.SetStartPosition(startPosition) - - return buffer -} - -func (buffer *Buffer) Cap() uint64 { - return uint64(cap(buffer.data)) -} - -func (buffer *Buffer) ReadAt(p []byte, position uint64) (int, error) { - if buffer.closed { - return 0, errors.New("buffer is closed") - } - - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - bufferCount := buffer.count.Load() - if bufferCount <= 0 { - return 0, errors.New("buffer is empty") - } - - if !buffer.IsPositionInBuffer(position) { - return 0, errors.New(fmt.Sprintf("position %d is not in buffer", position)) - } - - bufferCap := buffer.Cap() - relativePos := buffer.GetRelativePosition(position) - bufferPosition := relativePos % bufferCap - - readPosition := buffer.readPosition.Load() - - writePosition := buffer.writePosition.Load() - - requestedSize := uint64(len(p)) - - var readSize uint64 - if bufferCount == bufferCap && readPosition == writePosition { - readSize = min(requestedSize, bufferCap) - } else if writePosition >= bufferPosition { - readSize = min(requestedSize, writePosition-bufferPosition) - } else { - readSize = min(requestedSize, bufferCap-bufferPosition+writePosition) - } - - if bufferPosition+readSize <= bufferCap { - copy(p, buffer.data[bufferPosition:bufferPosition+readSize]) - } else { - firstPart := bufferCap - bufferPosition - copy(p, buffer.data[bufferPosition:bufferCap]) - copy(p[firstPart:], buffer.data[0:readSize-firstPart]) - } - - newReadPosition := (bufferPosition + readSize) % bufferCap - if newReadPosition <= readPosition { - buffer.readPage.Add(1) - } - - buffer.readPosition.Store(newReadPosition) - - buffer.count.Store(bufferCount - readSize) - - return int(readSize), nil -} - -func (buffer *Buffer) Write(p []byte) (int, error) { - if buffer.closed { - return 0, errors.New("buffer is closed") - } - - buffer.mu.Lock() - defer buffer.mu.Unlock() - - bufferCap := buffer.Cap() - requestedSize := uint64(len(p)) - - if requestedSize > bufferCap { - return 0, fmt.Errorf("write data exceeds buffer size: %d", requestedSize) - } - - availableSpace := buffer.GetBytesToOverwrite() - if requestedSize > availableSpace { - return 0, fmt.Errorf("not enough space in buffer: %d/%d", requestedSize, availableSpace) - } - - bufferCount := buffer.count.Load() - writePosition := buffer.writePosition.Load() - - if writePosition+requestedSize <= bufferCap { - copy(buffer.data[writePosition:], p) - } else { - firstPart := bufferCap - writePosition - copy(buffer.data[writePosition:], p[:firstPart]) - copy(buffer.data[0:], p[firstPart:]) - } - - newWritePosition := (writePosition + requestedSize) % bufferCap - if newWritePosition <= writePosition { - buffer.writePage.Add(1) - } - - buffer.writePosition.Store(newWritePosition) - - buffer.count.Store(bufferCount + requestedSize) - - return int(requestedSize), nil -} - -func (buffer *Buffer) SetStartPosition(position uint64) { - buffer.startPosition.Store(position) -} - -func (buffer *Buffer) GetStartPosition() uint64 { - return buffer.startPosition.Load() -} - -func (buffer *Buffer) GetRelativePosition(position uint64) uint64 { - return position - buffer.startPosition.Load() -} - -func (buffer *Buffer) IsPositionInBufferSync(position uint64) bool { - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - return buffer.IsPositionInBuffer(position) -} - -// Checks if the given logical position is within the readPos and writePos. -func (buffer *Buffer) IsPositionInBuffer(position uint64) bool { - relativePosition := buffer.GetRelativePosition(position) - if relativePosition < 0 { - return false - } - - bufferCap := buffer.Cap() - - if bufferCap == 0 { - return false - } - - bufferPosition := relativePosition % bufferCap - bufferPositionPage := relativePosition / bufferCap - - readPage := buffer.readPage.Load() - readPosition := buffer.readPosition.Load() - writePage := buffer.writePage.Load() - writePosition := buffer.writePosition.Load() - - // Case 0: The buffer is empty. - if readPage == 0 && writePage == 0 && readPosition == 0 && writePosition == 0 { - return false - } - - if readPage == writePage { - // Case 1: Same page, position must be between readPosition and writePosition. - return bufferPosition >= readPosition && bufferPosition < writePosition - } - - if bufferPositionPage == readPage { - // Case 2: Position is on the read page. - return bufferPosition >= readPosition - } - - if bufferPositionPage == writePage { - // Case 3: Position is on the write page. - return bufferPosition < writePosition - } - - // Case 4: Position is in between readPage and writePage when they are not the same. - return readPage < writePage -} - -func (buffer *Buffer) WaitForPositionInBuffer(context context.Context, position uint64) { - for { - if buffer.closed { - return - } - - if buffer.IsPositionInBufferSync(position) { - return - } - - select { - case <-context.Done(): - return - case <-time.After(100 * time.Microsecond): - } - } -} - -func (buffer *Buffer) GetBytesToOverwriteSync() uint64 { - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - return buffer.GetBytesToOverwrite() -} - -func (buffer *Buffer) GetBytesToOverwrite() uint64 { - if buffer.closed { - return 0 - } - - bufferCap := buffer.Cap() - bufferCount := buffer.count.Load() - - return bufferCap - bufferCount -} - -func (buffer *Buffer) Reset(position uint64) { - buffer.mu.Lock() - defer buffer.mu.Unlock() - - buffer.SetStartPosition(position) - buffer.writePage.Store(0) - buffer.writePosition.Store(0) - buffer.readPage.Store(0) - buffer.readPosition.Store(0) - buffer.count.Store(0) - buffer.data = make([]byte, buffer.Cap()) -} - -func (buffer *Buffer) Close() { - buffer.mu.Lock() - defer buffer.mu.Unlock() - - buffer.data = nil - - buffer.closed = true -} diff --git a/stream/buffer/main.go.bak b/stream/buffer/main.go.bak deleted file mode 100644 index dad8ebf..0000000 --- a/stream/buffer/main.go.bak +++ /dev/null @@ -1,235 +0,0 @@ -package buffer - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" -) - -type Buffer struct { - data []byte - startPosition atomic.Uint64 // The logical start position of the buffer - - readPosition atomic.Uint64 // The position where the next read will happen - writePosition atomic.Uint64 // The position where the next write will happen - full bool - - mu sync.RWMutex - - closed bool -} - -func NewBuffer(size uint64, startPosition uint64) *Buffer { - buffer := &Buffer{ - data: make([]byte, size), - } - - buffer.SetStartPosition(startPosition) - - return buffer -} - -func (buffer *Buffer) Cap() uint64 { - return uint64(cap(buffer.data)) -} - -func (buffer *Buffer) ReadAt(p []byte, position uint64) (int, error) { - if buffer.closed { - return 0, errors.New("buffer is closed") - } - - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - availableSpace := buffer.GetBytesToOverwrite() - if availableSpace == 0 { - return 0, fmt.Errorf("not enough space in buffer: %d", availableSpace) - } - - requestedSize := uint64(len(p)) - if requestedSize > availableSpace { - p = p[:availableSpace] - } - - bufferCap := buffer.Cap() - relativePos := buffer.GetRelativePosition(position) - bufferPosition := relativePos % bufferCap - - // fmt.Println("ReadAt: bufferPos", bufferPos, "readPosition", readPosition, "writePosition", writePosition, "bufferCount", bufferCount, "requestedSize", requestedSize, "relativePos", relativePos) - - endSpace := bufferCap - bufferPosition - - var readSize int - if requestedSize <= endSpace { - copy(p, buffer.data[bufferPosition:bufferPosition+requestedSize]) - } else { - firstPart := bufferCap - bufferPosition - secondPart := requestedSize - firstPart - - copy(p, buffer.data[bufferPosition:]) - copy(p[firstPart:], buffer.data[:secondPart]) - } - - newReadPosition := (bufferPosition + uint64(readSize)) % bufferCap - buffer.readPosition.Store(newReadPosition) - - buffer.full = false - - // fmt.Printf("Read position %d, Read page %d, n %d\n", newReadPosition, readPage, n) - - return int(readSize), nil -} - -// Write writes data to the ring buffer from p. -func (buffer *Buffer) Write(p []byte) (int, error) { - if buffer.closed { - return 0, errors.New("buffer is closed") - } - - buffer.mu.Lock() - defer buffer.mu.Unlock() - - bufferCap := buffer.Cap() - - availableSpace := bufferCap - buffer.GetBytesToOverwrite() - 1 - - requestedSize := uint64(len(p)) - if requestedSize > availableSpace { - return 0, fmt.Errorf("not enough space in buffer: %d/%d", requestedSize, availableSpace) - } - - writePosition := buffer.writePosition.Load() - endSpace := bufferCap - writePosition - - // if buffer is not full yet we need to append rather than copy - if requestedSize <= endSpace { - // No wraparound needed. - copy(buffer.data[writePosition:], p) - } else { - copy(buffer.data[writePosition:], p[:endSpace]) - copy(buffer.data[:], p[endSpace:]) - } - - newWritePosition := (writePosition + requestedSize) % bufferCap - buffer.writePosition.Store(newWritePosition) - - readPosition := buffer.readPosition.Load() - - buffer.full = newWritePosition == readPosition - - // fmt.Printf("Write position %d, Write page %d\n", newWritePosition, writePage) - - return int(requestedSize), nil -} - -func (buffer *Buffer) SetStartPosition(position uint64) { - buffer.startPosition.Store(position) -} - -func (buffer *Buffer) GetStartPosition() uint64 { - return buffer.startPosition.Load() -} - -func (buffer *Buffer) GetRelativePosition(position uint64) uint64 { - return position - buffer.startPosition.Load() -} - -// Checks if the given logical position is within the readPos and writePos. -func (buffer *Buffer) IsPositionInBufferSync(position uint64) bool { - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - return buffer.IsPositionInBuffer(position) -} - -func (buffer *Buffer) IsPositionInBuffer(position uint64) bool { - relativePosition := buffer.GetRelativePosition(position) - if relativePosition < 0 { - return false - } - - bufferCap := buffer.Cap() - bufferPosition := relativePosition % bufferCap - - readPosition := buffer.readPosition.Load() - writePosition := buffer.writePosition.Load() - - if buffer.full { - return bufferPosition >= 0 && bufferPosition < bufferCap - } - - if readPosition <= writePosition { - return bufferPosition >= readPosition && bufferPosition < writePosition - } - - return bufferPosition >= readPosition || bufferPosition < writePosition -} - -func (buffer *Buffer) WaitForPositionInBuffer(position uint64, context context.Context) { - for { - if buffer.closed { - return - } - - if buffer.IsPositionInBufferSync(position) { - return - } - - select { - case <-context.Done(): - return - case <-time.After(100 * time.Microsecond): - } - - } -} - -func (buffer *Buffer) GetBytesToOverwriteSync() uint64 { - buffer.mu.RLock() - defer buffer.mu.RUnlock() - - return buffer.GetBytesToOverwrite() -} - -func (buffer *Buffer) GetBytesToOverwrite() uint64 { - if buffer.closed { - return 0 - } - - bufferCap := buffer.Cap() - - writePosition := buffer.writePosition.Load() - readPosition := buffer.readPosition.Load() - - if buffer.full { - return bufferCap - } - if writePosition >= readPosition { - return writePosition - readPosition - } - return bufferCap - (readPosition - writePosition) -} - -func (buffer *Buffer) Reset(position uint64) { - buffer.mu.Lock() - defer buffer.mu.Unlock() - - buffer.SetStartPosition(position) - buffer.writePosition.Store(0) - buffer.readPosition.Store(0) - buffer.data = make([]byte, buffer.Cap()) - buffer.full = false -} - -func (buffer *Buffer) Close() { - buffer.mu.Lock() - defer buffer.mu.Unlock() - - buffer.data = nil - - buffer.closed = true -} - diff --git a/stream/buffer/main_test.go b/stream/buffer/main_test.go deleted file mode 100644 index a056b71..0000000 --- a/stream/buffer/main_test.go +++ /dev/null @@ -1,417 +0,0 @@ -package buffer - -import ( - "fmt" - "testing" -) - -const startPositionOffet = 349087234 - -func TestWrite(t *testing.T) { - buffer := NewBuffer(10, startPositionOffet) - buffer.Write([]byte{0, 1, 2, 3, 4}) - - for i, b := range buffer.data { - if i < 5 && b != byte(i) { - t.Errorf("Expected %d, got %d", i, b) - } else if i >= 5 && b != 0 { - t.Errorf("Expected 0, got %d", b) - } - } - - writePosition := buffer.writePosition.Load() - - if writePosition != 5 { - t.Errorf("Expected 5, got %d", writePosition) - } - - buffer.Write([]byte{5, 6, 7, 8, 9}) - - for i, b := range buffer.data { - if i < 10 && b != byte(i) { - t.Errorf("Expected %d, got %d", i, b) - } else if i >= 10 && b != byte(i-10) { - t.Errorf("Expected %d, got %d", i-10, b) - } - } - - if buffer.writePosition.Load() != 0 { - t.Errorf("Expected 0, got %d", buffer.writePosition.Load()) - } - - _, err := buffer.ReadAt(make([]byte, 5), startPositionOffet+0) - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - buffer.Write([]byte{10, 11, 12, 13, 14}) - - if buffer.writePosition.Load() != 5 { - t.Errorf("Expected 5, got %d", buffer.writePosition.Load()) - } - - bytesToOverflow := []byte{15, 16, 17, 18, 19, 20, 21, 22, 23, 24} - - _, err = buffer.Write(bytesToOverflow) - if err == nil { - t.Errorf("Expected error, got nil") - } - - requestedSize := len(bytesToOverflow) - - if err.Error() != fmt.Sprintf("not enough space in buffer: %d/%d", requestedSize, 0) { - t.Errorf("Expected 'not enough space in buffer: %d/%d, got %s", requestedSize, 0, err) - } - - newBuffer := NewBuffer(10, startPositionOffet) - - bytesToOverflow = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - - _, err = newBuffer.Write(bytesToOverflow) - if err == nil { - t.Errorf("Expected error, got nil") - } - - requestedSize = len(bytesToOverflow) - - if err.Error() != fmt.Sprintf("write data exceeds buffer size: %d", requestedSize) { - t.Errorf("Expected 'write data exceeds buffer size: %d', got %s", requestedSize, err) - } -} - -func TestRead(t *testing.T) { - buffer := NewBuffer(10, startPositionOffet) - - bytesToWrite := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - - buffer.Write(bytesToWrite) - - testBuffer := make([]byte, 5) - - _, err := buffer.ReadAt(testBuffer, startPositionOffet+0) - - for i, b := range testBuffer { - if b != bytesToWrite[i] { - t.Errorf("Expected %d, got %d", bytesToWrite[i], b) - } - } - - if buffer.readPosition.Load() != 5 { - t.Errorf("Expected 5, got %d", buffer.readPosition.Load()) - } - - _, err = buffer.ReadAt(testBuffer, startPositionOffet+5) - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - for i, b := range testBuffer { - i += 5 - - if b != bytesToWrite[i] { - t.Errorf("Expected %d, got %d", bytesToWrite[i], b) - } - } - - if buffer.readPosition.Load() != 0 { - t.Errorf("Expected 0, got %d", buffer.readPosition.Load()) - } - - buffer.Write([]byte{11, 12, 13, 14}) - - testBuffer = make([]byte, 5) - n, err := buffer.ReadAt(testBuffer, startPositionOffet+12) // = len(testBuffer) + 2 = 7 - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - if buffer.readPosition.Load() != int64(n+2) { - t.Errorf("Expected %d, got %d", n+2, buffer.readPosition.Load()) - } -} - -func TestWriteRead(t *testing.T) { - buffer := NewBuffer(10, startPositionOffet) - testBuffer := make([]byte, 5) - - bytes := []byte{0, 1, 2, 3, 4} - buffer.Write(bytes) - // t.Logf("Buffer: %v", buffer.data) - - _, err := buffer.ReadAt(testBuffer, startPositionOffet+0) - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - // t.Logf("Test buffer: %v", testBuffer) - - for i, b := range testBuffer { - if b != bytes[i] { - t.Errorf("Expected %d, got %d", bytes[i], b) - } - } - - t.Log("Segment 1 -- Complete") - - bytes = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - buffer.Write(bytes[5:]) - // t.Logf("Buffer: %v", buffer.data) - - _, err = buffer.ReadAt(testBuffer, startPositionOffet+5) - // t.Logf("Test buffer: %v", testBuffer) - - for i, b := range testBuffer { - i += 5 - - if b != bytes[i] { - t.Errorf("Expected %d, got %d", bytes[i], b) - } - } - - t.Log("Segment 2 -- Complete") - - bytes = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14} - buffer.Write(bytes[10:]) - // t.Logf("Buffer: %v", buffer.data) - - _, err = buffer.ReadAt(testBuffer, startPositionOffet+10) - // t.Logf("Test buffer: %v", testBuffer) - - for i, b := range testBuffer { - i += 10 - - if b != bytes[i] { - t.Errorf("Expected %d, got %d", bytes[i], b) - } - } - - t.Log("Segment 3 -- Complete") -} - -func TestReadWriteWrap(t *testing.T) { - buffer := NewBuffer(10, startPositionOffet) - var bytes []byte - var testBuffer []byte - var expectedWrittenData []byte - var expectedReadData []byte - - bytes = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - buffer.Write(bytes) - // t.Logf("Buffer: %v", buffer.data) - - testBuffer = make([]byte, 5) - _, err := buffer.ReadAt(testBuffer, startPositionOffet+0) - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - // t.Logf("Test buffer: %v", testBuffer) - - expectedReadData = []byte{0, 1, 2, 3, 4} - - for i, b := range testBuffer { - if b != expectedReadData[i] { - t.Errorf("Expected %d, got %d", expectedReadData[i], b) - } - } - - t.Log("Segment 1 -- Complete") - - bytes = []byte{10, 11, 12, 13, 14} - buffer.Write(bytes) - // t.Logf("Buffer: %v", buffer.data) - - expectedWrittenData = []byte{10, 11, 12, 13, 14, 5, 6, 7, 8, 9} - - for i, b := range buffer.data { - if b != expectedWrittenData[i] { - t.Errorf("Write: Expected %d, got %d", expectedWrittenData[i], b) - } - } - - testBuffer = make([]byte, 10) - _, err = buffer.ReadAt(testBuffer, startPositionOffet+5) - // t.Logf("Test buffer: %v", testBuffer) - - expectedReadData = []byte{5, 6, 7, 8, 9, 10, 11, 12, 13, 14} - - for i, b := range testBuffer { - if b != expectedReadData[i] { - t.Errorf("Read: Expected %d, got %d", expectedReadData[i], b) - } - } - - t.Log("Segment 2 -- Complete") -} - -// Helper to check if a position is in the buffer and log errors if expectations are not met. -func checkPosition(t *testing.T, buffer *Buffer, position int64, expected bool) { - actual := buffer.IsPositionInBufferSync(position) - if actual != expected { - t.Errorf("At position %d: expected %v, got %v", position, expected, actual) - } -} - -// Helper to write data into the buffer and verify positions. -func writeAndCheck(t *testing.T, buffer *Buffer, data []byte, expectedPositions map[int64]bool) { - buffer.Write(data) - for pos, expected := range expectedPositions { - checkPosition(t, buffer, pos, expected) - } -} - -// Main test function for buffer position checks. -func TestIsPositionInBuffer(t *testing.T) { - // Case 1: Test with buffer starting at position 0. - buffer := NewBuffer(10, 0) - fmt.Println("Testing with start position 0") - - // Check position 0 before writing. - checkPosition(t, buffer, 0, false) - - // Write one byte and verify positions. - writeAndCheck(t, buffer, []byte{1}, map[int64]bool{ - 0: true, // Position 0 should now be valid. - 1: false, - }) - - // Write more data and verify positions. - writeAndCheck(t, buffer, []byte{2, 3, 4, 5, 6, 7, 8, 9, 10}, map[int64]bool{ - 0: true, // Data at position 0 should still be present. - 1: true, // Data extends to position 1. - 10: false, // Position 10 should be outside the buffer. - 11: false, // Position 11 should also be outside. - }) - - // Read some data from the buffer and verify positions are updated. - _, err := buffer.ReadAt(make([]byte, 9), 0) - if err != nil { - t.Errorf("Expected nil error on read, got %s", err) - } - - // Write more data and verify positions. - writeAndCheck(t, buffer, []byte{11, 12, 13, 14, 15}, map[int64]bool{ - 11: true, - 0: false, // Data at position 0 has been read, so it should now be invalid. - 1: false, - 14: true, - 15: false, - }) - - // Read more and verify positions again. - _, err = buffer.ReadAt(make([]byte, 5), 11) - if err != nil { - t.Errorf("Expected nil error on read, got %s", err) - } - - // Position 11 should be invalid after reading. - checkPosition(t, buffer, 11, false) - - // Case 2: Test with buffer starting at a non-zero position. - startOffset := int64(34324) - buffer = NewBuffer(10, startOffset) - fmt.Println("Testing with start position", startOffset) - - // Check position 0 before writing. - checkPosition(t, buffer, startOffset+0, false) - - // Write one byte and verify positions. - writeAndCheck(t, buffer, []byte{1}, map[int64]bool{ - startOffset + 0: true, // Position 0 should now be valid. - startOffset + 1: false, - }) - - // Write more data and verify positions. - writeAndCheck(t, buffer, []byte{2, 3, 4, 5, 6, 7, 8, 9, 10}, map[int64]bool{ - startOffset + 0: true, // Data at position 0 should still be present. - startOffset + 1: true, // Data extends to position 1. - startOffset + 10: false, // Position 10 should be outside the buffer. - startOffset + 11: false, // Position 11 should also be outside. - }) - - // Read some data from the buffer and verify positions are updated. - _, err = buffer.ReadAt(make([]byte, 9), startOffset+0) - if err != nil { - t.Errorf("Expected nil error on read, got %s", err) - } - - // Write more data and verify positions. - writeAndCheck(t, buffer, []byte{11, 12, 13, 14, 15}, map[int64]bool{ - startOffset + 11: true, - startOffset + 0: false, // Data at position 0 has been read, so it should now be invalid. - startOffset + 1: false, - startOffset + 14: true, - startOffset + 15: false, - }) - - // Read more and verify positions again. - _, err = buffer.ReadAt(make([]byte, 5), startOffset+11) - if err != nil { - t.Errorf("Expected nil error on read, got %s", err) - } - - // Position 11 should be invalid after reading. - checkPosition(t, buffer, startOffset+11, false) -} - -// Not good yet -func TestGetBytesToOverwrite(t *testing.T) { - buffer := NewBuffer(10, startPositionOffet) - - // Initial state: empty buffer - bytesToOverwrite := buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 10 { - t.Errorf("Expected 10, got %d", bytesToOverwrite) - } - - // Single Write - fmt.Println() - buffer.Write([]byte{1, 2, 3, 4, 5}) - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 5 { - t.Errorf("Expected 5 after writing 5 bytes, got %d", bytesToOverwrite) - } - - // Single Read - fmt.Println() - buffer.ReadAt(make([]byte, 2), startPositionOffet+0) // Reading 2 bytes - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 7 { - t.Errorf("Expected 7 after reading 2 bytes, got %d", bytesToOverwrite) - } - - // Overwriting: Write 3 more bytes - fmt.Println() - buffer.Write([]byte{6, 7, 8}) - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 4 { - t.Errorf("Expected 4 after writing 3 more bytes, got %d", bytesToOverwrite) - } - - // Write until the buffer is full - fmt.Println() - buffer.Write([]byte{9, 10, 1, 2}) // This will fill the buffer completely - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 0 { - t.Errorf("Expected 0 after filling the buffer, got %d", bytesToOverwrite) - } - - // Read 5 bytes from the buffer w12 - r7 - fmt.Println() - buffer.ReadAt(make([]byte, 5), startPositionOffet+2) - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 5 { - t.Errorf("Expected 5 after reading 5 bytes, got %d", bytesToOverwrite) - } - - fmt.Println() - _, err := buffer.ReadAt(make([]byte, 3), startPositionOffet+8) - if err != nil { - t.Errorf("Expected nil, got %s", err) - } - - bytesToOverwrite = buffer.GetBytesToOverwriteSync() - if bytesToOverwrite != 9 { - t.Errorf("Expected 9 after reading 3 bytes, got %d", bytesToOverwrite) - } -} diff --git a/stream/main.go b/stream/main.go index 16cfa75..1d9052e 100644 --- a/stream/main.go +++ b/stream/main.go @@ -10,7 +10,8 @@ import ( "time" "fuse_video_steamer/logger" - "fuse_video_steamer/stream/buffer" + + ring_buffer "github.com/sushydev/ring_buffer_go" ) type Stream struct { @@ -25,7 +26,7 @@ type Stream struct { cancel context.CancelFunc wg sync.WaitGroup - buffer *buffer.Buffer + buffer ring_buffer.LockingRingBufferInterface seekPosition atomic.Uint64 logger *logger.Logger @@ -43,8 +44,6 @@ func NewStream(url string, size uint64) *Stream { panic(err) } - buffer := buffer.NewBuffer(min(size, bufferCreateSize), 0) - client := &http.Client{ Transport: &http.Transport{ MaxIdleConns: 1, @@ -55,6 +54,10 @@ func NewStream(url string, size uint64) *Stream { Timeout: 6 * time.Hour, } + bufferContext, _ := context.WithCancel(context.Background()) + + buffer := ring_buffer.NewLockingRingBuffer(bufferContext, bufferCreateSize, 0) + return &Stream{ url: url, size: size, @@ -116,7 +119,7 @@ func (stream *Stream) startStream(seekPosition uint64) { case <-time.After(retryDelay): } - bytesToOverwrite := max(stream.buffer.GetBytesToOverwriteSync(), 0) + bytesToOverwrite := max(stream.buffer.GetBytesToOverwrite(), 0) chunkSizeToRead := min(uint64(len(chunk)), bytesToOverwrite) if chunkSizeToRead == 0 { @@ -194,7 +197,7 @@ func (stream *Stream) checkAndStartBufferIfNeeded(seekPosition uint64, requested return } - seekInBuffer := stream.buffer.IsPositionInBufferSync(seekPosition) + seekInBuffer := stream.buffer.IsPositionInBuffer(seekPosition) if !seekInBuffer { stream.stopStream() @@ -205,23 +208,23 @@ func (stream *Stream) checkAndStartBufferIfNeeded(seekPosition uint64, requested stream.wg.Add(1) - stream.buffer.Reset(seekPosition) + stream.buffer.ResetToPosition(seekPosition) go stream.startStream(seekPosition) waitForSize := min(seekPosition+requestedSize, stream.size) - stream.buffer.WaitForPositionInBuffer(stream.context, waitForSize) + stream.buffer.WaitForPositionInBuffer(waitForSize) return } - dataInBuffer := stream.buffer.IsPositionInBufferSync(seekPosition + requestedSize) + dataInBuffer := stream.buffer.IsPositionInBuffer(seekPosition + requestedSize) if !dataInBuffer { waitForSize := min(seekPosition+requestedSize, stream.size) - stream.buffer.WaitForPositionInBuffer(stream.context, waitForSize) + stream.buffer.WaitForPositionInBuffer(waitForSize) } } @@ -270,7 +273,7 @@ func (stream *Stream) Close() error { defer stream.mu.Unlock() stream.stopStream() - stream.buffer.Close() + // stream.buffer.Close() return nil }