Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix start height greater than latest height issue #151

Merged
merged 6 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
}
}

func (ccp *CosmosChainProcessor) SnapshotHeight(height int) {
func (ccp *CosmosChainProcessor) SnapshotHeight(height int64) {
panic("Not implemented for Cosmos")
}

func (ccp *CosmosChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *CosmosChainProcessor) StartFromHeight(ctx context.Context) int64 {
panic("Not implemented for Cosmos")
}

Expand Down
41 changes: 24 additions & 17 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,22 @@ func (icp *IconChainProcessor) Run(ctx context.Context, initialBlockHistory uint
return err
}

func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int {
func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int64 {
cfg := icp.Provider().ProviderConfig().(*IconProviderConfig)

if cfg.StartHeight != 0 {
return int(cfg.StartHeight)
return cfg.StartHeight
}
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil {
icp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
icp.log.Info("Obtained start height from config", zap.Int("height", snapshotHeight))
icp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))
}
return snapshotHeight
}

func (icp *IconChainProcessor) getLastSavedHeight() int {
func (icp *IconChainProcessor) getLastSavedHeight() int64 {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0
Expand Down Expand Up @@ -275,15 +276,21 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer
}

var err error
// processedheight := int64(icp.chainProvider.lastBTPBlockHeight)
// if processedheight == 0 {
processedheight := int64(icp.StartFromHeight(ctx))
processedheight := icp.StartFromHeight(ctx)
latestHeight, err := icp.chainProvider.QueryLatestHeight(ctx)
if err != nil {
fmt.Println("Error fetching latest block")
izyak marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if processedheight > latestHeight {
icp.log.Warn("Start height set is greater than latest height",
zap.Int64("Start height", processedheight),
zap.Int64("Latest Height", latestHeight),
)
processedheight = latestHeight
}
if processedheight <= 0 {
processedheight, err = icp.chainProvider.QueryLatestHeight(ctx)
if err != nil {
fmt.Println("Error fetching latest block")
return err
}
processedheight = latestHeight
}

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))
Expand Down Expand Up @@ -475,20 +482,20 @@ loop:
}
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {
func (icp *IconChainProcessor) getHeightToSave(height int64) int64 {
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {
icp.log.Info("Save height for snapshot", zap.Int("height", height))
func (icp *IconChainProcessor) SnapshotHeight(height int64) {
icp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
icp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))
}
}

Expand Down
4 changes: 2 additions & 2 deletions relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func NewMockChainProcessor(ctx context.Context, log *zap.Logger, chainID string,
}
}

func (mcp *MockChainProcessor) SnapshotHeight(height int) {
func (mcp *MockChainProcessor) SnapshotHeight(height int64) {
panic("")
}

func (mcp *MockChainProcessor) StartFromHeight(ctx context.Context) int {
func (mcp *MockChainProcessor) StartFromHeight(ctx context.Context) int64 {
return 0
}

Expand Down
4 changes: 2 additions & 2 deletions relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ func (pcp *PenumbraChainProcessor) initializeChannelState(ctx context.Context) e
return nil
}

func (ccp *PenumbraChainProcessor) SnapshotHeight(height int) {
func (ccp *PenumbraChainProcessor) SnapshotHeight(height int64) {
panic("Not implemented for Penumbra")
}

func (ccp *PenumbraChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *PenumbraChainProcessor) StartFromHeight(ctx context.Context) int64 {
panic("Not implemented for Penumbra")
}

Expand Down
25 changes: 11 additions & 14 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ type queryCyclePersistence struct {
balanceUpdateWaitDuration time.Duration
}

func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int64 {
cfg := ccp.Provider().ProviderConfig().(*WasmProviderConfig)
if cfg.StartHeight != 0 {
return int(cfg.StartHeight)
return int64(cfg.StartHeight)
}
snapshotHeight, err := common.LoadSnapshotHeight(ccp.Provider().ChainId())
if err != nil {
ccp.log.Warn("Failed to load height from snapshot", zap.Error(err))
} else {
ccp.log.Info("Obtained start height from config", zap.Int("height", snapshotHeight))
ccp.log.Info("Obtained start height from config", zap.Int64("height", snapshotHeight))
}
return snapshotHeight
}
Expand Down Expand Up @@ -248,16 +248,13 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint

// this will make initial QueryLoop iteration look back initialBlockHistory blocks in history
latestQueriedBlock := ccp.StartFromHeight(ctx)
if latestQueriedBlock < 0 {
latestQueriedBlock = int(persistence.latestHeight - int64(initialBlockHistory))
if latestQueriedBlock < 0 {
latestQueriedBlock = 0
}
if latestQueriedBlock <= 0 || latestQueriedBlock > persistence.latestHeight {
latestQueriedBlock = persistence.latestHeight
}

persistence.latestQueriedBlock = int64(latestQueriedBlock)

ccp.log.Info("Start to query from height ", zap.Int("height", latestQueriedBlock))
ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock))

_, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock)
if err != nil {
Expand Down Expand Up @@ -519,20 +516,20 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
return nil
}

func (ccp *WasmChainProcessor) getHeightToSave(height int64) int {
func (ccp *WasmChainProcessor) getHeightToSave(height int64) int64 {
retryAfter := ccp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
ht := height - int64(retryAfter)
if ht < 0 {
return 0
}
return ht
}

func (ccp *WasmChainProcessor) SnapshotHeight(height int) {
ccp.log.Info("Save height for snapshot", zap.Int("height", height))
func (ccp *WasmChainProcessor) SnapshotHeight(height int64) {
ccp.log.Info("Save height for snapshot", zap.Int64("height", height))
err := common.SnapshotHeight(ccp.Provider().ChainId(), height)
if err != nil {
ccp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
ccp.log.Warn("Failed saving height snapshot for height", zap.Int64("height", height))
}
}

Expand Down
6 changes: 3 additions & 3 deletions relayer/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getSnapshotPath(chain_name string) (string, error) {
return snapshot, nil
}

func SnapshotHeight(chain_id string, height int) error {
func SnapshotHeight(chain_id string, height int64) error {
snapshot, err := getSnapshotPath(chain_id)
if err != nil {
return fmt.Errorf("Failed to find snapshot path, %w", err)
Expand All @@ -46,7 +46,7 @@ func SnapshotHeight(chain_id string, height int) error {
return nil
}

func LoadSnapshotHeight(chain_id string) (int, error) {
func LoadSnapshotHeight(chain_id string) (int64, error) {
snapshot, err := getSnapshotPath(chain_id)
if err != nil {
return -1, fmt.Errorf("Failed to find snapshot path, %w", err)
Expand All @@ -56,5 +56,5 @@ func LoadSnapshotHeight(chain_id string) (int, error) {
if err != nil {
return -1, fmt.Errorf("Failed reading file, %w", err)
}
return strconv.Atoi(strings.TrimSuffix(string(content), "\n"))
return strconv.ParseInt(strings.TrimSuffix(string(content), "\n"), 10, 64)
}
4 changes: 2 additions & 2 deletions relayer/processor/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type ChainProcessor interface {

// Take snapshot of height every N blocks or when the chain processor fails, so that the relayer
// can restart from that height
SnapshotHeight(height int)
SnapshotHeight(height int64)

// If the relay goes down, start chain processor from height returned by this function
// CAN return max(snapshotHeight, latestHeightFromClient)
StartFromHeight(ctx context.Context) int
StartFromHeight(ctx context.Context) int64
}

// ChainProcessors is a slice of ChainProcessor instances.
Expand Down