Skip to content

Commit

Permalink
migrations,txapp,voting: optimize migration by omitting empty changesets
Browse files Browse the repository at this point in the history
This optimizes the migration process on the new network by not submitting
the resolutions with empty changesets thereby reducing the number of
resolutions to be processed and voted on the new network.

Also fixes any issues concerning if the changeset migration resolution
can fit within a block. All the changeset chunks are readjusted to the
blocksize/3 on the new network.
  • Loading branch information
charithabandi authored Sep 20, 2024
1 parent 2f6b18e commit d175b7f
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 103 deletions.
238 changes: 216 additions & 22 deletions internal/migrations/changeset_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type migrationListener struct {
// eventStore is the event store to broadcast the changeset migration events
eventStore listeners.EventStore

changesetSize int64

// logger
logger log.SugaredLogger
}
Expand Down Expand Up @@ -103,23 +105,26 @@ func Start(ctx context.Context, service *common.Service, eventStore listeners.Ev
currentHeight = lastHeight + 1
}

blockSize := service.GenesisConfig.ConsensusParams.Block.MaxBytes

// Create the migration listener
listener := &migrationListener{
config: cfg,
client: clt,
currentHeight: currentHeight,
eventStore: eventStore,
logger: service.Logger,
changesetSize: blockSize / 3, // 1/3 of the block size
}

// Start polling the admin server for changesets
service.Logger.Info("start syncing changesets from old chain", log.Int("startHeight", int64(cfg.StartHeight)))
return listener.RetrieveChangesets(ctx)
return listener.retrieveChangesets(ctx)
}

func (ml *migrationListener) RetrieveChangesets(ctx context.Context) error {
func (ml *migrationListener) retrieveChangesets(ctx context.Context) error {
for {
if ml.currentHeight >= ml.config.EndHeight {
if ml.currentHeight > ml.config.EndHeight {
// Synced up with the old chain, no more changesets to pull. Close the listener
ml.logger.Info("changesets have been synchronized with the old chain", log.Int("height", int64(ml.currentHeight)))
return nil
Expand All @@ -134,47 +139,53 @@ func (ml *migrationListener) RetrieveChangesets(ctx context.Context) error {
}
btsReceived := int64(0)

ml.logger.Debug("received changeset metadata", log.Int("height", int64(ml.currentHeight)), log.Int("numChunks", numChunks), log.Any("chunkSizes", chunkSizes))
ml.logger.Info("received changeset metadata", log.Int("height", int64(ml.currentHeight)), log.Int("numChunks", numChunks), log.Any("chunkSizes", chunkSizes))

wg := sync.WaitGroup{}
errChan := make(chan error, numChunks)
for i := int64(0); i < numChunks; i++ {
wg.Add(1)
go func(chunkIdx int64) {
cs, err := ml.LoadChangeset(ctx, chunkIdx, chunkSizes[chunkIdx])
defer wg.Done()
// Check if the chunk is already received
cs, err := getChangesetChunk(ctx, ml.eventStore, ml.currentHeight, uint64(chunkIdx))
if err != nil {
errChan <- err
return
}

// Create the changeset migration event
totalChunks := uint64(numChunks)
idx := uint64(chunkIdx)
evt := &changesetMigration{
Height: ml.currentHeight,
TotalChunks: totalChunks,
ChunkIdx: idx,
Changeset: cs,
if len(cs) > 0 {
ml.logger.Debug("changeset chunk already received", log.Int("height", int64(ml.currentHeight)), log.Int("chunk", int(chunkIdx)))
// Chunk already received, skip
if int64(len(cs)) != chunkSizes[chunkIdx] {
errChan <- fmt.Errorf("changeset size mismatch: expected %d, got %d", chunkSizes[chunkIdx], len(cs))
return
}

btsReceived += int64(len(cs))
wg.Done()
return
}

csEvt, err := evt.MarshalBinary()
// else request the changeset chunk from the old chain
cs, err = ml.LoadChangeset(ctx, chunkIdx, chunkSizes[chunkIdx])
if err != nil {
ml.logger.Error("failed to marshal changeset migration event", "error", err)
errChan <- err
return
}

ml.logger.Info("broadcasting changeset migration event", "height", ml.currentHeight, "chunk", chunkIdx, "size", len(cs))
if int64(len(cs)) != chunkSizes[chunkIdx] {
errChan <- fmt.Errorf("changeset size mismatch for height: %d: expected %d, got %d", ml.currentHeight, chunkSizes[chunkIdx], len(cs))
return
}

// Broadcast the changeset migration event to the event store for voting
err = ml.eventStore.Broadcast(ctx, voting.ChangesetMigrationEventType, csEvt)
if err != nil {
errChan <- fmt.Errorf("failed to broadcast changeset migration event: %w", err)
// Store the changeset chunk in the event store
if err = setChangesetChunk(ctx, ml.eventStore, ml.currentHeight, uint64(chunkIdx), cs); err != nil {
errChan <- err
return
}

btsReceived += int64(len(cs))
wg.Done()
}(i)
}

Expand All @@ -192,6 +203,11 @@ func (ml *migrationListener) RetrieveChangesets(ctx context.Context) error {
return errs
}

// Add the changeset migration event to the event store for voting
if err = ml.addChangesetEvent(ctx, ml.currentHeight, numChunks, btsReceived); err != nil {
return err
}

if err = setLastStoredHeight(ctx, ml.eventStore, ml.currentHeight); err != nil {
return err
}
Expand All @@ -205,6 +221,134 @@ func (ml *migrationListener) RetrieveChangesets(ctx context.Context) error {
}
}

const (
emptyChangesetChunksCount = 1
emptyChangesetChunkIdx = 0
)

// AddEvent adds the chaneset migration event to the event store for voting
// only if the changeset is not empty. If the changeset is empty, it is skipped.
// This method readjusts the changeset sizes depending on the current chains block size.
func (ml *migrationListener) addChangesetEvent(ctx context.Context, height uint64, numChunks int64, changesetSize int64) error {
// Check if the changeset is empty and skip it if it is not at the end height
// Add the changeset for the end height even if it is empty to signal the end of the migration
if height != ml.config.EndHeight && numChunks == 0 {
ml.logger.Debug("empty changesets for height", log.Int("height", int64(height)))
return nil
}

prevHeight, err := getLastChangesetBlockHeight(ctx, ml.eventStore)
if err != nil {
return err
}

// numChunks is the total number of chunks for the changeset for the given height
// received from the old chain. totalChunks is the total number of chunks
// the changeset will be split into based on the current chain's block size.
totalChunks := changesetSize / ml.changesetSize
if changesetSize%ml.changesetSize != 0 {
totalChunks++
}

// reader to read the changeset chunks according to adjusted chunk size
reader := newChunkReader(height, totalChunks)

if numChunks == 0 && height == ml.config.EndHeight {
// Add the changeset migration event for the end height even if it is empty
if err := ml.addEvent(height, prevHeight, emptyChangesetChunksCount, emptyChangesetChunkIdx, nil); err != nil {
return err
}
} else {
for i := int64(0); i < totalChunks; i++ {
// Read as many bytes as the changeset size
cs, err := reader.Read(ctx, ml.eventStore, int(ml.changesetSize))
if err != nil {
return err
}

if err = ml.addEvent(height, prevHeight, uint64(totalChunks), uint64(i), cs); err != nil {
return err
}
}
}

// Set the last changeset block height
if err = setLastChangesetBlockHeight(ctx, ml.eventStore, height); err != nil {
return err
}

// delete the changeset chunks from the event store
for i := int64(0); i < totalChunks; i++ {
if err = deleteChangesetChunk(ctx, ml.eventStore, height, uint64(i)); err != nil {
return err
}
}

return nil
}

func (ml *migrationListener) addEvent(height, prevHeight, totalChunks, chunkIdx uint64, csData []byte) error {
evt := &changesetMigration{
Height: height,
TotalChunks: totalChunks,
ChunkIdx: chunkIdx,
Changeset: csData,
PreviousBlock: prevHeight,
}

csEvt, err := evt.MarshalBinary()
if err != nil {
ml.logger.Error("failed to marshal changeset migration event", "error", err)
return err
}

ml.logger.Info("adding changeset migration event", log.Int("height", int64(height)), log.Int("size", len(csData)), log.Int("prevHeight", int(prevHeight)))

// Broadcast the changeset migration event to the event store for voting
err = ml.eventStore.Broadcast(context.Background(), voting.ChangesetMigrationEventType, csEvt)
if err != nil {
return fmt.Errorf("failed to broadcast changeset migration event: %w", err)
}

return nil
}

type chunkReader struct {
height uint64
chunkIdx uint64
totalChunks int64
data []byte
}

func newChunkReader(height uint64, totalChunks int64) *chunkReader {
return &chunkReader{
height: height,
totalChunks: totalChunks,
}
}

// Read function returns next numBytesToRead bytes from the changeset chunks
func (r *chunkReader) Read(ctx context.Context, eventStore listeners.EventStore, numBytesToRead int) ([]byte, error) {
for len(r.data) < numBytesToRead {
if r.chunkIdx >= uint64(r.totalChunks) {
return r.data, nil // no more chunks to read
}

bts, err := getChangesetChunk(ctx, eventStore, r.height, r.chunkIdx)
if err != nil {
return nil, err
}

r.data = append(r.data, bts...)
r.chunkIdx++
}

data := r.data[:numBytesToRead]
r.data = r.data[numBytesToRead:]

return data, nil
}

func (ml *migrationListener) GetChangesetMetadata(ctx context.Context) (totalChunks int64, chunkSizes []int64, err error) {
err = retry(ctx, maxRetries, func() error {
totalChunks, chunkSizes, err = ml.client.ChangesetMetadata(ctx, int64(ml.currentHeight))
Expand Down Expand Up @@ -238,7 +382,9 @@ func (c *MigrationConfig) Map() map[string]string {

var (
// lastHeightKey is the key used to store the last height processed by the listener
lastHeightKey = []byte("lh")
lastHeightKey = []byte("lh")
lastChangesetKey = []byte("lc")
chunkKey = []byte("ck")
)

// getLastStoredHeight gets the last height stored by the KV store
Expand Down Expand Up @@ -269,6 +415,54 @@ func setLastStoredHeight(ctx context.Context, eventStore listeners.EventStore, h
return nil
}

// getLastStoredHeight gets the last height stored by the KV store
func getLastChangesetBlockHeight(ctx context.Context, eventStore listeners.EventStore) (uint64, error) {
// get the last confirmed block height processed by the listener
lastHeight, err := eventStore.Get(ctx, lastChangesetKey)
if err != nil {
return 0, fmt.Errorf("failed to get last block height: %w", err)
}

if len(lastHeight) == 0 {
return 0, nil
}

return binary.LittleEndian.Uint64(lastHeight), nil
}

// setLastStoredHeight sets the last height stored by the KV store
func setLastChangesetBlockHeight(ctx context.Context, eventStore listeners.EventStore, height uint64) error {
heightBts := make([]byte, 8)
binary.LittleEndian.PutUint64(heightBts, height)

// set the last confirmed block height processed by the listener
err := eventStore.Set(ctx, lastChangesetKey, heightBts)
if err != nil {
return fmt.Errorf("failed to set last block height: %w", err)
}
return nil
}

// SetChunk sets the changeset chunk of the given height and chunk index
func setChangesetChunk(ctx context.Context, eventStore listeners.EventStore, height, chunkIdx uint64, chunk []byte) error {
suffix := []byte(fmt.Sprintf("%d-%d", height, chunkIdx))
key := append(chunkKey, suffix...)
return eventStore.Set(ctx, key, chunk)
}

// GetChunk gets the changeset chunk of the given height and chunk index
func getChangesetChunk(ctx context.Context, eventStore listeners.EventStore, height, chunkIdx uint64) ([]byte, error) {
suffix := []byte(fmt.Sprintf("%d-%d", height, chunkIdx))
key := append(chunkKey, suffix...)
return eventStore.Get(ctx, key)
}

func deleteChangesetChunk(ctx context.Context, eventStore listeners.EventStore, height, chunkIdx uint64) error {
suffix := []byte(fmt.Sprintf("%d-%d", height, chunkIdx))
key := append(chunkKey, suffix...)
return eventStore.Delete(ctx, key)
}

// retry will retry the function until it is successful, or reached the max retries
func retry(ctx context.Context, maxRetries int64, fn func() error) error {
retrier := &backoff.Backoff{
Expand Down
Loading

0 comments on commit d175b7f

Please sign in to comment.