-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DecodeMessage: skip bytes Buffer, reducing alloc overhead #175
Conversation
see nsqio#174 (comment) for more discussion and before and after
aceb6dd
to
34d3c4d
Compare
restarted that one test failure, just looks like a flakey test |
_, err := io.ReadFull(buf, msg.ID[:]) | ||
if err != nil { | ||
return nil, err | ||
if len(b) < 11+MsgIDLength { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we define a constant? I had to sit and think about why you chose 11 for too long 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would say no. because the lines below make it clear. this function has hardcoded slice ranges from 0-7, 8 to 9, etc. pulling out the number 11 (or 11+MsgIDLength) and defining it elsewhere feels weird because than you have a bunch of magic numbers except for one that is defined elsewhere. if we go this route, than we should also constant-ize all the other magic numbers IMHO. (which i wouldn't do, seems overkill. what we could do, is document the message format somewhere - if not already exists - and link to it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only confusing being 11 because nsqd rejects length 0 bodies. I'd expect in the code for this to be 10, the len(int64)
and len(uint16)
.
FWIW, NewMessage accepts length 0 bodies; it'd be more consistent for this to accept length 0 bodies. Also might make sense for nsqd to do the same. IIRC there was an issue/discussion about that somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're already doing something similar on the nsqd
side so let's keep it consistent (which, btw, I'm realizing is also eligible for this same optimization).
Feel free to also copy the message format into the function comment:
http://nsq.io/clients/tcp_protocol_spec.html#data-format
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
| (int64) || || (hex string encoded in ASCII) || (binary)
| 8-byte || || 16-byte || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body
(uint16)
2-byte
attempts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you're referring to https://github.com/nsqio/nsq/blob/fa56e2fda80c08f1428b9139f516b9aadac117c3/nsqd/message.go#L69-L92
-
i guess we could argue for days about this, but i'm personally not a big fan of having part of the numbers hardcoded in the function and part defined elsewhere. if
minValidMsgLength
ever changes then i can guarantee you this function will change as well. so in my mind it makes more sense to keep all these related numbers together in one place. -
that function allows empty bodies. whether nsqd allows empty bodies or not, i actually don't care much (though we should obviously follow the same rules everywhere). i did this check cause i thought
b[10+16:]
would have an index out of bounds if len was 26, but turns out i'm wrong about that so i'm ok with changing it
green light, thanks @Dieterbe If you don't mind addressing that one comment and squashing in, this LGTM! |
package nsq
import (
"bytes"
"math/rand"
"reflect"
"sync"
"testing"
"testing/quick"
"time"
)
var quickID = func() func() MessageID {
t := reflect.TypeOf(MessageID{})
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var m sync.Mutex
return func() MessageID {
m.Lock()
v, _ := quick.Value(t, r)
m.Unlock()
return v.Interface().(MessageID)
}
}()
func TestDecodeMessage(t *testing.T) {
// Create our message: nsqd rejects length 0 messages.
message := NewMessage(quickID(), make([]byte, 1))
// Encode the message.
buf := new(bytes.Buffer)
message.WriteTo(buf)
encoded := buf.Bytes()
// We expect a valid, min length message to decode idempotently.
decodeRight, err := DecodeMessage(encoded)
if err != nil {
t.Errorf("expected idempotent encode/decode, got err: %v", err)
}
if !reflect.DeepEqual(message, decodeRight) {
t.Error("expected idempotent encode/decode, not")
}
// We expect a message with no body to not decode.
encoded = encoded[:len(encoded)-1] // trim the 1 byte body
_, err = DecodeMessage(encoded)
if err == nil {
t.Error("expected invalid encode to not decode")
}
} TIL that nsqd rejects length 0 messages. |
as discussed in nsqio#175 and on IRC
this is the nsqd equivalent of a very similar change in go-nsq: nsqio/go-nsq#175
// DecodeMessage deserializes data (as []byte) and creates a new Message | ||
// message format: | ||
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... | ||
// | (int64) || || (hex string encoded in ASCII) || (binary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring nitpick: message ID
is binary w/r/t the protocol. Practically we chose to use ascii printable characters for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(if we want to reword that, that's in the official docs)
this is the nsqd equivalent of a very similar change in go-nsq: nsqio/go-nsq#175
* nsqadmin: fix counter removal issues * nsqadmin: fix memory depth is always 0 * nsqadmin: send version with UserAgent * nsqadmin: eslint fixes * internal/quantile: fix JSON decoding * nsqadmin: round times to 2 decimal points * nsqadmin: fix column count on /topic * nsqadmin: handle/display AJAX errors * nsqadmin: bin dater * docs: update ChangeLog * internal/clusterinfo: fix dedupe regression in GetLookupdTopicProducers * build: bump go-nsq dependency to v1.0.5 stable * nsqadmin: fix deferred count * nsqadmin: compile static assets * nsqadmin: fix nsqd backwards compatibility when querying nsqd directly * nsqadmin: expand notify context * gofmt * nsqadmin: update header * Fix broken merge from nsqio#654 * nsqadmin: bump gulp/gulp-sass deps * nsqadmin: parse ?t=Xh timeframes * nsqadmin: bin dater * nsqadmin: Update statsd counters prefix from 'stats_counts' to 'stats.counters' * internal/clusterinfo: Expose detailed connection errors * nsqadmin: fix backwards compatible /info parsing * docs: add Code of Conduct * bump v0.3.6 stable * Makefile: BINARIES and EXAMPLES are gone, install depends on APPS * Makefile: no need to write apps/ dir everywhere also, we can line up variables nicely * nsqd: optimize NewGUID() division for "milliseconds" 64-bit division operations appear to be slow on some ARM systems, so replace division by 1000000 with a bitshift equivalent to division by 1048576. adapt guid twepoch comment for revised guid timestamp scheme * dist.sh: cleanup temp install dir * dist.sh: little cleanups * nsqd: bound reads from nsqlookupd peers to prevent OOM * nsqd: flock --data-path for unix-like platforms * dist: build freebsd binaries * nsqadmin: added conf flags for formatting statsd counters and gauges * docs: README tweak (wsup travis) * nsqd topic/channel: reset health on successful backend write - channel and topic put reset ctx.nsqd.SetHealth - change nsqd SetHealth/GetError to use atomic.Value; skip allocation in SetHealth if attempting to set an already healthy queue to healthy - nsqd_test.go: change `exp` to `nexp` in `nequal` output - relates to nsqio#594 * nsqd: simplify health / loading checks * nsqd: add msg header size to diskqueue MaxMsgSize - purpose: diskqueue was enforcing a maxmsgsize of the whole message including header, while protocol enforces maxmsgsize on the body. this could lead to successful writes to the memory queue but failed writes to the diskqueue, intermittently if the body size was within the 26 bytes range. - channel / topic tests assert configuration is passed correctly to diskqueue; it doesn't test PutMessage directly because enforcing msg size is the responsibility of protocol. - add maxMsgSize to diskqueue writeOne error message. * nsqd / nsqlookupd protocol IOLoop updates - nsqd: update IOLoop so FatalClientErr's are returned. current implementation shadows err, returned err was result of client.Reader.ReadSlice. - nsqlookupd: update IOLoop err shadowing in similar manner, rename SendResponse err to sendErr to be closer to nsqd implementation. in this case FatalClientErr's were effectively ignored since they would fail the type assertion. - add nequal to nsqlookupd_test - http.go / nsqd.go: resolve some err shadowing which had no impact. * docs: update README * add common test funcs to internal/test * nsqd: moved runlock in GetStats * nsqd: don't need to RLock channel on every requeue doRequeue is always called inside exitLock now * nsqd: use queue-specific lock for inflight/deferred * nsqd: tighten up stats related locking even more just as we don't need to lock the whole instance for the lifetime of the stats gathering, we don't need to lock the topic for the lifetime of the channel stats gathering * nsqd: separate client locks into write and meta locks * bench: add requirements.txt; update bootstrap/options * nsqd: fix stats data race * bump 0.3.7-alpha * should allow POST method, since the newer go tool pprof need it. * 1.should use rlock while aggregate 2.try rlock first to reduce write lock * nsq_to_*: update hasArg function to use flag.Visit * enable block profile and change block rate from http endpoint * nsqd/nsqlookupd: support running as windows service * dist: add windows * nsqd: fix flakey TestHTTPEmptyChannelPOST * bump v0.3.7 stable * docs: update changelog * nsqadmin: fix connected durations >1h * update npm dependencies & go-bindata * Update README URLs based on HTTP redirects * Makefile: consolidate apps sources deps * Makefile: install: add .exe if building for windows, consolidate recipe * dist.sh: build and install in one make command so that install can see what GOOS was used * nsqadmin: add dependency on gulp-task-listing seems to need it to gulp build * nsqadmin: make "rate" column work without --proxy-graphite Requests to graphite /render endpoint for "rate" column would always be proxied. Further, when not proxied, they need to use "jsonp" to avoid CORS problems. * nsqd: diskqueue syncs when only reads have occurred * godeps: bump go-options to latest * nsqd: use defaults from Options struct and go-options fixes * nsqd: update test certs * DecodeMessage: skip bytes Buffer, reducing alloc overhead this is the nsqd equivalent of a very similar change in go-nsq: nsqio/go-nsq#175 * nsqd: remove channel messagePump performance improvement: SUB: 2014/08/17 10:28:16 duration: 3.875343474s - 49.218mb/s - 258041.644ops/s - 3.875us/op * dist.sh: strip release builds of debug info results in 30% smaller binaries * (re)-handle SIGTERM * nsqd/nsqlookupd: explicitly set exit signals * bump v0.3.8-rc1 * godeps: bump go-nsq to latest * bump v0.3.8-rc2 * docs: update changelog * bump v0.3.8 stable * docker: Adds support for alpine as base image Alpine is equally as small as busybox (~5 MB) but has more features including the ability to install other packages. * docs: add wiredcraft and deis to README * travis: bump golang versions/gpm * nsqadmin: add /config API Provides the ability to modify the configuration nsqadmin for setting `nsqlookupd_http_addresses` to support dynamic configuration. Closes nsqio#769 * nsqd: deflake TestDiskQueueSyncAfterRead * nsqd: configurable HTTP client timeouts Adds configuration options HTTPClientConnectTimeout and HTTPClientRequestTimeout to control the connection and request timeout repectively of the HTTP client. Also added to the following app binaries: - nsqadmin - nsq_stat - nsq_to_file - nsq_to_http Closes nsqio#715 Closes nsqio#680 * coverage.sh: adds Code Coverage generation Adds script for generating code coverage statistics for Go packages. Borrowed from https://github.com/hashicorp/vault * docs: fix coveralls badge on README * nsqlookupd: Adds test cases Adds test cases to improve code coverage to protect against potential compatibility conflicts moving forward. Existing test cases refactored to leverage common test helpers within the internal/test package to avoid code duplication and simplify the testing effort. * nsqd: adds delayed retry to TestDiskQueueSyncAfterRead Adds a delay retry when attempting to opening metadata file as part of TestDiskQueueSyncAfterRead to handle timing issue between the ioloop and possible disk IO delays. * nsqadmin: improve test coverage Adds test cases to improve code coverage to protect against potential compatibility conflicts moving forward. Existing test cases refactored to leverage common test helpers within the internal/test package to avoid code duplication and simplify the testing effort. * nsqd: improve test coverage Adds test cases to improve code coverage to protect against potential compatibility conflicts moving forward. Existing test cases refactored to leverage common test helpers within the internal/test package to avoid code duplication and simplify the testing effort. * nsqlookupd: Add conn.Close() in IOLoop * docs: add sproutsocial to README * apps/to_nsq: add --rate option for throttling message rate * fix nsqadmin root ca verification * add test to ensure nsqadmin connects to nsqd with verified tls * nsq_to_file: require --topic or --topic-pattern fixes nsqio#789 * travis: update build matrix * nsqd: fix diskqueue benchmark test, maxMsgSize too small * nsqadmin: Fix handling of IPv6 broadcast addresses Fixes nsqio#815 * nsqd: deflake TestReconfigure * nsqd: switch to golang/snappy * nsqd: more intuitive channel message accounting * nsqd: deflake TestDiskQueueSyncAfterRead * nsqd: log when we can't get channels for pre-creation * deps: bump go-options * dist: scrub file ownership for tarball * nsqd: bound MPUB messages to derived limit * *: remove deprecated features * *: remove go-simplejson * internal/stringy: remove unused functions * nsqadmin: remove --use-statsd-prefixes * nsqadmin: go bin dater * nsq_to_*: go fmt * nsqlookupd: pass defaults from opts into flags * release: bump verison 1.0.0-alpha * nsqd: log errors if no nsqlookupd broadcast address * Godeps: update rev of go-options (remove extra []float64 log) * Godeps: update rev of go-nsq (switch to golang/snappy) * nsqadmin: allow ctr/meta+click to open a new tab * nsqadmin: generate bindata.go * nsqd: replace missing requeue exit check; cleanup doRequeue * nsqd: add benchmark for multiple topics * nsqd: per-topic message IDs * nsqd: read/write concurrently in MPUB benchmarks * new --log-prefix flag / option for nsqd, nsqlookupd, nsqadmin instantiate Logger in $APP.New() instead of $APP.NewOptions(), only if not already present in opts Now, setting Logger to nil in Options is not sufficient to disable logging. A no-op logger must be created and assigned. * nsqd: no longer need atomicRename() for windows os.Rename() now does the same thing on Windows that atomicRename() did (since go 1.5) * nsqd: rename option --worker-id to --node-id * nsqd: new metadata filename without ID symlink old metadata filename to new when loading, if both exist, ensure they match this makes rollback possible without losing messages (when rolling back forward, some manual intervention is required) on windows, Symlink() needs Administrator privs, so just write plain old metadata file includes tests * tests: ioutil.TempDir() adds a pseudo-random suffix no need to add our own pseudo-random "unique" suffix * nsqd: move diskqueue into nsqio/go-diskqueue * nsqd: GUID benchmark update * nsqadmin: add --allow-config-from-cidr flag/option * bump v1.0.0-compat stable * test: update travis build matrix * apps/to_nsq: fix --rate divide by zero bug * nsqd: clamp requeue timeout to range instead of dropping connection * nsqd: remove back-compat for flag duration ints --statsd-interval and --msg-timeout are the only two duration flags that have special functionality to be backwards-compatible with plain int arguments (meaning seconds or milliseconds). We can clean this up before nsq-1.0 * nsqd: expose memory stats under /stats * nsqd: refresh test TLS certs * test more go versions on Travis CI * test.sh: avoid redundant build of nsqadmin, use bash dir glob bash glob trick to match only folders nsqadmin moved to apps/ and is thus included in the glob already * nsqd: retry sub to ephemeral topic/channel which is Exiting quick hacky fix, instead of "proper" locking * test: ignore failure to push to coveralls avoid coveralls service failure making travis tests fail * replace verbose with log-level to reduce log sizes * nsqd: a few misc logging line cleanups * nsq*: clean up, regularize logging Options minor fix for --log-level flag help text must set opts before logging errors in New() re-order log stuff in Options structs treat logLevel the same in nsqd, nsqlookupd, nsqadmin Fix tests: multiple nsqlookupds need their own Options structs, now that nsqlookupd has logLevel in its Options. Otherwise the race detector complains when one of the nsqlookupds writes the derived int logLevel while another reads it for logging. * new internal/lg/ logging package for nsqd nsqlookupd nsqadmin introduce LogLevel type NilLogger for disabling logging consolidate LogLevel tests into internal/lg/ consolidate TestNoLogger * tests: more delay in TestClientMsgTimeout * use app.logf() for internal packages for nsqd nsqlookupd nsqadmin Instead of setting a Logger for github.com/nsqio/nsq/internal packages, pass a logf() function, so it is called with and honors a LogLevel. * internal/clusterinfo/ * internal/http_api/ * internal/protocol/ nsqd lookupPeer also needed to be converted Get rid of interal.app.Logger type, but internal/test/ needs its own Logger definition to avoid circular import with internal/lg/ tests. * Fix segment.com logo URL in README.md Instead segmentio_logo.png it should be segment_logo.png See: https://github.com/nsqio/nsqio.github.io/blob/master/static/img/segment_logo.png * fix vet error in clusterinfo/data.go * nsqd: fix nil pointer when memstats enabled * nsqadmin: X-Forwarded-User based ACL * nsqadmin: update bindata * nsqadmin: Update bindata for admin-user ACL changes
see #174 (comment)
for more discussion and before and after