-
Notifications
You must be signed in to change notification settings - Fork 20.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rpc, whisper, xeth: fix RPC message retrieval data race
- Loading branch information
Showing
6 changed files
with
119 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package whisper | ||
|
||
import ( | ||
"bytes" | ||
"testing" | ||
) | ||
|
||
func TestEnvelopeOpen(t *testing.T) { | ||
payload := []byte("hello world") | ||
message := NewMessage(payload) | ||
|
||
envelope, err := message.Wrap(DefaultPoW, Options{}) | ||
if err != nil { | ||
t.Fatalf("failed to wrap message: %v", err) | ||
} | ||
opened, err := envelope.Open(nil) | ||
if err != nil { | ||
t.Fatalf("failed to open envelope: %v.", err) | ||
} | ||
if opened.Flags != message.Flags { | ||
t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) | ||
} | ||
if bytes.Compare(opened.Signature, message.Signature) != 0 { | ||
t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) | ||
} | ||
if bytes.Compare(opened.Payload, message.Payload) != 0 { | ||
t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) | ||
} | ||
if opened.Sent != message.Sent { | ||
t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) | ||
} | ||
|
||
if opened.Hash != envelope.Hash() { | ||
t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,84 @@ | ||
// Contains the external API side message filter for watching, pooling and polling | ||
// matched whisper messages. | ||
// matched whisper messages, also serializing data access to avoid duplications. | ||
|
||
package xeth | ||
|
||
import "time" | ||
import ( | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
) | ||
|
||
// whisperFilter is the message cache matching a specific filter, accumulating | ||
// inbound messages until the are requested by the client. | ||
type whisperFilter struct { | ||
id int // Filter identifier | ||
cache []WhisperMessage // Cache of messages not yet polled | ||
timeout time.Time // Time when the last message batch was queries | ||
id int // Filter identifier for old message retrieval | ||
ref *Whisper // Whisper reference for old message retrieval | ||
|
||
cache []WhisperMessage // Cache of messages not yet polled | ||
skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication | ||
update time.Time // Time of the last message query | ||
|
||
lock sync.RWMutex // Lock protecting the filter internals | ||
} | ||
|
||
// newWhisperFilter creates a new serialized, poll based whisper topic filter. | ||
func newWhisperFilter(id int, ref *Whisper) *whisperFilter { | ||
return &whisperFilter{ | ||
id: id, | ||
ref: ref, | ||
|
||
update: time.Now(), | ||
skip: make(map[common.Hash]struct{}), | ||
} | ||
} | ||
|
||
// messages retrieves all the cached messages from the entire pool matching the | ||
// filter, resetting the filter's change buffer. | ||
func (w *whisperFilter) messages() []WhisperMessage { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
w.cache = nil | ||
w.update = time.Now() | ||
|
||
w.skip = make(map[common.Hash]struct{}) | ||
messages := w.ref.Messages(w.id) | ||
for _, message := range messages { | ||
w.skip[message.ref.Hash] = struct{}{} | ||
} | ||
return messages | ||
} | ||
|
||
// insert injects a new batch of messages into the filter cache. | ||
func (w *whisperFilter) insert(msgs ...WhisperMessage) { | ||
w.cache = append(w.cache, msgs...) | ||
func (w *whisperFilter) insert(messages ...WhisperMessage) { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
for _, message := range messages { | ||
if _, ok := w.skip[message.ref.Hash]; !ok { | ||
w.cache = append(w.cache, messages...) | ||
} | ||
} | ||
} | ||
|
||
// retrieve fetches all the cached messages from the filter. | ||
func (w *whisperFilter) retrieve() (messages []WhisperMessage) { | ||
w.lock.Lock() | ||
defer w.lock.Unlock() | ||
|
||
messages, w.cache = w.cache, nil | ||
w.timeout = time.Now() | ||
w.update = time.Now() | ||
|
||
return | ||
} | ||
|
||
// activity returns the last time instance when client requests were executed on | ||
// the filter. | ||
func (w *whisperFilter) activity() time.Time { | ||
w.lock.RLock() | ||
defer w.lock.RUnlock() | ||
|
||
return w.update | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters