Skip to content

Commit

Permalink
feat: better logging for tx, provider, query, processor (#120)
Browse files Browse the repository at this point in the history
Co-authored-by: izyak <test@test.com>
  • Loading branch information
2 people authored and DeepakBomjan committed Aug 15, 2023
1 parent 2eca6fd commit adcde89
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 67 deletions.
86 changes: 57 additions & 29 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Verifier struct {

func NewIconChainProcessor(log *zap.Logger, provider *IconProvider, metrics *processor.PrometheusMetrics) *IconChainProcessor {
return &IconChainProcessor{
log: log.With(zap.String("chain_name", "Icon")),
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
latestClientState: make(latestClientState),
connectionStateCache: make(processor.ConnectionStateCache),
Expand Down Expand Up @@ -149,7 +149,7 @@ func (icp *IconChainProcessor) Run(ctx context.Context, initialBlockHistory uint
}

// start_query_cycle
icp.log.Debug(" **************** Entering main query loop **************** ")
icp.log.Debug("Starting query cycle")
err := icp.monitoring(ctx, &persistence)
return err
}
Expand All @@ -168,6 +168,14 @@ func (icp *IconChainProcessor) StartFromHeight(ctx context.Context) int {
return snapshotHeight
}

func (icp *IconChainProcessor) getLastSavedHeight() int {
snapshotHeight, err := rlycommon.LoadSnapshotHeight(icp.Provider().ChainId())
if err != nil || snapshotHeight < 0 {
return 0
}
return snapshotHeight
}

func (icp *IconChainProcessor) initializeConnectionState(ctx context.Context) error {
// TODO: review
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
Expand All @@ -187,9 +195,9 @@ func (icp *IconChainProcessor) initializeConnectionState(ctx context.Context) er
CounterpartyClientID: c.Counterparty.ClientId,
}] = c.State == conntypes.OPEN

icp.log.Info("found connection",
zap.String("ClientId ", c.ClientId),
zap.String("ConnectionID ", c.Id),
icp.log.Debug("Found open connection",
zap.String("client-id ", c.ClientId),
zap.String("connection-id ", c.Id),
)
}
return nil
Expand Down Expand Up @@ -221,11 +229,11 @@ func (icp *IconChainProcessor) initializeChannelState(ctx context.Context) error
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN

icp.log.Info("Found channel",
zap.String("channelID", ch.ChannelId),
zap.String("Port id ", ch.PortId))
zap.String("Counterparty Channel Id ", ch.Counterparty.ChannelId)
zap.String("Counterparty Port Id", ch.Counterparty.PortId)
icp.log.Debug("Found open channel",
zap.String("channel-id", ch.ChannelId),
zap.String("port-id ", ch.PortId),
zap.String("counterparty-channel-id", ch.Counterparty.ChannelId),
zap.String("counterparty-port-id", ch.Counterparty.PortId))
}

return nil
Expand Down Expand Up @@ -276,7 +284,7 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer
}
// }

icp.log.Debug("Start to query from height", zap.Int64("height", processedheight))
icp.log.Info("Start to query from height", zap.Int64("height", processedheight))
// subscribe to monitor block
ctxMonitorBlock, cancelMonitorBlock := context.WithCancel(ctx)
reconnect()
Expand Down Expand Up @@ -304,7 +312,7 @@ loop:

go func(ctx context.Context, cancel context.CancelFunc) {
blockReq.Height = types.NewHexInt(processedheight)
icp.log.Debug("Querying Height", zap.Int64("height", processedheight))
icp.log.Debug("Try to reconnect from", zap.Int64("height", processedheight))
err := icp.chainProvider.client.MonitorBlock(ctx, blockReq, func(conn *websocket.Conn, v *types.BlockNotification) error {
if !errors.Is(ctx.Err(), context.Canceled) {
btpBlockNotifCh <- v
Expand All @@ -313,7 +321,10 @@ loop:
}, func(conn *websocket.Conn) {
}, func(conn *websocket.Conn, err error) {})
if err != nil {
icp.SnapshotHeight(int(processedheight) - 5)
ht := icp.getHeightToSave(processedheight)
if ht != icp.getLastSavedHeight() {
icp.SnapshotHeight(ht)
}
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -329,8 +340,8 @@ loop:
err := icp.verifyBlock(ctx, br.Header)
if err != nil {
reconnect()
icp.log.Warn("failed to Verify BTP Block",
zap.Int64("got", br.Height),
icp.log.Warn("Failed to verify BTP Block",
zap.Int64("height", br.Height),
zap.Error(err),
)
break
Expand All @@ -348,8 +359,7 @@ loop:
}

ibcHeaderCache[uint64(br.Height)] = br.Header
icp.log.Info("Queried Latest height: ",
zap.String("chain id ", icp.chainProvider.ChainId()),
icp.log.Debug("Queried block ",
zap.Int64("height", br.Height))
err = icp.handlePathProcessorUpdate(ctx, br.Header, ibcMessageCache, ibcHeaderCache.Clone())
if err != nil {
Expand All @@ -364,7 +374,10 @@ loop:
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
}
icp.SnapshotHeight(int(icp.latestBlock.Height) - 5)
ht, takeSnapshot := icp.shouldSnapshot(int(icp.latestBlock.Height))
if takeSnapshot {
icp.SnapshotHeight(ht)
}
}
// remove unprocessed blockResponses
for len(btpBlockRespCh) > 0 {
Expand Down Expand Up @@ -455,19 +468,32 @@ loop:
}
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {

func (icp *IconChainProcessor) shouldSnapshot(height int) (int, bool) {
blockInterval := icp.Provider().ProviderConfig().GetBlockInterval()
snapshotThreshold := rlycommon.ONE_HOUR / int(blockInterval)

retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
snapshotHeight := height - int(retryAfter)
snapshotHeight := icp.getHeightToSave(int64(height))

if snapshotHeight%snapshotThreshold == 0 {
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
}
return snapshotHeight, true
}
return 0, false
}

func (icp *IconChainProcessor) getHeightToSave(height int64) int {
retryAfter := icp.Provider().ProviderConfig().GetFirstRetryBlockAfter()
ht := int(height - int64(retryAfter))
if ht < 0 {
return 0
}
return ht
}

func (icp *IconChainProcessor) SnapshotHeight(height int) {
icp.log.Info("Save height for snapshot", zap.Int("height", height))
err := rlycommon.SnapshotHeight(icp.Provider().ChainId(), height)
if err != nil {
icp.log.Warn("Failed saving height snapshot for height", zap.Int("height", height))
}
}

Expand Down Expand Up @@ -526,6 +552,8 @@ func (icp *IconChainProcessor) verifyBlock(ctx context.Context, ibcHeader provid
icp.verifier.nextProofContext = header.Validators
icp.verifier.verifiedHeight = int64(header.Height())
icp.verifier.prevNetworkSectionHash = types.NewNetworkSection(header.Header).Hash()
icp.log.Debug("Verified block ",
zap.Uint64("height", header.Height()))
return nil
}

Expand Down Expand Up @@ -602,8 +630,8 @@ func (icp *IconChainProcessor) handleBTPBlockRequest(
request.err = errors.Wrapf(err, "event.UnmarshalFromBytes: %v", err)
return
}
icp.log.Info("Detected eventlog: ", zap.Int64("Height", request.height),
zap.String("Eventlog", string(el.Indexed[0])))
icp.log.Info("Detected eventlog ", zap.Int64("height", request.height),
zap.String("eventlog", IconCosmosEventMap[string(el.Indexed[0])]))
eventlogs = append(eventlogs, el)
}

Expand All @@ -629,7 +657,7 @@ func (icp *IconChainProcessor) handleBTPBlockRequest(
request.response.IsProcessed = processed
return
}
request.err = errors.Wrapf(err, "failed to get btp header: %v", err)
request.err = errors.Wrapf(err, "Failed to get btp header: %v", err)
return
}
request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight))
Expand Down
2 changes: 0 additions & 2 deletions relayer/chains/icon/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,5 @@ func (icp *IconProvider) NewIconMessage(msg interface{}, method string) provider
Method: method,
}

// icp.log.Debug("Icon Message ", zap.String("method", method), zap.Any("message ", msg))

return im
}
3 changes: 1 addition & 2 deletions relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (pp *IconProviderConfig) GetFirstRetryBlockAfter() uint64 {
return 8
}

// NewProvider should provide a new Icon provider
// NewProvider should provide a new Icon provider
func (pp *IconProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) {

Expand All @@ -118,7 +117,7 @@ func (pp *IconProviderConfig) NewProvider(log *zap.Logger, homepath string, debu
codec := MakeCodec(ModuleBasics, []string{})

return &IconProvider{
log: log.With(zap.String("sys", "chain_client")),
log: log.With(zap.String("chain_id", pp.ChainID)),
client: NewClient(pp.getRPCAddr(), log),
PCfg: pp,
wallet: wallet,
Expand Down
6 changes: 3 additions & 3 deletions relayer/chains/icon/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,20 +615,20 @@ func (icp *IconProvider) QueryChannels(ctx context.Context) ([]*chantypes.Identi
"portId": portId,
}), &_channel)
if err != nil {
icp.log.Error("unable to fetch channel for ", zap.String("channel id ", channelId), zap.Error(err))
icp.log.Error("unable to fetch channel for ", zap.String("channel-id ", channelId), zap.Error(err))
continue
}

if _channel == "" {
icp.log.Debug("channel not present for ", zap.String("Channel id ", channelId), zap.String("port id ", portId))
icp.log.Debug("Channel not present for ", zap.String("channel-id ", channelId), zap.String("port-id ", portId))
continue
}

var channel chantypes.Channel
_, err = HexBytesToProtoUnmarshal(_channel, &channel)
if err != nil {
icp.log.Info("Unable to unmarshal channel for ",
zap.String("channel id ", channelId), zap.Error(err))
zap.String("channel-id ", channelId), zap.Error(err))
continue
}

Expand Down
7 changes: 3 additions & 4 deletions relayer/chains/wasm/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ func (ap *WasmProvider) buildMessages(clientCtx client.Context, txf tx.Factory,
}

txf = txf.WithGas(adjusted)
_, _ = fmt.Fprintf(os.Stderr, "%s\n", tx.GasEstimateResponse{GasEstimate: txf.Gas()})
// _, _ = fmt.Fprintf(os.Stderr, "%s\n", tx.GasEstimateResponse{GasEstimate: txf.Gas()})
}

if clientCtx.Simulate {
Expand Down Expand Up @@ -1003,9 +1003,8 @@ func (ap *WasmProvider) BroadcastTx(

ap.log.Info("Submitted transaction",
zap.String("chain_id", ap.PCfg.ChainID),
zap.String("txHash", res.TxHash),
zap.Int64("Height", res.Height),
zap.Any("Methods called", msgTypesField(msgs)),
zap.String("tx_hash", res.TxHash),
msgTypesField(msgs),
)

if shouldWait {
Expand Down
Loading

0 comments on commit adcde89

Please sign in to comment.