diff --git a/CHANGELOG.md b/CHANGELOG.md index 352798c7480a..20ce4781c9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,13 @@ All notable changes to this project will be documented in this file based on the ### Backward Compatibility Breaks ### Bugfixes +- Fix panic on nil in redis protocol parser. #384 +- Fix errors redis parser when messages are split in multiple TCP segments. #402 +- Fix errors in redis parser when length prefixed strings contain sequences of CRLF. #402 +- Fix errors in redis parser when dealing with nested arrays. #402 ### Added +- Added piping support to redis protocol. #402 ### Deprecated diff --git a/protos/redis/redis.go b/protos/redis/redis.go index edbbeb386d2a..9aaf115ac210 100644 --- a/protos/redis/redis.go +++ b/protos/redis/redis.go @@ -1,8 +1,6 @@ package redis import ( - "bytes" - "strconv" "strings" "time" @@ -13,48 +11,17 @@ import ( "github.com/elastic/packetbeat/config" "github.com/elastic/packetbeat/procs" "github.com/elastic/packetbeat/protos" + "github.com/elastic/packetbeat/protos/applayer" "github.com/elastic/packetbeat/protos/tcp" ) -const ( - START = iota - BULK_ARRAY - SIMPLE_MESSAGE -) - -type RedisMessage struct { - Ts time.Time - NumberOfBulks int64 - Bulks []string - - TcpTuple common.TcpTuple - CmdlineTuple *common.CmdlineTuple - Direction uint8 - - IsRequest bool - IsError bool - Message string - Method string - Path string - Size int - - parseState int - start int - end int -} - -type RedisStream struct { +type stream struct { + applayer.Stream + parser parser tcptuple *common.TcpTuple - - data []byte - - parseOffset int - bytesReceived int - - message *RedisMessage } -type RedisTransaction struct { +type transaction struct { Type string tuple common.TcpTuple Src common.Endpoint @@ -73,207 +40,48 @@ type RedisTransaction struct { Redis common.MapStr - Request_raw string - Response_raw string + RequestRaw string + ResponseRaw string } -// Keep sorted for future command addition -var RedisCommands = map[string]struct{}{ - "APPEND": struct{}{}, - "AUTH": struct{}{}, - "BGREWRITEAOF": struct{}{}, - "BGSAVE": struct{}{}, - "BITCOUNT": struct{}{}, - "BITOP": struct{}{}, - "BITPOS": struct{}{}, - "BLPOP": struct{}{}, - "BRPOP": struct{}{}, - "BRPOPLPUSH": struct{}{}, - "CLIENT GETNAME": struct{}{}, - "CLIENT KILL": struct{}{}, - "CLIENT LIST": struct{}{}, - "CLIENT PAUSE": struct{}{}, - "CLIENT SETNAME": struct{}{}, - "CONFIG GET": struct{}{}, - "CONFIG RESETSTAT": struct{}{}, - "CONFIG REWRITE": struct{}{}, - "CONFIG SET": struct{}{}, - "DBSIZE": struct{}{}, - "DEBUG OBJECT": struct{}{}, - "DEBUG SEGFAULT": struct{}{}, - "DECR": struct{}{}, - "DECRBY": struct{}{}, - "DEL": struct{}{}, - "DISCARD": struct{}{}, - "DUMP": struct{}{}, - "ECHO": struct{}{}, - "EVAL": struct{}{}, - "EVALSHA": struct{}{}, - "EXEC": struct{}{}, - "EXISTS": struct{}{}, - "EXPIRE": struct{}{}, - "EXPIREAT": struct{}{}, - "FLUSHALL": struct{}{}, - "FLUSHDB": struct{}{}, - "GET": struct{}{}, - "GETBIT": struct{}{}, - "GETRANGE": struct{}{}, - "GETSET": struct{}{}, - "HDEL": struct{}{}, - "HEXISTS": struct{}{}, - "HGET": struct{}{}, - "HGETALL": struct{}{}, - "HINCRBY": struct{}{}, - "HINCRBYFLOAT": struct{}{}, - "HKEYS": struct{}{}, - "HLEN": struct{}{}, - "HMGET": struct{}{}, - "HMSET": struct{}{}, - "HSCAN": struct{}{}, - "HSET": struct{}{}, - "HSETINX": struct{}{}, - "HVALS": struct{}{}, - "INCR": struct{}{}, - "INCRBY": struct{}{}, - "INCRBYFLOAT": struct{}{}, - "INFO": struct{}{}, - "KEYS": struct{}{}, - "LASTSAVE": struct{}{}, - "LINDEX": struct{}{}, - "LINSERT": struct{}{}, - "LLEN": struct{}{}, - "LPOP": struct{}{}, - "LPUSH": struct{}{}, - "LPUSHX": struct{}{}, - "LRANGE": struct{}{}, - "LREM": struct{}{}, - "LSET": struct{}{}, - "LTRIM": struct{}{}, - "MGET": struct{}{}, - "MIGRATE": struct{}{}, - "MONITOR": struct{}{}, - "MOVE": struct{}{}, - "MSET": struct{}{}, - "MSETNX": struct{}{}, - "MULTI": struct{}{}, - "OBJECT": struct{}{}, - "PERSIST": struct{}{}, - "PEXPIRE": struct{}{}, - "PEXPIREAT": struct{}{}, - "PFADD": struct{}{}, - "PFCOUNT": struct{}{}, - "PFMERGE": struct{}{}, - "PING": struct{}{}, - "PSETEX": struct{}{}, - "PSUBSCRIBE": struct{}{}, - "PTTL": struct{}{}, - "PUBLISH": struct{}{}, - "PUBSUB": struct{}{}, - "PUNSUBSCRIBE": struct{}{}, - "QUIT": struct{}{}, - "RANDOMKEY": struct{}{}, - "RENAME": struct{}{}, - "RENAMENX": struct{}{}, - "RESTORE": struct{}{}, - "RPOP": struct{}{}, - "RPOPLPUSH": struct{}{}, - "RPUSH": struct{}{}, - "RPUSHX": struct{}{}, - "SADD": struct{}{}, - "SAVE": struct{}{}, - "SCAN": struct{}{}, - "SCARD": struct{}{}, - "SCRIPT EXISTS": struct{}{}, - "SCRIPT FLUSH": struct{}{}, - "SCRIPT KILL": struct{}{}, - "SCRIPT LOAD": struct{}{}, - "SDIFF": struct{}{}, - "SDIFFSTORE": struct{}{}, - "SELECT": struct{}{}, - "SET": struct{}{}, - "SETBIT": struct{}{}, - "SETEX": struct{}{}, - "SETNX": struct{}{}, - "SETRANGE": struct{}{}, - "SHUTDOWN": struct{}{}, - "SINTER": struct{}{}, - "SINTERSTORE": struct{}{}, - "SISMEMBER": struct{}{}, - "SLAVEOF": struct{}{}, - "SLOWLOG": struct{}{}, - "SMEMBERS": struct{}{}, - "SMOVE": struct{}{}, - "SORT": struct{}{}, - "SPOP": struct{}{}, - "SRANDMEMBER": struct{}{}, - "SREM": struct{}{}, - "SSCAN": struct{}{}, - "STRLEN": struct{}{}, - "SUBSCRIBE": struct{}{}, - "SUNION": struct{}{}, - "SUNIONSTORE": struct{}{}, - "SYNC": struct{}{}, - "TIME": struct{}{}, - "TTL": struct{}{}, - "TYPE": struct{}{}, - "UNSUBSCRIBE": struct{}{}, - "UNWATCH": struct{}{}, - "WATCH": struct{}{}, - "ZADD": struct{}{}, - "ZCARD": struct{}{}, - "ZCOUNT": struct{}{}, - "ZINCRBY": struct{}{}, - "ZINTERSTORE": struct{}{}, - "ZRANGE": struct{}{}, - "ZRANGEBYSCORE": struct{}{}, - "ZRANK": struct{}{}, - "ZREM": struct{}{}, - "ZREMRANGEBYLEX": struct{}{}, - "ZREMRANGEBYRANK": struct{}{}, - "ZREMRANGEBYSCORE": struct{}{}, - "ZREVRANGE": struct{}{}, - "ZREVRANGEBYSCORE": struct{}{}, - "ZREVRANK": struct{}{}, - "ZSCAN": struct{}{}, - "ZSCORE": struct{}{}, - "ZUNIONSTORE": struct{}{}, +type redisConnectionData struct { + Streams [2]*stream + requests messageList + responses messageList } +type messageList struct { + head, tail *redisMessage +} + +// Redis protocol plugin type Redis struct { // config - Ports []int - Send_request bool - Send_response bool + Ports []int + SendRequest bool + SendResponse bool - transactions *common.Cache transactionTimeout time.Duration results publisher.Client } -func (redis *Redis) getTransaction(k common.HashableTcpTuple) *RedisTransaction { - v := redis.transactions.Get(k) - if v != nil { - return v.(*RedisTransaction) - } - return nil -} +var debug = logp.MakeDebug("redis") func (redis *Redis) InitDefaults() { - redis.Send_request = false - redis.Send_response = false + redis.SendRequest = false + redis.SendResponse = false redis.transactionTimeout = protos.DefaultTransactionExpiration } func (redis *Redis) setFromConfig(config config.Redis) error { - redis.Ports = config.Ports if config.SendRequest != nil { - redis.Send_request = *config.SendRequest + redis.SendRequest = *config.SendRequest } if config.SendResponse != nil { - redis.Send_response = *config.SendResponse + redis.SendResponse = *config.SendResponse } if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 { redis.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second @@ -291,388 +99,204 @@ func (redis *Redis) Init(test_mode bool, results publisher.Client) error { redis.setFromConfig(config.ConfigSingleton.Protocols.Redis) } - redis.transactions = common.NewCache( - redis.transactionTimeout, - protos.DefaultTransactionHashSize) - redis.transactions.StartJanitor(redis.transactionTimeout) redis.results = results return nil } -func (stream *RedisStream) PrepareForNewMessage() { - stream.data = stream.data[stream.parseOffset:] - stream.parseOffset = 0 - stream.message = nil - stream.message.Bulks = []string{} +func (s *stream) PrepareForNewMessage() { + parser := &s.parser + s.Stream.Reset() + parser.reset() } -func redisMessageParser(s *RedisStream) (bool, bool) { - - var err error - var value string - m := s.message - - iserror := false - - for s.parseOffset < len(s.data) { - - if s.data[s.parseOffset] == '*' { - //Arrays - - m.parseState = BULK_ARRAY - m.start = s.parseOffset - logp.Debug("redis", "start %d", m.start) - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") - return true, false - } - logp.Debug("redis", "line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - //Null array - s.parseOffset = off - value = "nil" - } else if len(line) == 2 && line[1] == '0' { - // Empty array - s.parseOffset = off - value = "[]" - } else { - - m.NumberOfBulks, err = strconv.ParseInt(line[1:], 10, 64) - - if err != nil { - logp.Err("Failed to read number of bulk messages: %s", err) - return false, false - } - s.parseOffset = off - m.Bulks = []string{} - - continue - } - - } else if s.data[s.parseOffset] == '$' { - // Bulk Strings - if m.parseState == START { - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - starting_offset := s.parseOffset - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - logp.Debug("redis", "line %s: %d", line, off) - - if len(line) == 3 && line[1] == '-' && line[2] == '1' { - // NULL Bulk Reply - value = "nil" - s.parseOffset = off - } else { - length, err := strconv.ParseInt(line[1:], 10, 64) - if err != nil { - logp.Err("Failed to read bulk message: %s", err) - return false, false - } - - s.parseOffset = off - - found, line, off = readLine(s.data, s.parseOffset) - if !found { - logp.Debug("redis", "End of line not found, waiting for more data") - s.parseOffset = starting_offset - return true, false - } - logp.Debug("redis", "line %s: %d", line, off) - - if int64(len(line)) != length { - logp.Err("Wrong length of data: %d instead of %d", len(line), length) - return false, false - } - value = line - s.parseOffset = off - } - - } else if s.data[s.parseOffset] == ':' { - // Integers - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - n, err := strconv.ParseInt(line[1:], 10, 64) - - if err != nil { - logp.Err("Failed to read integer reply: %s", err) - return false, false - } - value = strconv.Itoa(int(n)) - s.parseOffset = off - - } else if s.data[s.parseOffset] == '+' { - // Simple Strings - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - - value = line[1:] - s.parseOffset = off - } else if s.data[s.parseOffset] == '-' { - // Errors - if m.parseState == START { - // it's not in a bulk message - m.parseState = SIMPLE_MESSAGE - m.start = s.parseOffset - } - found, line, off := readLine(s.data, s.parseOffset) - if !found { - return true, false - } - iserror = true - - value = line[1:] - s.parseOffset = off - } else { - logp.Debug("redis", "Unexpected message starting with %s", s.data[s.parseOffset:]) - return false, false - } - - // add value - if m.NumberOfBulks > 0 { - m.NumberOfBulks = m.NumberOfBulks - 1 - m.Bulks = append(m.Bulks, value) - - if len(m.Bulks) == 1 { - logp.Debug("redis", "Value: %s", value) - // first word. - // check if it's a command - if isRedisCommand(value) { - logp.Debug("redis", "is request") - m.IsRequest = true - m.Method = value - } - } - - if len(m.Bulks) == 2 { - // second word. This is usually the path - if m.IsRequest { - m.Path = value - } - } - - if m.NumberOfBulks == 0 { - // the last bulk received - if m.IsRequest { - m.Message = strings.Join(m.Bulks, " ") - } else { - m.Message = "[" + strings.Join(m.Bulks, ", ") + "]" - } - m.end = s.parseOffset - m.Size = m.end - m.start - return true, true - } - } else { - m.Message = value - m.end = s.parseOffset - m.Size = m.end - m.start - if iserror { - m.IsError = true - } - return true, true - } +func (redis *Redis) ConnectionTimeout() time.Duration { + return redis.transactionTimeout +} - } //end for +func (redis *Redis) Parse( + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, + private protos.ProtocolData, +) protos.ProtocolData { + defer logp.Recover("ParseRedis exception") - return true, false + conn := ensureRedisConnection(private) + conn = redis.doParse(conn, pkt, tcptuple, dir) + if conn == nil { + return nil + } + return conn } -func readLine(data []byte, offset int) (bool, string, int) { - q := bytes.Index(data[offset:], []byte("\r\n")) - if q == -1 { - return false, "", 0 +func ensureRedisConnection(private protos.ProtocolData) *redisConnectionData { + if private == nil { + return &redisConnectionData{} } - return true, string(data[offset : offset+q]), offset + q + 2 -} -type redisPrivateData struct { - Data [2]*RedisStream -} + priv, ok := private.(*redisConnectionData) + if !ok { + logp.Warn("redis connection data type error, create new one") + return &redisConnectionData{} + } + if priv == nil { + logp.Warn("Unexpected: redis connection data not set, create new one") + return &redisConnectionData{} + } -func (redis *Redis) ConnectionTimeout() time.Duration { - return redis.transactionTimeout + return priv } -func (redis *Redis) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, - private protos.ProtocolData) protos.ProtocolData { - - defer logp.Recover("ParseRedis exception") - - priv := redisPrivateData{} - if private != nil { - var ok bool - priv, ok = private.(redisPrivateData) - if !ok { - priv = redisPrivateData{} - } +func (redis *Redis) doParse( + conn *redisConnectionData, + pkt *protos.Packet, + tcptuple *common.TcpTuple, + dir uint8, +) *redisConnectionData { + + st := conn.Streams[dir] + if st == nil { + st = newStream(pkt.Ts, tcptuple) + conn.Streams[dir] = st + debug("new stream: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) } - if priv.Data[dir] == nil { - priv.Data[dir] = &RedisStream{ - tcptuple: tcptuple, - data: pkt.Payload, - message: &RedisMessage{Ts: pkt.Ts}, - } - } else { - // concatenate bytes - priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...) - if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM { - logp.Debug("redis", "Stream data too large, dropping TCP stream") - priv.Data[dir] = nil - return priv - } + if err := st.Append(pkt.Payload); err != nil { + debug("%v, dropping TCP stream: ", err) + return nil } + debug("stream add data: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload)) - stream := priv.Data[dir] - for len(stream.data) > 0 { - if stream.message == nil { - stream.message = &RedisMessage{Ts: pkt.Ts} + for st.Buf.Len() > 0 { + if st.parser.message == nil { + st.parser.message = newMessage(pkt.Ts) } - ok, complete := redisMessageParser(priv.Data[dir]) - + ok, complete := st.parser.parse(&st.Buf) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it - priv.Data[dir] = nil - logp.Debug("redis", "Ignore Redis message. Drop tcp stream. Try parsing with the next segment") - return priv + conn.Streams[dir] = nil + debug("Ignore Redis message. Drop tcp stream. Try parsing with the next segment") + return conn } - if complete { - - if stream.message.IsRequest { - logp.Debug("redis", "REDIS request message: %s", stream.message.Message) - } else { - logp.Debug("redis", "REDIS response message: %s", stream.message.Message) - } - - // all ok, go to next level - redis.handleRedis(stream.message, tcptuple, dir) - - // and reset message - stream.PrepareForNewMessage() - } else { + if !complete { // wait for more data break } + + msg := st.parser.message + if msg.IsRequest { + debug("REDIS (%p) request message: %s", conn, msg.Message) + } else { + debug("REDIS (%p) response message: %s", conn, msg.Message) + } + + // all ok, go to next level and reset stream for new message + redis.handleRedis(conn, msg, tcptuple, dir) + st.PrepareForNewMessage() } - return priv + return conn } -func isRedisCommand(key string) bool { - _, exists := RedisCommands[strings.ToUpper(key)] - return exists +func newStream(ts time.Time, tcptuple *common.TcpTuple) *stream { + s := &stream{ + tcptuple: tcptuple, + } + s.parser.message = newMessage(ts) + s.Stream.Init(tcp.TCP_MAX_DATA_IN_STREAM) + return s } -func (redis *Redis) handleRedis(m *RedisMessage, tcptuple *common.TcpTuple, - dir uint8) { +func newMessage(ts time.Time) *redisMessage { + return &redisMessage{Ts: ts} +} +func (redis *Redis) handleRedis( + conn *redisConnectionData, + m *redisMessage, + tcptuple *common.TcpTuple, + dir uint8, +) { m.TcpTuple = *tcptuple m.Direction = dir m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IpPort()) if m.IsRequest { - redis.receivedRedisRequest(m) + conn.requests.append(m) // wait for response } else { - redis.receivedRedisResponse(m) + conn.responses.append(m) + redis.correlate(conn) } } -func (redis *Redis) receivedRedisRequest(msg *RedisMessage) { - tuple := msg.TcpTuple - trans := redis.getTransaction(tuple.Hashable()) - if trans != nil { - if trans.Redis != nil { - logp.Warn("Two requests without a Response. Dropping old request") +func (redis *Redis) correlate(conn *redisConnectionData) { + // drop responses with missing requests + if conn.requests.empty() { + for !conn.responses.empty() { + logp.Warn("Response from unknown transaction. Ignoring") + conn.responses.pop() } - } else { - trans = &RedisTransaction{Type: "redis", tuple: tuple} - redis.transactions.Put(tuple.Hashable(), trans) + return + } + + // merge requests with responses into transactions + for !conn.responses.empty() && !conn.requests.empty() { + requ := conn.requests.pop() + resp := conn.responses.pop() + trans := newTransaction(requ, resp) + + debug("REDIS (%p) transaction completed: %s", conn, trans.Redis) + redis.publishTransaction(trans) } +} + +func newTransaction(requ, resp *redisMessage) *transaction { + trans := &transaction{Type: "redis", tuple: requ.TcpTuple} + // init from request trans.Redis = common.MapStr{} - trans.Method = msg.Method - trans.Path = msg.Path - trans.Query = msg.Message - trans.Request_raw = msg.Message - trans.BytesIn = msg.Size - - trans.cmdline = msg.CmdlineTuple - trans.ts = msg.Ts + trans.Method = requ.Method + trans.Path = requ.Path + trans.Query = requ.Message + trans.RequestRaw = requ.Message + trans.BytesIn = requ.Size + + trans.cmdline = requ.CmdlineTuple + trans.ts = requ.Ts trans.Ts = int64(trans.ts.UnixNano() / 1000) // transactions have microseconds resolution - trans.JsTs = msg.Ts + trans.JsTs = requ.Ts trans.Src = common.Endpoint{ - Ip: msg.TcpTuple.Src_ip.String(), - Port: msg.TcpTuple.Src_port, - Proc: string(msg.CmdlineTuple.Src), + Ip: requ.TcpTuple.Src_ip.String(), + Port: requ.TcpTuple.Src_port, + Proc: string(requ.CmdlineTuple.Src), } trans.Dst = common.Endpoint{ - Ip: msg.TcpTuple.Dst_ip.String(), - Port: msg.TcpTuple.Dst_port, - Proc: string(msg.CmdlineTuple.Dst), + Ip: requ.TcpTuple.Dst_ip.String(), + Port: requ.TcpTuple.Dst_port, + Proc: string(requ.CmdlineTuple.Dst), } - if msg.Direction == tcp.TcpDirectionReverse { + if requ.Direction == tcp.TcpDirectionReverse { trans.Src, trans.Dst = trans.Dst, trans.Src } -} - -func (redis *Redis) receivedRedisResponse(msg *RedisMessage) { - tuple := msg.TcpTuple - trans := redis.getTransaction(tuple.Hashable()) - if trans == nil { - logp.Warn("Response from unknown transaction. Ignoring.") - return - } - // check if the request was received - if trans.Redis == nil { - logp.Warn("Response from unknown transaction. Ignoring.") - return - - } - trans.IsError = msg.IsError - if msg.IsError { - trans.Redis["error"] = msg.Message + // init from response + trans.IsError = resp.IsError + if resp.IsError { + trans.Redis["error"] = resp.Message } else { - trans.Redis["return_value"] = msg.Message + trans.Redis["return_value"] = resp.Message } - trans.BytesOut = msg.Size - trans.Response_raw = msg.Message + trans.BytesOut = resp.Size + trans.ResponseRaw = resp.Message - trans.ResponseTime = int32(msg.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds + trans.ResponseTime = int32(resp.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds - redis.publishTransaction(trans) - redis.transactions.Delete(trans.tuple.Hashable()) - - logp.Debug("redis", "Redis transaction completed: %s", trans.Redis) + return trans } func (redis *Redis) GapInStream(tcptuple *common.TcpTuple, dir uint8, @@ -692,8 +316,7 @@ func (redis *Redis) ReceivedFin(tcptuple *common.TcpTuple, dir uint8, return private } -func (redis *Redis) publishTransaction(t *RedisTransaction) { - +func (redis *Redis) publishTransaction(t *transaction) { if redis.results == nil { return } @@ -706,11 +329,11 @@ func (redis *Redis) publishTransaction(t *RedisTransaction) { event["status"] = common.ERROR_STATUS } event["responsetime"] = t.ResponseTime - if redis.Send_request { - event["request"] = t.Request_raw + if redis.SendRequest { + event["request"] = t.RequestRaw } - if redis.Send_response { - event["response"] = t.Response_raw + if redis.SendResponse { + event["response"] = t.ResponseRaw } event["redis"] = common.MapStr(t.Redis) event["method"] = strings.ToUpper(t.Method) @@ -725,3 +348,35 @@ func (redis *Redis) publishTransaction(t *RedisTransaction) { redis.results.PublishEvent(event) } + +func (ml *messageList) append(msg *redisMessage) { + if ml.tail == nil { + ml.head = msg + } else { + ml.tail.next = msg + } + msg.next = nil + ml.tail = msg +} + +func (ml *messageList) empty() bool { + return ml.head == nil +} + +func (ml *messageList) pop() *redisMessage { + if ml.head == nil { + return nil + } + + msg := ml.head + ml.head = ml.head.next + if ml.head == nil { + ml.tail = nil + } + debug("new head=%p", ml.head) + return msg +} + +func (ml *messageList) last() *redisMessage { + return ml.tail +} diff --git a/protos/redis/redis_parse.go b/protos/redis/redis_parse.go new file mode 100644 index 000000000000..227ceed981ac --- /dev/null +++ b/protos/redis/redis_parse.go @@ -0,0 +1,369 @@ +package redis + +import ( + "strconv" + "strings" + "time" + + "github.com/elastic/libbeat/common" + "github.com/elastic/libbeat/common/streambuf" + "github.com/elastic/libbeat/logp" +) + +type parser struct { + parseOffset int + // bytesReceived int + message *redisMessage +} + +type redisMessage struct { + Ts time.Time + + TcpTuple common.TcpTuple + CmdlineTuple *common.CmdlineTuple + Direction uint8 + + IsRequest bool + IsError bool + Message string + Method string + Path string + Size int + + next *redisMessage +} + +const ( + START = iota + BULK_ARRAY + SIMPLE_MESSAGE +) + +// Keep sorted for future command addition +var redisCommands = map[string]struct{}{ + "APPEND": struct{}{}, + "AUTH": struct{}{}, + "BGREWRITEAOF": struct{}{}, + "BGSAVE": struct{}{}, + "BITCOUNT": struct{}{}, + "BITOP": struct{}{}, + "BITPOS": struct{}{}, + "BLPOP": struct{}{}, + "BRPOP": struct{}{}, + "BRPOPLPUSH": struct{}{}, + "CLIENT GETNAME": struct{}{}, + "CLIENT KILL": struct{}{}, + "CLIENT LIST": struct{}{}, + "CLIENT PAUSE": struct{}{}, + "CLIENT SETNAME": struct{}{}, + "CONFIG GET": struct{}{}, + "CONFIG RESETSTAT": struct{}{}, + "CONFIG REWRITE": struct{}{}, + "CONFIG SET": struct{}{}, + "DBSIZE": struct{}{}, + "DEBUG OBJECT": struct{}{}, + "DEBUG SEGFAULT": struct{}{}, + "DECR": struct{}{}, + "DECRBY": struct{}{}, + "DEL": struct{}{}, + "DISCARD": struct{}{}, + "DUMP": struct{}{}, + "ECHO": struct{}{}, + "EVAL": struct{}{}, + "EVALSHA": struct{}{}, + "EXEC": struct{}{}, + "EXISTS": struct{}{}, + "EXPIRE": struct{}{}, + "EXPIREAT": struct{}{}, + "FLUSHALL": struct{}{}, + "FLUSHDB": struct{}{}, + "GET": struct{}{}, + "GETBIT": struct{}{}, + "GETRANGE": struct{}{}, + "GETSET": struct{}{}, + "HDEL": struct{}{}, + "HEXISTS": struct{}{}, + "HGET": struct{}{}, + "HGETALL": struct{}{}, + "HINCRBY": struct{}{}, + "HINCRBYFLOAT": struct{}{}, + "HKEYS": struct{}{}, + "HLEN": struct{}{}, + "HMGET": struct{}{}, + "HMSET": struct{}{}, + "HSCAN": struct{}{}, + "HSET": struct{}{}, + "HSETINX": struct{}{}, + "HVALS": struct{}{}, + "INCR": struct{}{}, + "INCRBY": struct{}{}, + "INCRBYFLOAT": struct{}{}, + "INFO": struct{}{}, + "KEYS": struct{}{}, + "LASTSAVE": struct{}{}, + "LINDEX": struct{}{}, + "LINSERT": struct{}{}, + "LLEN": struct{}{}, + "LPOP": struct{}{}, + "LPUSH": struct{}{}, + "LPUSHX": struct{}{}, + "LRANGE": struct{}{}, + "LREM": struct{}{}, + "LSET": struct{}{}, + "LTRIM": struct{}{}, + "MGET": struct{}{}, + "MIGRATE": struct{}{}, + "MONITOR": struct{}{}, + "MOVE": struct{}{}, + "MSET": struct{}{}, + "MSETNX": struct{}{}, + "MULTI": struct{}{}, + "OBJECT": struct{}{}, + "PERSIST": struct{}{}, + "PEXPIRE": struct{}{}, + "PEXPIREAT": struct{}{}, + "PFADD": struct{}{}, + "PFCOUNT": struct{}{}, + "PFMERGE": struct{}{}, + "PING": struct{}{}, + "PSETEX": struct{}{}, + "PSUBSCRIBE": struct{}{}, + "PTTL": struct{}{}, + "PUBLISH": struct{}{}, + "PUBSUB": struct{}{}, + "PUNSUBSCRIBE": struct{}{}, + "QUIT": struct{}{}, + "RANDOMKEY": struct{}{}, + "RENAME": struct{}{}, + "RENAMENX": struct{}{}, + "RESTORE": struct{}{}, + "RPOP": struct{}{}, + "RPOPLPUSH": struct{}{}, + "RPUSH": struct{}{}, + "RPUSHX": struct{}{}, + "SADD": struct{}{}, + "SAVE": struct{}{}, + "SCAN": struct{}{}, + "SCARD": struct{}{}, + "SCRIPT EXISTS": struct{}{}, + "SCRIPT FLUSH": struct{}{}, + "SCRIPT KILL": struct{}{}, + "SCRIPT LOAD": struct{}{}, + "SDIFF": struct{}{}, + "SDIFFSTORE": struct{}{}, + "SELECT": struct{}{}, + "SET": struct{}{}, + "SETBIT": struct{}{}, + "SETEX": struct{}{}, + "SETNX": struct{}{}, + "SETRANGE": struct{}{}, + "SHUTDOWN": struct{}{}, + "SINTER": struct{}{}, + "SINTERSTORE": struct{}{}, + "SISMEMBER": struct{}{}, + "SLAVEOF": struct{}{}, + "SLOWLOG": struct{}{}, + "SMEMBERS": struct{}{}, + "SMOVE": struct{}{}, + "SORT": struct{}{}, + "SPOP": struct{}{}, + "SRANDMEMBER": struct{}{}, + "SREM": struct{}{}, + "SSCAN": struct{}{}, + "STRLEN": struct{}{}, + "SUBSCRIBE": struct{}{}, + "SUNION": struct{}{}, + "SUNIONSTORE": struct{}{}, + "SYNC": struct{}{}, + "TIME": struct{}{}, + "TTL": struct{}{}, + "TYPE": struct{}{}, + "UNSUBSCRIBE": struct{}{}, + "UNWATCH": struct{}{}, + "WATCH": struct{}{}, + "ZADD": struct{}{}, + "ZCARD": struct{}{}, + "ZCOUNT": struct{}{}, + "ZINCRBY": struct{}{}, + "ZINTERSTORE": struct{}{}, + "ZRANGE": struct{}{}, + "ZRANGEBYSCORE": struct{}{}, + "ZRANK": struct{}{}, + "ZREM": struct{}{}, + "ZREMRANGEBYLEX": struct{}{}, + "ZREMRANGEBYRANK": struct{}{}, + "ZREMRANGEBYSCORE": struct{}{}, + "ZREVRANGE": struct{}{}, + "ZREVRANGEBYSCORE": struct{}{}, + "ZREVRANK": struct{}{}, + "ZSCAN": struct{}{}, + "ZSCORE": struct{}{}, + "ZUNIONSTORE": struct{}{}, +} + +func isRedisCommand(key string) bool { + _, exists := redisCommands[strings.ToUpper(key)] + return exists +} + +func (p *parser) reset() { + p.parseOffset = 0 + p.message = nil +} + +func (parser *parser) parse(buf *streambuf.Buffer) (bool, bool) { + snapshot := buf.Snapshot() + + content, iserror, ok, complete := parser.dispatch(0, buf) + if !ok || !complete { + // on error or incomplete message drop all parsing progress, due to + // parse not being statefull among multiple calls + // => parser needs to restart parsing all content + buf.Restore(snapshot) + return ok, complete + } + + parser.message.IsError = iserror + parser.message.Size = buf.BufferConsumed() + parser.message.Message = content + return true, true +} + +func (p *parser) dispatch(depth int, buf *streambuf.Buffer) (string, bool, bool, bool) { + if buf.Len() == 0 { + return "", false, true, false + } + + var value string + var iserror, ok, complete bool + snapshot := buf.Snapshot() + + switch buf.Bytes()[0] { + case '*': + value, iserror, ok, complete = p.parseArray(depth, buf) + case '$': + value, ok, complete = p.parseString(buf) + case ':': + value, ok, complete = p.parseInt(buf) + case '+': + value, ok, complete = p.parseSimpleString(buf) + case '-': + iserror = true + value, ok, complete = p.parseSimpleString(buf) + default: + debug("Unexpected message starting with %s", buf.Bytes()[0]) + return "", false, false, false + } + + if !ok || !complete { + buf.Restore(snapshot) + } + return value, iserror, ok, complete +} + +func (p *parser) parseInt(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + number := string(line[1:]) + if _, err := strconv.ParseInt(number, 10, 64); err != nil { + logp.Err("Failed to read integer reply: %s", err) + } + + return number, true, true +} + +func (p *parser) parseSimpleString(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + return string(line[1:]), true, true +} + +func (p *parser) parseString(buf *streambuf.Buffer) (string, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + return "", true, false + } + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + return "nil", true, true + } + + length, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + logp.Err("Failed to read bulk message: %s", err) + return "", false, false + } + + content, err := buf.CollectWithSuffix(int(length), []byte("\r\n")) + if err != nil { + if err != streambuf.ErrNoMoreBytes { + return "", false, false + } + return "", true, false + } + + return string(content), true, true +} + +func (p *parser) parseArray(depth int, buf *streambuf.Buffer) (string, bool, bool, bool) { + line, err := buf.UntilCRLF() + if err != nil { + debug("End of line not found, waiting for more data") + return "", false, false, false + } + debug("line %s: %d", line, buf.BufferConsumed()) + + if len(line) == 3 && line[1] == '-' && line[2] == '1' { + return "nil", false, true, true + } + + if len(line) == 2 && line[1] == '0' { + return "[]", false, true, true + } + + count, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + logp.Err("Failed to read number of bulk messages: %s", err) + return "", false, false, false + } + if count < 0 { + return "nil", false, true, true + } + + content := make([]string, 0, count) + // read sub elements + + iserror := false + for i := 0; i < int(count); i++ { + var value string + var ok, complete bool + + value, iserror, ok, complete := p.dispatch(depth+1, buf) + if !ok || !complete { + debug("Array incomplete") + return "", iserror, ok, complete + } + + content = append(content, value) + } + + if depth == 0 && isRedisCommand(content[0]) { // we've got a request + p.message.IsRequest = true + p.message.Method = content[0] + p.message.Path = content[1] + } + + var value string + if depth == 0 && p.message.IsRequest { + value = strings.Join(content, " ") + } else { + value = "[" + strings.Join(content, ", ") + "]" + } + return value, iserror, true, true +} diff --git a/protos/redis/redis_test.go b/protos/redis/redis_test.go index d95a6ac9267f..587016426e12 100644 --- a/protos/redis/redis_test.go +++ b/protos/redis/redis_test.go @@ -1,9 +1,25 @@ package redis -import "testing" +import ( + "testing" + "time" -func TestRedisParser_ArrayRequest(t *testing.T) { + "github.com/stretchr/testify/assert" +) + +func newTestStream(content []byte) *stream { + st := newStream(time.Now(), nil) + st.Append(content) + return st +} + +func parse(content []byte) (*redisMessage, bool, bool) { + st := newTestStream(content) + ok, complete := st.parser.parse(&st.Buf) + return st.parser.message, ok, complete +} +func TestRedisParser_ArrayRequest(t *testing.T) { message := []byte("*3\r\n" + "$3\r\n" + "SET\r\n" + @@ -11,30 +27,16 @@ func TestRedisParser_ArrayRequest(t *testing.T) { "key1\r\n" + "$5\r\n" + "Hello\r\n") + msg, ok, complete := parse(message) - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if !stream.message.IsRequest { - t.Errorf("Failed to parse Redis request") - } - if stream.message.Message != "SET key1 Hello" { - t.Errorf("Failed to parse Redis request: %s", stream.message.Message) - } - if stream.message.Size != 34 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.True(t, msg.IsRequest) + assert.Equal(t, "SET key1 Hello", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_ArrayResponse(t *testing.T) { - message := []byte("*4\r\n" + "$3\r\n" + "foo\r\n" + @@ -42,124 +44,120 @@ func TestRedisParser_ArrayResponse(t *testing.T) { "$3\r\n" + "bar\r\n" + ":23\r\n") + msg, ok, complete := parse(message) - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if stream.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if stream.message.Message != "[foo, nil, bar, 23]" { - t.Errorf("Failed to parse Redis request: %s", stream.message.Message) - } - if stream.message.Size != 32 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[foo, nil, bar, 23]", msg.Message) + assert.Equal(t, len(message), msg.Size) } -func TestRedisParser_SimpleString(t *testing.T) { +func TestRedisParser_ArrayNested(t *testing.T) { + message := []byte("*3\r\n" + + "*-1\r\n" + + "+foo\r\n" + + "*2\r\n" + + ":1\r\n" + + "+bar\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[nil, foo, [1, bar]]", msg.Message) + assert.Equal(t, len(message), msg.Size) +} +func TestRedisParser_SimpleString(t *testing.T) { message := []byte("+OK\r\n") + msg, ok, complete := parse(message) - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if stream.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if stream.message.Message != "OK" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) - } - if stream.message.Size != 5 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "OK", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_NilString(t *testing.T) { - message := []byte("$-1\r\n") + msg, ok, complete := parse(message) - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if stream.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if stream.message.Message != "nil" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) - } - if stream.message.Size != 5 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "nil", msg.Message) + assert.Equal(t, len(message), msg.Size) } func TestRedisParser_EmptyString(t *testing.T) { - message := []byte("$0\r\n\r\n") + msg, ok, complete := parse(message) - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if stream.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if stream.message.Message != "" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) - } - if stream.message.Size != 6 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "", msg.Message) + assert.Equal(t, len(message), msg.Size) } -func TestRedisParser_EmptyArray(t *testing.T) { +func TestRedisParser_LenString(t *testing.T) { + message := []byte("$5\r\n" + + "12345\r\n") + msg, ok, complete := parse(message) + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "12345", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_LenStringWithCRLF(t *testing.T) { + message := []byte("$7\r\n" + + "123\r\n45\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "123\r\n45", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_EmptyArray(t *testing.T) { message := []byte("*0\r\n") + msg, ok, complete := parse(message) + + assert.True(t, ok) + assert.True(t, complete) + assert.False(t, msg.IsRequest) + assert.Equal(t, "[]", msg.Message) + assert.Equal(t, len(message), msg.Size) +} + +func TestRedisParser_Array2Passes(t *testing.T) { + part1 := []byte("*3\r\n" + + "$3\r\n" + + "SET\r\n" + + "$4\r\n") + part2 := []byte("key1\r\n" + + "$5\r\n" + + "Hello\r\n") + + st := newTestStream(part1) + ok, complete := st.parser.parse(&st.Buf) + assert.True(t, ok) + assert.False(t, complete) + + st.Stream.Append(part2) + ok, complete = st.parser.parse(&st.Buf) + msg := st.parser.message - stream := &RedisStream{data: message, message: new(RedisMessage)} - - ok, complete := redisMessageParser(stream) - - if !ok { - t.Errorf("Parsing returned error") - } - if !complete { - t.Errorf("Expecting a complete message") - } - if stream.message.IsRequest { - t.Errorf("Failed to parse Redis response") - } - if stream.message.Message != "[]" { - t.Errorf("Failed to parse Redis response: %s", stream.message.Message) - } - if stream.message.Size != 4 { - t.Errorf("Wrong message size %d", stream.message.Size) - } + assert.True(t, ok) + assert.True(t, complete) + assert.True(t, msg.IsRequest) + assert.Equal(t, "SET key1 Hello", msg.Message) + assert.Equal(t, len(part1)+len(part2), msg.Size) }