Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Reduce event memory footprint #1183

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Cursor struct {
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
// Note: Currently, there is no use for it (events are transaction-wide and not operation-specific)
// but we keep it in order to make the API future-proof.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
Expand Down
40 changes: 19 additions & 21 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,16 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

type bucket struct {
ledgerSeq uint32
ledgerCloseTimestamp int64
events []event
}

type event struct {
contents xdr.DiagnosticEvent
txIndex uint32
opIndex uint32
eventIndex uint32
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
}

func (e event) cursor(ledgerSeq uint32) Cursor {
return Cursor{
Ledger: ledgerSeq,
Tx: e.txIndex,
Op: e.opIndex,
Event: e.eventIndex,
}
}
Expand Down Expand Up @@ -129,7 +121,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if eventRange.End.Cmp(cur) <= 0 {
return lastLedgerInWindow, nil
}
if !f(event.contents, cur, timestamp) {
var diagnosticEvent xdr.DiagnosticEvent
err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -201,7 +198,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error {
BucketContent: events,
}
m.lock.Lock()
m.eventsByLedger.Append(bucket)
if _, err = m.eventsByLedger.Append(bucket); err != nil {
return err
}
m.lock.Unlock()
m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}).
Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -241,15 +240,14 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
return nil, err
}
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
return nil, err
}
events = append(events, event{
contents: e,
txIndex: tx.Index,
// NOTE: we cannot really index by operation since all events
// are provided as part of the transaction. However,
// that shouldn't matter in practice since a transaction
// can only contain a single Host Function Invocation.
opIndex: 0,
eventIndex: uint32(index),
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
})
}
}
Expand Down
82 changes: 40 additions & 42 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"bytes"
"testing"

"github.com/stellar/go/xdr"
Expand All @@ -13,68 +14,64 @@ import (
var (
ledger5CloseTime = ledgerCloseTime(5)
ledger5Events = []event{
newEvent(1, 0, 0, 100),
newEvent(1, 0, 1, 200),
newEvent(2, 0, 0, 300),
newEvent(2, 1, 0, 400),
newEvent(1, 0, 100),
newEvent(1, 1, 200),
newEvent(2, 0, 300),
newEvent(2, 1, 400),
}
ledger6CloseTime = ledgerCloseTime(6)
ledger6Events []event = nil
ledger7CloseTime = ledgerCloseTime(7)
ledger7Events = []event{
newEvent(1, 0, 0, 500),
newEvent(1, 0, 500),
}
ledger8CloseTime = ledgerCloseTime(8)
ledger8Events = []event{
newEvent(1, 0, 0, 600),
newEvent(2, 0, 0, 700),
newEvent(2, 0, 1, 800),
newEvent(2, 0, 2, 900),
newEvent(2, 1, 0, 1000),
newEvent(1, 0, 600),
newEvent(2, 0, 700),
newEvent(2, 1, 800),
newEvent(2, 2, 900),
newEvent(2, 3, 1000),
}
)

func ledgerCloseTime(seq uint32) int64 {
return int64(seq)*25 + 100
}

func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
func newEvent(txIndex, eventIndex, val uint32) event {
v := xdr.Uint32(val)
return event{
contents: xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},

e := xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},
},
},
},
txIndex: txIndex,
opIndex: opIndex,
eventIndex: eventIndex,
}
}

func mustMarshal(e xdr.DiagnosticEvent) string {
result, err := xdr.MarshalBase64(e)
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
panic(err)
}
return result
return event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: txIndex,
eventIndex: eventIndex,
}
}

func (e event) equals(other event) bool {
return e.txIndex == other.txIndex &&
e.opIndex == other.opIndex &&
e.eventIndex == other.eventIndex &&
mustMarshal(e.contents) == mustMarshal(other.contents)
bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR)
}

func eventsAreEqual(t *testing.T, a, b []event) {
Expand Down Expand Up @@ -291,7 +288,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: MaxCursor,
ClampEnd: true,
Expand All @@ -336,7 +333,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand All @@ -354,9 +351,9 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 8, Tx: 1, Op: 4},
End: Cursor{Ledger: 8, Tx: 2},
ClampEnd: false,
},
concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]),
Expand All @@ -367,11 +364,12 @@ func TestScan(t *testing.T) {
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
contents: contractEvent,
txIndex: cursor.Tx,
opIndex: cursor.Op,
eventIndex: cursor.Event,
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
})
return iterateAll
}
Expand Down
Loading