Skip to content

Commit

Permalink
fix_: ensure storenode requests do not exceed 24h
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 20, 2024
1 parent 11cf42b commit fe52352
Showing 1 changed file with 24 additions and 47 deletions.
71 changes: 24 additions & 47 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/maps"

gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -579,55 +580,15 @@ func (m *Messenger) syncFiltersFrom(ms mailservers.Mailserver, filters []*transp
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
}

var batches24h []MailserverBatch
for pubsubTopic := range batches {
batchKeys := make([]int, 0, len(batches[pubsubTopic]))
for k := range batches[pubsubTopic] {
batchKeys = append(batchKeys, k)
}
batchKeys := maps.Keys(batches[pubsubTopic])
sort.Ints(batchKeys)

keysToIterate := append([]int{}, batchKeys...)
for {
// For all batches
var tmpKeysToIterate []int
for _, k := range keysToIterate {
batch := batches[pubsubTopic][k]

dayBatch := MailserverBatch{
To: batch.To,
Cursor: batch.Cursor,
PubsubTopic: batch.PubsubTopic,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
}

from := batch.To - uint32(oneDayDuration.Seconds())
if from > batch.From {
dayBatch.From = from
batches24h = append(batches24h, dayBatch)

// Replace og batch with new dates
batch.To = from
batches[pubsubTopic][k] = batch
tmpKeysToIterate = append(tmpKeysToIterate, k)
} else {
batches24h = append(batches24h, batch)
}
}

if len(tmpKeysToIterate) == 0 {
break
for _, k := range batchKeys {
err := m.processMailserverBatch(ms, batches[pubsubTopic][k])
if err != nil {
m.logger.Error("error syncing topics", zap.Error(err))
return nil, err
}
keysToIterate = tmpKeysToIterate
}
}

for _, batch := range batches24h {
err := m.processMailserverBatch(ms, batch)
if err != nil {
m.logger.Error("error syncing topics", zap.Error(err))
return nil, err
}
}

Expand Down Expand Up @@ -727,6 +688,8 @@ type work struct {
contentTopics []types.TopicType
cursor types.StoreRequestCursor
limit uint32
from uint32
to uint32
}

type messageRequester interface {
Expand Down Expand Up @@ -805,6 +768,8 @@ func processMailserverBatch(
pubsubTopic: batch.PubsubTopic,
contentTopics: batch.Topics[i:j],
limit: pageLimit,
from: batch.From,
to: batch.To,
}
time.Sleep(50 * time.Millisecond)
}
Expand Down Expand Up @@ -846,7 +811,17 @@ loop:
}()

queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, batch.From, batch.To, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)

// If time range is greater than 24 hours, limit the range: to - (to-24h)
from := w.from
to := w.to
nextWorkTo := to
if batch.To-batch.From > uint32(oneDayDuration.Seconds()) {
from = to - uint32(oneDayDuration.Seconds())
nextWorkTo = from
}

cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
queryCancel()

if err != nil {
Expand Down Expand Up @@ -880,6 +855,8 @@ loop:
contentTopics: w.contentTopics,
cursor: cursor,
limit: nextPageLimit,
from: w.from,
to: nextWorkTo,
}
}(w)
case err := <-errCh:
Expand Down

0 comments on commit fe52352

Please sign in to comment.