From a4395a90f3409b3af023bb6f00b82fb9c02efcdb Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 13:45:13 +0200 Subject: [PATCH 01/12] BE-636 | Add pipeline package Adds pipeline package allowing to manipulate data in various ways including filtering, sorting, paginating and iterating. --- domain/pipeline/iterator.go | 65 +++++ domain/pipeline/iterator_test.go | 279 ++++++++++++++++++ domain/pipeline/paginator.go | 64 ++++ domain/pipeline/paginator_test.go | 183 ++++++++++++ domain/pipeline/transformer.go | 137 +++++++++ domain/pipeline/transformer_test.go | 438 ++++++++++++++++++++++++++++ 6 files changed, 1166 insertions(+) create mode 100644 domain/pipeline/iterator.go create mode 100644 domain/pipeline/iterator_test.go create mode 100644 domain/pipeline/paginator.go create mode 100644 domain/pipeline/paginator_test.go create mode 100644 domain/pipeline/transformer.go create mode 100644 domain/pipeline/transformer_test.go diff --git a/domain/pipeline/iterator.go b/domain/pipeline/iterator.go new file mode 100644 index 00000000..4774de38 --- /dev/null +++ b/domain/pipeline/iterator.go @@ -0,0 +1,65 @@ +package pipeline + +import "sync" + +// Iterator interface defines methods for filtering, sorting, and chunked access +type Iterator[K, V any] interface { + Next() (V, bool) // Retrieves the next element and a bool indicating if it's valid + HasNext() bool // Checks if there are more elements + SetOffset(offset int) // Sets the offset for starting point of iteration + Reset() // Resets the iterator to the start +} + +// NewMapIterator creates an iterator over map data +func NewSyncMapIterator[K, V any](data *sync.Map, keys []K) *SyncMapIterator[K, V] { + return &SyncMapIterator[K, V]{ + data: data, + keys: keys, + index: 0, + } +} + +// SyncMapIterator is a sample iterator for a map data structure +type SyncMapIterator[K, V any] struct { + data *sync.Map + keys []K + index int +} + +// Next retrieves the next element that matches the filter (if set), advancing the index +func (it *SyncMapIterator[K, V]) Next() (V, bool) { + if it.HasNext() { + key := it.keys[it.index] + it.index++ + mp, ok := it.data.Load(key) + if !ok { + return *new(V), false + } + + value, ok := mp.(V) + if !ok { + return *new(V), false + } + + return value, true + } + + return *new(V), false +} + +// SetOffset sets the offset for the iterator. +// This is useful when client requests a subset of the result set +// and wants to start from a specific index. +func (it *SyncMapIterator[K, V]) SetOffset(offset int) { + it.index = offset +} + +// HasNext checks if there are more elements in the iterator +func (it *SyncMapIterator[K, V]) HasNext() bool { + return it.index < len(it.keys) +} + +// Reset resets the iterator to the start +func (it *SyncMapIterator[K, V]) Reset() { + it.index = 0 +} diff --git a/domain/pipeline/iterator_test.go b/domain/pipeline/iterator_test.go new file mode 100644 index 00000000..0a9644b1 --- /dev/null +++ b/domain/pipeline/iterator_test.go @@ -0,0 +1,279 @@ +package pipeline + +import ( + "reflect" + "sync" + "testing" +) + +type testdata struct { + key string + value int +} + +// MockIterator is a simple implementation of Iterator for testing +type MockIterator struct { + items []int + index int +} + +func (m *MockIterator) HasNext() bool { + return m.index < len(m.items) +} + +func (m *MockIterator) SetOffset(offset int) { + m.index = offset +} + +func (m *MockIterator) Next() (int, bool) { + if m.HasNext() { + item := m.items[m.index] + m.index++ + return item, true + } + return 0, false +} + +func (m *MockIterator) Reset() { + m.index = 0 +} + +func TestSyncMapIteratorNext(t *testing.T) { + tests := []struct { + name string + data []testdata + keys []string + expected []testdata + }{ + { + name: "Empty map", + }, + { + name: "Single element", + data: []testdata{{key: "a", value: 1}}, + expected: []testdata{{key: "a", value: 1}}, + }, + { + name: "Multiple elements", + data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + expected: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := sync.Map{} + var keys []string + + for _, v := range tt.data { + m.Store(v.key, v) + keys = append(keys, v.key) + } + + it := NewSyncMapIterator[string, testdata](&m, keys) + + var result []testdata + for { + val, ok := it.Next() + if !ok { + break + } + result = append(result, val) + } + + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("Iteration result = %v, want %v", result, tt.expected) + } + + // Test that after full iteration, Next() returns false + _, ok := it.Next() + if ok { + t.Errorf("Expected Next() to return false after full iteration") + } + }) + } +} + +func TestSyncMapIteratorSetOffset(t *testing.T) { + tests := []struct { + name string + data []testdata + keys []string + offset int + expected []testdata + }{ + { + name: "Set offset to 0", + data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + keys: []string{"a", "b", "c"}, + offset: 0, + expected: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + }, + { + name: "Set offset to middle", + data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + keys: []string{"a", "b", "c"}, + offset: 1, + expected: []testdata{{key: "b", value: 2}, {key: "c", value: 3}}, + }, + { + name: "Set offset to last element", + data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + keys: []string{"a", "b", "c"}, + offset: 2, + expected: []testdata{{key: "c", value: 3}}, + }, + { + name: "Set offset beyond last element", + data: []testdata{{key: "a", value: 1}, {key: "b", value: 2}, {key: "c", value: 3}}, + keys: []string{"a", "b", "c"}, + offset: 3, + }, + { + name: "Set offset for empty map", + offset: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := sync.Map{} + for _, v := range tt.data { + m.Store(v.key, v) + } + + it := NewSyncMapIterator[string, testdata](&m, tt.keys) + it.SetOffset(tt.offset) + + var result []testdata + for { + val, ok := it.Next() + if !ok { + break + } + result = append(result, val) + } + + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("Iteration result after SetOffset(%d) = %v, want %v", tt.offset, result, tt.expected) + } + + // Test that after full iteration, Next() returns false + _, ok := it.Next() + if ok { + t.Errorf("Expected Next() to return false after full iteration") + } + }) + } +} + +func TestSyncMapIterator_HasNext(t *testing.T) { + tests := []struct { + name string + keys []string + index int + want bool + }{ + { + name: "Empty iterator", + keys: []string{}, + index: 0, + want: false, + }, + { + name: "Iterator with elements, at start", + keys: []string{"a", "b", "c"}, + index: 0, + want: true, + }, + { + name: "Iterator with elements, in middle", + keys: []string{"a", "b", "c"}, + index: 1, + want: true, + }, + { + name: "Iterator with elements, at last element", + keys: []string{"a", "b", "c"}, + index: 2, + want: true, + }, + { + name: "Iterator with elements, past last element", + keys: []string{"a", "b", "c"}, + index: 3, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iterator := &SyncMapIterator[string, int]{ + keys: tt.keys, + index: tt.index, + } + if got := iterator.HasNext(); got != tt.want { + t.Errorf("SyncMapIterator.HasNext() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSyncMapIterator_Reset(t *testing.T) { + tests := []struct { + name string + initialIndex int + keys []string + expectedIndex int + expectedHasNext bool + }{ + { + name: "Reset from middle", + initialIndex: 2, + keys: []string{"a", "b", "c", "d"}, + expectedIndex: 0, + expectedHasNext: true, + }, + { + name: "Reset from end", + initialIndex: 4, + keys: []string{"a", "b", "c", "d"}, + expectedIndex: 0, + expectedHasNext: true, + }, + { + name: "Reset from start", + initialIndex: 0, + keys: []string{"a", "b", "c", "d"}, + expectedIndex: 0, + expectedHasNext: true, + }, + { + name: "Reset empty iterator", + initialIndex: 0, + keys: []string{}, + expectedIndex: 0, + expectedHasNext: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + it := &SyncMapIterator[string, int]{ + data: &sync.Map{}, + keys: tt.keys, + index: tt.initialIndex, + } + + it.Reset() + + if it.index != tt.expectedIndex { + t.Errorf("After Reset(), index = %v, want %v", it.index, tt.expectedIndex) + } + + if it.HasNext() != tt.expectedHasNext { + t.Errorf("After Reset(), HasNext() = %v, want %v", it.HasNext(), tt.expectedHasNext) + } + }) + } +} diff --git a/domain/pipeline/paginator.go b/domain/pipeline/paginator.go new file mode 100644 index 00000000..e69f85e9 --- /dev/null +++ b/domain/pipeline/paginator.go @@ -0,0 +1,64 @@ +package pipeline + +import v1beta1 "github.com/osmosis-labs/sqs/pkg/api/v1beta1" + +// NewPaginator initializes a Paginator with an Iterator +func NewPaginator[K, V any](iterator Iterator[K, V], p *v1beta1.PaginationRequest) *Paginator[K, V] { + return &Paginator[K, V]{ + iterator: iterator, + pagination: p, + } +} + +// Paginator relies on Iterator to fetch paginated data without knowing the data type +type Paginator[K, V any] struct { + iterator Iterator[K, V] + pagination *v1beta1.PaginationRequest +} + +// GetPage retrieves elements for the current page based on pagination strategy. +// Under the hood it calls either GetPageBasedPage or GetCursorBasedPage. +func (p *Paginator[K, V]) GetPage() []V { + if p.pagination.Strategy == v1beta1.PaginationStrategy_PAGE { + return p.FetchPageByPageNumber() + } + return p.FetchPageByCursor() +} + +// FetchPageByPageNumber retrieves elements for the current page based on page-based pagination strategy. +func (p *Paginator[K, V]) FetchPageByPageNumber() []V { + // Ensure we're starting fresh + p.iterator.Reset() + + // Set the offset based on the page number to avoid fetching data from the beginning + p.iterator.SetOffset(int(p.pagination.Page * p.pagination.Limit)) + + items := make([]V, 0, p.pagination.Limit) + for i := uint64(0); i < p.pagination.Limit && p.iterator.HasNext(); i++ { + elem, valid := p.iterator.Next() + if valid { + items = append(items, elem) + } + } + + return items +} + +// FetchPageByCursor retrieves elements for the current page based on cursor-based pagination strategy. +func (p *Paginator[K, V]) FetchPageByCursor() []V { + // Ensure we're starting fresh + p.iterator.Reset() + + // Set the offset based on the page number to avoid fetching data from the beginning + p.iterator.SetOffset(int(p.pagination.Cursor)) + + items := make([]V, 0, p.pagination.Limit) + for i := uint64(0); i < p.pagination.Limit && p.iterator.HasNext(); i++ { + elem, valid := p.iterator.Next() + if valid { + items = append(items, elem) + } + } + + return items +} diff --git a/domain/pipeline/paginator_test.go b/domain/pipeline/paginator_test.go new file mode 100644 index 00000000..aad18aeb --- /dev/null +++ b/domain/pipeline/paginator_test.go @@ -0,0 +1,183 @@ +package pipeline + +import ( + "reflect" + "testing" + + v1beta1 "github.com/osmosis-labs/sqs/pkg/api/v1beta1" +) + +func TestGetPage(t *testing.T) { + tests := []struct { + name string + items []int + pagination *v1beta1.PaginationRequest + want []int + }{ + { + name: "Page-based strategy with odd number of items", + items: []int{1, 2, 3, 4, 5, 6, 7}, + pagination: &v1beta1.PaginationRequest{ + Strategy: v1beta1.PaginationStrategy_PAGE, + Page: 1, + Limit: 3, + }, + want: []int{4, 5, 6}, + }, + { + name: "Cursor-based strategy with even number of items", + items: []int{10, 20, 30, 40, 50, 60}, + pagination: &v1beta1.PaginationRequest{ + Strategy: v1beta1.PaginationStrategy_CURSOR, + Cursor: 2, + Limit: 3, + }, + want: []int{30, 40, 50}, + }, + { + name: "Page-based strategy with limit exceeding remaining items", + items: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Strategy: v1beta1.PaginationStrategy_PAGE, + Page: 1, + Limit: 10, + }, + want: []int{}, + }, + { + name: "Cursor-based strategy with cursor at last item", + items: []int{100, 200, 300, 400}, + pagination: &v1beta1.PaginationRequest{ + Strategy: v1beta1.PaginationStrategy_CURSOR, + Cursor: 3, + Limit: 2, + }, + want: []int{400}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iterator := &MockIterator{items: tt.items} + paginator := NewPaginator[int, int](iterator, tt.pagination) + got := paginator.GetPage() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetPage() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFetchPageByPageNumber(t *testing.T) { + tests := []struct { + name string + items []int + pagination *v1beta1.PaginationRequest + want []int + }{ + { + name: "First page of 3 items", + items: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Page: 0, + Limit: 3, + }, + want: []int{1, 2, 3}, + }, + { + name: "Second page of 2 items", + items: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Page: 1, + Limit: 2, + }, + want: []int{3, 4}, + }, + { + name: "Last page with fewer items than limit", + items: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Page: 1, + Limit: 3, + }, + want: []int{4, 5}, + }, + { + name: "Empty result for page beyond available items", + items: []int{1, 2, 3}, + pagination: &v1beta1.PaginationRequest{ + Page: 2, + Limit: 2, + }, + want: []int{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iterator := &MockIterator{items: tt.items} + paginator := NewPaginator[int, int](iterator, tt.pagination) + got := paginator.FetchPageByPageNumber() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("FetchPageByPageNumber() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFetchPageByCursor(t *testing.T) { + tests := []struct { + name string + data []int + pagination *v1beta1.PaginationRequest + expected []int + }{ + { + name: "Fetch first page", + data: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Cursor: 0, + Limit: 3, + }, + expected: []int{1, 2, 3}, + }, + { + name: "Fetch second page", + data: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Cursor: 3, + Limit: 2, + }, + expected: []int{4, 5}, + }, + { + name: "Fetch beyond available data", + data: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Cursor: 5, + Limit: 2, + }, + expected: []int{}, + }, + { + name: "Fetch with limit greater than remaining items", + data: []int{1, 2, 3, 4, 5}, + pagination: &v1beta1.PaginationRequest{ + Cursor: 3, + Limit: 5, + }, + expected: []int{4, 5}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockIterator := &MockIterator{items: tt.data} + paginator := NewPaginator[int, int](mockIterator, tt.pagination) + got := paginator.FetchPageByCursor() + if !reflect.DeepEqual(got, tt.expected) { + t.Errorf("FetchPageByCursor() = %v, want %v", got, tt.expected) + } + }) + } +} diff --git a/domain/pipeline/transformer.go b/domain/pipeline/transformer.go new file mode 100644 index 00000000..ddbd9536 --- /dev/null +++ b/domain/pipeline/transformer.go @@ -0,0 +1,137 @@ +package pipeline + +import ( + "sort" + "sync" +) + +// Transformer defines a generic interface for filtering and sorting data. +type Transformer[K, V any] interface { + Filter(fn func(V) bool) *Transformer[K, V] // Filter applies a filter to the data + Sort(less ...func(V, V) bool) *Transformer[K, V] // Sort sorts the data + Keys() []string // Keys returns the list of transformed keys +} + +// SyncMapTransformer is a generic data transformer for map data +type SyncMapTransformer[K, V any] struct { + data *sync.Map + keys []K +} + +// NewDataTransformer initializes a transformer with raw data. +func NewSyncMapTransformer[K, V any](m *sync.Map) *SyncMapTransformer[K, V] { + var keys []K + m.Range(func(key, value any) bool { + k, ok := key.(K) + if ok { + keys = append(keys, k) + } + return true // keep iterating + }) + return &SyncMapTransformer[K, V]{data: m, keys: keys} +} + +// Count returns the number of elements in the transformer. +// Number of elements returned excludes any filtered elements. +func (dt *SyncMapTransformer[K, V]) Count() uint64 { + return uint64(len(dt.keys)) +} + +// Range calls f sequentially for each key and value present in the transformer. +// If f returns false, range stops the iteration. +func (dt *SyncMapTransformer[K, V]) Range(f func(key K, value V) bool) { + for _, k := range dt.keys { + v, ok := dt.load(k) + if !ok { + continue + } + + if !f(k, v) { + break + } + } +} + +// Filter implements the Transformer interface for map data. +func (dt *SyncMapTransformer[K, V]) Filter(fn ...func(V) bool) *SyncMapTransformer[K, V] { + var filteredKeys []K + + dt.Range(func(key K, value V) bool { + for _, f := range fn { + if f(value) { + filteredKeys = append(filteredKeys, key) + } + } + return true + }) + + dt.keys = filteredKeys + + return dt +} + +// Sort implements the Transformer interface for map data. +func (dt *SyncMapTransformer[K, V]) Sort(less ...func(V, V) bool) *SyncMapTransformer[K, V] { + if len(less) == 0 { + return dt // no sorting required + } + + sort.Slice(dt.keys, func(i, j int) bool { + for _, criterion := range less { + vi, ok := dt.load(dt.keys[i]) + if !ok { + continue + } + + vj, ok := dt.load(dt.keys[j]) + if !ok { + continue + } + + return criterion(vi, vj) + } + return false + }) + return dt +} + +// Keys implements the Transformer interface for map data. +func (dt *SyncMapTransformer[K, V]) Keys() []K { + return dt.keys +} + +// Data returns transformed underlying data. +func (dt *SyncMapTransformer[K, V]) Data() []V { + var data []V + for _, key := range dt.keys { + v, ok := dt.load(key) + if ok { + data = append(data, v) + } + } + return data +} + +// Clone returns a new transformer with the same underlying data at the current state. +func (dt *SyncMapTransformer[K, V]) Clone() *SyncMapTransformer[K, V] { + return &SyncMapTransformer[K, V]{ + data: dt.data, + keys: dt.keys, + } +} + +// load returns the value associated with the key. +// If the key is not found, it returns a zero value of the value type and false. +func (dt *SyncMapTransformer[K, V]) load(key K) (V, bool) { + mv, ok := dt.data.Load(key) + if !ok { + return *new(V), false + } + + v, ok := mv.(V) + if !ok { + return *new(V), false + } + + return v, true +} diff --git a/domain/pipeline/transformer_test.go b/domain/pipeline/transformer_test.go new file mode 100644 index 00000000..145c3bbd --- /dev/null +++ b/domain/pipeline/transformer_test.go @@ -0,0 +1,438 @@ +package pipeline + +import ( + "reflect" + "slices" + "sync" + "testing" +) + +func TestSyncMapTransformer_Count(t *testing.T) { + tests := []struct { + name string + data []int + expected uint64 + }{ + { + name: "Empty map", + data: nil, + expected: 0, + }, + { + name: "Map with one element", + data: []int{1}, + expected: 1, + }, + { + name: "Map with multiple elements", + data: []int{1, 2, 3}, + expected: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var m sync.Map + for k, v := range tt.data { + m.Store(k, v) + } + + transformer := NewSyncMapTransformer[int, int](&m) + got := transformer.Count() + + if got != tt.expected { + t.Errorf("Expected count %d, but got %d", tt.expected, got) + } + }) + } +} + +func TestSyncMapTransformerRange(t *testing.T) { + testCases := []struct { + name string + initialData []testdata + expectedKeys []string + expectedValues []int + stopAfter int + }{ + { + name: "Empty Map", + }, + { + name: "Single Element", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + }, + expectedKeys: []string{"one"}, + expectedValues: []int{1}, + }, + { + name: "Multiple Elements", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + { + key: "two", + value: 2, + }, + { + key: "three", + value: 3, + }, + }, + expectedKeys: []string{"one", "two", "three"}, + expectedValues: []int{1, 2, 3}, + }, + { + name: "Stop Iteration Early", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + { + key: "two", + value: 2, + }, + { + key: "three", + value: 3, + }, + }, + expectedKeys: []string{"one", "two"}, + expectedValues: []int{1, 2}, + stopAfter: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct sync.Map and populate with initial data + m := sync.Map{} + var keys []string + + for _, v := range tc.initialData { + m.Store(v.key, v.value) + keys = append(keys, v.key) + } + + // Create transformer, we are not using NewSyncMapTransformer because + // we want to keep the keys in a specific order + transformer := &SyncMapTransformer[string, int]{data: &m, keys: keys} + + // Collect keys and values during Range + var collectedKeys []string + var collectedValues []int + + // Define iteration function + iterFunc := func(key string, value int) bool { + collectedKeys = append(collectedKeys, key) + collectedValues = append(collectedValues, value) + + // Stop iteration if stopAfter is set and reached + if tc.stopAfter > 0 && len(collectedKeys) >= tc.stopAfter { + return false + } + return true + } + + // Perform Range + transformer.Range(iterFunc) + + // Validate collected keys + if len(collectedKeys) != len(tc.expectedKeys) { + t.Errorf("Collected %d keys, want %d", len(collectedKeys), len(tc.expectedKeys)) + } + + // Validate keys and values + if slices.Equal(tc.expectedKeys, collectedKeys) != true { + t.Errorf("Collected keys %v, want %v", collectedKeys, tc.expectedKeys) + } + + if slices.Equal(tc.expectedValues, collectedValues) != true { + t.Errorf("Collected values %v, want %v", collectedValues, tc.expectedValues) + } + }) + } +} + +func TestTransformerFilter(t *testing.T) { + tests := []struct { + name string + data []int + filter func(int) bool + want []int + }{ + { + name: "Filter even numbers", + data: []int{1, 2, 3, 4, 5}, + filter: func(v int) bool { + return v%2 == 0 + }, + want: []int{2, 4}, + }, + { + name: "Filter numbers greater than 3", + data: []int{1, 2, 3, 4, 5}, + filter: func(v int) bool { + return v > 3 + }, + want: []int{4, 5}, + }, + { + name: "Filter all", + data: []int{1, 2, 3}, + filter: func(v int) bool { + return true + }, + want: []int{1, 2, 3}, + }, + { + name: "Filter none", + data: []int{1, 2, 3}, + filter: func(v int) bool { + return false + }, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var m sync.Map + for k, v := range tt.data { + m.Store(k, v) + } + + transformer := NewSyncMapTransformer[int, int](&m) + transformer.Sort(func(a, b int) bool { return a < b }) // Sort the data to ensure the order is consistent + transformer.Filter(tt.filter) + + // Check if the original data is unchanged + if !reflect.DeepEqual(transformer.Data(), tt.want) { + t.Errorf("Filter() modified original data. Got %v, want %v", transformer.Data(), tt.want) + } + }) + } +} + +func TestMapTransformerSort(t *testing.T) { + tests := []struct { + name string + data []int + less func(int, int) bool + expected []int + }{ + { + name: "Sort integers ascending", + data: []int{3, 1, 2}, + less: func(a, b int) bool { return a < b }, + expected: []int{1, 2, 3}, + }, + { + name: "Sort integers descending", + data: []int{3, 1, 2}, + less: func(a, b int) bool { return a > b }, + expected: []int{3, 2, 1}, + }, + { + name: "Sort with equal values", + data: []int{1, 2, 1}, + less: func(a, b int) bool { return a < b }, + expected: []int{1, 1, 2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var m sync.Map + for k, v := range tt.data { + m.Store(k, v) + } + transformer := NewSyncMapTransformer[int, int](&m) + transformer.Sort(tt.less) + + got := transformer.Data() + if !reflect.DeepEqual(got, tt.expected) { + t.Errorf("Expected %v, but got %v", tt.expected, got) + } + }) + } +} + +func TestSyncMapTransformerKeys(t *testing.T) { + testCases := []struct { + name string + initialKeys []string + expectedKeys []string + }{ + { + name: "Empty Map", + }, + { + name: "Single Element", + initialKeys: []string{"one"}, + expectedKeys: []string{"one"}, + }, + { + name: "Multiple Elements", + initialKeys: []string{"one", "two", "three"}, + expectedKeys: []string{"one", "two", "three"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create transformer, we are not using NewSyncMapTransformer because + // we want to keep the keys in a specific order + transformer := &SyncMapTransformer[string, int]{data: nil, keys: tc.initialKeys} + + collectedKeys := transformer.Keys() + if slices.Equal(tc.expectedKeys, collectedKeys) != true { + t.Errorf("Collected keys %v, want %v", collectedKeys, tc.expectedKeys) + } + }) + } +} + +func TestSyncMapTransformerData(t *testing.T) { + testCases := []struct { + name string + initialData []testdata + expectedValues []int + stopAfter int + }{ + { + name: "Empty Map", + }, + { + name: "Single Element", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + }, + expectedValues: []int{1}, + }, + { + name: "Multiple Elements", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + { + key: "two", + value: 2, + }, + { + key: "three", + value: 3, + }, + }, + expectedValues: []int{1, 2, 3}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct sync.Map and populate with initial data + m := sync.Map{} + var keys []string + + for _, v := range tc.initialData { + m.Store(v.key, v.value) + keys = append(keys, v.key) + } + + // Create transformer, we are not using NewSyncMapTransformer because + // we want to keep the keys in a specific order + transformer := &SyncMapTransformer[string, int]{data: &m, keys: keys} + + // Collect values + collectedValues := transformer.Data() + + // Validate values + if slices.Equal(tc.expectedValues, collectedValues) != true { + t.Errorf("Collected values %v, want %v", collectedValues, tc.expectedValues) + } + }) + } +} + +func TestSyncMapTransformerClone(t *testing.T) { + testCases := []struct { + name string + initialData []testdata + expectedValues []int + stopAfter int + }{ + { + name: "Empty Map", + }, + { + name: "Single Element", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + }, + expectedValues: []int{1}, + }, + { + name: "Multiple Elements", + initialData: []testdata{ + { + key: "one", + value: 1, + }, + { + key: "two", + value: 2, + }, + { + key: "three", + value: 3, + }, + }, + expectedValues: []int{1, 2, 3}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct sync.Map and populate with initial data + m := sync.Map{} + var keys []string + + for _, v := range tc.initialData { + m.Store(v.key, v.value) + keys = append(keys, v.key) + } + + // Create transformer, we are not using NewSyncMapTransformer because + // we want to keep the keys in a specific order + transformer := &SyncMapTransformer[string, int]{data: &m, keys: keys} + + // Clone the transformer + clonedTransformer := transformer.Clone() + + // Verify the clone has the same underlying map + if clonedTransformer.data != transformer.data { + t.Errorf("Clone should share the same underlying sync.Map") + } + + // Verify the clone has the same keys + if len(clonedTransformer.keys) != len(transformer.keys) { + t.Errorf("Clone should have the same number of keys") + } + }) + } +} From f0c4fa0002c5acaa422c149a92aec7a45e2cf86c Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 14:09:10 +0200 Subject: [PATCH 02/12] BE-636-pipeline | Add missing pagination files --- pkg/api/v1beta1/pagination.go | 148 +++++ pkg/api/v1beta1/pagination.pb.go | 660 +++++++++++++++++++++++ pkg/api/v1beta1/pagination_test.go | 210 ++++++++ proto/Dockerfile | 39 ++ proto/buf.gen.gogo.yaml | 8 + proto/buf.gen.swagger.yaml | 5 + proto/buf.lock | 23 + proto/buf.yaml | 35 ++ proto/sqs/query/v1beta1/pagination.proto | 54 ++ 9 files changed, 1182 insertions(+) create mode 100644 pkg/api/v1beta1/pagination.go create mode 100644 pkg/api/v1beta1/pagination.pb.go create mode 100644 pkg/api/v1beta1/pagination_test.go create mode 100644 proto/Dockerfile create mode 100644 proto/buf.gen.gogo.yaml create mode 100644 proto/buf.gen.swagger.yaml create mode 100644 proto/buf.lock create mode 100644 proto/buf.yaml create mode 100644 proto/sqs/query/v1beta1/pagination.proto diff --git a/pkg/api/v1beta1/pagination.go b/pkg/api/v1beta1/pagination.go new file mode 100644 index 00000000..aba0e1be --- /dev/null +++ b/pkg/api/v1beta1/pagination.go @@ -0,0 +1,148 @@ +package v1beta1 + +import ( + "fmt" + "strconv" + + "github.com/labstack/echo/v4" +) + +const ( + // MaxPage is the maximum allowed value for Page. + // This is used to prevent abuse and number was chosen arbitrarily. + MaxPage = 1000000 + + // MaxLimit is the maximum allowed value for Limit. + // This is used to prevent abuse and number was chosen arbitrarily. + MaxLimit = 1000 +) + +var ( + // ErrPageNotValid is the error returned when the page is not valid. + ErrPageNotValid = fmt.Errorf("page is not valid") + + // ErrLimitNotValid is the error returned when the limit is not valid. + ErrLimitNotValid = fmt.Errorf("limit is not valid") + + // ErrPageTooLarge is the error returned when the page is too large. + ErrPageTooLarge = fmt.Errorf("page is too large, maximum allowed is %d", MaxPage) + + // ErrLimitTooLarge is the error returned when the limit is too large. + ErrLimitTooLarge = fmt.Errorf("limit is too large, maximum allowed is %d", MaxLimit) + + // ErrPaginationStrategyNotSupported is the error returned when the pagination strategy is not supported. + ErrPaginationStrategyNotSupported = fmt.Errorf("pagination strategy is not supported") +) + +// Query parameters for pagination. +const ( + queryPageNumber = "page[number]" + queryPageSize = "page[size]" + queryPageCursor = "page[cursor]" +) + +// IsPresent checks if the pagination request is present in the HTTP request. +func (r *PaginationRequest) IsPresent(c echo.Context) bool { + return c.QueryParam(queryPageNumber) != "" || c.QueryParam(queryPageSize) != "" || c.QueryParam(queryPageCursor) != "" +} + +// UnmarshalHTTPRequest imlpements RequestUnmarshaler interface. +func (r *PaginationRequest) UnmarshalHTTPRequest(c echo.Context) error { + var err error + + // Fetch query parameters + pageParam := c.QueryParam(queryPageNumber) + limitParam := c.QueryParam(queryPageSize) + cursorParam := c.QueryParam(queryPageCursor) + + if pageParam != "" { + r.Page, err = strconv.ParseUint(pageParam, 10, 64) + if err != nil { + return err + } + } + + if limitParam != "" { + r.Limit, err = strconv.ParseUint(limitParam, 10, 64) + if err != nil { + return err + } + } + + if cursorParam != "" { + r.Cursor, err = strconv.ParseUint(cursorParam, 10, 64) + if err != nil { + return err + } + } + + // Determine strategy + if cursorParam != "" { + r.Strategy = PaginationStrategy_CURSOR + } else if pageParam != "" { + r.Strategy = PaginationStrategy_PAGE + } else { + r.Strategy = PaginationStrategy_UNKNOWN + } + + return nil +} + +// Validate validates the pagination request. +func (r *PaginationRequest) Validate() error { + if r.Page == 0 && r.Strategy == PaginationStrategy_PAGE { + return ErrPageNotValid + } + + if r.Page > MaxPage { + return ErrPageTooLarge + } + + if r.Limit == 0 { + return ErrLimitNotValid + } + + if r.Limit > MaxLimit { + return ErrLimitTooLarge + } + + if r.Strategy == PaginationStrategy_UNKNOWN { + return ErrPaginationStrategyNotSupported + } + + return nil +} + +// CalculateNextCursor calculates the next cursor based on the current cursor and limit. +func (r *PaginationRequest) CalculateNextCursor(totalItems uint64) (nextCursor int64) { + if r.Cursor >= totalItems { + return -1 // cursor is out of range + } + + endIndex := r.Cursor + r.Limit + if endIndex >= totalItems { + return -1 // end index is out of range + } + + nextCursor = int64(r.Cursor + r.Limit) + + return nextCursor +} + +// NewPaginationResponse creates a new pagination response. +// The response contains relevant fields filled based on the pagination strategy. +func NewPaginationResponse(req *PaginationRequest, total uint64) *PaginationResponse { + response := PaginationResponse{ + TotalItems: total, + } + + if req == nil { + return &response // return early if request is nil + } + + if req.Strategy == PaginationStrategy_CURSOR { + response.NextCursor = req.CalculateNextCursor(total) + } + + return &response +} diff --git a/pkg/api/v1beta1/pagination.pb.go b/pkg/api/v1beta1/pagination.pb.go new file mode 100644 index 00000000..0c504fdd --- /dev/null +++ b/pkg/api/v1beta1/pagination.pb.go @@ -0,0 +1,660 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: sqs/query/v1beta1/pagination.proto + +package v1beta1 + +import ( + fmt "fmt" + proto "github.com/cosmos/gogoproto/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// PaginationStrategy is an enum that defines the pagination strategy to be used +// in the PaginationRequest request. +type PaginationStrategy int32 + +const ( + // unknown strategy indicates that the pagination strategy is not known + // or not set. + PaginationStrategy_UNKNOWN PaginationStrategy = 0 + // page-based pagination is the most common pagination strategy. It is + // compatible with offset pagination. If you have a page of 2 and a limit of + // 20, you will get items 20-39. + PaginationStrategy_PAGE PaginationStrategy = 1 + // cursor-based pagination is compatible with and similar to offset + // pagination. If you have a cursor of 50 and a limit of 20, you will get + // items 50-69. Client should re-request all data if it changes, as the cursor + // is not a pointer to a page, but to an item index. + PaginationStrategy_CURSOR PaginationStrategy = 2 +) + +var PaginationStrategy_name = map[int32]string{ + 0: "UNKNOWN", + 1: "PAGE", + 2: "CURSOR", +} + +var PaginationStrategy_value = map[string]int32{ + "UNKNOWN": 0, + "PAGE": 1, + "CURSOR": 2, +} + +func (x PaginationStrategy) String() string { + return proto.EnumName(PaginationStrategy_name, int32(x)) +} + +func (PaginationStrategy) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_4b42ef85f330fbb5, []int{0} +} + +type PaginationRequest struct { + // page is the page number to query, starts at 0. If not provided, will + // default to first page. + Page uint64 `protobuf:"varint,1,opt,name=page,proto3" json:"page,omitempty"` + // limit is the total number of results to be returned in the result page. + // If left empty it will default to a value to be set by each app. + Limit uint64 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` + // cursor is the query offset to start from. If not provided, will default to + // the 0. + Cursor uint64 `protobuf:"varint,3,opt,name=cursor,proto3" json:"cursor,omitempty"` + // strategy is the pagination strategy to be used. If not provided, will + // default to PAGE. + Strategy PaginationStrategy `protobuf:"varint,4,opt,name=strategy,proto3,enum=sqs.query.v1beta1.PaginationStrategy" json:"strategy,omitempty"` +} + +func (m *PaginationRequest) Reset() { *m = PaginationRequest{} } +func (m *PaginationRequest) String() string { return proto.CompactTextString(m) } +func (*PaginationRequest) ProtoMessage() {} +func (*PaginationRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_4b42ef85f330fbb5, []int{0} +} +func (m *PaginationRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PaginationRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PaginationRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PaginationRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PaginationRequest.Merge(m, src) +} +func (m *PaginationRequest) XXX_Size() int { + return m.Size() +} +func (m *PaginationRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PaginationRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PaginationRequest proto.InternalMessageInfo + +func (m *PaginationRequest) GetPage() uint64 { + if m != nil { + return m.Page + } + return 0 +} + +func (m *PaginationRequest) GetLimit() uint64 { + if m != nil { + return m.Limit + } + return 0 +} + +func (m *PaginationRequest) GetCursor() uint64 { + if m != nil { + return m.Cursor + } + return 0 +} + +func (m *PaginationRequest) GetStrategy() PaginationStrategy { + if m != nil { + return m.Strategy + } + return PaginationStrategy_UNKNOWN +} + +type PaginationResponse struct { + // next_cursor is the cursor to be used in the next request to get the next + // page of results for cursor-based pagination. + // If the next_cursor is -1, it means that there are no more results to be + // fetched. + NextCursor int64 `protobuf:"varint,1,opt,name=next_cursor,json=nextCursor,proto3" json:"next_cursor,omitempty"` + // total_items is the total number of items available in the result set. + // This is useful for pagination when the client requests a subset of the + // result set. + TotalItems uint64 `protobuf:"varint,2,opt,name=total_items,json=totalItems,proto3" json:"total_items,omitempty"` +} + +func (m *PaginationResponse) Reset() { *m = PaginationResponse{} } +func (m *PaginationResponse) String() string { return proto.CompactTextString(m) } +func (*PaginationResponse) ProtoMessage() {} +func (*PaginationResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_4b42ef85f330fbb5, []int{1} +} +func (m *PaginationResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PaginationResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PaginationResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PaginationResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PaginationResponse.Merge(m, src) +} +func (m *PaginationResponse) XXX_Size() int { + return m.Size() +} +func (m *PaginationResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PaginationResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PaginationResponse proto.InternalMessageInfo + +func (m *PaginationResponse) GetNextCursor() int64 { + if m != nil { + return m.NextCursor + } + return 0 +} + +func (m *PaginationResponse) GetTotalItems() uint64 { + if m != nil { + return m.TotalItems + } + return 0 +} + +func init() { + proto.RegisterEnum("sqs.query.v1beta1.PaginationStrategy", PaginationStrategy_name, PaginationStrategy_value) + proto.RegisterType((*PaginationRequest)(nil), "sqs.query.v1beta1.PaginationRequest") + proto.RegisterType((*PaginationResponse)(nil), "sqs.query.v1beta1.PaginationResponse") +} + +func init() { + proto.RegisterFile("sqs/query/v1beta1/pagination.proto", fileDescriptor_4b42ef85f330fbb5) +} + +var fileDescriptor_4b42ef85f330fbb5 = []byte{ + // 325 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x91, 0xc1, 0x6a, 0xf2, 0x40, + 0x14, 0x85, 0x33, 0x9a, 0xdf, 0x5f, 0xae, 0x50, 0x74, 0x28, 0x25, 0xab, 0x28, 0x42, 0x41, 0x5a, + 0x9a, 0x60, 0xbb, 0xe8, 0xda, 0x8a, 0x94, 0x52, 0x50, 0x89, 0xd8, 0x42, 0x37, 0x32, 0x91, 0x21, + 0x1d, 0x6a, 0x32, 0x49, 0xee, 0xa4, 0xd4, 0xb7, 0xe8, 0xb2, 0x8f, 0xd4, 0xa5, 0xcb, 0x2e, 0x8b, + 0xbe, 0x48, 0xc9, 0x18, 0xad, 0xd0, 0x5d, 0xee, 0xc7, 0xc9, 0x7c, 0x07, 0x0e, 0xb4, 0x31, 0x41, + 0x37, 0xc9, 0x78, 0xba, 0x74, 0x5f, 0xbb, 0x3e, 0x57, 0xac, 0xeb, 0xc6, 0x2c, 0x10, 0x11, 0x53, + 0x42, 0x46, 0x4e, 0x9c, 0x4a, 0x25, 0x69, 0x03, 0x13, 0x74, 0x74, 0xc6, 0x29, 0x32, 0xed, 0x0f, + 0x02, 0x8d, 0xf1, 0x3e, 0xe7, 0xf1, 0x24, 0xe3, 0xa8, 0x28, 0x05, 0x33, 0x66, 0x01, 0xb7, 0x48, + 0x8b, 0x74, 0x4c, 0x4f, 0x7f, 0xd3, 0x63, 0xf8, 0xb7, 0x10, 0xa1, 0x50, 0x56, 0x49, 0xc3, 0xed, + 0x41, 0x4f, 0xa0, 0x32, 0xcf, 0x52, 0x94, 0xa9, 0x55, 0xd6, 0xb8, 0xb8, 0x68, 0x0f, 0xaa, 0xa8, + 0x52, 0xa6, 0x78, 0xb0, 0xb4, 0xcc, 0x16, 0xe9, 0x1c, 0x5d, 0x9e, 0x3a, 0x7f, 0xec, 0xce, 0xaf, + 0x79, 0x52, 0x84, 0xbd, 0xfd, 0x6f, 0xed, 0x07, 0xa0, 0x87, 0xcd, 0x30, 0x96, 0x11, 0x72, 0xda, + 0x84, 0x5a, 0xc4, 0xdf, 0xd4, 0xac, 0xb0, 0xe6, 0x0d, 0xcb, 0x1e, 0xe4, 0xa8, 0xbf, 0x35, 0x37, + 0xa1, 0xa6, 0xa4, 0x62, 0x8b, 0x99, 0x50, 0x3c, 0xc4, 0xa2, 0x2d, 0x68, 0x74, 0x97, 0x93, 0xb3, + 0xeb, 0xc3, 0x77, 0x77, 0x5e, 0x5a, 0x83, 0xff, 0xd3, 0xe1, 0xfd, 0x70, 0xf4, 0x38, 0xac, 0x1b, + 0xb4, 0x0a, 0xe6, 0xb8, 0x77, 0x3b, 0xa8, 0x13, 0x0a, 0x50, 0xe9, 0x4f, 0xbd, 0xc9, 0xc8, 0xab, + 0x97, 0x6e, 0x06, 0x9f, 0x6b, 0x9b, 0xac, 0xd6, 0x36, 0xf9, 0x5e, 0xdb, 0xe4, 0x7d, 0x63, 0x1b, + 0xab, 0x8d, 0x6d, 0x7c, 0x6d, 0x6c, 0xe3, 0xe9, 0x3c, 0x10, 0xea, 0x39, 0xf3, 0x9d, 0xb9, 0x0c, + 0x5d, 0x89, 0xa1, 0x44, 0x81, 0x17, 0x0b, 0xe6, 0xa3, 0x9b, 0x8f, 0x12, 0xbf, 0x04, 0x2e, 0x8b, + 0xc5, 0x6e, 0x16, 0xbf, 0xa2, 0xc7, 0xb8, 0xfa, 0x09, 0x00, 0x00, 0xff, 0xff, 0xf2, 0x00, 0x56, + 0x18, 0xb2, 0x01, 0x00, 0x00, +} + +func (m *PaginationRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PaginationRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PaginationRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Strategy != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.Strategy)) + i-- + dAtA[i] = 0x20 + } + if m.Cursor != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.Cursor)) + i-- + dAtA[i] = 0x18 + } + if m.Limit != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.Limit)) + i-- + dAtA[i] = 0x10 + } + if m.Page != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.Page)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *PaginationResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PaginationResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PaginationResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.TotalItems != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.TotalItems)) + i-- + dAtA[i] = 0x10 + } + if m.NextCursor != 0 { + i = encodeVarintPagination(dAtA, i, uint64(m.NextCursor)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintPagination(dAtA []byte, offset int, v uint64) int { + offset -= sovPagination(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *PaginationRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Page != 0 { + n += 1 + sovPagination(uint64(m.Page)) + } + if m.Limit != 0 { + n += 1 + sovPagination(uint64(m.Limit)) + } + if m.Cursor != 0 { + n += 1 + sovPagination(uint64(m.Cursor)) + } + if m.Strategy != 0 { + n += 1 + sovPagination(uint64(m.Strategy)) + } + return n +} + +func (m *PaginationResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.NextCursor != 0 { + n += 1 + sovPagination(uint64(m.NextCursor)) + } + if m.TotalItems != 0 { + n += 1 + sovPagination(uint64(m.TotalItems)) + } + return n +} + +func sovPagination(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozPagination(x uint64) (n int) { + return sovPagination(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PaginationRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PaginationRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PaginationRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Page", wireType) + } + m.Page = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Page |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Limit", wireType) + } + m.Limit = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Limit |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Cursor", wireType) + } + m.Cursor = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Cursor |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Strategy", wireType) + } + m.Strategy = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Strategy |= PaginationStrategy(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipPagination(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPagination + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PaginationResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PaginationResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PaginationResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NextCursor", wireType) + } + m.NextCursor = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NextCursor |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TotalItems", wireType) + } + m.TotalItems = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPagination + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TotalItems |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipPagination(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPagination + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipPagination(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPagination + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPagination + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPagination + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthPagination + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupPagination + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthPagination + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthPagination = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowPagination = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupPagination = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/api/v1beta1/pagination_test.go b/pkg/api/v1beta1/pagination_test.go new file mode 100644 index 00000000..b96c97e8 --- /dev/null +++ b/pkg/api/v1beta1/pagination_test.go @@ -0,0 +1,210 @@ +package v1beta1 + +import ( + "net/http/httptest" + "testing" + + "github.com/labstack/echo/v4" + "github.com/stretchr/testify/assert" +) + +func TestPaginationRequestUnmarshalHTTPRequest(t *testing.T) { + tests := []struct { + name string + queryParams map[string]string + want *PaginationRequest + wantErr bool + }{ + { + name: "Valid page and size", + queryParams: map[string]string{"page[number]": "5", "page[size]": "20"}, + want: &PaginationRequest{Page: 5, Limit: 20, Strategy: PaginationStrategy_PAGE}, + wantErr: false, + }, + { + name: "Only page provided", + queryParams: map[string]string{"page[number]": "3"}, + want: &PaginationRequest{Page: 3, Limit: 0, Strategy: PaginationStrategy_PAGE}, + wantErr: false, + }, + { + name: "Only size provided", + queryParams: map[string]string{"page[size]": "15"}, + want: &PaginationRequest{Page: 0, Limit: 15, Strategy: PaginationStrategy_UNKNOWN}, + wantErr: false, + }, + { + name: "Invalid page (not a number)", + queryParams: map[string]string{"page[number]": "invalid", "page[size]": "10"}, + want: &PaginationRequest{}, + wantErr: true, + }, + { + name: "Invalid size (not a number)", + queryParams: map[string]string{"page[number]": "1", "page[size]": "invalid"}, + want: &PaginationRequest{}, + wantErr: true, + }, + { + name: "No parameters provided", + queryParams: map[string]string{}, + want: &PaginationRequest{Strategy: PaginationStrategy_UNKNOWN}, + wantErr: false, + }, + { + name: "Valid cursor and size", + queryParams: map[string]string{"page[cursor]": "100", "page[size]": "20"}, + want: &PaginationRequest{Cursor: 100, Limit: 20, Strategy: PaginationStrategy_CURSOR}, + wantErr: false, + }, + { + name: "Invalid cursor (not a number)", + queryParams: map[string]string{"page[cursor]": "invalid", "page[size]": "10"}, + want: &PaginationRequest{}, + wantErr: true, + }, + { + name: "Cursor takes precedence over page", + queryParams: map[string]string{"page[cursor]": "100", "page[number]": "5", "page[size]": "20"}, + want: &PaginationRequest{Cursor: 100, Page: 5, Limit: 20, Strategy: PaginationStrategy_CURSOR}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := echo.New() + req := httptest.NewRequest(echo.GET, "/", nil) + q := req.URL.Query() + for k, v := range tt.queryParams { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + var result PaginationRequest + err := (&result).UnmarshalHTTPRequest(c) + + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.want, &result) + }) + } +} + +func TestPaginationRequestValidate(t *testing.T) { + tests := []struct { + name string + request PaginationRequest + wantErr error + }{ + { + name: "Valid page-based request", + request: PaginationRequest{Page: 1, Limit: 10, Strategy: PaginationStrategy_PAGE}, + wantErr: nil, + }, + { + name: "Valid cursor-based request", + request: PaginationRequest{Cursor: 100, Limit: 10, Strategy: PaginationStrategy_CURSOR}, + wantErr: nil, + }, + { + name: "Page is zero", + request: PaginationRequest{Page: 0, Limit: 10, Strategy: PaginationStrategy_PAGE}, + wantErr: ErrPageNotValid, + }, + { + name: "Limit is zero", + request: PaginationRequest{Page: 1, Limit: 0, Strategy: PaginationStrategy_PAGE}, + wantErr: ErrLimitNotValid, + }, + { + name: "Page exceeds maximum", + request: PaginationRequest{Page: MaxPage + 1, Limit: 10, Strategy: PaginationStrategy_PAGE}, + wantErr: ErrPageTooLarge, + }, + { + name: "Limit exceeds maximum", + request: PaginationRequest{Page: 1, Limit: MaxLimit + 1, Strategy: PaginationStrategy_PAGE}, + wantErr: ErrLimitTooLarge, + }, + { + name: "Unknown strategy", + request: PaginationRequest{Page: 1, Limit: 10, Strategy: PaginationStrategy_UNKNOWN}, + wantErr: ErrPaginationStrategyNotSupported, + }, + { + name: "Cursor-based with page set", + request: PaginationRequest{Page: 1, Cursor: 100, Limit: 10, Strategy: PaginationStrategy_CURSOR}, + wantErr: nil, // This should be valid as we're not checking for this case in Validate() + }, + { + name: "Page-based with cursor set", + request: PaginationRequest{Page: 1, Cursor: 100, Limit: 10, Strategy: PaginationStrategy_PAGE}, + wantErr: nil, // This should be valid as we're not checking for this case in Validate() + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.request.Validate() + if got != tt.wantErr { + t.Errorf("PaginationRequest.Validate() got error = %v, wantErr %v", got, tt.wantErr) + } + }) + } +} + +func TestPaginationRequestCalculateNextCursor(t *testing.T) { + tests := []struct { + name string + request PaginationRequest + totalItems uint64 + nextCursor int64 + }{ + { + name: "Fetch first page", + request: PaginationRequest{Cursor: 0, Limit: 3}, + totalItems: 5, + nextCursor: 3, + }, + { + name: "Fetch second page", + request: PaginationRequest{Cursor: 3, Limit: 2}, + totalItems: 5, + nextCursor: -1, + }, + { + name: "Fetch beyond available data", + request: PaginationRequest{Cursor: 5, Limit: 2}, + totalItems: 5, + nextCursor: -1, + }, + { + name: "Fetch with limit greater than remaining items", + request: PaginationRequest{Cursor: 3, Limit: 5}, + totalItems: 5, + nextCursor: -1, + }, + { + name: "Zero total items", + request: PaginationRequest{Cursor: 0, Limit: 10}, + totalItems: 0, + nextCursor: -1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.request.CalculateNextCursor(tt.totalItems) + if got != tt.nextCursor { + t.Errorf("PaginationRequest.CalculateNextCursor() = %v, want %v", got, tt.nextCursor) + } + }) + } +} diff --git a/proto/Dockerfile b/proto/Dockerfile new file mode 100644 index 00000000..49443ca6 --- /dev/null +++ b/proto/Dockerfile @@ -0,0 +1,39 @@ +# This Dockerfile is used for proto generation +# To build, run `make proto-image-build` + +FROM bufbuild/buf:1.7.0 as BUILDER + +FROM golang:1.22-alpine + + +RUN apk add --no-cache \ + nodejs \ + npm \ + git \ + make + +ENV GOLANG_PROTOBUF_VERSION=1.28.0 \ + GOGO_PROTOBUF_VERSION=1.3.2 \ + GRPC_GATEWAY_VERSION=1.16.0 + + +RUN go install github.com/cosmos/cosmos-proto/cmd/protoc-gen-go-pulsar@latest +RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@v${GOLANG_PROTOBUF_VERSION} +RUN go install github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway@v${GRPC_GATEWAY_VERSION} \ + github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger@v${GRPC_GATEWAY_VERSION} + +# install all gogo protobuf binaries +RUN git clone https://github.com/regen-network/protobuf.git; \ + cd protobuf; \ + go mod download; \ + make install + +# we need to use git clone because we use 'replace' directive in go.mod +# protoc-gen-gocosmos was moved to to in cosmos/gogoproto but pending a migration there. +RUN git clone https://github.com/regen-network/cosmos-proto.git; \ + cd cosmos-proto/protoc-gen-gocosmos; \ + go install . + +RUN npm install -g swagger-combine + +COPY --from=BUILDER /usr/local/bin /usr/local/bin diff --git a/proto/buf.gen.gogo.yaml b/proto/buf.gen.gogo.yaml new file mode 100644 index 00000000..9c8ba0a4 --- /dev/null +++ b/proto/buf.gen.gogo.yaml @@ -0,0 +1,8 @@ +version: v1 +plugins: + - name: gocosmos + out: .. + opt: plugins=grpc,Mgoogle/protobuf/any.proto=github.com/cosmos/cosmos-sdk/codec/types + - name: grpc-gateway + out: .. + opt: logtostderr=true,allow_colon_final_segments=true diff --git a/proto/buf.gen.swagger.yaml b/proto/buf.gen.swagger.yaml new file mode 100644 index 00000000..39d0bd87 --- /dev/null +++ b/proto/buf.gen.swagger.yaml @@ -0,0 +1,5 @@ +version: v1 +plugins: + - name: openapiv2 + out: ../tmp-swagger-gen + opt: logtostderr=true,fqn_for_openapi_name=true,simple_operation_ids=true,json_names_for_fields=false diff --git a/proto/buf.lock b/proto/buf.lock new file mode 100644 index 00000000..6a7ba9b9 --- /dev/null +++ b/proto/buf.lock @@ -0,0 +1,23 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: cosmos + repository: cosmos-proto + commit: 04467658e59e44bbb22fe568206e1f70 + digest: shake256:73a640bd60e0c523b0f8237ff34eab67c45a38b64bbbde1d80224819d272dbf316ac183526bd245f994af6608b025f5130483d0133c5edd385531326b5990466 + - remote: buf.build + owner: cosmos + repository: cosmos-sdk + commit: 954f7b05f38440fc8250134b15adec47 + digest: shake256:2ab4404fd04a7d1d52df0e2d0f2d477a3d83ffd88d876957bf3fedfd702c8e52833d65b3ce1d89a3c5adf2aab512616b0e4f51d8463f07eda9a8a3317ee3ac54 + - remote: buf.build + owner: cosmos + repository: gogo-proto + commit: 88ef6483f90f478fb938c37dde52ece3 + digest: shake256:89c45df2aa11e0cff97b0d695436713db3d993d76792e9f8dc1ae90e6ab9a9bec55503d48ceedd6b86069ab07d3041b32001b2bfe0227fa725dd515ff381e5ba + - remote: buf.build + owner: googleapis + repository: googleapis + commit: 553fd4b4b3a640be9b69a3fa0c17b383 + digest: shake256:e30e3247f84b7ff9d09941ce391eb4b6f04734e1e5fae796bfc471f167e6f90813630cc39397ee46b8bc0ea7d6935c416d15c219cc5732d9778cbfdf73a1ed6e diff --git a/proto/buf.yaml b/proto/buf.yaml new file mode 100644 index 00000000..a10b37f7 --- /dev/null +++ b/proto/buf.yaml @@ -0,0 +1,35 @@ +version: v1 +name: buf.build/osmosis-labs/sqs +deps: + - buf.build/cometbft/cometbft:4a62c99d422068a5165429b62a7eb824df46cca9 + - buf.build/cosmos/gogo-proto + - buf.build/cosmos/cosmos-sdk:v0.47.0 + - buf.build/cosmos/cosmos-proto + - buf.build/googleapis/googleapis +breaking: + use: + - FILE +lint: + use: + - DEFAULT + - COMMENTS + - FILE_LOWER_SNAKE_CASE + except: + - UNARY_RPC + - COMMENT_FIELD + - COMMENT_MESSAGE + - COMMENT_SERVICE + - COMMENT_RPC + - SERVICE_SUFFIX + - PACKAGE_VERSION_SUFFIX + - RPC_REQUEST_STANDARD_NAME + - PACKAGE_SAME_GO_PACKAGE + - PACKAGE_SAME_DIRECTORY + - PACKAGE_DIRECTORY_MATCH + - RPC_RESPONSE_STANDARD_NAME + - COMMENT_ENUM_VALUE + - COMMENT_ENUM + - ENUM_ZERO_VALUE_SUFFIX + - ENUM_ZERO_VALUE + ignore: + - tendermint diff --git a/proto/sqs/query/v1beta1/pagination.proto b/proto/sqs/query/v1beta1/pagination.proto new file mode 100644 index 00000000..4b552418 --- /dev/null +++ b/proto/sqs/query/v1beta1/pagination.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; +package sqs.query.v1beta1; + +option go_package = "github.com/osmosis-labs/sqs/pkg/api/v1beta1"; + +// PaginationStrategy is an enum that defines the pagination strategy to be used +// in the PaginationRequest request. +enum PaginationStrategy { + // unknown strategy indicates that the pagination strategy is not known + // or not set. + UNKNOWN = 0; + + // page-based pagination is the most common pagination strategy. It is + // compatible with offset pagination. If you have a page of 2 and a limit of + // 20, you will get items 20-39. + PAGE = 1; + + // cursor-based pagination is compatible with and similar to offset + // pagination. If you have a cursor of 50 and a limit of 20, you will get + // items 50-69. Client should re-request all data if it changes, as the cursor + // is not a pointer to a page, but to an item index. + CURSOR = 2; +} + +message PaginationRequest { + // page is the page number to query, starts at 0. If not provided, will + // default to first page. + uint64 page = 1; + + // limit is the total number of results to be returned in the result page. + // If left empty it will default to a value to be set by each app. + uint64 limit = 2; + + // cursor is the query offset to start from. If not provided, will default to + // the 0. + uint64 cursor = 3; + + // strategy is the pagination strategy to be used. If not provided, will + // default to PAGE. + PaginationStrategy strategy = 4; +} + +message PaginationResponse { + // next_cursor is the cursor to be used in the next request to get the next + // page of results for cursor-based pagination. + // If the next_cursor is -1, it means that there are no more results to be + // fetched. + int64 next_cursor = 1; + + // total_items is the total number of items available in the result set. + // This is useful for pagination when the client requests a subset of the + // result set. + uint64 total_items = 2; +} From a38c862c86d9ff315953a589c1d774dde8a9a901 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:34:12 +0200 Subject: [PATCH 03/12] BE-636-pipeline | Iterator return error for Next() instead of bool --- domain/pipeline/iterator.go | 17 ++++++++++------- domain/pipeline/iterator_test.go | 25 +++++++++++++------------ domain/pipeline/paginator.go | 8 ++++---- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/domain/pipeline/iterator.go b/domain/pipeline/iterator.go index 4774de38..491cad28 100644 --- a/domain/pipeline/iterator.go +++ b/domain/pipeline/iterator.go @@ -1,10 +1,13 @@ package pipeline -import "sync" +import ( + "fmt" + "sync" +) // Iterator interface defines methods for filtering, sorting, and chunked access type Iterator[K, V any] interface { - Next() (V, bool) // Retrieves the next element and a bool indicating if it's valid + Next() (V, error) // Retrieves the next element and a bool indicating if it's valid HasNext() bool // Checks if there are more elements SetOffset(offset int) // Sets the offset for starting point of iteration Reset() // Resets the iterator to the start @@ -27,24 +30,24 @@ type SyncMapIterator[K, V any] struct { } // Next retrieves the next element that matches the filter (if set), advancing the index -func (it *SyncMapIterator[K, V]) Next() (V, bool) { +func (it *SyncMapIterator[K, V]) Next() (V, error) { if it.HasNext() { key := it.keys[it.index] it.index++ mp, ok := it.data.Load(key) if !ok { - return *new(V), false + return *new(V), fmt.Errorf("key %v not found", key) } value, ok := mp.(V) if !ok { - return *new(V), false + return *new(V), fmt.Errorf("invalid type assertion for key %v", key) } - return value, true + return value, nil } - return *new(V), false + return *new(V), fmt.Errorf("no more elements") } // SetOffset sets the offset for the iterator. diff --git a/domain/pipeline/iterator_test.go b/domain/pipeline/iterator_test.go index 0a9644b1..518e5c72 100644 --- a/domain/pipeline/iterator_test.go +++ b/domain/pipeline/iterator_test.go @@ -1,6 +1,7 @@ package pipeline import ( + "fmt" "reflect" "sync" "testing" @@ -25,13 +26,13 @@ func (m *MockIterator) SetOffset(offset int) { m.index = offset } -func (m *MockIterator) Next() (int, bool) { +func (m *MockIterator) Next() (int, error) { if m.HasNext() { item := m.items[m.index] m.index++ - return item, true + return item, nil } - return 0, false + return 0, fmt.Errorf("no more elements") } func (m *MockIterator) Reset() { @@ -74,8 +75,8 @@ func TestSyncMapIteratorNext(t *testing.T) { var result []testdata for { - val, ok := it.Next() - if !ok { + val, err := it.Next() + if err != nil { break } result = append(result, val) @@ -86,8 +87,8 @@ func TestSyncMapIteratorNext(t *testing.T) { } // Test that after full iteration, Next() returns false - _, ok := it.Next() - if ok { + _, err := it.Next() + if err == nil { t.Errorf("Expected Next() to return false after full iteration") } }) @@ -147,8 +148,8 @@ func TestSyncMapIteratorSetOffset(t *testing.T) { var result []testdata for { - val, ok := it.Next() - if !ok { + val, err := it.Next() + if err != nil { break } result = append(result, val) @@ -159,9 +160,9 @@ func TestSyncMapIteratorSetOffset(t *testing.T) { } // Test that after full iteration, Next() returns false - _, ok := it.Next() - if ok { - t.Errorf("Expected Next() to return false after full iteration") + _, err := it.Next() + if err == nil { + t.Errorf("Expected Next() to return err after full iteration") } }) } diff --git a/domain/pipeline/paginator.go b/domain/pipeline/paginator.go index e69f85e9..83e89a63 100644 --- a/domain/pipeline/paginator.go +++ b/domain/pipeline/paginator.go @@ -35,8 +35,8 @@ func (p *Paginator[K, V]) FetchPageByPageNumber() []V { items := make([]V, 0, p.pagination.Limit) for i := uint64(0); i < p.pagination.Limit && p.iterator.HasNext(); i++ { - elem, valid := p.iterator.Next() - if valid { + elem, err := p.iterator.Next() + if err == nil { items = append(items, elem) } } @@ -54,8 +54,8 @@ func (p *Paginator[K, V]) FetchPageByCursor() []V { items := make([]V, 0, p.pagination.Limit) for i := uint64(0); i < p.pagination.Limit && p.iterator.HasNext(); i++ { - elem, valid := p.iterator.Next() - if valid { + elem, err := p.iterator.Next() + if err == nil { items = append(items, elem) } } From bfdf60272a7aeaba3dfc9e16d48104f3551bb995 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:35:11 +0200 Subject: [PATCH 04/12] BE-636-pipeline | Validate that the offset for SetOffset method --- domain/pipeline/iterator.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/domain/pipeline/iterator.go b/domain/pipeline/iterator.go index 491cad28..b4f30f91 100644 --- a/domain/pipeline/iterator.go +++ b/domain/pipeline/iterator.go @@ -54,6 +54,14 @@ func (it *SyncMapIterator[K, V]) Next() (V, error) { // This is useful when client requests a subset of the result set // and wants to start from a specific index. func (it *SyncMapIterator[K, V]) SetOffset(offset int) { + if offset < 0 { + offset = 0 + } + + if offset > len(it.keys) { + offset = len(it.keys) + } + it.index = offset } From 79648da07ea4f157d4db8704c20e54dd08fb7a8e Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:42:31 +0200 Subject: [PATCH 05/12] BE-636-pipeline | NewPaginator constructor checks --- domain/pipeline/paginator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/domain/pipeline/paginator.go b/domain/pipeline/paginator.go index 83e89a63..7b780567 100644 --- a/domain/pipeline/paginator.go +++ b/domain/pipeline/paginator.go @@ -4,6 +4,10 @@ import v1beta1 "github.com/osmosis-labs/sqs/pkg/api/v1beta1" // NewPaginator initializes a Paginator with an Iterator func NewPaginator[K, V any](iterator Iterator[K, V], p *v1beta1.PaginationRequest) *Paginator[K, V] { + if p == nil { + p = &v1beta1.PaginationRequest{} + } + return &Paginator[K, V]{ iterator: iterator, pagination: p, From 7abcfaadbb663374f3d65a09bd8f75e764bf1d60 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:43:55 +0200 Subject: [PATCH 06/12] BE-636-pipeline | Handle potential side effects for Clone method --- domain/pipeline/transformer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/domain/pipeline/transformer.go b/domain/pipeline/transformer.go index ddbd9536..2433c868 100644 --- a/domain/pipeline/transformer.go +++ b/domain/pipeline/transformer.go @@ -114,9 +114,12 @@ func (dt *SyncMapTransformer[K, V]) Data() []V { // Clone returns a new transformer with the same underlying data at the current state. func (dt *SyncMapTransformer[K, V]) Clone() *SyncMapTransformer[K, V] { + keys := make([]K, len(dt.keys)) + copy(keys, dt.keys) + return &SyncMapTransformer[K, V]{ data: dt.data, - keys: dt.keys, + keys: keys, } } From 988e9f78af51f97b7cdd76a646474f8f75816a30 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:46:10 +0200 Subject: [PATCH 07/12] BE-636-pipeline | Fix method signatures and return types in Transformer interface --- domain/pipeline/transformer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/domain/pipeline/transformer.go b/domain/pipeline/transformer.go index 2433c868..9020aa9b 100644 --- a/domain/pipeline/transformer.go +++ b/domain/pipeline/transformer.go @@ -7,9 +7,9 @@ import ( // Transformer defines a generic interface for filtering and sorting data. type Transformer[K, V any] interface { - Filter(fn func(V) bool) *Transformer[K, V] // Filter applies a filter to the data - Sort(less ...func(V, V) bool) *Transformer[K, V] // Sort sorts the data - Keys() []string // Keys returns the list of transformed keys + Filter(fn func(V) bool) Transformer[K, V] // Filter applies a filter to the data + Sort(less ...func(V, V) bool) Transformer[K, V] // Sort sorts the data + Keys() []string // Keys returns the list of transformed keys } // SyncMapTransformer is a generic data transformer for map data From 96666b8ebc239cd54ec0f740fb898401d880b715 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:49:17 +0200 Subject: [PATCH 08/12] BE-636-pipeline | Handle potential integer overflow when calculating next cursor --- pkg/api/v1beta1/pagination.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/api/v1beta1/pagination.go b/pkg/api/v1beta1/pagination.go index aba0e1be..df7fc305 100644 --- a/pkg/api/v1beta1/pagination.go +++ b/pkg/api/v1beta1/pagination.go @@ -2,6 +2,7 @@ package v1beta1 import ( "fmt" + math "math" "strconv" "github.com/labstack/echo/v4" @@ -119,6 +120,10 @@ func (r *PaginationRequest) CalculateNextCursor(totalItems uint64) (nextCursor i return -1 // cursor is out of range } + if r.Cursor > math.MaxUint64-r.Limit { + return -1 // overflow detected + } + endIndex := r.Cursor + r.Limit if endIndex >= totalItems { return -1 // end index is out of range From c68f2d2f0c58f1e35071dfbd1ce8295815e62988 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Thu, 28 Nov 2024 16:50:12 +0200 Subject: [PATCH 09/12] BE-636-pipeline | fmt --- ingest/usecase/plugins/orderbook/claimbot/export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/usecase/plugins/orderbook/claimbot/export_test.go b/ingest/usecase/plugins/orderbook/claimbot/export_test.go index c75f7f35..2ab99f41 100644 --- a/ingest/usecase/plugins/orderbook/claimbot/export_test.go +++ b/ingest/usecase/plugins/orderbook/claimbot/export_test.go @@ -9,8 +9,8 @@ import ( "github.com/osmosis-labs/sqs/domain/mvc" orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook" - txfeestypes "github.com/osmosis-labs/osmosis/v27/x/txfees/types" "github.com/osmosis-labs/osmosis/v27/app/params" + txfeestypes "github.com/osmosis-labs/osmosis/v27/x/txfees/types" "github.com/osmosis-labs/osmosis/osmomath" From cc70649aa2e83124442ae7085361deff93d4524e Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Fri, 29 Nov 2024 10:44:12 +0200 Subject: [PATCH 10/12] BE-636-pipeline | Improve docs for Next method. --- domain/pipeline/iterator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/pipeline/iterator.go b/domain/pipeline/iterator.go index b4f30f91..eacfcf88 100644 --- a/domain/pipeline/iterator.go +++ b/domain/pipeline/iterator.go @@ -30,6 +30,7 @@ type SyncMapIterator[K, V any] struct { } // Next retrieves the next element that matches the filter (if set), advancing the index +// Error is returned when given key is not found, type assertion for value fails, or there are no more elements to iterate func (it *SyncMapIterator[K, V]) Next() (V, error) { if it.HasNext() { key := it.keys[it.index] From 8752a1d5e652066650784fdc7299fb17a736c122 Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Fri, 29 Nov 2024 10:45:36 +0200 Subject: [PATCH 11/12] BE-636-pipeline | Use require for assertions for stylistic consistency --- domain/pipeline/iterator_test.go | 37 ++++++++++------------------- domain/pipeline/paginator_test.go | 15 ++++-------- domain/pipeline/transformer_test.go | 36 ++++++++-------------------- 3 files changed, 27 insertions(+), 61 deletions(-) diff --git a/domain/pipeline/iterator_test.go b/domain/pipeline/iterator_test.go index 518e5c72..1a764890 100644 --- a/domain/pipeline/iterator_test.go +++ b/domain/pipeline/iterator_test.go @@ -2,9 +2,10 @@ package pipeline import ( "fmt" - "reflect" "sync" "testing" + + "github.com/stretchr/testify/require" ) type testdata struct { @@ -82,15 +83,11 @@ func TestSyncMapIteratorNext(t *testing.T) { result = append(result, val) } - if !reflect.DeepEqual(result, tt.expected) { - t.Errorf("Iteration result = %v, want %v", result, tt.expected) - } + require.Equal(t, tt.expected, result, "Iteration result should match expected") - // Test that after full iteration, Next() returns false + // Test that after full iteration, Next() returns an error _, err := it.Next() - if err == nil { - t.Errorf("Expected Next() to return false after full iteration") - } + require.Error(t, err, "Expected Next() to return an error after full iteration") }) } } @@ -155,15 +152,11 @@ func TestSyncMapIteratorSetOffset(t *testing.T) { result = append(result, val) } - if !reflect.DeepEqual(result, tt.expected) { - t.Errorf("Iteration result after SetOffset(%d) = %v, want %v", tt.offset, result, tt.expected) - } + require.Equalf(t, tt.expected, result, "Iteration result after SetOffset(%d) should match expected", tt.offset) - // Test that after full iteration, Next() returns false + // Test that after full iteration, Next() returns an error _, err := it.Next() - if err == nil { - t.Errorf("Expected Next() to return err after full iteration") - } + require.Error(t, err, "Expected Next() to return an error after full iteration") }) } } @@ -213,9 +206,8 @@ func TestSyncMapIterator_HasNext(t *testing.T) { keys: tt.keys, index: tt.index, } - if got := iterator.HasNext(); got != tt.want { - t.Errorf("SyncMapIterator.HasNext() = %v, want %v", got, tt.want) - } + got := iterator.HasNext() + require.Equal(t, tt.want, got, "SyncMapIterator.HasNext() should return expected value") }) } } @@ -268,13 +260,8 @@ func TestSyncMapIterator_Reset(t *testing.T) { it.Reset() - if it.index != tt.expectedIndex { - t.Errorf("After Reset(), index = %v, want %v", it.index, tt.expectedIndex) - } - - if it.HasNext() != tt.expectedHasNext { - t.Errorf("After Reset(), HasNext() = %v, want %v", it.HasNext(), tt.expectedHasNext) - } + require.Equal(t, tt.expectedIndex, it.index, "After Reset(), index should match expected") + require.Equal(t, tt.expectedHasNext, it.HasNext(), "After Reset(), HasNext() should return expected value") }) } } diff --git a/domain/pipeline/paginator_test.go b/domain/pipeline/paginator_test.go index aad18aeb..83ffb6c2 100644 --- a/domain/pipeline/paginator_test.go +++ b/domain/pipeline/paginator_test.go @@ -1,10 +1,11 @@ package pipeline import ( - "reflect" "testing" v1beta1 "github.com/osmosis-labs/sqs/pkg/api/v1beta1" + + "github.com/stretchr/testify/require" ) func TestGetPage(t *testing.T) { @@ -61,9 +62,7 @@ func TestGetPage(t *testing.T) { iterator := &MockIterator{items: tt.items} paginator := NewPaginator[int, int](iterator, tt.pagination) got := paginator.GetPage() - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetPage() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, got, "GetPage() returned unexpected result") }) } } @@ -118,9 +117,7 @@ func TestFetchPageByPageNumber(t *testing.T) { iterator := &MockIterator{items: tt.items} paginator := NewPaginator[int, int](iterator, tt.pagination) got := paginator.FetchPageByPageNumber() - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("FetchPageByPageNumber() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, got, "FetchPageByPageNumber() returned unexpected result") }) } } @@ -175,9 +172,7 @@ func TestFetchPageByCursor(t *testing.T) { mockIterator := &MockIterator{items: tt.data} paginator := NewPaginator[int, int](mockIterator, tt.pagination) got := paginator.FetchPageByCursor() - if !reflect.DeepEqual(got, tt.expected) { - t.Errorf("FetchPageByCursor() = %v, want %v", got, tt.expected) - } + require.Equal(t, tt.expected, got, "FetchPageByCursor() returned unexpected result") }) } } diff --git a/domain/pipeline/transformer_test.go b/domain/pipeline/transformer_test.go index 145c3bbd..68ec01d3 100644 --- a/domain/pipeline/transformer_test.go +++ b/domain/pipeline/transformer_test.go @@ -1,10 +1,11 @@ package pipeline import ( - "reflect" "slices" "sync" "testing" + + "github.com/stretchr/testify/require" ) func TestSyncMapTransformer_Count(t *testing.T) { @@ -40,9 +41,7 @@ func TestSyncMapTransformer_Count(t *testing.T) { transformer := NewSyncMapTransformer[int, int](&m) got := transformer.Count() - if got != tt.expected { - t.Errorf("Expected count %d, but got %d", tt.expected, got) - } + require.Equal(t, tt.expected, got, "Expected count %d, but got %d", tt.expected, got) }) } } @@ -145,18 +144,11 @@ func TestSyncMapTransformerRange(t *testing.T) { transformer.Range(iterFunc) // Validate collected keys - if len(collectedKeys) != len(tc.expectedKeys) { - t.Errorf("Collected %d keys, want %d", len(collectedKeys), len(tc.expectedKeys)) - } + require.Equal(t, len(tc.expectedKeys), len(collectedKeys), "Collected %d keys, want %d", len(collectedKeys), len(tc.expectedKeys)) // Validate keys and values - if slices.Equal(tc.expectedKeys, collectedKeys) != true { - t.Errorf("Collected keys %v, want %v", collectedKeys, tc.expectedKeys) - } - - if slices.Equal(tc.expectedValues, collectedValues) != true { - t.Errorf("Collected values %v, want %v", collectedValues, tc.expectedValues) - } + require.True(t, slices.Equal(tc.expectedKeys, collectedKeys), "Collected keys %v, want %v", collectedKeys, tc.expectedKeys) + require.True(t, slices.Equal(tc.expectedValues, collectedValues), "Collected values %v, want %v", collectedValues, tc.expectedValues) }) } } @@ -214,9 +206,7 @@ func TestTransformerFilter(t *testing.T) { transformer.Filter(tt.filter) // Check if the original data is unchanged - if !reflect.DeepEqual(transformer.Data(), tt.want) { - t.Errorf("Filter() modified original data. Got %v, want %v", transformer.Data(), tt.want) - } + require.Equal(t, tt.want, transformer.Data(), "Filter() modified original data. Got %v, want %v", transformer.Data(), tt.want) }) } } @@ -258,9 +248,7 @@ func TestMapTransformerSort(t *testing.T) { transformer.Sort(tt.less) got := transformer.Data() - if !reflect.DeepEqual(got, tt.expected) { - t.Errorf("Expected %v, but got %v", tt.expected, got) - } + require.Equal(t, tt.expected, got, "Expected %v, but got %v", tt.expected, got) }) } } @@ -425,14 +413,10 @@ func TestSyncMapTransformerClone(t *testing.T) { clonedTransformer := transformer.Clone() // Verify the clone has the same underlying map - if clonedTransformer.data != transformer.data { - t.Errorf("Clone should share the same underlying sync.Map") - } + require.Equal(t, transformer.data, clonedTransformer.data, "Clone should share the same underlying sync.Map") // Verify the clone has the same keys - if len(clonedTransformer.keys) != len(transformer.keys) { - t.Errorf("Clone should have the same number of keys") - } + require.Equal(t, len(transformer.keys), len(clonedTransformer.keys), "Clone should have the same number of keys") }) } } From 0d46e58675c7e5e0ad24fdc270994cf800b04eda Mon Sep 17 00:00:00 2001 From: Deividas Petraitis Date: Fri, 29 Nov 2024 11:09:40 +0200 Subject: [PATCH 12/12] BE-636-pipeline | Add protocgen makefiles, scripts --- scripts/makefiles/proto.mk | 55 +++++++++++++++++++++++++++++++++ scripts/protocgen.sh | 60 ++++++++++++++++++++++++++++++++++++ sqsdomain/proto/ingest.proto | 18 +++++------ 3 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 scripts/makefiles/proto.mk create mode 100755 scripts/protocgen.sh diff --git a/scripts/makefiles/proto.mk b/scripts/makefiles/proto.mk new file mode 100644 index 00000000..d2b759c5 --- /dev/null +++ b/scripts/makefiles/proto.mk @@ -0,0 +1,55 @@ +############################################################################### +### Proto ### +############################################################################### + +proto-help: + @echo "proto subcommands" + @echo "" + @echo "Usage:" + @echo " make proto-[command]" + @echo "" + @echo "Available Commands:" + @echo " all Run proto-format and proto-gen" + @echo " format Format Protobuf files" + @echo " gen Generate Protobuf files" + @echo " image-build Build the protobuf Docker image" + @echo " image-push Push the protobuf Docker image" + +proto: proto-help +proto-all: proto-format proto-gen + +protoVer=0.15.1 +protoImageName=ghcr.io/cosmos/proto-builder:$(protoVer) +protoImage=$(DOCKER) run --rm -v $(CURDIR):/workspace --workdir /workspace $(protoImageName) + +#? proto-all: Run make proto-format proto-lint proto-gen +proto-all: proto-format proto-lint proto-gen + +#? proto-gen: Generate Protobuf files +proto-gen: + @$(protoImage) sh ./scripts/protocgen.sh + protoc --go_out=./ --go-grpc_out=./ --proto_path=./sqsdomain/proto ./sqsdomain/proto/ingest.proto + +#? proto-swagger-gen: Generate Protobuf Swagger +proto-swagger-gen: + @echo "Generating Protobuf Swagger" + @$(protoImage) sh ./scripts/protoc-swagger-gen.sh + +#? proto-format: Format proto file +proto-format: + @$(protoImage) find ./ -name "*.proto" -exec clang-format -i {} \; + +#? proto-lint: Lint proto file +proto-lint: + @$(protoImage) buf lint --error-format=json + +#? proto-check-breaking: Check proto file is breaking +proto-check-breaking: + @$(protoImage) buf breaking --against $(HTTPS_GIT)#branch=main + +#? proto-update-deps: Update protobuf dependencies +proto-update-deps: + @echo "Updating Protobuf dependencies" + $(DOCKER) run --rm -v $(CURDIR)/proto:/workspace --workdir /workspace $(protoImageName) buf mod update + +.PHONY: proto-all proto-gen proto-swagger-gen proto-format proto-lint proto-check-breaking proto-update-deps diff --git a/scripts/protocgen.sh b/scripts/protocgen.sh new file mode 100755 index 00000000..e5ae4a1c --- /dev/null +++ b/scripts/protocgen.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +# How to run manually: +# docker build --pull --rm -f "contrib/devtools/Dockerfile" -t cosmossdk-proto:latest "contrib/devtools" +# docker run --rm -v $(pwd):/workspace --workdir /workspace cosmossdk-proto sh ./scripts/protocgen.sh + +echo "Formatting protobuf files" +find ./ -name "*.proto" -exec clang-format -i {} \; + +set -e + +home=$PWD + +echo "Generating proto code" +proto_dirs=$(find ./ -name 'buf.yaml' -print0 | xargs -0 -n1 dirname | sort | uniq) +for dir in $proto_dirs; do + echo "Generating proto code for $dir" + + cd $dir + # check if buf.gen.pulsar.yaml exists in the proto directory + if [ -f "buf.gen.pulsar.yaml" ]; then + buf generate --template buf.gen.pulsar.yaml + # move generated files to the right places + if [ -d "../cosmos" -a "$dir" != "./proto" ]; then + cp -r ../cosmos $home/api + rm -rf ../cosmos + fi + fi + + # check if buf.gen.gogo.yaml exists in the proto directory + if [ -f "buf.gen.gogo.yaml" ]; then + for file in $(find . -maxdepth 8 -name '*.proto'); do + # this regex checks if a proto file has its go_package set to cosmossdk.io/api/... + # gogo proto files SHOULD ONLY be generated if this is false + # we don't want gogo proto to run for proto files which are natively built for google.golang.org/protobuf + if grep -q "option go_package" "$file" && grep -H -o -c 'option go_package.*cosmossdk.io/api' "$file" | grep -q ':0$'; then + buf generate --template buf.gen.gogo.yaml $file + fi + done + + # move generated files to the right places + if [ -d "../cosmossdk.io" ]; then + cp -r ../cosmossdk.io/* $home + rm -rf ../cosmossdk.io + fi + + if [ -d "../github.com" -a "$dir" != "./proto" ]; then + cp -r ../github.com/cosmos/cosmos-sdk/* $home + rm -rf ../github.com + fi + fi + + cd $home +done + +# move generated files to the right places +cp -r github.com/osmosis-labs/sqs/pkg ./ +rm -rf github.com + +go mod tidy diff --git a/sqsdomain/proto/ingest.proto b/sqsdomain/proto/ingest.proto index 61e282bb..24f15281 100644 --- a/sqsdomain/proto/ingest.proto +++ b/sqsdomain/proto/ingest.proto @@ -12,18 +12,18 @@ service SQSIngester { // PoolData represents a structure encapsulating an Osmosis liquidity pool. message PoolData { - // ChainModel is the chain representation model of the pool. - bytes chain_model = 1; + // ChainModel is the chain representation model of the pool. + bytes chain_model = 1; - // SqsModel is additional pool data used by the sidecar query server. - bytes sqs_model = 2; + // SqsModel is additional pool data used by the sidecar query server. + bytes sqs_model = 2; - // TickModel is the tick data of a concentrated liquidity pool. - // This field is only valid and set for concentrated pools. It is nil otherwise. - bytes tick_model = 3; + // TickModel is the tick data of a concentrated liquidity pool. + // This field is only valid and set for concentrated pools. It is nil + // otherwise. + bytes tick_model = 3; } - // ProcessBlock //////////////////////////////////////////////////////////////////// @@ -39,4 +39,4 @@ message ProcessBlockRequest { } // The response after completing the block processing. -message ProcessBlockReply{} +message ProcessBlockReply {}