From d472fe28b833eb645ba23913b3e731ee638d2650 Mon Sep 17 00:00:00 2001 From: zheng Date: Thu, 1 Dec 2022 18:05:29 +0800 Subject: [PATCH 1/4] Add payload for all protocols (#375) Signed-off-by: huxiangyuan --- CHANGELOG.md | 2 +- .../analyzer/network/network_analyzer.go | 23 ++++- .../analyzer/network/network_analyzer_test.go | 14 ++- .../network/protocol/dubbo/dubbo_parser.go | 24 ----- .../network/protocol/dubbo/dubbo_request.go | 1 - .../network/protocol/dubbo/dubbo_response.go | 1 - .../network/protocol/http/http_parser_test.go | 92 ------------------- .../network/protocol/http/http_request.go | 15 +-- .../network/protocol/http/http_response.go | 6 +- .../analyzer/network/protocol/protocol.go | 14 +-- .../network/protocol/protocol_parser.go | 22 +++++ .../network/protocol/redis/redis_request.go | 2 - .../network/protocol/redis/redis_response.go | 2 - .../testdata/dns/server-trace-multi.yml | 6 +- .../consumer-trace-fetch-multi-topics.yml | 2 + .../kafka/consumer-trace-fetch-split.yml | 2 + .../kafka/provider-trace-produce-split.yml | 2 + .../mysql/server-trace-query-split.yml | 2 + .../testdata/mysql/server-trace-query.yml | 2 + .../protocol/testdata/na-protocol-config.yaml | 11 ++- .../testdata/rocketmq/server-trace-error.yml | 2 + .../testdata/rocketmq/server-trace-json.yml | 2 + .../rocketmq/server-trace-rocketmq.yml | 2 + .../pkg/component/analyzer/tools/ascii.go | 27 ++++++ .../exporter/tools/adapter/net_dict.go | 20 +++- collector/pkg/model/constlabels/const.go | 3 + 26 files changed, 148 insertions(+), 153 deletions(-) create mode 100644 collector/pkg/component/analyzer/tools/ascii.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 796a8a93c..0eee52d26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ ### Enhancements -- +- Add payload for all protocols.([#375](https://github.com/KindlingProject/kindling/pull/375)) - - diff --git a/collector/pkg/component/analyzer/network/network_analyzer.go b/collector/pkg/component/analyzer/network/network_analyzer.go index 2d1616311..de0489d17 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer.go +++ b/collector/pkg/component/analyzer/network/network_analyzer.go @@ -406,7 +406,6 @@ func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.Pro // Parse failure return nil } - requestMsg.AddByteArrayUtf8Attribute(constlabels.RequestPayload, mps.requests.getData()) if mps.responses == nil { return na.getRecords(mps, parser.GetProtocol(), requestMsg.GetAttributes()) @@ -417,8 +416,6 @@ func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.Pro // Parse failure return nil } - responseMsg.AddByteArrayUtf8Attribute(constlabels.ResponsePayload, mps.responses.getData()) - return na.getRecords(mps, parser.GetProtocol(), responseMsg.GetAttributes()) } @@ -537,6 +534,13 @@ func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attrib labels.UpdateAddStringValue(constlabels.Protocol, protocol) labels.Merge(attributes) + + if mps.responses == nil { + addProtocolPayload(protocol, labels, mps.requests.getData(), nil) + } else { + addProtocolPayload(protocol, labels, mps.requests.getData(), mps.responses.getData()) + } + // If no protocol error found, we check other errors if !labels.GetBoolValue(constlabels.IsError) && mps.responses == nil { labels.AddBoolValue(constlabels.IsError, true) @@ -586,6 +590,12 @@ func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messag labels.UpdateAddStringValue(constlabels.Protocol, protocol) labels.Merge(attributes) + if mp.response == nil { + addProtocolPayload(protocol, labels, evt.GetData(), nil) + } else { + addProtocolPayload(protocol, labels, evt.GetData(), mp.response.GetData()) + } + // If no protocol error found, we check other errors if !labels.GetBoolValue(constlabels.IsError) && mps.responses == nil { labels.AddBoolValue(constlabels.IsError, true) @@ -609,6 +619,13 @@ func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messag return ret } +func addProtocolPayload(protocolName string, labels *model.AttributeMap, request []byte, response []byte) { + labels.UpdateAddStringValue(constlabels.RequestPayload, protocol.GetPayloadString(request, protocolName)) + if response != nil { + labels.UpdateAddStringValue(constlabels.ResponsePayload, protocol.GetPayloadString(response, protocolName)) + } +} + func (na *NetworkAnalyzer) isSlow(duration uint64, protocol string) bool { return int64(duration) >= int64(na.getResponseSlowThreshold(protocol))*int64(time.Millisecond) } diff --git a/collector/pkg/component/analyzer/network/network_analyzer_test.go b/collector/pkg/component/analyzer/network/network_analyzer_test.go index d14dc5a97..8151db3e3 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer_test.go +++ b/collector/pkg/component/analyzer/network/network_analyzer_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/Kindling-project/kindling/collector/pkg/component" + "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol/factory" "github.com/Kindling-project/kindling/collector/pkg/component/consumer" "github.com/Kindling-project/kindling/collector/pkg/model" @@ -92,6 +93,17 @@ func prepareNetworkAnalyzer() *NetworkAnalyzer { nextConsumers: []consumer.Consumer{&NopProcessor{}}, telemetry: component.NewDefaultTelemetryTools(), } + na.staticPortMap = map[uint32]string{} + for _, config := range na.cfg.ProtocolConfigs { + for _, port := range config.Ports { + na.staticPortMap[port] = config.Key + } + } + na.slowThresholdMap = map[string]int{} + for _, config := range na.cfg.ProtocolConfigs { + protocol.SetPayLoadLength(config.Key, config.PayloadLength) + na.slowThresholdMap[config.Key] = config.Threshold + } na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod)) na.Start() } @@ -298,11 +310,11 @@ func (evt *TraceEvent) exchange(common *EventCommon) *model.KindlingEvent { modelEvt := &model.KindlingEvent{ Source: model.Source(common.Source), Timestamp: evt.Timestamp, + Latency: uint64(evt.UserAttributes.Latency), Name: evt.Name, Category: model.Category(common.Category), ParamsNumber: 3, UserAttributes: [16]model.KeyValue{ - {Key: "latency", ValueType: model.ValueType_UINT64, Value: Int64ToBytes(evt.UserAttributes.Latency)}, {Key: "res", ValueType: model.ValueType_INT64, Value: Int64ToBytes(evt.UserAttributes.Res)}, {Key: "data", ValueType: model.ValueType_BYTEBUF, Value: byteData}, }, diff --git a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_parser.go b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_parser.go index 1b9347038..c49350634 100644 --- a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_parser.go +++ b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_parser.go @@ -17,10 +17,6 @@ const ( FlagTwoWay = byte(0x40) FlagEvent = byte(0x20) // for heartbeat SerialMask = 0x1f - - AsciiLow = byte(0x20) - AsciiHigh = byte(0x7e) - AsciiReplace = byte(0x2e) // . ) func NewDubboParser() *protocol.ProtocolParser { @@ -28,23 +24,3 @@ func NewDubboParser() *protocol.ProtocolParser { responseParser := protocol.CreatePkgParser(fastfailDubboResponse(), parseDubboResponse()) return protocol.NewProtocolParser(protocol.DUBBO, requestParser, responseParser, nil) } - -/** - Get the ascii readable string, replace other value to '.', like wireshark. -*/ -func getAsciiString(data []byte) string { - length := len(data) - if length == 0 { - return "" - } - - newData := make([]byte, length) - for i := 0; i < length; i++ { - if data[i] > AsciiHigh || data[i] < AsciiLow { - newData[i] = AsciiReplace - } else { - newData[i] = data[i] - } - } - return string(newData) -} diff --git a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_request.go b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_request.go index e5fe33bae..c697e64a8 100644 --- a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_request.go +++ b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_request.go @@ -19,7 +19,6 @@ func parseDubboRequest() protocol.ParsePkgFn { } message.AddStringAttribute(constlabels.ContentKey, contentKey) - message.AddStringAttribute(constlabels.RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength()))) return true, true } } diff --git a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_response.go b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_response.go index e914ff8b2..baf2a1e11 100644 --- a/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_response.go +++ b/collector/pkg/component/analyzer/network/protocol/dubbo/dubbo_response.go @@ -23,7 +23,6 @@ func parseDubboResponse() protocol.ParsePkgFn { message.AddBoolAttribute(constlabels.IsError, true) message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError)) } - message.AddStringAttribute(constlabels.ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength()))) return true, true } } diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_parser_test.go b/collector/pkg/component/analyzer/network/protocol/http/http_parser_test.go index 996574ba5..70f6f75c9 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_parser_test.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_parser_test.go @@ -5,8 +5,6 @@ import ( "testing" "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" - "github.com/Kindling-project/kindling/collector/pkg/model" - "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" ) func Test_urlMerge(t *testing.T) { @@ -83,96 +81,6 @@ func TestHttpParser_getContentKey(t *testing.T) { } } -func TestParseHttpRequest_GetPayLoad(t *testing.T) { - httpData := "POST /test?sleep=0&respbyte=1000&statusCode=200 HTTP/1.1\r\nKey1: value1\r\n\r\nHello world" - - tests := []struct { - name string - size int - want string - }{ - {name: "substring", size: 10, want: "POST /test"}, - {name: "equal", size: 85, want: httpData}, - {name: "overflow", size: 100, want: httpData}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - protocol.SetPayLoadLength(protocol.HTTP, tt.size) - message := protocol.NewRequestMessage([]byte(httpData)) - NewHttpParser("").ParseRequest(message) - - if !message.HasAttribute(constlabels.RequestPayload) { - t.Errorf("Fail to parse HttpRequest()") - } - if got := message.GetStringAttribute(constlabels.RequestPayload); got != tt.want { - t.Errorf("GetHttpPayload() = %v, want %v", got, tt.want) - } - }) - } - tests := []struct { - name string - args args - want map[string]string - }{ - { - name: "normal case", - args: args{ - message: protocol.NewRequestMessage([]byte("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nAPM-AgentID: TTXvC3EQS6KLwxx3eIqINFjAW2olRm+cr8M+yuvwhkY=\r\nTransfer-Encoding: chunked\r\nContent-Type: application/json\r\nAPM-TransactionID: 5e480579c718a4a6498a9")), - }, - want: map[string]string{ - "connection": "keep-alive", - "apm-agentid": "TTXvC3EQS6KLwxx3eIqINFjAW2olRm+cr8M+yuvwhkY=", - "transfer-encoding": "chunked", - "content-type": "application/json", - "apm-transactionid": "5e480579c718a4a6498a9", - }, - }, - { - name: "no values", - args: args{ - protocol.NewRequestMessage([]byte("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nTransfer-Encoding: ")), - }, - want: map[string]string{ - "connection": "keep-alive", - }, - }, - { - - name: "no spaces", - args: args{ - protocol.NewRequestMessage([]byte("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nTransfer-Encoding:")), - }, - want: map[string]string{ - "connection": "keep-alive", - }, - }, - { - name: "no colon", - args: args{ - protocol.NewRequestMessage([]byte("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nTransfer-Encoding")), - }, - want: map[string]string{ - "connection": "keep-alive", - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - protocol.SetPayLoadLength(protocol.HTTP, tt.size) - - message := protocol.NewResponseMessage([]byte(httpData), model.NewAttributeMap()) - NewHttpParser("").ParseResponse(message) - - if !message.HasAttribute(constlabels.ResponsePayload) { - t.Errorf("Fail to parse HttpResponse()") - } - if got := message.GetStringAttribute(constlabels.ResponsePayload); got != tt.want { - t.Errorf("GetHttpPayload() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_parseHeaders(t *testing.T) { type args struct { message *protocol.PayloadMessage diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_request.go b/collector/pkg/component/analyzer/network/protocol/http/http_request.go index 563e66de7..1485b3b39 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_request.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_request.go @@ -18,12 +18,14 @@ func fastfailHttpRequest() protocol.FastFailFn { /* Request line - Method [GET/POST/PUT/DELETE/HEAD/TRACE/OPTIONS/CONNECT] - Blank - Request-URI [eg. /xxx/yyy?parm0=aaa¶m1=bbb] - Blank - HTTP-Version [HTTP/1.0 | HTTP/1.2] - \r\n + + Method [GET/POST/PUT/DELETE/HEAD/TRACE/OPTIONS/CONNECT] + Blank + Request-URI [eg. /xxx/yyy?parm0=aaa¶m1=bbb] + Blank + HTTP-Version [HTTP/1.0 | HTTP/1.2] + \r\n + Request header Request body */ @@ -54,7 +56,6 @@ func parseHttpRequest(urlClusteringMethod urlclustering.ClusteringMethod) protoc message.AddStringAttribute(constlabels.HttpMethod, string(method)) message.AddByteArrayUtf8Attribute(constlabels.HttpUrl, url) - message.AddByteArrayUtf8Attribute(constlabels.RequestPayload, message.GetData(0, protocol.GetHttpPayLoadLength())) contentKey := urlClusteringMethod.Clustering(string(url)) if len(contentKey) == 0 { diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_response.go b/collector/pkg/component/analyzer/network/protocol/http/http_response.go index 94311bc00..5c45fe370 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_response.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_response.go @@ -9,14 +9,17 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" ) -/** +/* +* Status line + HTTP-Version[HTTP/1.0 | HTTP/1.1] Blank Status-Code Blank Reason-Phrase \r\n + Response header Response body */ @@ -57,7 +60,6 @@ func parseHttpResponse() protocol.ParsePkgFn { } message.AddIntAttribute(constlabels.HttpStatusCode, statusCodeI) - message.AddByteArrayUtf8Attribute(constlabels.ResponsePayload, message.GetData(0, protocol.GetHttpPayLoadLength())) if statusCodeI >= 400 { message.AddBoolAttribute(constlabels.IsError, true) message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError)) diff --git a/collector/pkg/component/analyzer/network/protocol/protocol.go b/collector/pkg/component/analyzer/network/protocol/protocol.go index 1e3ce94cb..823e25281 100644 --- a/collector/pkg/component/analyzer/network/protocol/protocol.go +++ b/collector/pkg/component/analyzer/network/protocol/protocol.go @@ -14,7 +14,11 @@ const ( var payloadLength map[string]int = map[string]int{} func SetPayLoadLength(protocol string, length int) { - payloadLength[protocol] = length + if length > 0 { + payloadLength[protocol] = length + } else { + payloadLength[protocol] = 200 + } } func GetPayLoadLength(protocol string) int { @@ -23,11 +27,3 @@ func GetPayLoadLength(protocol string) int { } return 200 } - -func GetHttpPayLoadLength() int { - return GetPayLoadLength(HTTP) -} - -func GetDubboPayLoadLength() int { - return GetPayLoadLength(DUBBO) -} diff --git a/collector/pkg/component/analyzer/network/protocol/protocol_parser.go b/collector/pkg/component/analyzer/network/protocol/protocol_parser.go index 043f92f92..33d4be18e 100644 --- a/collector/pkg/component/analyzer/network/protocol/protocol_parser.go +++ b/collector/pkg/component/analyzer/network/protocol/protocol_parser.go @@ -495,3 +495,25 @@ func (parser *ProtocolParser) ResetPort(port uint32) { key := strconv.Itoa(int(port)) parser.portCounter.Remove(key) } + +func GetPayloadString(data []byte, protocolName string) string { + switch protocolName { + case HTTP, REDIS: + return tools.FormatByteArrayToUtf8(getSubstrBytes(data, protocolName, 0)) + case DUBBO: + return tools.GetAsciiString(getSubstrBytes(data, protocolName, 16)) + default: + return tools.GetAsciiString(getSubstrBytes(data, protocolName, 0)) + } +} + +func getSubstrBytes(data []byte, protocolName string, offset int) []byte { + length := GetPayLoadLength(protocolName) + if offset >= length { + return data[0:0] + } + if offset+length > len(data) { + return data[offset:] + } + return data[offset : offset+length] +} diff --git a/collector/pkg/component/analyzer/network/protocol/redis/redis_request.go b/collector/pkg/component/analyzer/network/protocol/redis/redis_request.go index 699566e5a..79255ea50 100644 --- a/collector/pkg/component/analyzer/network/protocol/redis/redis_request.go +++ b/collector/pkg/component/analyzer/network/protocol/redis/redis_request.go @@ -2,7 +2,6 @@ package redis import ( "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" - "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" ) /* @@ -21,7 +20,6 @@ func fastfailRedisRequest() protocol.FastFailFn { func parseRedisRequest() protocol.ParsePkgFn { return func(message *protocol.PayloadMessage) (bool, bool) { - message.AddByteArrayUtf8Attribute(constlabels.RequestPayload, message.GetData(0, protocol.GetPayLoadLength(protocol.REDIS))) return true, false } } diff --git a/collector/pkg/component/analyzer/network/protocol/redis/redis_response.go b/collector/pkg/component/analyzer/network/protocol/redis/redis_response.go index a3ba4b76e..17eeccfd8 100644 --- a/collector/pkg/component/analyzer/network/protocol/redis/redis_response.go +++ b/collector/pkg/component/analyzer/network/protocol/redis/redis_response.go @@ -2,7 +2,6 @@ package redis import ( "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" - "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" ) /* @@ -25,7 +24,6 @@ func fastfailResponse() protocol.FastFailFn { func parseResponse() protocol.ParsePkgFn { return func(message *protocol.PayloadMessage) (bool, bool) { - message.AddByteArrayUtf8Attribute(constlabels.ResponsePayload, message.GetData(0, protocol.GetPayLoadLength(protocol.REDIS))) return true, false } } diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/dns/server-trace-multi.yml b/collector/pkg/component/analyzer/network/protocol/testdata/dns/server-trace-multi.yml index 2053bd74d..1b42e0a50 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/dns/server-trace-multi.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/dns/server-trace-multi.yml @@ -86,6 +86,8 @@ trace: dns_ip: "121.227.7.33" is_error: false error_type: 0 + request_payload: ".............ss0.baidu.com.......)........" + response_payload: ".............ss0.baidu.com..................sslbaidu.jomodns...+.......2..y..!.." - Timestamp: 100396000 Values: @@ -113,4 +115,6 @@ trace: dns_domain: "ss0.baidu.com." dns_rcode: 0 is_error: false - error_type: 0 \ No newline at end of file + error_type: 0 + request_payload: "9............ss0.baidu.com.......)........" + response_payload: "9............ss0.baidu.com..................sslbaidu.jomodns....)........" diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-multi-topics.yml b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-multi-topics.yml index a1200e81e..2e36409b4 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-multi-topics.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-multi-topics.yml @@ -57,3 +57,5 @@ trace: kafka_error_code: 0 is_error: false error_type: 0 + request_payload: "...\"..........consumer-merge-2............. .....(...=^......npm_request_trace.................tS...........................A...............npm_detail_topology_request...................:............." + response_payload: "................(....." diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-split.yml b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-split.yml index cb05dda48..83932cc61 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-split.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/consumer-trace-fetch-split.yml @@ -64,3 +64,5 @@ trace: kafka_error_code: 0 is_error: false error_type: 0 + request_payload: "...g..........rdkafka...............................container-monitor..........." + response_payload: "...S....................container-monitor......................................." diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/provider-trace-produce-split.yml b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/provider-trace-produce-split.yml index 71ac22600..fb9d27c36 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/kafka/provider-trace-produce-split.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/kafka/provider-trace-produce-split.yml @@ -63,3 +63,5 @@ trace: kafka_error_code: 0 is_error: false error_type: 0 + request_payload: "...........@..rdkafka......u0......container-monitor...........O...........C...." + response_payload: "...A...@......container-monitor.................u...................." diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query-split.yml b/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query-split.yml index 2e70d10ba..172c8a0ab 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query-split.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query-split.yml @@ -63,5 +63,7 @@ trace: protocol: "mysql" content_key: "select dummy *" sql: "SELECT * FROM dummy" + request_payload: ".....SELECT * FROM dummy" + response_payload: ".....9....def.container-monitor.dummy.dummy.name.name.-...........;....def.conta" is_error: false error_type: 0 diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query.yml b/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query.yml index b16813bcb..7947c8c2c 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/mysql/server-trace-query.yml @@ -56,5 +56,7 @@ trace: protocol: "mysql" content_key: "select dummy *" sql: "SELECT * FROM dummy" + request_payload: ".....SELECT * FROM dummy" + response_payload: ".....9....def.container-monitor.dummy.dummy.name.name.-...........;....def.conta" is_error: false error_type: 0 diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml b/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml index bd53ffea0..a9c85add4 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml @@ -8,20 +8,21 @@ analyzers: conntrack_rate_limit: 500 proc_root: /proc protocol_parser: [ http, mysql, dns, redis, kafka, dubbo, rocketmq ] + url_clustering_method: alphabet protocol_config: - key: "http" + ports: [ 80 ] payload_length: 80 - key: "dubbo" payload_length: 80 - key: "mysql" + ports: [ 3306 ] slow_threshold: 100 - key: "kafka" + ports: [ 9092 ] slow_threshold: 100 - - key: "cassandra" - ports: [ 9042 ] - slow_threshold: 100 - - key: "s3" - ports: [ 9190 ] + - key: "redis" + ports: [ 6379 ] slow_threshold: 100 - key: "dns" ports: [ 53 ] diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-error.yml b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-error.yml index fcd86c9f5..c75da08e2 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-error.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-error.yml @@ -51,3 +51,5 @@ trace: rocketmq_error_msg: "TOPIC_NOT_EXIST" rocketmq_error_code: 17 error_type: 3 + request_payload: '........{"code":105,"extFields":{"topic":"TopicTest"},"flag":0,"language":"JAVA","opaque":2,"serializeTypeCurrentRPC":"JSON","version":401}' + response_payload: '........{"code":17,"flag":1,"language":"JAVA","opaque":2,"remark":"No topic route info in name server for the topic: TopicTest\nSee http://rocketmq.apache.org/docs/faq/ for further details.","serializ' \ No newline at end of file diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-json.yml b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-json.yml index 0bbfaf7f1..c59063ef1 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-json.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-json.yml @@ -81,3 +81,5 @@ trace: rocketmq_opaque: 1062 rocketmq_error_code: 0 error_type: 0 + request_payload: '...h...d{"code":106,"flag":0,"language":"JAVA","opaque":1062,"serializeTypeCurrentRPC":"JSON","version":393}' + response_payload: '...H...b{"code":0,"flag":1,"language":"JAVA","opaque":1062,"serializeTypeCurrentRPC":"JSON","version":401}{"brokerAddrTable":{"rocketmq-5668b48cd9-h6gbz":{"brokerAddrs":{0:"10.233.90.93:10911"},"broke' diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-rocketmq.yml b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-rocketmq.yml index 35d772d96..bea407dbd 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-rocketmq.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/rocketmq/server-trace-rocketmq.yml @@ -91,3 +91,5 @@ trace: rocketmq_opaque: 2 rocketmq_error_code: 0 error_type: 0 + request_payload: '...-...).i.....................topic....TopicTest' + response_payload: '...".........................{"brokerDatas":[{"brokerAddrs":{"0":"192.168.64.1:10911"},"brokerName":"broker-a","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueData' diff --git a/collector/pkg/component/analyzer/tools/ascii.go b/collector/pkg/component/analyzer/tools/ascii.go new file mode 100644 index 000000000..761688f61 --- /dev/null +++ b/collector/pkg/component/analyzer/tools/ascii.go @@ -0,0 +1,27 @@ +package tools + +const ( + AsciiLow = byte(0x20) + AsciiHigh = byte(0x7e) + AsciiReplace = byte(0x2e) // . +) + +/* + * Get the ascii readable string, replace other value to '.', like wireshark. + */ +func GetAsciiString(data []byte) string { + length := len(data) + if length == 0 { + return "" + } + + newData := make([]byte, length) + for i := 0; i < length; i++ { + if data[i] > AsciiHigh || data[i] < AsciiLow { + newData[i] = AsciiReplace + } else { + newData[i] = data[i] + } + } + return string(newData) +} diff --git a/collector/pkg/component/consumer/exporter/tools/adapter/net_dict.go b/collector/pkg/component/consumer/exporter/tools/adapter/net_dict.go index 21f75c100..004e6cc25 100644 --- a/collector/pkg/component/consumer/exporter/tools/adapter/net_dict.go +++ b/collector/pkg/component/consumer/exporter/tools/adapter/net_dict.go @@ -234,14 +234,22 @@ var spanProtocol = []extraLabelsParam{ {constlabels.SpanHttpResponseHeaders, constlabels.ResponsePayload, String}, {constlabels.SpanHttpResponseBody, constlabels.STR_EMPTY, StrEmpty}, }, extraLabelsKey{HTTP}}, + {[]dictionary{ + {constlabels.SpanRequestPayload, constlabels.RequestPayload, String}, + {constlabels.SpanResponsePayload, constlabels.ResponsePayload, String}, + }, extraLabelsKey{KAFKA}}, {[]dictionary{ {constlabels.SpanMysqlSql, constlabels.Sql, String}, {constlabels.SpanMysqlErrorCode, constlabels.SqlErrCode, Int64}, {constlabels.SpanMysqlErrorMsg, constlabels.SqlErrMsg, String}, + {constlabels.SpanRequestPayload, constlabels.RequestPayload, String}, + {constlabels.SpanResponsePayload, constlabels.ResponsePayload, String}, }, extraLabelsKey{MYSQL}}, {[]dictionary{ {constlabels.SpanDnsDomain, constlabels.DnsDomain, String}, {constlabels.SpanDnsRCode, constlabels.DnsRcode, FromInt64ToString}, + {constlabels.SpanRequestPayload, constlabels.RequestPayload, String}, + {constlabels.SpanResponsePayload, constlabels.ResponsePayload, String}, }, extraLabelsKey{DNS}}, {[]dictionary{ {constlabels.SpanDubboRequestBody, constlabels.RequestPayload, String}, @@ -257,9 +265,17 @@ var spanProtocol = []extraLabelsParam{ {[]dictionary{ {constlabels.SpanRocketMQRequestMsg, constlabels.RocketMQRequestMsg, String}, {constlabels.SpanRocketMQErrMsg, constlabels.RocketMQErrMsg, String}, + {constlabels.SpanRequestPayload, constlabels.RequestPayload, String}, + {constlabels.SpanResponsePayload, constlabels.ResponsePayload, String}, }, extraLabelsKey{ROCKETMQ}}, - { - []dictionary{}, extraLabelsKey{UNSUPPORTED}, + {[]dictionary{ + /* + * Currently we add payload span for all protocols everywhere as http\dubbo\redis has it's own key. + * TODO Use unified way to set request/response payload. + */ + {constlabels.SpanRequestPayload, constlabels.RequestPayload, String}, + {constlabels.SpanResponsePayload, constlabels.ResponsePayload, String}, + }, extraLabelsKey{UNSUPPORTED}, }, } diff --git a/collector/pkg/model/constlabels/const.go b/collector/pkg/model/constlabels/const.go index 8a702004a..ab781009d 100644 --- a/collector/pkg/model/constlabels/const.go +++ b/collector/pkg/model/constlabels/const.go @@ -112,6 +112,9 @@ const ( SpanRocketMQRequestMsg = "rocketmq.request_msg" SpanRocketMQErrMsg = "rocketmq.error_msg" + SpanRequestPayload = "request_payload" + SpanResponsePayload = "response_payload" + NetWorkAnalyzeMetricGroup = "netAnalyzeMetrics" // IsSent is used by cpuAnalyzer to label whether an event has been sent. From 3fbc93ee5241c2460aa3a5cbbc5e9663b676e0d4 Mon Sep 17 00:00:00 2001 From: Daxin Wang <46807570+dxsup@users.noreply.github.com> Date: Thu, 1 Dec 2022 19:20:21 +0800 Subject: [PATCH 2/4] Add a new clustering method "blank" (#372) Signed-off-by: Daxin Wang --- CHANGELOG.md | 2 +- collector/docker/kindling-collector-config.yml | 3 ++- .../analyzer/network/protocol/http/http_parser.go | 10 +--------- collector/pkg/urlclustering/blank.go | 14 ++++++++++++++ collector/pkg/urlclustering/factory.go | 14 ++++++++++++++ deploy/agent/kindling-collector-config.yml | 3 ++- 6 files changed, 34 insertions(+), 12 deletions(-) create mode 100644 collector/pkg/urlclustering/blank.go create mode 100644 collector/pkg/urlclustering/factory.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0eee52d26..6aae0b8c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ ### Enhancements - Add payload for all protocols.([#375](https://github.com/KindlingProject/kindling/pull/375)) - -- +- Add a new clustering method "blank" that is used to reduce the cardinality of metrics as much as possible. ([#372](https://github.com/KindlingProject/kindling/pull/372)) ### Bug fixes - diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 42eb4e8cd..d28d55648 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -56,10 +56,11 @@ analyzers: protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ] # Which URL clustering method should be used to shorten the URL of HTTP request. # This is useful for decrease the cardinality of URLs. - # Valid values: ["noparam", "alphabet"] + # Valid values: ["noparam", "alphabet", "blank"] # - noparam: Only trim the trailing parameters behind the character '?' # - alphabet: Trim the trailing parameters and Convert the segments # containing non-alphabetical characters to star(*) + # - blank: Turn endpoints to empty. This is used to reduce the cardinality as much as possible. url_clustering_method: alphabet # If the destination port of data is one of the followings, the protocol of such network request # is set to the corresponding one. Note the program will try to identify the protocol automatically diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_parser.go b/collector/pkg/component/analyzer/network/protocol/http/http_parser.go index f1e614a37..ff8af4c58 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_parser.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_parser.go @@ -8,15 +8,7 @@ import ( ) func NewHttpParser(urlClusteringMethod string) *protocol.ProtocolParser { - var method urlclustering.ClusteringMethod - switch urlClusteringMethod { - case "alphabet": - method = urlclustering.NewAlphabeticalClusteringMethod() - case "noparam": - method = urlclustering.NewNoParamClusteringMethod() - default: - method = urlclustering.NewAlphabeticalClusteringMethod() - } + method := urlclustering.NewMethod(urlClusteringMethod) requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest(method)) responseParser := protocol.CreatePkgParser(fastfailHttpResponse(), parseHttpResponse()) diff --git a/collector/pkg/urlclustering/blank.go b/collector/pkg/urlclustering/blank.go new file mode 100644 index 000000000..d8a2b4685 --- /dev/null +++ b/collector/pkg/urlclustering/blank.go @@ -0,0 +1,14 @@ +package urlclustering + +// BlankClusteringMethod removes the endpoint and return an empty string. +// This method is used to reduce the cardinality as much as possible. +type BlankClusteringMethod struct { +} + +func NewBlankClusteringMethod() ClusteringMethod { + return &BlankClusteringMethod{} +} + +func (m *BlankClusteringMethod) Clustering(_ string) string { + return "" +} diff --git a/collector/pkg/urlclustering/factory.go b/collector/pkg/urlclustering/factory.go new file mode 100644 index 000000000..ead91dbc5 --- /dev/null +++ b/collector/pkg/urlclustering/factory.go @@ -0,0 +1,14 @@ +package urlclustering + +func NewMethod(urlClusteringMethod string) ClusteringMethod { + switch urlClusteringMethod { + case "alphabet": + return NewAlphabeticalClusteringMethod() + case "noparam": + return NewNoParamClusteringMethod() + case "blank": + return NewBlankClusteringMethod() + default: + return NewAlphabeticalClusteringMethod() + } +} diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 6247c67c6..4ea2e82ac 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -56,10 +56,11 @@ analyzers: protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ] # Which URL clustering method should be used to shorten the URL of HTTP request. # This is useful for decrease the cardinality of URLs. - # Valid values: ["noparam", "alphabet"] + # Valid values: ["noparam", "alphabet", "blank"] # - noparam: Only trim the trailing parameters behind the character '?' # - alphabet: Trim the trailing parameters and Convert the segments # containing non-alphabetical characters to star(*) + # - blank: Turn endpoints to empty. This is used to reduce the cardinality as much as possible. url_clustering_method: alphabet # If the destination port of data is one of the followings, the protocol of such network request # is set to the corresponding one. Note the program will try to identify the protocol automatically From cb0516b1d8464e693a66df07ad6557c7afca04f8 Mon Sep 17 00:00:00 2001 From: Daxin Wang <46807570+dxsup@users.noreply.github.com> Date: Thu, 1 Dec 2022 19:38:24 +0800 Subject: [PATCH 3/4] Fix the pod info with persistent IP in the map is deleted incorrectly (#374) Signed-off-by: Daxin Wang --- CHANGELOG.md | 2 +- collector/pkg/metadata/kubernetes/k8scache.go | 1 + .../pkg/metadata/kubernetes/pod_delete.go | 21 ++- .../pkg/metadata/kubernetes/pod_watch.go | 6 +- .../pkg/metadata/kubernetes/pod_watch_test.go | 152 +++++++++--------- 5 files changed, 102 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6aae0b8c3..7958f0bec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ - Add a new clustering method "blank" that is used to reduce the cardinality of metrics as much as possible. ([#372](https://github.com/KindlingProject/kindling/pull/372)) ### Bug fixes -- +- Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374)) - - Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/CloudDectective-Harmonycloud/kindling/pull/373)) - Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362)) diff --git a/collector/pkg/metadata/kubernetes/k8scache.go b/collector/pkg/metadata/kubernetes/k8scache.go index c13843aa2..552d9bc58 100644 --- a/collector/pkg/metadata/kubernetes/k8scache.go +++ b/collector/pkg/metadata/kubernetes/k8scache.go @@ -14,6 +14,7 @@ type K8sContainerInfo struct { } type K8sPodInfo struct { + UID string Ip string PodName string Ports []int32 diff --git a/collector/pkg/metadata/kubernetes/pod_delete.go b/collector/pkg/metadata/kubernetes/pod_delete.go index 95c20d58a..476b37ef0 100644 --- a/collector/pkg/metadata/kubernetes/pod_delete.go +++ b/collector/pkg/metadata/kubernetes/pod_delete.go @@ -20,6 +20,7 @@ type deleteRequest struct { } type deletedPodInfo struct { + uid string name string namespace string containerIds []string @@ -65,18 +66,32 @@ func deletePodInfo(podInfo *deletedPodInfo) { } if len(podInfo.containerIds) != 0 { for i := 0; i < len(podInfo.containerIds); i++ { + // Assume that container id can't be reused in a few seconds MetaDataCache.DeleteByContainerId(podInfo.containerIds[i]) } } if podInfo.ip != "" && len(podInfo.ports) != 0 { for _, port := range podInfo.ports { - // Assume that PodIP:Port can't be reused in a few seconds - MetaDataCache.DeleteContainerByIpPort(podInfo.ip, uint32(port)) + containerInfo, ok := MetaDataCache.GetContainerByIpPort(podInfo.ip, uint32(port)) + if !ok { + continue + } + // PodIP:Port can be reused in a few seconds, so we check its UID + if containerInfo.RefPodInfo.UID == podInfo.uid { + MetaDataCache.DeleteContainerByIpPort(podInfo.ip, uint32(port)) + } } } if podInfo.hostIp != "" && len(podInfo.hostPorts) != 0 { for _, port := range podInfo.hostPorts { - MetaDataCache.DeleteContainerByHostIpPort(podInfo.hostIp, uint32(port)) + containerInfo, ok := MetaDataCache.GetContainerByHostIpPort(podInfo.hostIp, uint32(port)) + if !ok { + continue + } + // PodIP:Port can be reused in a few seconds, so we check its UID + if containerInfo.RefPodInfo.UID == podInfo.uid { + MetaDataCache.DeleteContainerByHostIpPort(podInfo.hostIp, uint32(port)) + } } } } diff --git a/collector/pkg/metadata/kubernetes/pod_watch.go b/collector/pkg/metadata/kubernetes/pod_watch.go index 6d63f9006..ffac363e6 100644 --- a/collector/pkg/metadata/kubernetes/pod_watch.go +++ b/collector/pkg/metadata/kubernetes/pod_watch.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/Kindling-project/kindling/collector/pkg/compare" corev1 "k8s.io/api/core/v1" _ "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -16,6 +15,8 @@ import ( "k8s.io/client-go/tools/cache" _ "k8s.io/client-go/tools/clientcmd" _ "k8s.io/client-go/util/homedir" + + "github.com/Kindling-project/kindling/collector/pkg/compare" ) type podMap struct { @@ -141,6 +142,7 @@ func onAdd(obj interface{}) { } var cachePodInfo = &K8sPodInfo{ + UID: string(pod.UID), Ip: pod.Status.PodIP, Namespace: pod.Namespace, PodName: pod.Name, @@ -251,6 +253,7 @@ func OnUpdate(objOld interface{}, objNew interface{}) { // Delay delete the pod using the difference between the old pod and the new one deletedPodInfo := &deletedPodInfo{ + uid: string(oldPod.UID), name: "", namespace: oldPod.Namespace, containerIds: nil, @@ -322,6 +325,7 @@ func OnUpdate(objOld interface{}, objNew interface{}) { func onDelete(obj interface{}) { pod := obj.(*corev1.Pod) podInfo := &deletedPodInfo{ + uid: string(pod.UID), name: pod.Name, namespace: pod.Namespace, containerIds: make([]string, 0), diff --git a/collector/pkg/metadata/kubernetes/pod_watch_test.go b/collector/pkg/metadata/kubernetes/pod_watch_test.go index 68dd8b5e4..37fdb880b 100644 --- a/collector/pkg/metadata/kubernetes/pod_watch_test.go +++ b/collector/pkg/metadata/kubernetes/pod_watch_test.go @@ -148,6 +148,7 @@ func TestOnAddLowercaseWorkload(t *testing.T) { func CreatePod(hasPort bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ + UID: "0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba", Name: "deploy-1a2b3c4d-5e6f7", Namespace: "CustomNamespace", Labels: map[string]string{ @@ -197,8 +198,8 @@ func CreatePod(hasPort bool) *corev1.Pod { } func TestUpdateAndDelayDelete(t *testing.T) { - addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" - updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.102\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.102\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" addObj := new(corev1.Pod) err := json.Unmarshal([]byte(addObjJson), addObj) if err != nil { @@ -215,34 +216,20 @@ func TestUpdateAndDelayDelete(t *testing.T) { _, ok := MetaDataCache.GetContainerByIpPort(podIp, uint32(port)) if !ok { t.Fatalf("Not found container [%s:%d]", podIp, port) - } else { - t.Logf("Found container [%s:%d]", podIp, port) } stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) OnUpdate(addObj, updateObj) - // Check if new Container can be find - _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Wait for deletes + // Check if the new container can be found + assertFindPod(t, updateObj) + // Wait for the deleting time.Sleep(1000 * time.Millisecond) + // Double check for the new container + assertFindPod(t, updateObj) - // Double Check for NewContainer - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Check the old Container has been delete - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + // Check if the old container has been deleted + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) assert.False(t, find, "OldContainer IP should be deleted in MetaDataCache") @@ -253,40 +240,29 @@ func TestUpdateAndDelayDelete(t *testing.T) { } func TestUpdateAndDelayDeleteWhenOnlyPodIpChanged(t *testing.T) { - addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" - updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" addObj := new(corev1.Pod) - json.Unmarshal([]byte(addObjJson), addObj) + _ = json.Unmarshal([]byte(addObjJson), addObj) updateObj := new(corev1.Pod) - json.Unmarshal([]byte(updateObjJson), updateObj) - port := addObj.Spec.Containers[0].Ports[0].ContainerPort + _ = json.Unmarshal([]byte(updateObjJson), updateObj) + onAdd(addObj) stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) OnUpdate(addObj, updateObj) - // Check if new Container can be find - _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Wait for deletes + // Check if the new container can be found + assertFindPod(t, updateObj) + // Wait for the deleting time.Sleep(1000 * time.Millisecond) + // Double check for the new container + assertFindPod(t, updateObj) - // Double Check for NewContainer - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Check the old Container has been delete - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) - assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") + // Check if the old container has been deleted + port := addObj.Spec.Containers[0].Ports[0].ContainerPort + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + assert.False(t, find, "OldContainerId should be deleted in MetaDataCache") _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) assert.False(t, find, "OldContainer IP should be deleted in MetaDataCache") @@ -294,45 +270,71 @@ func TestUpdateAndDelayDeleteWhenOnlyPodIpChanged(t *testing.T) { } func TestUpdateAndDelayDeleteWhenOnlyPortChanged(t *testing.T) { - addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" - updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9002,\"protocol\":\"TCP\",\"hostPort\":9002}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9002,\"protocol\":\"TCP\",\"hostPort\":9002}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" addObj := new(corev1.Pod) - json.Unmarshal([]byte(addObjJson), addObj) + _ = json.Unmarshal([]byte(addObjJson), addObj) updateObj := new(corev1.Pod) - json.Unmarshal([]byte(updateObjJson), updateObj) - port := addObj.Spec.Containers[0].Ports[0].ContainerPort - newPort := updateObj.Spec.Containers[0].Ports[0].ContainerPort + _ = json.Unmarshal([]byte(updateObjJson), updateObj) + onAdd(addObj) stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) OnUpdate(addObj, updateObj) - // Check if new Container can be find - _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Wait for deletes + // Check if new container can be found + assertFindPod(t, updateObj) + // Wait for the deleting time.Sleep(1000 * time.Millisecond) + // Double check for the new container + assertFindPod(t, updateObj) - // Double Check for NewContainer - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) - assert.True(t, find, "NewContainerId did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort)) - assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") - _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort)) - assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") - - // Check the old Container has been delete - _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) - assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") + // Check the old Container has been deleted + port := addObj.Spec.Containers[0].Ports[0].ContainerPort + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + assert.False(t, find, "OldContainerId should be deleted in MetaDataCache") _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) - assert.True(t, find, "If podIp is not changed , Old IP can still be found in MetaDataCache") + assert.True(t, find, "If podIp is not changed, Old IP can still be found in MetaDataCache") _, find = MetaDataCache.GetContainerByHostIpPort(addObj.Status.HostIP, uint32(port)) assert.False(t, find, "OldHostIp Port should be deleted in MetaDataCache") stopCh <- struct{}{} } + +func TestDelayDeleteThenAddWithSameIP(t *testing.T) { + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-0\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObj := new(corev1.Pod) + _ = json.Unmarshal([]byte(addObjJson), addObj) + + deletedObjJson := "{\"metadata\":{\"name\":\"testdemo2-0\",\"namespace\":\"test-ns\",\"uid\":\"0ae5c03d-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:37Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + deletedObj := new(corev1.Pod) + _ = json.Unmarshal([]byte(deletedObjJson), deletedObj) + + // Note that uid is different + newAddObjJson := "{\"metadata\":{\"name\":\"testdemo2-0\",\"namespace\":\"test-ns\",\"uid\":\"00000000-5fb3-4eb9-9de8-2bd4b51606ba\",\"resourceVersion\":\"44895978\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:38Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + newAddObj := new(corev1.Pod) + _ = json.Unmarshal([]byte(newAddObjJson), newAddObj) + + stopCh := make(chan struct{}) + go podDeleteLoop(20*time.Millisecond, 100*time.Millisecond, stopCh) + + onAdd(addObj) + // Check if the container can be found + assertFindPod(t, addObj) + + onDelete(deletedObj) + onAdd(newAddObj) + time.Sleep(200 * time.Millisecond) + + // Check if the new container can be found + assertFindPod(t, newAddObj) +} + +func assertFindPod(t *testing.T, pod *corev1.Pod) { + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(pod.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "Didn't find the new container ID in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(pod.Status.PodIP, uint32(pod.Spec.Containers[0].Ports[0].ContainerPort)) + assert.True(t, find, "Didn't find the new container IP Port in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(pod.Status.HostIP, uint32(pod.Spec.Containers[0].Ports[0].HostPort)) + assert.True(t, find, "Didn't find the new HostIP Port in MetaDataCache") +} From 8164aeab747d9844c8eeb8e3004f47bfaf848bba Mon Sep 17 00:00:00 2001 From: zheng Date: Thu, 1 Dec 2022 19:51:13 +0800 Subject: [PATCH 4/4] Add no_response_threshold(120s) for No response requests (#376) Signed-off-by: huxiangyuan --- CHANGELOG.md | 3 ++- .../docker/kindling-collector-config.yml | 4 ++- .../testdata/kindling-collector-config.yaml | 2 +- .../pkg/component/analyzer/network/config.go | 27 +++++++++++++------ .../analyzer/network/message_pair.go | 14 ++++++++-- .../analyzer/network/network_analyzer.go | 18 ++++++++++--- .../protocol/testdata/na-protocol-config.yaml | 2 +- deploy/agent/kindling-collector-config.yml | 4 ++- 8 files changed, 56 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7958f0bec..4653ad846 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,11 @@ ### Enhancements +- Add no_response_threshold(120s) for No response requests. ([#376](https://github.com/KindlingProject/kindling/pull/376)) - Add payload for all protocols.([#375](https://github.com/KindlingProject/kindling/pull/375)) -- - Add a new clustering method "blank" that is used to reduce the cardinality of metrics as much as possible. ([#372](https://github.com/KindlingProject/kindling/pull/372)) + ### Bug fixes - Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374)) - diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index d28d55648..77206822d 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -42,8 +42,10 @@ analyzers: tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 + # How many seconds to wait until we consider a request as complete. + fd_reuse_timeout: 15 # How many seconds to wait until we consider a request as no response. - request_timeout: 60 + no_response_threshold: 120 # How many milliseconds to wait until we consider a request-response as slow. response_slow_threshold: 500 # Whether enable conntrack module to find pod's ip when calling service diff --git a/collector/internal/application/testdata/kindling-collector-config.yaml b/collector/internal/application/testdata/kindling-collector-config.yaml index 8c5948526..a8b951aaa 100644 --- a/collector/internal/application/testdata/kindling-collector-config.yaml +++ b/collector/internal/application/testdata/kindling-collector-config.yaml @@ -29,7 +29,7 @@ analyzers: num: 10 networkanalyzer: connect_timeout: 100 - request_timeout: 1 + fd_reuse_timeout: 15 response_slow_threshold: 500 enable_conntrack: true conntrack_max_state_size: 131072 diff --git a/collector/pkg/component/analyzer/network/config.go b/collector/pkg/component/analyzer/network/config.go index 8caf3f844..9b3e0e6c6 100644 --- a/collector/pkg/component/analyzer/network/config.go +++ b/collector/pkg/component/analyzer/network/config.go @@ -1,14 +1,16 @@ package network const ( - defaultRequestTimeout = 1 + defaultFdReuseTimeout = 15 + defaultNoResponseThreshold = 120 defaultConnectTimeout = 1 defaultResponseSlowThreshold = 500 ) type Config struct { - ConnectTimeout int `mapstructure:"connect_timeout"` - RequestTimeout int `mapstructure:"request_timeout"` + ConnectTimeout int `mapstructure:"connect_timeout"` + FdReuseTimeout int `mapstructure:"fd_reuse_timeout"` + NoResponseThreshold int `mapstructure:"no_response_threshold"` // unit is ms ResponseSlowThreshold int `mapstructure:"response_slow_threshold"` @@ -25,7 +27,8 @@ type Config struct { func NewDefaultConfig() *Config { return &Config{ ConnectTimeout: 100, - RequestTimeout: 60, + FdReuseTimeout: 15, + NoResponseThreshold: 120, ResponseSlowThreshold: 500, EnableConntrack: true, ConntrackMaxStateSize: 131072, @@ -87,11 +90,11 @@ func (cfg *Config) GetConnectTimeout() int { } } -func (cfg *Config) GetRequestTimeout() int { - if cfg.RequestTimeout > 0 { - return cfg.RequestTimeout +func (cfg *Config) GetFdReuseTimeout() int { + if cfg.FdReuseTimeout > 0 { + return cfg.FdReuseTimeout } else { - return defaultRequestTimeout + return defaultFdReuseTimeout } } @@ -102,3 +105,11 @@ func (cfg *Config) getResponseSlowThreshold() int { return defaultResponseSlowThreshold } } + +func (cfg *Config) getNoResponseThreshold() int { + if cfg.NoResponseThreshold > 0 { + return cfg.NoResponseThreshold + } else { + return defaultNoResponseThreshold + } +} diff --git a/collector/pkg/component/analyzer/network/message_pair.go b/collector/pkg/component/analyzer/network/message_pair.go index 19b3e3365..7daa3a76e 100644 --- a/collector/pkg/component/analyzer/network/message_pair.go +++ b/collector/pkg/component/analyzer/network/message_pair.go @@ -2,6 +2,7 @@ package network import ( "sync" + "sync/atomic" "time" "github.com/Kindling-project/kindling/collector/pkg/metadata/conntracker" @@ -114,6 +115,10 @@ func (evts *events) IsTimeout(newEvt *model.KindlingEvent, timeout int) bool { return false } +func (evts *events) IsSportChanged(newEvt *model.KindlingEvent) bool { + return newEvt.GetSport() != evts.event.GetSport() +} + func (evts *events) getDuration() uint64 { if evts == nil { return 0 @@ -131,8 +136,13 @@ type messagePairs struct { requests *events responses *events natTuple *conntracker.IPTranslation - isSend bool - mutex sync.RWMutex // only for update latency and resval now + isSend int32 + mutex sync.RWMutex // only for update latency and resval now +} + +func (mps *messagePairs) checkSend() bool { + // Check Send Once. + return atomic.AddInt32(&mps.isSend, 1) == 1 } func (mps *messagePairs) getKey() messagePairKey { diff --git a/collector/pkg/component/analyzer/network/network_analyzer.go b/collector/pkg/component/analyzer/network/network_analyzer.go index de0489d17..8b0a9e139 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer.go +++ b/collector/pkg/component/analyzer/network/network_analyzer.go @@ -193,8 +193,15 @@ func (na *NetworkAnalyzer) consumerFdNoReusingTrace() { na.requestMonitor.Range(func(k, v interface{}) bool { mps := v.(*messagePairs) var timeoutTs = mps.getTimeoutTs() - if timeoutTs != 0 && (time.Now().UnixNano()/1000000000-int64(timeoutTs)/1000000000) >= 15 { - na.distributeTraceMetric(mps, nil) + if timeoutTs != 0 { + var duration = (time.Now().UnixNano()/1000000000 - int64(timeoutTs)/1000000000) + if mps.responses != nil && duration >= int64(na.cfg.GetFdReuseTimeout()) { + // No FdReuse Request + na.distributeTraceMetric(mps, nil) + } else if duration >= int64(na.cfg.getNoResponseThreshold()) { + // No Response Request + na.distributeTraceMetric(mps, nil) + } } return true }) @@ -259,7 +266,7 @@ func (na *NetworkAnalyzer) analyseRequest(evt *model.KindlingEvent) error { } } - if oldPairs.responses != nil || oldPairs.requests.IsTimeout(evt, na.cfg.GetRequestTimeout()) { + if oldPairs.responses != nil || oldPairs.requests.IsSportChanged(evt) { na.distributeTraceMetric(oldPairs, mps) } else { oldPairs.mergeRequest(evt) @@ -296,6 +303,11 @@ func (na *NetworkAnalyzer) distributeTraceMetric(oldPairs *messagePairs, newPair return nil } + if oldPairs.checkSend() == false { + // FIX send twice for request/response with 15s delay. + return nil + } + if newPairs != nil { na.requestMonitor.Store(newPairs.getKey(), newPairs) } else { diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml b/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml index a9c85add4..927554594 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/na-protocol-config.yaml @@ -1,7 +1,7 @@ analyzers: networkanalyzer: connect_timeout: 100 - request_timeout: 1 + fd_reuse_timeout: 15 response_slow_threshold: 500 enable_conntrack: false conntrack_max_state_size: 131072 diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 4ea2e82ac..3bae36b1b 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -42,8 +42,10 @@ analyzers: tcpmetricanalyzer: networkanalyzer: connect_timeout: 100 + # How many seconds to wait until we consider a request as complete. + fd_reuse_timeout: 15 # How many seconds to wait until we consider a request as no response. - request_timeout: 60 + no_response_threshold: 120 # How many milliseconds to wait until we consider a request-response as slow. response_slow_threshold: 500 # Whether enable conntrack module to find pod's ip when calling service