Skip to content

Commit

Permalink
chore_: use constant for magic number
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun committed Jun 3, 2024
1 parent 3cdc7b8 commit 86a0f31
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const requestTimeout = 30 * time.Second
const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7
const cacheTTL = 20 * time.Minute
const maxHashQueryLength = 20
const hashQueryInterval = 5 * time.Second
const messageSentPeriod = 5 // in seconds

type ITelemetryClient interface {
PushReceivedEnvelope(*protocol.Envelope)
Expand Down Expand Up @@ -980,31 +983,31 @@ func (w *Waku) broadcast() {
}

func (w *Waku) checkIfMessagesStored() {
ticker := time.NewTicker(time.Second * 5)
ticker := time.NewTicker(hashQueryInterval)
defer ticker.Stop()

for {
select {
case <-w.ctx.Done():
w.logger.Debug("Stop the look for message stored check")
w.logger.Debug("stop the look for message stored check")
return
case <-ticker.C:
w.logger.Debug("Running loop for messages stored check")
w.logger.Debug("running loop for messages stored check")
w.sendMsgIDsMu.Lock()
pubsubTopics := make([]string, 0, len(w.sendMsgIDs))
pubsubMessageIds := make([][]gethcommon.Hash, 0, len(w.sendMsgIDs))
for pubsubTopic, subMsgs := range w.sendMsgIDs {
var queryMsgIds []gethcommon.Hash
for msgID, sendTime := range subMsgs {
if len(queryMsgIds) >= 20 {
if len(queryMsgIds) >= maxHashQueryLength {
break
}
// message is sent 5 seconds ago, check if it's stored
if uint32(w.timesource.Now().Unix()) > sendTime+5 {
if uint32(w.timesource.Now().Unix()) > sendTime+messageSentPeriod {
queryMsgIds = append(queryMsgIds, msgID)
}
}
w.logger.Debug("Store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic))
w.logger.Debug("store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic))
if len(queryMsgIds) > 0 {
pubsubTopics = append(pubsubTopics, pubsubTopic)
pubsubMessageIds = append(pubsubMessageIds, queryMsgIds)
Expand Down Expand Up @@ -1110,17 +1113,19 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha
requestID := protocol.GenerateRequestID()
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(selectedPeer))
opts = append(opts, store.WithPaging(false, maxHashQueryLength))
opts = append(opts, store.IncludeData(false))

messageHashes := make([]pb.MessageHash, len(hashes))
for i, hash := range hashes {
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}

w.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", selectedPeer.String()), zap.Any("messageHashes", messageHashes))
w.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes))

result, err := w.node.Store().QueryByHash(ctx, messageHashes, opts...)
if err != nil {
w.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", selectedPeer.String()), zap.Error(err))
w.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []gethcommon.Hash{}
}

Expand Down Expand Up @@ -1152,8 +1157,8 @@ func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Ha
}
}

w.logger.Debug("Ack message hashes", zap.Any("ackHashes", ackHashes))
w.logger.Debug("Missed message hashes", zap.Any("missedHashes", missedHashes))
w.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes))
w.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes))

return append(ackHashes, missedHashes...)
}
Expand Down

0 comments on commit 86a0f31

Please sign in to comment.