Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added methods to handle new transactions #36

Merged
merged 13 commits into from
May 8, 2024
7 changes: 7 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
}

f.events.SignalEvent(event)

for _, txResult := range txResults {
jinoosss marked this conversation as resolved.
Show resolved Hide resolved
event := &types.NewTransaction{
TxResult: txResult,
}
f.events.SignalEvent(event)
}
}

f.logger.Info(
Expand Down
240 changes: 219 additions & 21 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]*indexerTypes.NewBlock, 0)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
blockEvent, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)

capturedEvents = append(capturedEvents, blockEvent)
if e.GetType() == indexerTypes.NewBlockEvent {
_, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
}
},
}

Expand Down Expand Up @@ -201,16 +202,25 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {
require.Len(t, capturedEvents, len(blocks)-1)

for index, event := range capturedEvents {
// Make sure the block is valid
assert.Equal(t, blocks[index+1], event.Block)
switch event.GetType() {
case indexerTypes.NewBlockEvent:
eventData, ok := event.(*indexerTypes.NewBlock)
require.True(t, ok)

// Make sure the transaction results are valid
require.Len(t, event.Results, txCount)
// Make sure the block is valid
assert.Equal(t, blocks[index+1], eventData.Block)

for txIndex, tx := range event.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
// Make sure the transaction results are valid
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
}
case indexerTypes.NewTransactionsEvent:
_, ok := event.(*indexerTypes.NewTransaction)
require.True(t, ok)
}
}
})
Expand All @@ -229,14 +239,19 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]*indexerTypes.NewBlock, 0)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
blockEvent, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)

capturedEvents = append(capturedEvents, blockEvent)
switch e.GetType() {
case indexerTypes.NewBlockEvent:
_, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
case indexerTypes.NewTransactionsEvent:
_, ok := e.(*indexerTypes.NewTransaction)
require.True(t, ok)
}
},
}

Expand Down Expand Up @@ -378,12 +393,13 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

for index, event := range capturedEvents {
// Make sure the block is valid
assert.Equal(t, blocks[index+1], event.Block)
eventData := event.(*indexerTypes.NewBlock)
assert.Equal(t, blocks[index+1], eventData.Block)

// Make sure the transaction results are valid
require.Len(t, event.Results, txCount)
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range event.Results {
for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
Expand All @@ -392,6 +408,188 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {
})
}

func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) {
t.Parallel()

t.Run("valid txs flow, sequential", func(t *testing.T) {
t.Parallel()

var cancelFn context.CancelFunc

var (
blockNum = 1000
txCount = 10
txs = generateTransactions(t, txCount)
serializedTxs = serializeTxs(t, txs)
blocks = generateBlocks(t, blockNum+1, txs)

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
if e.GetType() == indexerTypes.NewTransactionsEvent {
_, ok := e.(*indexerTypes.NewTransaction)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
}
},
}

latestSaved = uint64(0)

mockStorage = &mock.Storage{
GetLatestSavedHeightFn: func() (uint64, error) {
if latestSaved == 0 {
return 0, storageErrors.ErrNotFound
}

return latestSaved, nil
},
GetWriteBatchFn: func() storage.Batch {
return &mock.WriteBatch{
SetBlockFn: func(block *types.Block) error {
savedBlocks = append(savedBlocks, block)

// Check if all blocks are saved
if block.Height == int64(blockNum) {
// At this point, we can cancel the process
cancelFn()
}

latestSaved = uint64(block.Height)

return nil
},
SetTxFn: func(result *types.TxResult) error {
savedTxs = append(savedTxs, result)

return nil
},
}
},
}

mockClient = &mockClient{
createBatchFn: func() clientTypes.Batch {
return &mockBatch{
executeFn: func() ([]any, error) {
// Force an error
return nil, errors.New("something is flaky")
},
countFn: func() int {
return 1 // to trigger execution
},
}
},
getLatestBlockNumberFn: func() (uint64, error) {
return uint64(blockNum), nil
},
getBlockFn: func(num uint64) (*core_types.ResultBlock, error) {
// Sanity check
if num > uint64(blockNum) {
t.Fatalf("invalid block requested, %d", num)
}

if len(blocks[num].Txs) != txCount {
t.Fatalf("invalid transactions, current size: %d", len(blocks[num].Txs))
}

return &core_types.ResultBlock{
Block: blocks[num],
}, nil
},
getBlockResultsFn: func(num uint64) (*core_types.ResultBlockResults, error) {
// Sanity check
if num > uint64(blockNum) {
t.Fatalf("invalid block requested, %d", num)
}

return &core_types.ResultBlockResults{
Height: int64(num),
Results: &state.ABCIResponses{
DeliverTxs: make([]abci.ResponseDeliverTx, txCount),
},
}, nil
},
}
)

// Create the fetcher
f := New(
mockStorage,
mockClient,
mockEvents,
WithMaxSlots(10),
WithMaxChunkSize(50),
)

// Short interval to force spawning
f.queryInterval = 100 * time.Millisecond

// Create the context
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

// Run the fetch
require.NoError(t, f.FetchChainData(ctx))

// Verify the transactions are saved correctly
require.Len(t, savedTxs, blockNum*txCount)

for blockIndex := 0; blockIndex < blockNum; blockIndex++ {
assert.Equal(t, blocks[blockIndex+1], savedBlocks[blockIndex])

for txIndex := 0; txIndex < txCount; txIndex++ {
// since this is a linearized array of transactions
// we can access each item with: blockNum * length + txIndx
// where blockNum is the y-axis, and txIndx is the x-axis
tx := savedTxs[blockIndex*txCount+txIndex]

assert.EqualValues(t, blockIndex+1, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
}
}

// Make sure proper events were emitted
// Blocks each have as many transactions as txCount.
txEventCount := (len(blocks) - 1) * txCount
require.Len(t, capturedEvents, txEventCount)

for index, event := range capturedEvents {
switch event.GetType() {
case indexerTypes.NewBlockEvent:
eventData, ok := event.(*indexerTypes.NewBlock)
require.True(t, ok)

// Make sure the block is valid
assert.Equal(t, blocks[index+1], eventData.Block)

// Make sure the transaction results are valid
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
}
case indexerTypes.NewTransactionsEvent:
eventData, ok := event.(*indexerTypes.NewTransaction)
require.True(t, ok)

blockIndex := index / txCount
txIndex := index % txCount

// Make sure the tx is valid
assert.Equal(t, blocks[blockIndex+1].Txs[txIndex], eventData.TxResult.Tx)
assert.Equal(t, blocks[blockIndex+1].Height, eventData.TxResult.Height)
}
}
})
}

func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 7 additions & 1 deletion serve/filters/filter/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func (b *BlockFilter) GetChanges() any {
return hashes
}

func (b *BlockFilter) UpdateWithBlock(block *types.Block) {
func (b *BlockFilter) UpdateWith(data any) {
if block, ok := data.(*types.Block); ok {
b.updateWithBlock(block)
}
}

func (b *BlockFilter) updateWithBlock(block *types.Block) {
b.Lock()
defer b.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion serve/filters/filter/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBlockFilter_GetChanges(t *testing.T) {
for _, block := range blocks {
block := block

f.UpdateWithBlock(block)
f.UpdateWith(block)
}

// Get changes
Expand Down
Loading
Loading