Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions lib/block_view_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ func (bav *UtxoView) GetMessagingGroupEntriesForUser(ownerPublicKey []byte) (
func (bav *UtxoView) GetMessagesForUser(publicKey []byte) (
_messageEntries []*MessageEntry, _messagingKeyEntries []*MessagingGroupEntry, _err error) {

return bav.GetLimitedMessagesForUser(publicKey, math.MaxUint64)
return bav.GetLimitedMessagesForUser(publicKey, math.MaxUint64, math.MaxUint64)
}

// TODO: Update for Postgres
func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, limit uint64) (
func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, maxTimestampNanos uint64, limit uint64) (
_messageEntries []*MessageEntry, _messagingGroupEntries []*MessagingGroupEntry, _err error) {

// This function will fetch up to limit number of messages for a public key. To accomplish
Expand All @@ -257,30 +257,30 @@ func (bav *UtxoView) GetLimitedMessagesForUser(ownerPublicKey []byte, limit uint
// We define an auxiliary map to keep track of messages in UtxoView and DB.
messagesMap := make(map[MessageKey]*MessageEntry)

// First look for messages in the UtxoView. We don't skip deleted entries for now as we will do it later.
for messageKey, messageEntry := range bav.MessageKeyToMessageEntry {
for _, messagingKeyEntry := range messagingGroupEntries {
if reflect.DeepEqual(messageKey.PublicKey[:], messagingKeyEntry.MessagingPublicKey[:]) {
// We will add the messages with the sender messaging public key as the MessageKey
// so that we have no overlaps in the DB in some weird edge cases.
mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos)
messagesMap[mapKey] = messageEntry
break
}
}
}

// We fetched all UtxoView entries, so now look for messages in the DB.
dbMessageEntries, err := DBGetLimitedMessageForMessagingKeys(bav.Handle, messagingGroupEntries, limit)
// First look for messages in the DB. We don't skip deleted entries for now as we will do it later.
dbMessageEntries, err := DBGetLimitedMessageForMessagingKeys(bav.Handle, messagingGroupEntries, maxTimestampNanos, limit)
if err != nil {
return nil, nil, errors.Wrapf(err, "GetMessagesForUser: Problem fetching MessageEntries from db: ")
}
// Now iterate through all the db message entries and add them to our auxiliary map.
for _, messageEntry := range dbMessageEntries {
// Use the sender messaging public key for the MessageKey to make sure they match the UtxoView entries.
// We will add the messages with the sender messaging public key as the MessageKey
// so that we have no overlaps in the UtxoView in some weird edge cases.
mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos)
if _, exists := messagesMap[mapKey]; !exists {
messagesMap[mapKey] = messageEntry
messagesMap[mapKey] = messageEntry
}

// We fetched all DB entries, so now look for messages in the UtxoView to override DB entries.
for messageKey, messageEntry := range bav.MessageKeyToMessageEntry {
for _, messagingKeyEntry := range messagingGroupEntries {
if reflect.DeepEqual(messageKey.PublicKey[:], messagingKeyEntry.MessagingPublicKey[:]) {
// Use the sender messaging public key for the MessageKey to make sure they match the DB entries.
mapKey := MakeMessageKey(messageEntry.SenderMessagingPublicKey[:], messageEntry.TstampNanos)
if _, exists := messagesMap[mapKey]; exists {
messagesMap[mapKey] = messageEntry
}
break
}
}
}

Expand Down
20 changes: 11 additions & 9 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func DBGetMessageEntriesForPublicKey(handle *badger.DB, publicKey []byte) (

func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(
txn *badger.Txn, messagingGroupEntries []*MessagingGroupEntry,
limit uint64) (_privateMessages []*MessageEntry, _err error) {
maxTimestampNanos uint64, limit uint64) (_privateMessages []*MessageEntry, _err error) {

// Users can have many messaging keys. By default, a users has the base messaging key, which
// is just their main public key. Users can also register messaging keys, e.g. keys like the
Expand All @@ -1389,13 +1389,19 @@ func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(
//prefixes = append(prefixes, _dbSeekPrefixForMessagePartyPublicKey(keyEntry.MessagingPublicKey[:]))
}

// Initialize all iterators, add the 0xff byte to the seek prefix so that we can iterate backwards.
// Setting the prefix to a tstamp of MaxUint64 when unspecified should return all the messages
// for the public key in sorted order since MaxUint64 >> the maximum timestamp in the db.
if maxTimestampNanos == 0 {
maxTimestampNanos = math.MaxUint64
}

// Initialize all iterators, add timestampKeyset bytes to the seek prefix so that we can iterate backwards.
var messagingIterators []*badger.Iterator
for _, prefix := range prefixes {
opts := badger.DefaultIteratorOptions
opts.Reverse = true
iterator := txn.NewIterator(opts)
iterator.Seek(append(prefix, 0xff))
iterator.Seek(append(prefix, UintToBuf(maxTimestampNanos)...))
defer iterator.Close()
messagingIterators = append(messagingIterators, iterator)
}
Expand Down Expand Up @@ -1454,20 +1460,16 @@ func _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(
return privateMessages, nil
}

func DBGetLimitedMessageForMessagingKeys(handle *badger.DB, messagingKeys []*MessagingGroupEntry, limit uint64) (
func DBGetLimitedMessageForMessagingKeys(handle *badger.DB, messagingKeys []*MessagingGroupEntry, maxTimestampNanos uint64, limit uint64) (
_privateMessages []*MessageEntry, _err error) {

// Setting the prefix to a tstamp of zero should return all the messages
// for the public key in sorted order since 0 << the minimum timestamp in
// the db.

// Goes backwards to get messages in time sorted order.
// Limit the number of keys to speed up load times.
// Get all user messaging keys.

err := handle.Update(func(txn *badger.Txn) error {
var err error
_privateMessages, err = _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(txn, messagingKeys, limit)
_privateMessages, err = _enumerateLimitedMessagesForMessagingKeysReversedWithTxn(txn, messagingKeys, maxTimestampNanos, limit)
if err != nil {
return errors.Wrapf(err, "DBGetLimitedMessageForMessagingKeys: problem getting user messages")
}
Expand Down