From 5dfdf8ba51e29af26b5e4c1864d739aaf797541b Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 25 Nov 2015 20:28:42 +0100 Subject: [PATCH 1/8] Fix redis protocol plugin nil pointer Do not set message.Bulks to nil after setting message to nil --- protos/redis/redis.go | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/protos/redis/redis.go b/protos/redis/redis.go index edbbeb386d2a..8edae21b66cd 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -266,7 +266,6 @@ func (redis *Redis) InitDefaults() { } func (redis *Redis) setFromConfig(config config.Redis) error { - redis.Ports = config.Ports if config.SendRequest != nil { @@ -304,7 +303,6 @@ func (stream *RedisStream) PrepareForNewMessage() { stream.data = stream.data[stream.parseOffset:] stream.parseOffset = 0 stream.message = nil - stream.message.Bulks = []string{} } func redisMessageParser(s *RedisStream) (bool, bool) { @@ -342,14 +340,13 @@ func redisMessageParser(s *RedisStream) (bool, bool) { } else { m.NumberOfBulks, err = strconv.ParseInt(line[1:], 10, 64) - if err != nil { logp.Err("Failed to read number of bulk messages: %s", err) return false, false } - s.parseOffset = off - m.Bulks = []string{} + s.parseOffset = off + m.Bulks = make([]string, 0, m.NumberOfBulks) continue } @@ -536,7 +533,7 @@ func (redis *Redis) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uin priv.Data[dir] = &RedisStream{ tcptuple: tcptuple, data: pkt.Payload, - message: &RedisMessage{Ts: pkt.Ts}, + message: &RedisMessage{Ts: pkt.Ts, Bulks: []string{}}, } } else { // concatenate bytes @@ -564,23 +561,19 @@ func (redis *Redis) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uin return priv } - if complete { - - if stream.message.IsRequest { - logp.Debug("redis", "REDIS request message: %s", stream.message.Message) - } else { - logp.Debug("redis", "REDIS response message: %s", stream.message.Message) - } - - // all ok, go to next level - redis.handleRedis(stream.message, tcptuple, dir) - - // and reset message - stream.PrepareForNewMessage() - } else { + if !complete { // wait for more data break } + + if stream.message.IsRequest { + logp.Debug("redis", "REDIS request message: %s", stream.message.Message) + } else { + logp.Debug("redis", "REDIS response message: %s", stream.message.Message) + } + // all ok, go to next level and reset stream for new message + redis.handleRedis(stream.message, tcptuple, dir) + stream.PrepareForNewMessage() } return priv From c400b0dfcbf3387fa90bc2317bbe4f36bc2fd18e Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 25 Nov 2015 20:30:37 +0100 Subject: [PATCH 2/8] Fix redis string parsing Parse strings not until CRLF, but use message length field to get full message. Fixes poblems with messages bodies containing CRLF. --- protos/redis/redis.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/protos/redis/redis.go b/protos/redis/redis.go index 8edae21b66cd..665d3f315a96 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -379,18 +379,30 @@ func redisMessageParser(s *RedisStream) (bool, bool) { s.parseOffset = off - found, line, off = readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") + // check all content in buffer (length + CRLF) + if int64(len(s.data[s.parseOffset:])) < length+2 { + logp.Debug("redis", "Message incomplete, waiting for more data") s.parseOffset = starting_offset return true, false } - logp.Debug("redis", "line %s: %d", line, off) + // check content ends with CRLF + off = s.parseOffset + int(length) + if s.data[off] != '\r' || s.data[off+1] != '\n' { + logp.Err("Expected end of line not found") + return false, false + } + + // extract line + line = string(s.data[s.parseOffset:off]) + off += 2 + + logp.Debug("redis", "line %s: %d", line, s.parseOffset) if int64(len(line)) != length { logp.Err("Wrong length of data: %d instead of %d", len(line), length) return false, false } + value = line s.parseOffset = off } From 2f4706b411a2420b3413f8743a8fafda4d6a3f40 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 25 Nov 2015 21:19:36 +0100 Subject: [PATCH 3/8] Redis refactoring split parser and unexport/rename some types --- protos/redis/redis.go | 567 ++++++------------------------------ protos/redis/redis_parse.go | 418 ++++++++++++++++++++++++++ protos/redis/redis_test.go | 84 +++--- 3 files changed, 556 insertions(+), 513 deletions(-) create mode 100644 protos/redis/redis_parse.go diff --git a/protos/redis/redis.go b/protos/redis/redis.go index 665d3f315a96..ebe92f29cec3 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -1,8 +1,6 @@ package redis import ( - "bytes" - "strconv" "strings" "time" @@ -16,34 +14,7 @@ import ( "github.com/elastic/packetbeat/protos/tcp" ) -const ( - START = iota - BULK_ARRAY - SIMPLE_MESSAGE -) - -type RedisMessage struct { - Ts time.Time - NumberOfBulks int64 - Bulks []string - - TcpTuple common.TcpTuple - CmdlineTuple *common.CmdlineTuple - Direction uint8 - - IsRequest bool - IsError bool - Message string - Method string - Path string - Size int - - parseState int - start int - end int -} - -type RedisStream struct { +type stream struct { tcptuple *common.TcpTuple data []byte @@ -51,10 +22,10 @@ type RedisStream struct { parseOffset int bytesReceived int - message *RedisMessage + message *redisMessage } -type RedisTransaction struct { +type transaction struct { Type string tuple common.TcpTuple Src common.Endpoint @@ -73,177 +44,20 @@ type RedisTransaction struct { Redis common.MapStr - Request_raw string - Response_raw string + RequestRaw string + ResponseRaw string } -// Keep sorted for future command addition -var RedisCommands = map[string]struct{}{ - "APPEND": struct{}{}, - "AUTH": struct{}{}, - "BGREWRITEAOF": struct{}{}, - "BGSAVE": struct{}{}, - "BITCOUNT": struct{}{}, - "BITOP": struct{}{}, - "BITPOS": struct{}{}, - "BLPOP": struct{}{}, - "BRPOP": struct{}{}, - "BRPOPLPUSH": struct{}{}, - "CLIENT GETNAME": struct{}{}, - "CLIENT KILL": struct{}{}, - "CLIENT LIST": struct{}{}, - "CLIENT PAUSE": struct{}{}, - "CLIENT SETNAME": struct{}{}, - "CONFIG GET": struct{}{}, - "CONFIG RESETSTAT": struct{}{}, - "CONFIG REWRITE": struct{}{}, - "CONFIG SET": struct{}{}, - "DBSIZE": struct{}{}, - "DEBUG OBJECT": struct{}{}, - "DEBUG SEGFAULT": struct{}{}, - "DECR": struct{}{}, - "DECRBY": struct{}{}, - "DEL": struct{}{}, - "DISCARD": struct{}{}, - "DUMP": struct{}{}, - "ECHO": struct{}{}, - "EVAL": struct{}{}, - "EVALSHA": struct{}{}, - "EXEC": struct{}{}, - "EXISTS": struct{}{}, - "EXPIRE": struct{}{}, - "EXPIREAT": struct{}{}, - "FLUSHALL": struct{}{}, - "FLUSHDB": struct{}{}, - "GET": struct{}{}, - "GETBIT": struct{}{}, - "GETRANGE": struct{}{}, - "GETSET": struct{}{}, - "HDEL": struct{}{}, - "HEXISTS": struct{}{}, - "HGET": struct{}{}, - "HGETALL": struct{}{}, - "HINCRBY": struct{}{}, - "HINCRBYFLOAT": struct{}{}, - "HKEYS": struct{}{}, - "HLEN": struct{}{}, - "HMGET": struct{}{}, - "HMSET": struct{}{}, - "HSCAN": struct{}{}, - "HSET": struct{}{}, - "HSETINX": struct{}{}, - "HVALS": struct{}{}, - "INCR": struct{}{}, - "INCRBY": struct{}{}, - "INCRBYFLOAT": struct{}{}, - "INFO": struct{}{}, - "KEYS": struct{}{}, - "LASTSAVE": struct{}{}, - "LINDEX": struct{}{}, - "LINSERT": struct{}{}, - "LLEN": struct{}{}, - "LPOP": struct{}{}, - "LPUSH": struct{}{}, - "LPUSHX": struct{}{}, - "LRANGE": struct{}{}, - "LREM": struct{}{}, - "LSET": struct{}{}, - "LTRIM": struct{}{}, - "MGET": struct{}{}, - "MIGRATE": struct{}{}, - "MONITOR": struct{}{}, - "MOVE": struct{}{}, - "MSET": struct{}{}, - "MSETNX": struct{}{}, - "MULTI": struct{}{}, - "OBJECT": struct{}{}, - "PERSIST": struct{}{}, - "PEXPIRE": struct{}{}, - "PEXPIREAT": struct{}{}, - "PFADD": struct{}{}, - "PFCOUNT": struct{}{}, - "PFMERGE": struct{}{}, - "PING": struct{}{}, - "PSETEX": struct{}{}, - "PSUBSCRIBE": struct{}{}, - "PTTL": struct{}{}, - "PUBLISH": struct{}{}, - "PUBSUB": struct{}{}, - "PUNSUBSCRIBE": struct{}{}, - "QUIT": struct{}{}, - "RANDOMKEY": struct{}{}, - "RENAME": struct{}{}, - "RENAMENX": struct{}{}, - "RESTORE": struct{}{}, - "RPOP": struct{}{}, - "RPOPLPUSH": struct{}{}, - "RPUSH": struct{}{}, - "RPUSHX": struct{}{}, - "SADD": struct{}{}, - "SAVE": struct{}{}, - "SCAN": struct{}{}, - "SCARD": struct{}{}, - "SCRIPT EXISTS": struct{}{}, - "SCRIPT FLUSH": struct{}{}, - "SCRIPT KILL": struct{}{}, - "SCRIPT LOAD": struct{}{}, - "SDIFF": struct{}{}, - "SDIFFSTORE": struct{}{}, - "SELECT": struct{}{}, - "SET": struct{}{}, - "SETBIT": struct{}{}, - "SETEX": struct{}{}, - "SETNX": struct{}{}, - "SETRANGE": struct{}{}, - "SHUTDOWN": struct{}{}, - "SINTER": struct{}{}, - "SINTERSTORE": struct{}{}, - "SISMEMBER": struct{}{}, - "SLAVEOF": struct{}{}, - "SLOWLOG": struct{}{}, - "SMEMBERS": struct{}{}, - "SMOVE": struct{}{}, - "SORT": struct{}{}, - "SPOP": struct{}{}, - "SRANDMEMBER": struct{}{}, - "SREM": struct{}{}, - "SSCAN": struct{}{}, - "STRLEN": struct{}{}, - "SUBSCRIBE": struct{}{}, - "SUNION": struct{}{}, - "SUNIONSTORE": struct{}{}, - "SYNC": struct{}{}, - "TIME": struct{}{}, - "TTL": struct{}{}, - "TYPE": struct{}{}, - "UNSUBSCRIBE": struct{}{}, - "UNWATCH": struct{}{}, - "WATCH": struct{}{}, - "ZADD": struct{}{}, - "ZCARD": struct{}{}, - "ZCOUNT": struct{}{}, - "ZINCRBY": struct{}{}, - "ZINTERSTORE": struct{}{}, - "ZRANGE": struct{}{}, - "ZRANGEBYSCORE": struct{}{}, - "ZRANK": struct{}{}, - "ZREM": struct{}{}, - "ZREMRANGEBYLEX": struct{}{}, - "ZREMRANGEBYRANK": struct{}{}, - "ZREMRANGEBYSCORE": struct{}{}, - "ZREVRANGE": struct{}{}, - "ZREVRANGEBYSCORE": struct{}{}, - "ZREVRANK": struct{}{}, - "ZSCAN": struct{}{}, - "ZSCORE": struct{}{}, - "ZUNIONSTORE": struct{}{}, +type redisConnectionData struct { + Streams [2]*stream } +// Redis protocol plugin type Redis struct { // config - Ports []int - Send_request bool - Send_response bool + Ports []int + SendRequest bool + SendResponse bool transactions *common.Cache transactionTimeout time.Duration @@ -251,17 +65,19 @@ type Redis struct { results publisher.Client } -func (redis *Redis) getTransaction(k common.HashableTcpTuple) *RedisTransaction { +var debug = logp.MakeDebug("redis") + +func (redis *Redis) getTransaction(k common.HashableTcpTuple) *transaction { v := redis.transactions.Get(k) if v != nil { - return v.(*RedisTransaction) + return v.(*transaction) } return nil } func (redis *Redis) InitDefaults() { - redis.Send_request = false - redis.Send_response = false + redis.SendRequest = false + redis.SendResponse = false redis.transactionTimeout = protos.DefaultTransactionExpiration } @@ -269,10 +85,10 @@ func (redis *Redis) setFromConfig(config config.Redis) error { redis.Ports = config.Ports if config.SendRequest != nil { - redis.Send_request = *config.SendRequest + redis.SendRequest = *config.SendRequest } if config.SendResponse != nil { - redis.Send_response = *config.SendResponse + redis.SendResponse = *config.SendResponse } if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 { redis.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second @@ -299,278 +115,87 @@ func (redis *Redis) Init(test_mode bool, results publisher.Client) error { return nil } -func (stream *RedisStream) PrepareForNewMessage() { - stream.data = stream.data[stream.parseOffset:] - stream.parseOffset = 0 - stream.message = nil +func (s *stream) PrepareForNewMessage() { + s.data = s.data[s.parseOffset:] + s.parseOffset = 0 + s.message = nil } -func redisMessageParser(s *RedisStream) (bool, bool) { - - var err error - var value string - m := s.message - - iserror := false - - for s.parseOffset < len(s.data) { - - if s.data[s.parseOffset] == '*' { - //Arrays - - m.parseState = BULK_ARRAY - m.start = s.parseOffset - logp.Debug("redis", "start %d", m.start) - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") - return true, false - } - logp.Debug("redis", "line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - //Null array - s.parseOffset = off - value = "nil" - } else if len(line) == 2 && line[1] == '0' { - // Empty array - s.parseOffset = off - value = "[]" - } else { - - m.NumberOfBulks, err = strconv.ParseInt(line[1:], 10, 64) - if err != nil { - logp.Err("Failed to read number of bulk messages: %s", err) - return false, false - } - - s.parseOffset = off - m.Bulks = make([]string, 0, m.NumberOfBulks) - continue - } - - } else if s.data[s.parseOffset] == '$' { - // Bulk Strings - if m.parseState == START { - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - starting_offset := s.parseOffset - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - logp.Debug("redis", "line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - // NULL Bulk Reply - value = "nil" - s.parseOffset = off - } else { - length, err := strconv.ParseInt(line[1:], 10, 64) - if err != nil { - logp.Err("Failed to read bulk message: %s", err) - return false, false - } - - s.parseOffset = off - - // check all content in buffer (length + CRLF) - if int64(len(s.data[s.parseOffset:])) < length+2 { - logp.Debug("redis", "Message incomplete, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - - // check content ends with CRLF - off = s.parseOffset + int(length) - if s.data[off] != '\r' || s.data[off+1] != '\n' { - logp.Err("Expected end of line not found") - return false, false - } - - // extract line - line = string(s.data[s.parseOffset:off]) - off += 2 - - logp.Debug("redis", "line %s: %d", line, s.parseOffset) - if int64(len(line)) != length { - logp.Err("Wrong length of data: %d instead of %d", len(line), length) - return false, false - } - - value = line - s.parseOffset = off - } - - } else if s.data[s.parseOffset] == ':' { - // Integers - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - n, err := strconv.ParseInt(line[1:], 10, 64) - - if err != nil { - logp.Err("Failed to read integer reply: %s", err) - return false, false - } - value = strconv.Itoa(int(n)) - s.parseOffset = off - - } else if s.data[s.parseOffset] == '+' { - // Simple Strings - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - - value = line[1:] - s.parseOffset = off - } else if s.data[s.parseOffset] == '-' { - // Errors - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - iserror = true - - value = line[1:] - s.parseOffset = off - } else { - logp.Debug("redis", "Unexpected message starting with %s", s.data[s.parseOffset:]) - return false, false - } - - // add value - if m.NumberOfBulks > 0 { - m.NumberOfBulks = m.NumberOfBulks - 1 - m.Bulks = append(m.Bulks, value) - - if len(m.Bulks) == 1 { - logp.Debug("redis", "Value: %s", value) - // first word. - // check if it's a command - if isRedisCommand(value) { - logp.Debug("redis", "is request") - m.IsRequest = true - m.Method = value - } - } - - if len(m.Bulks) == 2 { - // second word. This is usually the path - if m.IsRequest { - m.Path = value - } - } - - if m.NumberOfBulks == 0 { - // the last bulk received - if m.IsRequest { - m.Message = strings.Join(m.Bulks, " ") - } else { - m.Message = "[" + strings.Join(m.Bulks, ", ") + "]" - } - m.end = s.parseOffset - m.Size = m.end - m.start - return true, true - } - } else { - m.Message = value - m.end = s.parseOffset - m.Size = m.end - m.start - if iserror { - m.IsError = true - } - return true, true - } +func (redis *Redis) ConnectionTimeout() time.Duration { + return redis.transactionTimeout +} - } //end for +func (redis *Redis) Parse( + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, + private protos.ProtocolData, +) protos.ProtocolData { + defer logp.Recover("ParseRedis exception") - return true, false + conn := ensureRedisConnection(private) + debug("redis connection: %p", conn) + conn = redis.doParse(conn, pkt, tcptuple, dir) + if conn == nil { + return nil + } + return conn } -func readLine(data []byte, offset int) (bool, string, int) { - q := bytes.Index(data[offset:], []byte("\r\n")) - if q == -1 { - return false, "", 0 +func ensureRedisConnection(private protos.ProtocolData) *redisConnectionData { + if private == nil { + return &redisConnectionData{} } - return true, string(data[offset : offset+q]), offset + q + 2 -} -type redisPrivateData struct { - Data [2]*RedisStream -} + priv, ok := private.(*redisConnectionData) + if !ok { + logp.Warn("redis connection data type error, create new one") + return &redisConnectionData{} + } + if priv == nil { + logp.Warn("Unexpected: redis connection data not set, create new one") + return &redisConnectionData{} + } -func (redis *Redis) ConnectionTimeout() time.Duration { - return redis.transactionTimeout + return priv } -func (redis *Redis) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - - defer logp.Recover("ParseRedis exception") - - priv := redisPrivateData{} - if private != nil { - var ok bool - priv, ok = private.(redisPrivateData) - if !ok { - priv = redisPrivateData{} - } - } +func (redis *Redis) doParse( + conn *redisConnectionData, + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, +) *redisConnectionData { - if priv.Data[dir] == nil { - priv.Data[dir] = &RedisStream{ + st := conn.Streams[dir] + if st == nil { + st = &stream{ tcptuple: tcptuple, data: pkt.Payload, - message: &RedisMessage{Ts: pkt.Ts, Bulks: []string{}}, + message: newMessage(pkt.Ts), } + conn.Streams[dir] = st } else { - // concatenate bytes - priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...) - if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM { + st.data = append(st.data, pkt.Payload...) + if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM { logp.Debug("redis", "Stream data too large, dropping TCP stream") - priv.Data[dir] = nil - return priv + conn.Streams[dir] = nil } + return conn } - stream := priv.Data[dir] - for len(stream.data) > 0 { - if stream.message == nil { - stream.message = &RedisMessage{Ts: pkt.Ts} + for len(st.data) > 0 { + if st.message == nil { + st.message = newMessage(pkt.Ts) } - ok, complete := redisMessageParser(priv.Data[dir]) - + ok, complete := redisMessageParser(st) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it - priv.Data[dir] = nil - logp.Debug("redis", "Ignore Redis message. Drop tcp stream. Try parsing with the next segment") - return priv + conn.Streams[dir] = nil + debug("Ignore Redis message. Drop tcp stream. Try parsing with the next segment") + return conn } if !complete { @@ -578,25 +203,25 @@ func (redis *Redis) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uin break } - if stream.message.IsRequest { - logp.Debug("redis", "REDIS request message: %s", stream.message.Message) + if st.message.IsRequest { + debug("REDIS request message: %s", st.message.Message) } else { - logp.Debug("redis", "REDIS response message: %s", stream.message.Message) + debug("REDIS response message: %s", st.message.Message) } + // all ok, go to next level and reset stream for new message - redis.handleRedis(stream.message, tcptuple, dir) - stream.PrepareForNewMessage() + redis.handleRedis(st.message, tcptuple, dir) + st.PrepareForNewMessage() } - return priv + return conn } -func isRedisCommand(key string) bool { - _, exists := RedisCommands[strings.ToUpper(key)] - return exists +func newMessage(ts time.Time) *redisMessage { + return &redisMessage{Ts: ts, Bulks: []string{}} } -func (redis *Redis) handleRedis(m *RedisMessage, tcptuple *common.TcpTuple, +func (redis *Redis) handleRedis(m *redisMessage, tcptuple *common.TcpTuple, dir uint8) { m.TcpTuple = *tcptuple @@ -610,7 +235,7 @@ func (redis *Redis) handleRedis(m *RedisMessage, tcptuple *common.TcpTuple, } } -func (redis *Redis) receivedRedisRequest(msg *RedisMessage) { +func (redis *Redis) receivedRedisRequest(msg *redisMessage) { tuple := msg.TcpTuple trans := redis.getTransaction(tuple.Hashable()) if trans != nil { @@ -618,7 +243,7 @@ func (redis *Redis) receivedRedisRequest(msg *RedisMessage) { logp.Warn("Two requests without a Response. Dropping old request") } } else { - trans = &RedisTransaction{Type: "redis", tuple: tuple} + trans = &transaction{Type: "redis", tuple: tuple} redis.transactions.Put(tuple.Hashable(), trans) } @@ -626,7 +251,7 @@ func (redis *Redis) receivedRedisRequest(msg *RedisMessage) { trans.Method = msg.Method trans.Path = msg.Path trans.Query = msg.Message - trans.Request_raw = msg.Message + trans.RequestRaw = msg.Message trans.BytesIn = msg.Size trans.cmdline = msg.CmdlineTuple @@ -648,7 +273,7 @@ func (redis *Redis) receivedRedisRequest(msg *RedisMessage) { } } -func (redis *Redis) receivedRedisResponse(msg *RedisMessage) { +func (redis *Redis) receivedRedisResponse(msg *redisMessage) { tuple := msg.TcpTuple trans := redis.getTransaction(tuple.Hashable()) if trans == nil { @@ -670,14 +295,14 @@ func (redis *Redis) receivedRedisResponse(msg *RedisMessage) { } trans.BytesOut = msg.Size - trans.Response_raw = msg.Message + trans.ResponseRaw = msg.Message trans.ResponseTime = int32(msg.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds redis.publishTransaction(trans) redis.transactions.Delete(trans.tuple.Hashable()) - logp.Debug("redis", "Redis transaction completed: %s", trans.Redis) + debug("Redis transaction completed: %s", trans.Redis) } func (redis *Redis) GapInStream(tcptuple *common.TcpTuple, dir uint8, @@ -697,7 +322,7 @@ func (redis *Redis) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, return private } -func (redis *Redis) publishTransaction(t *RedisTransaction) { +func (redis *Redis) publishTransaction(t *transaction) { if redis.results == nil { return @@ -711,11 +336,11 @@ func (redis *Redis) publishTransaction(t *RedisTransaction) { event["status"] = common.ERROR_STATUS } event["responsetime"] = t.ResponseTime - if redis.Send_request { - event["request"] = t.Request_raw + if redis.SendRequest { + event["request"] = t.RequestRaw } - if redis.Send_response { - event["response"] = t.Response_raw + if redis.SendResponse { + event["response"] = t.ResponseRaw } event["redis"] = common.MapStr(t.Redis) event["method"] = strings.ToUpper(t.Method) diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go new file mode 100644 index 000000000000..3ac13a703262 --- /dev/null +++ b/protos/redis/redis_parse.go @@ -0,0 +1,418 @@ +package redis + +import ( + "bytes" + "strconv" + "strings" + "time" + + "github.com/elastic/libbeat/common" + "github.com/elastic/libbeat/logp" +) + +type redisMessage struct { + Ts time.Time + NumberOfBulks int64 + Bulks []string + + TcpTuple common.TcpTuple + CmdlineTuple *common.CmdlineTuple + Direction uint8 + + IsRequest bool + IsError bool + Message string + Method string + Path string + Size int + + parseState int + start int + end int +} + +const ( + START = iota + BULK_ARRAY + SIMPLE_MESSAGE +) + +// Keep sorted for future command addition +var redisCommands = map[string]struct{}{ + "APPEND": struct{}{}, + "AUTH": struct{}{}, + "BGREWRITEAOF": struct{}{}, + "BGSAVE": struct{}{}, + "BITCOUNT": struct{}{}, + "BITOP": struct{}{}, + "BITPOS": struct{}{}, + "BLPOP": struct{}{}, + "BRPOP": struct{}{}, + "BRPOPLPUSH": struct{}{}, + "CLIENT GETNAME": struct{}{}, + "CLIENT KILL": struct{}{}, + "CLIENT LIST": struct{}{}, + "CLIENT PAUSE": struct{}{}, + "CLIENT SETNAME": struct{}{}, + "CONFIG GET": struct{}{}, + "CONFIG RESETSTAT": struct{}{}, + "CONFIG REWRITE": struct{}{}, + "CONFIG SET": struct{}{}, + "DBSIZE": struct{}{}, + "DEBUG OBJECT": struct{}{}, + "DEBUG SEGFAULT": struct{}{}, + "DECR": struct{}{}, + "DECRBY": struct{}{}, + "DEL": struct{}{}, + "DISCARD": struct{}{}, + "DUMP": struct{}{}, + "ECHO": struct{}{}, + "EVAL": struct{}{}, + "EVALSHA": struct{}{}, + "EXEC": struct{}{}, + "EXISTS": struct{}{}, + "EXPIRE": struct{}{}, + "EXPIREAT": struct{}{}, + "FLUSHALL": struct{}{}, + "FLUSHDB": struct{}{}, + "GET": struct{}{}, + "GETBIT": struct{}{}, + "GETRANGE": struct{}{}, + "GETSET": struct{}{}, + "HDEL": struct{}{}, + "HEXISTS": struct{}{}, + "HGET": struct{}{}, + "HGETALL": struct{}{}, + "HINCRBY": struct{}{}, + "HINCRBYFLOAT": struct{}{}, + "HKEYS": struct{}{}, + "HLEN": struct{}{}, + "HMGET": struct{}{}, + "HMSET": struct{}{}, + "HSCAN": struct{}{}, + "HSET": struct{}{}, + "HSETINX": struct{}{}, + "HVALS": struct{}{}, + "INCR": struct{}{}, + "INCRBY": struct{}{}, + "INCRBYFLOAT": struct{}{}, + "INFO": struct{}{}, + "KEYS": struct{}{}, + "LASTSAVE": struct{}{}, + "LINDEX": struct{}{}, + "LINSERT": struct{}{}, + "LLEN": struct{}{}, + "LPOP": struct{}{}, + "LPUSH": struct{}{}, + "LPUSHX": struct{}{}, + "LRANGE": struct{}{}, + "LREM": struct{}{}, + "LSET": struct{}{}, + "LTRIM": struct{}{}, + "MGET": struct{}{}, + "MIGRATE": struct{}{}, + "MONITOR": struct{}{}, + "MOVE": struct{}{}, + "MSET": struct{}{}, + "MSETNX": struct{}{}, + "MULTI": struct{}{}, + "OBJECT": struct{}{}, + "PERSIST": struct{}{}, + "PEXPIRE": struct{}{}, + "PEXPIREAT": struct{}{}, + "PFADD": struct{}{}, + "PFCOUNT": struct{}{}, + "PFMERGE": struct{}{}, + "PING": struct{}{}, + "PSETEX": struct{}{}, + "PSUBSCRIBE": struct{}{}, + "PTTL": struct{}{}, + "PUBLISH": struct{}{}, + "PUBSUB": struct{}{}, + "PUNSUBSCRIBE": struct{}{}, + "QUIT": struct{}{}, + "RANDOMKEY": struct{}{}, + "RENAME": struct{}{}, + "RENAMENX": struct{}{}, + "RESTORE": struct{}{}, + "RPOP": struct{}{}, + "RPOPLPUSH": struct{}{}, + "RPUSH": struct{}{}, + "RPUSHX": struct{}{}, + "SADD": struct{}{}, + "SAVE": struct{}{}, + "SCAN": struct{}{}, + "SCARD": struct{}{}, + "SCRIPT EXISTS": struct{}{}, + "SCRIPT FLUSH": struct{}{}, + "SCRIPT KILL": struct{}{}, + "SCRIPT LOAD": struct{}{}, + "SDIFF": struct{}{}, + "SDIFFSTORE": struct{}{}, + "SELECT": struct{}{}, + "SET": struct{}{}, + "SETBIT": struct{}{}, + "SETEX": struct{}{}, + "SETNX": struct{}{}, + "SETRANGE": struct{}{}, + "SHUTDOWN": struct{}{}, + "SINTER": struct{}{}, + "SINTERSTORE": struct{}{}, + "SISMEMBER": struct{}{}, + "SLAVEOF": struct{}{}, + "SLOWLOG": struct{}{}, + "SMEMBERS": struct{}{}, + "SMOVE": struct{}{}, + "SORT": struct{}{}, + "SPOP": struct{}{}, + "SRANDMEMBER": struct{}{}, + "SREM": struct{}{}, + "SSCAN": struct{}{}, + "STRLEN": struct{}{}, + "SUBSCRIBE": struct{}{}, + "SUNION": struct{}{}, + "SUNIONSTORE": struct{}{}, + "SYNC": struct{}{}, + "TIME": struct{}{}, + "TTL": struct{}{}, + "TYPE": struct{}{}, + "UNSUBSCRIBE": struct{}{}, + "UNWATCH": struct{}{}, + "WATCH": struct{}{}, + "ZADD": struct{}{}, + "ZCARD": struct{}{}, + "ZCOUNT": struct{}{}, + "ZINCRBY": struct{}{}, + "ZINTERSTORE": struct{}{}, + "ZRANGE": struct{}{}, + "ZRANGEBYSCORE": struct{}{}, + "ZRANK": struct{}{}, + "ZREM": struct{}{}, + "ZREMRANGEBYLEX": struct{}{}, + "ZREMRANGEBYRANK": struct{}{}, + "ZREMRANGEBYSCORE": struct{}{}, + "ZREVRANGE": struct{}{}, + "ZREVRANGEBYSCORE": struct{}{}, + "ZREVRANK": struct{}{}, + "ZSCAN": struct{}{}, + "ZSCORE": struct{}{}, + "ZUNIONSTORE": struct{}{}, +} + +func isRedisCommand(key string) bool { + _, exists := redisCommands[strings.ToUpper(key)] + return exists +} + +func redisMessageParser(s *stream) (bool, bool) { + var err error + var value string + m := s.message + + iserror := false + + for s.parseOffset < len(s.data) { + + if s.data[s.parseOffset] == '*' { + //Arrays + + m.parseState = BULK_ARRAY + m.start = s.parseOffset + debug("start %d", m.start) + + found, line, off := readLine(s.data, s.parseOffset) + if !found { + debug("End of line not found, waiting for more data") + return true, false + } + debug("line %s: %d", line, off) + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + //Null array + s.parseOffset = off + value = "nil" + } else if len(line) == 2 && line[1] == '0' { + // Empty array + s.parseOffset = off + value = "[]" + } else { + + m.NumberOfBulks, err = strconv.ParseInt(line[1:], 10, 64) + if err != nil { + logp.Err("Failed to read number of bulk messages: %s", err) + return false, false + } + + s.parseOffset = off + m.Bulks = make([]string, 0, m.NumberOfBulks) + continue + } + + } else if s.data[s.parseOffset] == '$' { + // Bulk Strings + if m.parseState == START { + m.parseState = SIMPLE_MESSAGE + m.start = s.parseOffset + } + starting_offset := s.parseOffset + + found, line, off := readLine(s.data, s.parseOffset) + if !found { + debug("End of line not found, waiting for more data") + s.parseOffset = starting_offset + return true, false + } + debug("line %s: %d", line, off) + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + // NULL Bulk Reply + value = "nil" + s.parseOffset = off + } else { + length, err := strconv.ParseInt(line[1:], 10, 64) + if err != nil { + logp.Err("Failed to read bulk message: %s", err) + return false, false + } + + s.parseOffset = off + + // check all content in buffer (length + CRLF) + if int64(len(s.data[s.parseOffset:])) < length+2 { + debug("Message incomplete, waiting for more data") + s.parseOffset = starting_offset + return true, false + } + + // check content ends with CRLF + off = s.parseOffset + int(length) + if s.data[off] != '\r' || s.data[off+1] != '\n' { + logp.Err("Expected end of line not found") + return false, false + } + + // extract line + line = string(s.data[s.parseOffset:off]) + off += 2 + + debug("line %s: %d", line, s.parseOffset) + if int64(len(line)) != length { + logp.Err("Wrong length of data: %d instead of %d", len(line), length) + return false, false + } + + value = line + s.parseOffset = off + } + + } else if s.data[s.parseOffset] == ':' { + // Integers + if m.parseState == START { + // it's not in a bulk message + m.parseState = SIMPLE_MESSAGE + m.start = s.parseOffset + } + + found, line, off := readLine(s.data, s.parseOffset) + if !found { + return true, false + } + n, err := strconv.ParseInt(line[1:], 10, 64) + + if err != nil { + logp.Err("Failed to read integer reply: %s", err) + return false, false + } + value = strconv.Itoa(int(n)) + s.parseOffset = off + + } else if s.data[s.parseOffset] == '+' { + // Simple Strings + if m.parseState == START { + // it's not in a bulk message + m.parseState = SIMPLE_MESSAGE + m.start = s.parseOffset + } + found, line, off := readLine(s.data, s.parseOffset) + if !found { + return true, false + } + + value = line[1:] + s.parseOffset = off + } else if s.data[s.parseOffset] == '-' { + // Errors + if m.parseState == START { + // it's not in a bulk message + m.parseState = SIMPLE_MESSAGE + m.start = s.parseOffset + } + found, line, off := readLine(s.data, s.parseOffset) + if !found { + return true, false + } + iserror = true + + value = line[1:] + s.parseOffset = off + } else { + debug("Unexpected message starting with %s", s.data[s.parseOffset:]) + return false, false + } + + // add value + if m.NumberOfBulks > 0 { + m.NumberOfBulks = m.NumberOfBulks - 1 + m.Bulks = append(m.Bulks, value) + + if len(m.Bulks) == 1 { + debug("Value: %s", value) + // first word. + // check if it's a command + if isRedisCommand(value) { + debug("is request") + m.IsRequest = true + m.Method = value + } + } + + if len(m.Bulks) == 2 { + // second word. This is usually the path + if m.IsRequest { + m.Path = value + } + } + + if m.NumberOfBulks == 0 { + // the last bulk received + if m.IsRequest { + m.Message = strings.Join(m.Bulks, " ") + } else { + m.Message = "[" + strings.Join(m.Bulks, ", ") + "]" + } + m.end = s.parseOffset + m.Size = m.end - m.start + return true, true + } + } else { + m.Message = value + m.end = s.parseOffset + m.Size = m.end - m.start + if iserror { + m.IsError = true + } + return true, true + } + + } //end for + + return true, false +} + +func readLine(data []byte, offset int) (bool, string, int) { + q := bytes.Index(data[offset:], []byte("\r\n")) + if q == -1 { + return false, "", 0 + } + return true, string(data[offset : offset+q]), offset + q + 2 +} diff --git a/protos/redis/redis_test.go b/protos/redis/redis_test.go index d95a6ac9267f..56badb7fca1f 100644 --- a/protos/redis/redis_test.go +++ b/protos/redis/redis_test.go @@ -12,9 +12,9 @@ func TestRedisParser_ArrayRequest(t *testing.T) { "$5\r\n" + "Hello\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -22,14 +22,14 @@ func TestRedisParser_ArrayRequest(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if !stream.message.IsRequest { + if !st.message.IsRequest { t.Errorf("Failed to parse Redis request") } - if stream.message.Message != "SET key1 Hello" { - t.Errorf("Failed to parse Redis request: %s", stream.message.Message) + if st.message.Message != "SET key1 Hello" { + t.Errorf("Failed to parse Redis request: %s", st.message.Message) } - if stream.message.Size != 34 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 34 { + t.Errorf("Wrong message size %d", st.message.Size) } } @@ -43,9 +43,9 @@ func TestRedisParser_ArrayResponse(t *testing.T) { "bar\r\n" + ":23\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -53,14 +53,14 @@ func TestRedisParser_ArrayResponse(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if stream.message.IsRequest { + if st.message.IsRequest { t.Errorf("Failed to parse Redis response") } - if stream.message.Message != "[foo, nil, bar, 23]" { - t.Errorf("Failed to parse Redis request: %s", stream.message.Message) + if st.message.Message != "[foo, nil, bar, 23]" { + t.Errorf("Failed to parse Redis request: %s", st.message.Message) } - if stream.message.Size != 32 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 32 { + t.Errorf("Wrong message size %d", st.message.Size) } } @@ -68,9 +68,9 @@ func TestRedisParser_SimpleString(t *testing.T) { message := []byte("+OK\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -78,14 +78,14 @@ func TestRedisParser_SimpleString(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if stream.message.IsRequest { + if st.message.IsRequest { t.Errorf("Failed to parse Redis response") } - if stream.message.Message != "OK" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) + if st.message.Message != "OK" { + t.Errorf("Failed to parse Redis response: %s", st.message.Message) } - if stream.message.Size != 5 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 5 { + t.Errorf("Wrong message size %d", st.message.Size) } } @@ -93,9 +93,9 @@ func TestRedisParser_NilString(t *testing.T) { message := []byte("$-1\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -103,14 +103,14 @@ func TestRedisParser_NilString(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if stream.message.IsRequest { + if st.message.IsRequest { t.Errorf("Failed to parse Redis response") } - if stream.message.Message != "nil" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) + if st.message.Message != "nil" { + t.Errorf("Failed to parse Redis response: %s", st.message.Message) } - if stream.message.Size != 5 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 5 { + t.Errorf("Wrong message size %d", st.message.Size) } } @@ -118,9 +118,9 @@ func TestRedisParser_EmptyString(t *testing.T) { message := []byte("$0\r\n\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -128,14 +128,14 @@ func TestRedisParser_EmptyString(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if stream.message.IsRequest { + if st.message.IsRequest { t.Errorf("Failed to parse Redis response") } - if stream.message.Message != "" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) + if st.message.Message != "" { + t.Errorf("Failed to parse Redis response: %s", st.message.Message) } - if stream.message.Size != 6 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 6 { + t.Errorf("Wrong message size %d", st.message.Size) } } @@ -143,9 +143,9 @@ func TestRedisParser_EmptyArray(t *testing.T) { message := []byte("*0\r\n") - stream := &RedisStream{data: message, message: new(RedisMessage)} + st := &stream{data: message, message: new(redisMessage)} - ok, complete := redisMessageParser(stream) + ok, complete := redisMessageParser(st) if !ok { t.Errorf("Parsing returned error") @@ -153,13 +153,13 @@ func TestRedisParser_EmptyArray(t *testing.T) { if !complete { t.Errorf("Expecting a complete message") } - if stream.message.IsRequest { + if st.message.IsRequest { t.Errorf("Failed to parse Redis response") } - if stream.message.Message != "[]" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) + if st.message.Message != "[]" { + t.Errorf("Failed to parse Redis response: %s", st.message.Message) } - if stream.message.Size != 4 { - t.Errorf("Wrong message size %d", stream.message.Size) + if st.message.Size != 4 { + t.Errorf("Wrong message size %d", st.message.Size) } } From 313651867329d363a942e7071c9c9e5efbdee52e Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 25 Nov 2015 23:20:15 +0100 Subject: [PATCH 4/8] Redis: support for pipelining Add support for pipelining by storing all requests and responses in lists to be correlated by order. Responses without requests are dropped (original behavior). If TCP connection times out, the response will be lost too. --- protos/redis/redis.go | 173 ++++++++++++++++++++---------------- protos/redis/redis_parse.go | 2 + 2 files changed, 100 insertions(+), 75 deletions(-) diff --git a/protos/redis/redis.go b/protos/redis/redis.go index ebe92f29cec3..3ac6e3e72cf5 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -49,7 +49,13 @@ type transaction struct { } type redisConnectionData struct { - Streams [2]*stream + Streams [2]*stream + requests messageList + responses messageList +} + +type messageList struct { + head, tail *redisMessage } // Redis protocol plugin @@ -59,7 +65,6 @@ type Redis struct { SendRequest bool SendResponse bool - transactions *common.Cache transactionTimeout time.Duration results publisher.Client @@ -67,14 +72,6 @@ type Redis struct { var debug = logp.MakeDebug("redis") -func (redis *Redis) getTransaction(k common.HashableTcpTuple) *transaction { - v := redis.transactions.Get(k) - if v != nil { - return v.(*transaction) - } - return nil -} - func (redis *Redis) InitDefaults() { redis.SendRequest = false redis.SendResponse = false @@ -106,10 +103,6 @@ func (redis *Redis) Init(test_mode bool, results publisher.Client) error { redis.setFromConfig(config.ConfigSingleton.Protocols.Redis) } - redis.transactions = common.NewCache( - redis.transactionTimeout, - protos.DefaultTransactionHashSize) - redis.transactions.StartJanitor(redis.transactionTimeout) redis.results = results return nil @@ -134,7 +127,6 @@ func (redis *Redis) Parse( defer logp.Recover("ParseRedis exception") conn := ensureRedisConnection(private) - debug("redis connection: %p", conn) conn = redis.doParse(conn, pkt, tcptuple, dir) if conn == nil { return nil @@ -204,13 +196,13 @@ func (redis *Redis) doParse( } if st.message.IsRequest { - debug("REDIS request message: %s", st.message.Message) + debug("REDIS (%p) request message: %s", conn, st.message.Message) } else { - debug("REDIS response message: %s", st.message.Message) + debug("REDIS (%p) response message: %s", conn, st.message.Message) } // all ok, go to next level and reset stream for new message - redis.handleRedis(st.message, tcptuple, dir) + redis.handleRedis(conn, st.message, tcptuple, dir) st.PrepareForNewMessage() } @@ -221,88 +213,88 @@ func newMessage(ts time.Time) *redisMessage { return &redisMessage{Ts: ts, Bulks: []string{}} } -func (redis *Redis) handleRedis(m *redisMessage, tcptuple *common.TcpTuple, - dir uint8) { - +func (redis *Redis) handleRedis( + conn *redisConnectionData, + m *redisMessage, + tcptuple *common.TcpTuple, + dir uint8, +) { m.TcpTuple = *tcptuple m.Direction = dir m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IpPort()) if m.IsRequest { - redis.receivedRedisRequest(m) + conn.requests.append(m) // wait for response } else { - redis.receivedRedisResponse(m) + conn.responses.append(m) + redis.correlate(conn) } } -func (redis *Redis) receivedRedisRequest(msg *redisMessage) { - tuple := msg.TcpTuple - trans := redis.getTransaction(tuple.Hashable()) - if trans != nil { - if trans.Redis != nil { - logp.Warn("Two requests without a Response. Dropping old request") +func (redis *Redis) correlate(conn *redisConnectionData) { + // drop responses with missing requests + if conn.requests.empty() { + for !conn.responses.empty() { + logp.Warn("Response from unknown transaction. Ignoring") + conn.responses.pop() } - } else { - trans = &transaction{Type: "redis", tuple: tuple} - redis.transactions.Put(tuple.Hashable(), trans) + return + } + + // merge requests with responses into transactions + for !conn.responses.empty() && !conn.requests.empty() { + requ := conn.requests.pop() + resp := conn.responses.pop() + trans := newTransaction(requ, resp) + + debug("REDIS (%p) transaction completed: %s", conn, trans.Redis) + redis.publishTransaction(trans) } +} + +func newTransaction(requ, resp *redisMessage) *transaction { + trans := &transaction{Type: "redis", tuple: requ.TcpTuple} + // init from request trans.Redis = common.MapStr{} - trans.Method = msg.Method - trans.Path = msg.Path - trans.Query = msg.Message - trans.RequestRaw = msg.Message - trans.BytesIn = msg.Size - - trans.cmdline = msg.CmdlineTuple - trans.ts = msg.Ts + trans.Method = requ.Method + trans.Path = requ.Path + trans.Query = requ.Message + trans.RequestRaw = requ.Message + trans.BytesIn = requ.Size + + trans.cmdline = requ.CmdlineTuple + trans.ts = requ.Ts trans.Ts = int64(trans.ts.UnixNano() / 1000) // transactions have microseconds resolution - trans.JsTs = msg.Ts + trans.JsTs = requ.Ts trans.Src = common.Endpoint{ - Ip: msg.TcpTuple.Src_ip.String(), - Port: msg.TcpTuple.Src_port, - Proc: string(msg.CmdlineTuple.Src), + Ip: requ.TcpTuple.Src_ip.String(), + Port: requ.TcpTuple.Src_port, + Proc: string(requ.CmdlineTuple.Src), } trans.Dst = common.Endpoint{ - Ip: msg.TcpTuple.Dst_ip.String(), - Port: msg.TcpTuple.Dst_port, - Proc: string(msg.CmdlineTuple.Dst), + Ip: requ.TcpTuple.Dst_ip.String(), + Port: requ.TcpTuple.Dst_port, + Proc: string(requ.CmdlineTuple.Dst), } - if msg.Direction == tcp.TcpDirectionReverse { + if requ.Direction == tcp.TcpDirectionReverse { trans.Src, trans.Dst = trans.Dst, trans.Src } -} - -func (redis *Redis) receivedRedisResponse(msg *redisMessage) { - tuple := msg.TcpTuple - trans := redis.getTransaction(tuple.Hashable()) - if trans == nil { - logp.Warn("Response from unknown transaction. Ignoring.") - return - } - // check if the request was received - if trans.Redis == nil { - logp.Warn("Response from unknown transaction. Ignoring.") - return - } - - trans.IsError = msg.IsError - if msg.IsError { - trans.Redis["error"] = msg.Message + // init from response + trans.IsError = resp.IsError + if resp.IsError { + trans.Redis["error"] = resp.Message } else { - trans.Redis["return_value"] = msg.Message + trans.Redis["return_value"] = resp.Message } - trans.BytesOut = msg.Size - trans.ResponseRaw = msg.Message - - trans.ResponseTime = int32(msg.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds + trans.BytesOut = resp.Size + trans.ResponseRaw = resp.Message - redis.publishTransaction(trans) - redis.transactions.Delete(trans.tuple.Hashable()) + trans.ResponseTime = int32(resp.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds - debug("Redis transaction completed: %s", trans.Redis) + return trans } func (redis *Redis) GapInStream(tcptuple *common.TcpTuple, dir uint8, @@ -323,7 +315,6 @@ func (redis *Redis) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, } func (redis *Redis) publishTransaction(t *transaction) { - if redis.results == nil { return } @@ -355,3 +346,35 @@ func (redis *Redis) publishTransaction(t *transaction) { redis.results.PublishEvent(event) } + +func (ml *messageList) append(msg *redisMessage) { + if ml.tail == nil { + ml.head = msg + } else { + ml.tail.next = msg + } + msg.next = nil + ml.tail = msg +} + +func (ml *messageList) empty() bool { + return ml.head == nil +} + +func (ml *messageList) pop() *redisMessage { + if ml.head == nil { + return nil + } + + msg := ml.head + ml.head = ml.head.next + if ml.head == nil { + ml.tail = nil + } + debug("new head=%p", ml.head) + return msg +} + +func (ml *messageList) last() *redisMessage { + return ml.tail +} diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go index 3ac13a703262..fc7c46dd21c6 100644 --- a/protos/redis/redis_parse.go +++ b/protos/redis/redis_parse.go @@ -29,6 +29,8 @@ type redisMessage struct { parseState int start int end int + + next *redisMessage } const ( From a74d95eabb6b15bfa4e15ab1593fb1187bccb394 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 26 Nov 2015 02:02:44 +0100 Subject: [PATCH 5/8] Rewrite redis parser Added unit tests for cases redis parser has had problems with + rewrite parser to fix those issues. - length prefixed strings containing '\r\n' have not been fully read => message dropped => connection/correlation in invalid state - nested arrays parsing 'unreliable' (code did assume no nesting) - restart/continue parsing is message is split in multiple TCP segments. Internal parser state was invalid such that TCP stream was sometimes not fully processed at all and transaction have been dropped or message required to be dropped ending in potentially wrong correlations. --- protos/redis/redis.go | 64 ++++--- protos/redis/redis_parse.go | 370 ++++++++++++++++-------------------- protos/redis/redis_test.go | 240 ++++++++++++----------- 3 files changed, 314 insertions(+), 360 deletions(-) diff --git a/protos/redis/redis.go b/protos/redis/redis.go index 3ac6e3e72cf5..9aaf115ac210 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -11,18 +11,14 @@ import ( "github.com/elastic/packetbeat/config" "github.com/elastic/packetbeat/procs" "github.com/elastic/packetbeat/protos" + "github.com/elastic/packetbeat/protos/applayer" "github.com/elastic/packetbeat/protos/tcp" ) type stream struct { + applayer.Stream + parser parser tcptuple *common.TcpTuple - - data []byte - - parseOffset int - bytesReceived int - - message *redisMessage } type transaction struct { @@ -109,9 +105,9 @@ func (redis *Redis) Init(test_mode bool, results publisher.Client) error { } func (s *stream) PrepareForNewMessage() { - s.data = s.data[s.parseOffset:] - s.parseOffset = 0 - s.message = nil + parser := &s.parser + s.Stream.Reset() + parser.reset() } func (redis *Redis) ConnectionTimeout() time.Duration { @@ -161,27 +157,23 @@ func (redis *Redis) doParse( st := conn.Streams[dir] if st == nil { - st = &stream{ - tcptuple: tcptuple, - data: pkt.Payload, - message: newMessage(pkt.Ts), - } + st = newStream(pkt.Ts, tcptuple) conn.Streams[dir] = st - } else { - st.data = append(st.data, pkt.Payload...) - if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM { - logp.Debug("redis", "Stream data too large, dropping TCP stream") - conn.Streams[dir] = nil - } - return conn + debug("new stream: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) } - for len(st.data) > 0 { - if st.message == nil { - st.message = newMessage(pkt.Ts) + if err := st.Append(pkt.Payload); err != nil { + debug("%v, dropping TCP stream: ", err) + return nil + } + debug("stream add data: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) + + for st.Buf.Len() > 0 { + if st.parser.message == nil { + st.parser.message = newMessage(pkt.Ts) } - ok, complete := redisMessageParser(st) + ok, complete := st.parser.parse(&st.Buf) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it @@ -195,22 +187,32 @@ func (redis *Redis) doParse( break } - if st.message.IsRequest { - debug("REDIS (%p) request message: %s", conn, st.message.Message) + msg := st.parser.message + if msg.IsRequest { + debug("REDIS (%p) request message: %s", conn, msg.Message) } else { - debug("REDIS (%p) response message: %s", conn, st.message.Message) + debug("REDIS (%p) response message: %s", conn, msg.Message) } // all ok, go to next level and reset stream for new message - redis.handleRedis(conn, st.message, tcptuple, dir) + redis.handleRedis(conn, msg, tcptuple, dir) st.PrepareForNewMessage() } return conn } +func newStream(ts time.Time, tcptuple *common.TcpTuple) *stream { + s := &stream{ + tcptuple: tcptuple, + } + s.parser.message = newMessage(ts) + s.Stream.Init(tcp.TCP_MAX_DATA_IN_STREAM) + return s +} + func newMessage(ts time.Time) *redisMessage { - return &redisMessage{Ts: ts, Bulks: []string{}} + return &redisMessage{Ts: ts} } func (redis *Redis) handleRedis( diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go index fc7c46dd21c6..b7c0bed185a0 100644 --- a/protos/redis/redis_parse.go +++ b/protos/redis/redis_parse.go @@ -1,19 +1,25 @@ package redis import ( - "bytes" "strconv" "strings" "time" "github.com/elastic/libbeat/common" + "github.com/elastic/libbeat/common/streambuf" "github.com/elastic/libbeat/logp" ) +type parser struct { + parseOffset int + // bytesReceived int + message *redisMessage +} + type redisMessage struct { - Ts time.Time - NumberOfBulks int64 - Bulks []string + Ts time.Time + // NumberOfBulks int64 + // Bulks []string TcpTuple common.TcpTuple CmdlineTuple *common.CmdlineTuple @@ -26,9 +32,9 @@ type redisMessage struct { Path string Size int - parseState int - start int - end int + // parseState int + // start int + // end int next *redisMessage } @@ -206,215 +212,163 @@ func isRedisCommand(key string) bool { return exists } -func redisMessageParser(s *stream) (bool, bool) { - var err error +func (p *parser) reset() { + p.parseOffset = 0 + p.message = nil +} + +func (parser *parser) parse(buf *streambuf.Buffer) (bool, bool) { + snapshot := buf.Snapshot() + + content, iserror, ok, complete := parser.dispatch(0, buf) + if !ok || !complete { + // on error or incomplete message drop all parsing progress, due to + // parse not being statefull among multiple calls + // => parser needs to restart parsing all content + buf.Restore(snapshot) + return ok, complete + } + + parser.message.IsError = iserror + parser.message.Size = buf.BufferConsumed() + parser.message.Message = content + return true, true +} + +func (p *parser) dispatch(depth int, buf *streambuf.Buffer) (string, bool, bool, bool) { + if buf.Len() == 0 { + return "", false, true, false + } + var value string - m := s.message + var iserror, ok, complete bool + snapshot := buf.Snapshot() + + switch buf.Bytes()[0] { + case '*': + value, iserror, ok, complete = p.parseArray(depth, buf) + case '$': + value, ok, complete = p.parseString(buf) + case ':': + value, ok, complete = p.parseInt(buf) + case '+': + value, ok, complete = p.parseSimpleString(buf) + case '-': + iserror = true + value, ok, complete = p.parseSimpleString(buf) + default: + debug("Unexpected message starting with %s", buf.Bytes()[0]) + return "", false, false, false + } - iserror := false + if !ok || !complete { + buf.Restore(snapshot) + } + return value, iserror, ok, complete +} + +func (p *parser) parseInt(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + number := string(line[1:]) + if _, err := strconv.ParseInt(number, 10, 64); err != nil { + logp.Err("Failed to read integer reply: %s", err) + } + + return number, true, true +} + +func (p *parser) parseSimpleString(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + return string(line[1:]), true, true +} - for s.parseOffset < len(s.data) { - - if s.data[s.parseOffset] == '*' { - //Arrays - - m.parseState = BULK_ARRAY - m.start = s.parseOffset - debug("start %d", m.start) - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - debug("End of line not found, waiting for more data") - return true, false - } - debug("line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - //Null array - s.parseOffset = off - value = "nil" - } else if len(line) == 2 && line[1] == '0' { - // Empty array - s.parseOffset = off - value = "[]" - } else { - - m.NumberOfBulks, err = strconv.ParseInt(line[1:], 10, 64) - if err != nil { - logp.Err("Failed to read number of bulk messages: %s", err) - return false, false - } - - s.parseOffset = off - m.Bulks = make([]string, 0, m.NumberOfBulks) - continue - } - - } else if s.data[s.parseOffset] == '$' { - // Bulk Strings - if m.parseState == START { - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - starting_offset := s.parseOffset - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - debug("End of line not found, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - debug("line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - // NULL Bulk Reply - value = "nil" - s.parseOffset = off - } else { - length, err := strconv.ParseInt(line[1:], 10, 64) - if err != nil { - logp.Err("Failed to read bulk message: %s", err) - return false, false - } - - s.parseOffset = off - - // check all content in buffer (length + CRLF) - if int64(len(s.data[s.parseOffset:])) < length+2 { - debug("Message incomplete, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - - // check content ends with CRLF - off = s.parseOffset + int(length) - if s.data[off] != '\r' || s.data[off+1] != '\n' { - logp.Err("Expected end of line not found") - return false, false - } - - // extract line - line = string(s.data[s.parseOffset:off]) - off += 2 - - debug("line %s: %d", line, s.parseOffset) - if int64(len(line)) != length { - logp.Err("Wrong length of data: %d instead of %d", len(line), length) - return false, false - } - - value = line - s.parseOffset = off - } - - } else if s.data[s.parseOffset] == ':' { - // Integers - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - n, err := strconv.ParseInt(line[1:], 10, 64) - - if err != nil { - logp.Err("Failed to read integer reply: %s", err) - return false, false - } - value = strconv.Itoa(int(n)) - s.parseOffset = off - - } else if s.data[s.parseOffset] == '+' { - // Simple Strings - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - - value = line[1:] - s.parseOffset = off - } else if s.data[s.parseOffset] == '-' { - // Errors - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - iserror = true - - value = line[1:] - s.parseOffset = off - } else { - debug("Unexpected message starting with %s", s.data[s.parseOffset:]) - return false, false +func (p *parser) parseString(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + return "nil", true, true + } + + length, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + logp.Err("Failed to read bulk message: %s", err) + return "", false, false + } + + content, err := buf.CollectWithSuffix(int(length), []byte("\r\n")) + if err != nil { + if err != streambuf.ErrNoMoreBytes { + return "", false, false } + return "", true, false + } + + return string(content), true, true +} + +func (p *parser) parseArray(depth int, buf *streambuf.Buffer) (string, bool, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + debug("End of line not found, waiting for more data") + return "", false, false, false + } + debug("line %s: %d", line, buf.BufferConsumed()) + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + return "nil", false, true, true + } + + if len(line) == 2 && line[1] == '0' { + return "[]", false, true, true + } + + count, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + logp.Err("Failed to read number of bulk messages: %s", err) + return "", false, false, false + } + if count < 0 { + return "nil", false, true, true + } - // add value - if m.NumberOfBulks > 0 { - m.NumberOfBulks = m.NumberOfBulks - 1 - m.Bulks = append(m.Bulks, value) - - if len(m.Bulks) == 1 { - debug("Value: %s", value) - // first word. - // check if it's a command - if isRedisCommand(value) { - debug("is request") - m.IsRequest = true - m.Method = value - } - } - - if len(m.Bulks) == 2 { - // second word. This is usually the path - if m.IsRequest { - m.Path = value - } - } - - if m.NumberOfBulks == 0 { - // the last bulk received - if m.IsRequest { - m.Message = strings.Join(m.Bulks, " ") - } else { - m.Message = "[" + strings.Join(m.Bulks, ", ") + "]" - } - m.end = s.parseOffset - m.Size = m.end - m.start - return true, true - } - } else { - m.Message = value - m.end = s.parseOffset - m.Size = m.end - m.start - if iserror { - m.IsError = true - } - return true, true + content := make([]string, 0, count) + // read sub elements + + iserror := false + for i := 0; i < int(count); i++ { + var value string + var ok, complete bool + + value, iserror, ok, complete := p.dispatch(depth+1, buf) + if !ok || !complete { + debug("Array incomplete") + return "", iserror, ok, complete } - } //end for + content = append(content, value) + } - return true, false -} + if depth == 0 && isRedisCommand(content[0]) { // we've got a request + p.message.IsRequest = true + p.message.Method = content[0] + } -func readLine(data []byte, offset int) (bool, string, int) { - q := bytes.Index(data[offset:], []byte("\r\n")) - if q == -1 { - return false, "", 0 + var value string + if depth == 0 && p.message.IsRequest { + value = strings.Join(content, " ") + } else { + value = "[" + strings.Join(content, ", ") + "]" } - return true, string(data[offset : offset+q]), offset + q + 2 + return value, iserror, true, true } diff --git a/protos/redis/redis_test.go b/protos/redis/redis_test.go index 56badb7fca1f..587016426e12 100644 --- a/protos/redis/redis_test.go +++ b/protos/redis/redis_test.go @@ -1,9 +1,25 @@ package redis -import "testing" +import ( + "testing" + "time" -func TestRedisParser_ArrayRequest(t *testing.T) { + "github.com/stretchr/testify/assert" +) + +func newTestStream(content []byte) *stream { + st := newStream(time.Now(), nil) + st.Append(content) + return st +} + +func parse(content []byte) (*redisMessage, bool, bool) { + st := newTestStream(content) + ok, complete := st.parser.parse(&st.Buf) + return st.parser.message, ok, complete +} +func TestRedisParser_ArrayRequest(t *testing.T) { message := []byte("*3\r\n" + "$3\r\n" + "SET\r\n" + @@ -11,30 +27,16 @@ func TestRedisParser_ArrayRequest(t *testing.T) { "key1\r\n" + "$5\r\n" + "Hello\r\n") + msg, ok, complete := parse(message) - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if !st.message.IsRequest { - t.Errorf("Failed to parse Redis request") - } - if st.message.Message != "SET key1 Hello" { - t.Errorf("Failed to parse Redis request: %s", st.message.Message) - } - if st.message.Size != 34 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.True(t, msg.IsRequest) + assert.Equal(t, "SET key1 Hello", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_ArrayResponse(t *testing.T) { - message := []byte("*4\r\n" + "$3\r\n" + "foo\r\n" + @@ -42,124 +44,120 @@ func TestRedisParser_ArrayResponse(t *testing.T) { "$3\r\n" + "bar\r\n" + ":23\r\n") + msg, ok, complete := parse(message) - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if st.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if st.message.Message != "[foo, nil, bar, 23]" { - t.Errorf("Failed to parse Redis request: %s", st.message.Message) - } - if st.message.Size != 32 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[foo, nil, bar, 23]", msg.Message) + assert.Equal(t, len(message), msg.Size) } -func TestRedisParser_SimpleString(t *testing.T) { +func TestRedisParser_ArrayNested(t *testing.T) { + message := []byte("*3\r\n" + + "*-1\r\n" + + "+foo\r\n" + + "*2\r\n" + + ":1\r\n" + + "+bar\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[nil, foo, [1, bar]]", msg.Message) + assert.Equal(t, len(message), msg.Size) +} +func TestRedisParser_SimpleString(t *testing.T) { message := []byte("+OK\r\n") + msg, ok, complete := parse(message) - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if st.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if st.message.Message != "OK" { - t.Errorf("Failed to parse Redis response: %s", st.message.Message) - } - if st.message.Size != 5 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "OK", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_NilString(t *testing.T) { - message := []byte("$-1\r\n") + msg, ok, complete := parse(message) - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if st.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if st.message.Message != "nil" { - t.Errorf("Failed to parse Redis response: %s", st.message.Message) - } - if st.message.Size != 5 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "nil", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_EmptyString(t *testing.T) { - message := []byte("$0\r\n\r\n") + msg, ok, complete := parse(message) - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if st.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if st.message.Message != "" { - t.Errorf("Failed to parse Redis response: %s", st.message.Message) - } - if st.message.Size != 6 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "", msg.Message) + assert.Equal(t, len(message), msg.Size) } -func TestRedisParser_EmptyArray(t *testing.T) { +func TestRedisParser_LenString(t *testing.T) { + message := []byte("$5\r\n" + + "12345\r\n") + msg, ok, complete := parse(message) + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "12345", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_LenStringWithCRLF(t *testing.T) { + message := []byte("$7\r\n" + + "123\r\n45\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "123\r\n45", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_EmptyArray(t *testing.T) { message := []byte("*0\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[]", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_Array2Passes(t *testing.T) { + part1 := []byte("*3\r\n" + + "$3\r\n" + + "SET\r\n" + + "$4\r\n") + part2 := []byte("key1\r\n" + + "$5\r\n" + + "Hello\r\n") + + st := newTestStream(part1) + ok, complete := st.parser.parse(&st.Buf) + assert.True(t, ok) + assert.False(t, complete) + + st.Stream.Append(part2) + ok, complete = st.parser.parse(&st.Buf) + msg := st.parser.message - st := &stream{data: message, message: new(redisMessage)} - - ok, complete := redisMessageParser(st) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if st.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if st.message.Message != "[]" { - t.Errorf("Failed to parse Redis response: %s", st.message.Message) - } - if st.message.Size != 4 { - t.Errorf("Wrong message size %d", st.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.True(t, msg.IsRequest) + assert.Equal(t, "SET key1 Hello", msg.Message) + assert.Equal(t, len(part1)+len(part2), msg.Size) } From 96c62db8249fc530448b158ad3e18266e0af3566 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 26 Nov 2015 02:10:40 +0100 Subject: [PATCH 6/8] remove unused fields --- protos/redis/redis_parse.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go index b7c0bed185a0..cb143b973d67 100644 --- a/protos/redis/redis_parse.go +++ b/protos/redis/redis_parse.go @@ -18,8 +18,6 @@ type parser struct { type redisMessage struct { Ts time.Time - // NumberOfBulks int64 - // Bulks []string TcpTuple common.TcpTuple CmdlineTuple *common.CmdlineTuple @@ -32,10 +30,6 @@ type redisMessage struct { Path string Size int - // parseState int - // start int - // end int - next *redisMessage } From 7d89c7af0c303dc42c3c194fb8769264b1fa212c Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 26 Nov 2015 02:16:47 +0100 Subject: [PATCH 7/8] Add redis changes to changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 352798c7480a..20ce4781c9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,13 @@ All notable changes to this project will be documented in this file based on the ### Backward Compatibility Breaks ### Bugfixes +- Fix panic on nil in redis protocol parser. #384 +- Fix errors redis parser when messages are split in multiple TCP segments. #402 +- Fix errors in redis parser when length prefixed strings contain sequences of CRLF. #402 +- Fix errors in redis parser when dealing with nested arrays. #402 ### Added +- Added piping support to redis protocol. #402 ### Deprecated From 23b7e8ac13c43be15eec49dcb1805558cb7f9f3f Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 26 Nov 2015 02:21:49 +0100 Subject: [PATCH 8/8] Set redis request path --- protos/redis/redis_parse.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go index cb143b973d67..227ceed981ac 100644 --- a/protos/redis/redis_parse.go +++ b/protos/redis/redis_parse.go @@ -356,6 +356,7 @@ func (p *parser) parseArray(depth int, buf *streambuf.Buffer) (string, bool, boo if depth == 0 && isRedisCommand(content[0]) { // we've got a request p.message.IsRequest = true p.message.Method = content[0] + p.message.Path = content[1] } var value string