Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE-636 | Add pipeline package #570

Merged
merged 12 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions domain/pipeline/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pipeline

import (
"fmt"
"sync"
)

// Iterator interface defines methods for filtering, sorting, and chunked access
type Iterator[K, V any] interface {
Next() (V, error) // Retrieves the next element and a bool indicating if it's valid
HasNext() bool // Checks if there are more elements
SetOffset(offset int) // Sets the offset for starting point of iteration
Reset() // Resets the iterator to the start
}

// NewMapIterator creates an iterator over map data
func NewSyncMapIterator[K, V any](data *sync.Map, keys []K) *SyncMapIterator[K, V] {
return &SyncMapIterator[K, V]{
data: data,
keys: keys,
index: 0,
}
}

// SyncMapIterator is a sample iterator for a map data structure
type SyncMapIterator[K, V any] struct {
data *sync.Map
keys []K
index int
}
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved

// Next retrieves the next element that matches the filter (if set), advancing the index
// Error is returned when given key is not found, type assertion for value fails, or there are no more elements to iterate
func (it *SyncMapIterator[K, V]) Next() (V, error) {
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved
if it.HasNext() {
key := it.keys[it.index]
it.index++
mp, ok := it.data.Load(key)
if !ok {
return *new(V), fmt.Errorf("key %v not found", key)
}

value, ok := mp.(V)
if !ok {
return *new(V), fmt.Errorf("invalid type assertion for key %v", key)
}

return value, nil
}

return *new(V), fmt.Errorf("no more elements")
}

// SetOffset sets the offset for the iterator.
// This is useful when client requests a subset of the result set
// and wants to start from a specific index.
func (it *SyncMapIterator[K, V]) SetOffset(offset int) {
if offset < 0 {
offset = 0
}

if offset > len(it.keys) {
offset = len(it.keys)
}

it.index = offset
}

// HasNext checks if there are more elements in the iterator
func (it *SyncMapIterator[K, V]) HasNext() bool {
return it.index < len(it.keys)
}

// Reset resets the iterator to the start
func (it *SyncMapIterator[K, V]) Reset() {
it.index = 0
}
267 changes: 267 additions & 0 deletions domain/pipeline/iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package pipeline

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

type testdata struct {
key string
value int
}

// MockIterator is a simple implementation of Iterator for testing
type MockIterator struct {
items []int
index int
}

func (m *MockIterator) HasNext() bool {
return m.index < len(m.items)
}

func (m *MockIterator) SetOffset(offset int) {
m.index = offset
}

func (m *MockIterator) Next() (int, error) {
if m.HasNext() {
item := m.items[m.index]
m.index++
return item, nil
}
return 0, fmt.Errorf("no more elements")
}

func (m *MockIterator) Reset() {
m.index = 0
}

func TestSyncMapIteratorNext(t *testing.T) {
tests := []struct {
name string
data []testdata
keys []string
expected []testdata
}{
{
name: "Empty map",
},
{
name: "Single element",
data: []testdata{{key: "a", value: 1}},
expected: []testdata{{key: "a", value: 1}},
},
{
name: "Multiple elements",
data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
expected: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := sync.Map{}
var keys []string

for _, v := range tt.data {
m.Store(v.key, v)
keys = append(keys, v.key)
}

it := NewSyncMapIterator[string, testdata](&m, keys)

var result []testdata
for {
val, err := it.Next()
if err != nil {
break
}
result = append(result, val)
}

require.Equal(t, tt.expected, result, "Iteration result should match expected")

// Test that after full iteration, Next() returns an error
_, err := it.Next()
require.Error(t, err, "Expected Next() to return an error after full iteration")
})
}
}

func TestSyncMapIteratorSetOffset(t *testing.T) {
tests := []struct {
name string
data []testdata
keys []string
offset int
expected []testdata
}{
{
name: "Set offset to 0",
data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
keys: []string{"a", "b", "c"},
offset: 0,
expected: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
},
{
name: "Set offset to middle",
data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
keys: []string{"a", "b", "c"},
offset: 1,
expected: []testdata{{key: "b", value: 2}, {key: "c", value: 3}},
},
{
name: "Set offset to last element",
data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
keys: []string{"a", "b", "c"},
offset: 2,
expected: []testdata{{key: "c", value: 3}},
},
{
name: "Set offset beyond last element",
data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}},
keys: []string{"a", "b", "c"},
offset: 3,
},
{
name: "Set offset for empty map",
offset: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := sync.Map{}
for _, v := range tt.data {
m.Store(v.key, v)
}

it := NewSyncMapIterator[string, testdata](&m, tt.keys)
it.SetOffset(tt.offset)

var result []testdata
for {
val, err := it.Next()
if err != nil {
break
}
result = append(result, val)
}

require.Equalf(t, tt.expected, result, "Iteration result after SetOffset(%d) should match expected", tt.offset)

// Test that after full iteration, Next() returns an error
_, err := it.Next()
require.Error(t, err, "Expected Next() to return an error after full iteration")
})
}
}

func TestSyncMapIterator_HasNext(t *testing.T) {
tests := []struct {
name string
keys []string
index int
want bool
}{
{
name: "Empty iterator",
keys: []string{},
index: 0,
want: false,
},
{
name: "Iterator with elements, at start",
keys: []string{"a", "b", "c"},
index: 0,
want: true,
},
{
name: "Iterator with elements, in middle",
keys: []string{"a", "b", "c"},
index: 1,
want: true,
},
{
name: "Iterator with elements, at last element",
keys: []string{"a", "b", "c"},
index: 2,
want: true,
},
{
name: "Iterator with elements, past last element",
keys: []string{"a", "b", "c"},
index: 3,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
iterator := &SyncMapIterator[string, int]{
keys: tt.keys,
index: tt.index,
}
got := iterator.HasNext()
require.Equal(t, tt.want, got, "SyncMapIterator.HasNext() should return expected value")
})
}
}

func TestSyncMapIterator_Reset(t *testing.T) {
deividaspetraitis marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
initialIndex int
keys []string
expectedIndex int
expectedHasNext bool
}{
{
name: "Reset from middle",
initialIndex: 2,
keys: []string{"a", "b", "c", "d"},
expectedIndex: 0,
expectedHasNext: true,
},
{
name: "Reset from end",
initialIndex: 4,
keys: []string{"a", "b", "c", "d"},
expectedIndex: 0,
expectedHasNext: true,
},
{
name: "Reset from start",
initialIndex: 0,
keys: []string{"a", "b", "c", "d"},
expectedIndex: 0,
expectedHasNext: true,
},
{
name: "Reset empty iterator",
initialIndex: 0,
keys: []string{},
expectedIndex: 0,
expectedHasNext: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
it := &SyncMapIterator[string, int]{
data: &sync.Map{},
keys: tt.keys,
index: tt.initialIndex,
}

it.Reset()

require.Equal(t, tt.expectedIndex, it.index, "After Reset(), index should match expected")
require.Equal(t, tt.expectedHasNext, it.HasNext(), "After Reset(), HasNext() should return expected value")
})
}
}
Loading
Loading