From 5e77b8543c01301cfee14e4165004f419cf676aa Mon Sep 17 00:00:00 2001 From: Steven Lee Date: Fri, 1 Jul 2022 10:47:48 -0700 Subject: [PATCH] Added configurable timestamp keyset when getting messages from DB --- lib/block_view_message.go | 40 +++++++++++++++++++-------------------- lib/db_utils.go | 20 +++++++++++--------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/lib/block_view_message.go b/lib/block_view_message.go index dbcc20b53..0a3aafba6 100644 --- a/lib/block_view_message.go +++ b/lib/block_view_message.go @@ -237,11 +237,11 @@ func (bav *UtxoView) GetMessagingGroupEntriesForUser(ownerPublicKey []byte) ( func (bav *UtxoView) GetMessagesForUser(publicKey []byte) ( _messageEntries []*MessageEntry, _messagingKeyEntries []*MessagingGroupEntry, _err error) { - return bav.GetLimitedMessagesForUser(publicKey, math.MaxUint64) + return bav.GetLimitedMessagesForUser(publicKey, math.MaxUint64, math.MaxUint64) } // TODO: Update for Postgres -func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, limit uint64) ( +func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, maxTimestampNanos uint64, limit uint64) ( _messageEntries []*MessageEntry, _messagingGroupEntries []*MessagingGroupEntry, _err error) { // This function will fetch up to limit number of messages for a public key. To accomplish @@ -257,30 +257,30 @@ func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, limit uint // We define an auxiliary map to keep track of messages in UtxoView and DB. messagesMap := make(map[MessageKey]*MessageEntry) - // First look for messages in the UtxoView. We don't skip deleted entries for now as we will do it later. - for messageKey, messageEntry := range bav.MessageKeyToMessageEntry { - for _, messagingKeyEntry := range messagingGroupEntries { - if reflect.DeepEqual(messageKey.PublicKey[:], messagingKeyEntry.MessagingPublicKey[:]) { - // We will add the messages with the sender messaging public key as the MessageKey - // so that we have no overlaps in the DB in some weird edge cases. - mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos) - messagesMap[mapKey] = messageEntry - break - } - } - } - - // We fetched all UtxoView entries, so now look for messages in the DB. - dbMessageEntries, err := DBGetLimitedMessageForMessagingKeys(bav.Handle, messagingGroupEntries, limit) + // First look for messages in the DB. We don't skip deleted entries for now as we will do it later. + dbMessageEntries, err := DBGetLimitedMessageForMessagingKeys(bav.Handle, messagingGroupEntries, maxTimestampNanos, limit) if err != nil { return nil, nil, errors.Wrapf(err, "GetMessagesForUser: Problem fetching MessageEntries from db: ") } // Now iterate through all the db message entries and add them to our auxiliary map. for _, messageEntry := range dbMessageEntries { - // Use the sender messaging public key for the MessageKey to make sure they match the UtxoView entries. + // We will add the messages with the sender messaging public key as the MessageKey + // so that we have no overlaps in the UtxoView in some weird edge cases. mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos) - if _, exists := messagesMap[mapKey]; !exists { - messagesMap[mapKey] = messageEntry + messagesMap[mapKey] = messageEntry + } + + // We fetched all DB entries, so now look for messages in the UtxoView to override DB entries. + for messageKey, messageEntry := range bav.MessageKeyToMessageEntry { + for _, messagingKeyEntry := range messagingGroupEntries { + if reflect.DeepEqual(messageKey.PublicKey[:], messagingKeyEntry.MessagingPublicKey[:]) { + // Use the sender messaging public key for the MessageKey to make sure they match the DB entries. + mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos) + if _, exists := messagesMap[mapKey]; exists { + messagesMap[mapKey] = messageEntry + } + break + } } } diff --git a/lib/db_utils.go b/lib/db_utils.go index 1cbdeb075..cf2fc9ad9 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1372,7 +1372,7 @@ func DBGetMessageEntriesForPublicKey(handle *badger.DB, publicKey []byte) ( func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn( txn *badger.Txn, messagingGroupEntries []*MessagingGroupEntry, - limit uint64) (_privateMessages []*MessageEntry, _err error) { + maxTimestampNanos uint64, limit uint64) (_privateMessages []*MessageEntry, _err error) { // Users can have many messaging keys. By default, a users has the base messaging key, which // is just their main public key. Users can also register messaging keys, e.g. keys like the @@ -1389,13 +1389,19 @@ func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn( //prefixes = append(prefixes, _dbSeekPrefixForMessagePartyPublicKey(keyEntry.MessagingPublicKey[:])) } - // Initialize all iterators, add the 0xff byte to the seek prefix so that we can iterate backwards. + // Setting the prefix to a tstamp of MaxUint64 when unspecified should return all the messages + // for the public key in sorted order since MaxUint64 >> the maximum timestamp in the db. + if maxTimestampNanos == 0 { + maxTimestampNanos = math.MaxUint64 + } + + // Initialize all iterators, add timestampKeyset bytes to the seek prefix so that we can iterate backwards. var messagingIterators []*badger.Iterator for _, prefix := range prefixes { opts := badger.DefaultIteratorOptions opts.Reverse = true iterator := txn.NewIterator(opts) - iterator.Seek(append(prefix, 0xff)) + iterator.Seek(append(prefix, UintToBuf(maxTimestampNanos)...)) defer iterator.Close() messagingIterators = append(messagingIterators, iterator) } @@ -1454,20 +1460,16 @@ func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn( return privateMessages, nil } -func DBGetLimitedMessageForMessagingKeys(handle *badger.DB, messagingKeys []*MessagingGroupEntry, limit uint64) ( +func DBGetLimitedMessageForMessagingKeys(handle *badger.DB, messagingKeys []*MessagingGroupEntry, maxTimestampNanos uint64, limit uint64) ( _privateMessages []*MessageEntry, _err error) { - // Setting the prefix to a tstamp of zero should return all the messages - // for the public key in sorted order since 0 << the minimum timestamp in - // the db. - // Goes backwards to get messages in time sorted order. // Limit the number of keys to speed up load times. // Get all user messaging keys. err := handle.Update(func(txn *badger.Txn) error { var err error - _privateMessages, err = _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(txn, messagingKeys, limit) + _privateMessages, err = _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(txn, messagingKeys, maxTimestampNanos, limit) if err != nil { return errors.Wrapf(err, "DBGetLimitedMessageForMessagingKeys: problem getting user messages") }