Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The size encoded in a entry header contains the header it self. #266

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
@@ -38,19 +38,23 @@ type queueError struct {
message string
}

// getUvarintSize returns the number of bytes to encode x in uvarint format
func getUvarintSize(x uint32) int {
if x < 128 {
return 1
} else if x < 16384 {
return 2
} else if x < 2097152 {
return 3
} else if x < 268435456 {
return 4
} else {
return 5
// getNeededSize returns the number of bytes an entry of length need in the queue
func getNeededSize(length int) int {
var header int
switch {
case length < 127: // 1<<7-1
header = 1
case length < 16382: // 1<<14-2
header = 2
case length < 2097149: // 1<<21 -3
header = 3
case length < 268435452: // 1<<28 -4
header = 4
default:
header = 5
}

return length + header
}

// NewBytesQueue initialize new bytes queue.
@@ -82,22 +86,21 @@ func (q *BytesQueue) Reset() {
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
dataLen := len(data)
headerEntrySize := getUvarintSize(uint32(dataLen))
neededSize := getNeededSize(len(data))

if !q.canInsertAfterTail(dataLen + headerEntrySize) {
if q.canInsertBeforeHead(dataLen + headerEntrySize) {
if !q.canInsertAfterTail(neededSize) {
if q.canInsertBeforeHead(neededSize) {
q.tail = leftMarginIndex
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
} else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 {
return -1, &queueError{"Full queue. Maximum size limit reached."}
} else {
q.allocateAdditionalMemory(dataLen + headerEntrySize)
q.allocateAdditionalMemory(neededSize)
}
}

index := q.tail

q.push(data, dataLen)
q.push(data, neededSize)

return index, nil
}
@@ -120,9 +123,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {

if q.tail <= q.head {
if q.tail != q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
// created slice is slightly larger then need but this is fine after only the needed bytes are copied
q.push(make([]byte, q.head-q.tail), q.head-q.tail)
}

q.head = leftMarginIndex
@@ -141,7 +143,7 @@ func (q *BytesQueue) push(data []byte, len int) {
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
q.copy(q.headerBuffer, headerEntrySize)

q.copy(data, len)
q.copy(data, len-headerEntrySize)

if q.tail > q.head {
q.rightMargin = q.tail
@@ -159,13 +161,12 @@ func (q *BytesQueue) copy(data []byte, len int) {

// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
data, headerEntrySize, err := q.peek(q.head)
data, blockSize, err := q.peek(q.head)
if err != nil {
return nil, err
}
size := len(data)

q.head += headerEntrySize + size
q.head += blockSize
q.count--

if q.head == q.rightMargin {
@@ -238,7 +239,7 @@ func (q *BytesQueue) peek(index int) ([]byte, int, error) {
}

blockSize, n := binary.Uvarint(q.array[index:])
return q.array[index+n : index+n+int(blockSize)], n, nil
return q.array[index+n : index+int(blockSize)], int(blockSize), nil
}

// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
25 changes: 25 additions & 0 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
@@ -224,6 +224,31 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef
assertEqual(t, blob('d', 40), pop(queue))
}

func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBeforeHead128(t *testing.T) {
t.Parallel()

// given
queue := NewBytesQueue(200, 0, false)

// when
queue.Push(blob('a', 30)) // header + entry + left margin = 32 bytes
queue.Push(blob('b', 1)) // 32 + 128 + 1 = 161 bytes
queue.Push(blob('b', 125)) // 32 + 128 + 1 = 161 bytes
queue.Push(blob('c', 20)) // 160 + 20 + 1 = 182
queue.Pop() // space freed at the beginning
queue.Pop() // free 2 bytes
queue.Pop() // free 126
queue.Push(blob('d', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer, now free space is 128 bytes
queue.Push(blob('e', 160)) // invoke allocateAdditionalMemory but fill 127 bytes free space (It should be 128 bytes, but 127 are filled, leaving one byte unfilled)

// then
assertEqual(t, 400, queue.Capacity())
assertEqual(t, blob('d', 30), pop(queue))
assertEqual(t, blob(0, 126), pop(queue)) //126 bytes data with 2bytes header only possible as empty entry
assertEqual(t, blob('c', 20), pop(queue)) //The data is not expected
assertEqual(t, blob('e', 160), pop(queue))
}

func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBeforeHead(t *testing.T) {
t.Parallel()