Skip to content

Commit

Permalink
merge main branch
Browse files Browse the repository at this point in the history
Signed-off-by: yaofighting <siyao@zju.edu.cn>
  • Loading branch information
yaofighting committed Dec 1, 2022
2 parents 9410fa1 + 8164aea commit 69a9582
Show file tree
Hide file tree
Showing 38 changed files with 340 additions and 263 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@


### Enhancements
- Add no_response_threshold(120s) for No response requests. ([#376](https://github.com/KindlingProject/kindling/pull/376))
- Add payload for all protocols.([#375](https://github.com/KindlingProject/kindling/pull/375))
- Add a new clustering method "blank" that is used to reduce the cardinality of metrics as much as possible. ([#372](https://github.com/KindlingProject/kindling/pull/372))
- Modify the configuration file structure and add parameter fields for subscription events. ([#368](https://github.com/CloudDectective-Harmonycloud/kindling/pull/368))
-
-


### Bug fixes
-
-
- Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374))
- Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/CloudDectective-Harmonycloud/kindling/pull/373))
- Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362))
- Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359))
Expand Down
7 changes: 5 additions & 2 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ analyzers:
tcpmetricanalyzer:
networkanalyzer:
connect_timeout: 100
# How many seconds to wait until we consider a request as complete.
fd_reuse_timeout: 15
# How many seconds to wait until we consider a request as no response.
request_timeout: 60
no_response_threshold: 120
# How many milliseconds to wait until we consider a request-response as slow.
response_slow_threshold: 500
# Whether enable conntrack module to find pod's ip when calling service
Expand All @@ -56,10 +58,11 @@ analyzers:
protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ]
# Which URL clustering method should be used to shorten the URL of HTTP request.
# This is useful for decrease the cardinality of URLs.
# Valid values: ["noparam", "alphabet"]
# Valid values: ["noparam", "alphabet", "blank"]
# - noparam: Only trim the trailing parameters behind the character '?'
# - alphabet: Trim the trailing parameters and Convert the segments
# containing non-alphabetical characters to star(*)
# - blank: Turn endpoints to empty. This is used to reduce the cardinality as much as possible.
url_clustering_method: alphabet
# If the destination port of data is one of the followings, the protocol of such network request
# is set to the corresponding one. Note the program will try to identify the protocol automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ analyzers:
num: 10
networkanalyzer:
connect_timeout: 100
request_timeout: 1
fd_reuse_timeout: 15
response_slow_threshold: 500
enable_conntrack: true
conntrack_max_state_size: 131072
Expand Down
27 changes: 19 additions & 8 deletions collector/pkg/component/analyzer/network/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package network

const (
defaultRequestTimeout = 1
defaultFdReuseTimeout = 15
defaultNoResponseThreshold = 120
defaultConnectTimeout = 1
defaultResponseSlowThreshold = 500
)

type Config struct {
ConnectTimeout int `mapstructure:"connect_timeout"`
RequestTimeout int `mapstructure:"request_timeout"`
ConnectTimeout int `mapstructure:"connect_timeout"`
FdReuseTimeout int `mapstructure:"fd_reuse_timeout"`
NoResponseThreshold int `mapstructure:"no_response_threshold"`
// unit is ms
ResponseSlowThreshold int `mapstructure:"response_slow_threshold"`

Expand All @@ -25,7 +27,8 @@ type Config struct {
func NewDefaultConfig() *Config {
return &Config{
ConnectTimeout: 100,
RequestTimeout: 60,
FdReuseTimeout: 15,
NoResponseThreshold: 120,
ResponseSlowThreshold: 500,
EnableConntrack: true,
ConntrackMaxStateSize: 131072,
Expand Down Expand Up @@ -87,11 +90,11 @@ func (cfg *Config) GetConnectTimeout() int {
}
}

func (cfg *Config) GetRequestTimeout() int {
if cfg.RequestTimeout > 0 {
return cfg.RequestTimeout
func (cfg *Config) GetFdReuseTimeout() int {
if cfg.FdReuseTimeout > 0 {
return cfg.FdReuseTimeout
} else {
return defaultRequestTimeout
return defaultFdReuseTimeout
}
}

Expand All @@ -102,3 +105,11 @@ func (cfg *Config) getResponseSlowThreshold() int {
return defaultResponseSlowThreshold
}
}

func (cfg *Config) getNoResponseThreshold() int {
if cfg.NoResponseThreshold > 0 {
return cfg.NoResponseThreshold
} else {
return defaultNoResponseThreshold
}
}
14 changes: 12 additions & 2 deletions collector/pkg/component/analyzer/network/message_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"sync"
"sync/atomic"
"time"

"github.com/Kindling-project/kindling/collector/pkg/metadata/conntracker"
Expand Down Expand Up @@ -114,6 +115,10 @@ func (evts *events) IsTimeout(newEvt *model.KindlingEvent, timeout int) bool {
return false
}

func (evts *events) IsSportChanged(newEvt *model.KindlingEvent) bool {
return newEvt.GetSport() != evts.event.GetSport()
}

func (evts *events) getDuration() uint64 {
if evts == nil {
return 0
Expand All @@ -131,8 +136,13 @@ type messagePairs struct {
requests *events
responses *events
natTuple *conntracker.IPTranslation
isSend bool
mutex sync.RWMutex // only for update latency and resval now
isSend int32
mutex sync.RWMutex // only for update latency and resval now
}

func (mps *messagePairs) checkSend() bool {
// Check Send Once.
return atomic.AddInt32(&mps.isSend, 1) == 1
}

func (mps *messagePairs) getKey() messagePairKey {
Expand Down
41 changes: 35 additions & 6 deletions collector/pkg/component/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,15 @@ func (na *NetworkAnalyzer) consumerFdNoReusingTrace() {
na.requestMonitor.Range(func(k, v interface{}) bool {
mps := v.(*messagePairs)
var timeoutTs = mps.getTimeoutTs()
if timeoutTs != 0 && (time.Now().UnixNano()/1000000000-int64(timeoutTs)/1000000000) >= 15 {
na.distributeTraceMetric(mps, nil)
if timeoutTs != 0 {
var duration = (time.Now().UnixNano()/1000000000 - int64(timeoutTs)/1000000000)
if mps.responses != nil && duration >= int64(na.cfg.GetFdReuseTimeout()) {
// No FdReuse Request
na.distributeTraceMetric(mps, nil)
} else if duration >= int64(na.cfg.getNoResponseThreshold()) {
// No Response Request
na.distributeTraceMetric(mps, nil)
}
}
return true
})
Expand Down Expand Up @@ -259,7 +266,7 @@ func (na *NetworkAnalyzer) analyseRequest(evt *model.KindlingEvent) error {
}
}

if oldPairs.responses != nil || oldPairs.requests.IsTimeout(evt, na.cfg.GetRequestTimeout()) {
if oldPairs.responses != nil || oldPairs.requests.IsSportChanged(evt) {
na.distributeTraceMetric(oldPairs, mps)
} else {
oldPairs.mergeRequest(evt)
Expand Down Expand Up @@ -296,6 +303,11 @@ func (na *NetworkAnalyzer) distributeTraceMetric(oldPairs *messagePairs, newPair
return nil
}

if oldPairs.checkSend() == false {
// FIX send twice for request/response with 15s delay.
return nil
}

if newPairs != nil {
na.requestMonitor.Store(newPairs.getKey(), newPairs)
} else {
Expand Down Expand Up @@ -406,7 +418,6 @@ func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.Pro
// Parse failure
return nil
}
requestMsg.AddByteArrayUtf8Attribute(constlabels.RequestPayload, mps.requests.getData())

if mps.responses == nil {
return na.getRecords(mps, parser.GetProtocol(), requestMsg.GetAttributes())
Expand All @@ -417,8 +428,6 @@ func (na *NetworkAnalyzer) parseProtocol(mps *messagePairs, parser *protocol.Pro
// Parse failure
return nil
}
responseMsg.AddByteArrayUtf8Attribute(constlabels.ResponsePayload, mps.responses.getData())

return na.getRecords(mps, parser.GetProtocol(), responseMsg.GetAttributes())
}

Expand Down Expand Up @@ -537,6 +546,13 @@ func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attrib
labels.UpdateAddStringValue(constlabels.Protocol, protocol)

labels.Merge(attributes)

if mps.responses == nil {
addProtocolPayload(protocol, labels, mps.requests.getData(), nil)
} else {
addProtocolPayload(protocol, labels, mps.requests.getData(), mps.responses.getData())
}

// If no protocol error found, we check other errors
if !labels.GetBoolValue(constlabels.IsError) && mps.responses == nil {
labels.AddBoolValue(constlabels.IsError, true)
Expand Down Expand Up @@ -586,6 +602,12 @@ func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messag
labels.UpdateAddStringValue(constlabels.Protocol, protocol)

labels.Merge(attributes)
if mp.response == nil {
addProtocolPayload(protocol, labels, evt.GetData(), nil)
} else {
addProtocolPayload(protocol, labels, evt.GetData(), mp.response.GetData())
}

// If no protocol error found, we check other errors
if !labels.GetBoolValue(constlabels.IsError) && mps.responses == nil {
labels.AddBoolValue(constlabels.IsError, true)
Expand All @@ -609,6 +631,13 @@ func (na *NetworkAnalyzer) getRecordWithSinglePair(mps *messagePairs, mp *messag
return ret
}

func addProtocolPayload(protocolName string, labels *model.AttributeMap, request []byte, response []byte) {
labels.UpdateAddStringValue(constlabels.RequestPayload, protocol.GetPayloadString(request, protocolName))
if response != nil {
labels.UpdateAddStringValue(constlabels.ResponsePayload, protocol.GetPayloadString(response, protocolName))
}
}

func (na *NetworkAnalyzer) isSlow(duration uint64, protocol string) bool {
return int64(duration) >= int64(na.getResponseSlowThreshold(protocol))*int64(time.Millisecond)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol/factory"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/model"
Expand Down Expand Up @@ -92,6 +93,17 @@ func prepareNetworkAnalyzer() *NetworkAnalyzer {
nextConsumers: []consumer.Consumer{&NopProcessor{}},
telemetry: component.NewDefaultTelemetryTools(),
}
na.staticPortMap = map[uint32]string{}
for _, config := range na.cfg.ProtocolConfigs {
for _, port := range config.Ports {
na.staticPortMap[port] = config.Key
}
}
na.slowThresholdMap = map[string]int{}
for _, config := range na.cfg.ProtocolConfigs {
protocol.SetPayLoadLength(config.Key, config.PayloadLength)
na.slowThresholdMap[config.Key] = config.Threshold
}
na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod))
na.Start()
}
Expand Down Expand Up @@ -298,11 +310,11 @@ func (evt *TraceEvent) exchange(common *EventCommon) *model.KindlingEvent {
modelEvt := &model.KindlingEvent{
Source: model.Source(common.Source),
Timestamp: evt.Timestamp,
Latency: uint64(evt.UserAttributes.Latency),
Name: evt.Name,
Category: model.Category(common.Category),
ParamsNumber: 3,
UserAttributes: [16]model.KeyValue{
{Key: "latency", ValueType: model.ValueType_UINT64, Value: Int64ToBytes(evt.UserAttributes.Latency)},
{Key: "res", ValueType: model.ValueType_INT64, Value: Int64ToBytes(evt.UserAttributes.Res)},
{Key: "data", ValueType: model.ValueType_BYTEBUF, Value: byteData},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,10 @@ const (
FlagTwoWay = byte(0x40)
FlagEvent = byte(0x20) // for heartbeat
SerialMask = 0x1f

AsciiLow = byte(0x20)
AsciiHigh = byte(0x7e)
AsciiReplace = byte(0x2e) // .
)

func NewDubboParser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailDubboRequest(), parseDubboRequest())
responseParser := protocol.CreatePkgParser(fastfailDubboResponse(), parseDubboResponse())
return protocol.NewProtocolParser(protocol.DUBBO, requestParser, responseParser, nil)
}

/**
Get the ascii readable string, replace other value to '.', like wireshark.
*/
func getAsciiString(data []byte) string {
length := len(data)
if length == 0 {
return ""
}

newData := make([]byte, length)
for i := 0; i < length; i++ {
if data[i] > AsciiHigh || data[i] < AsciiLow {
newData[i] = AsciiReplace
} else {
newData[i] = data[i]
}
}
return string(newData)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func parseDubboRequest() protocol.ParsePkgFn {
}

message.AddStringAttribute(constlabels.ContentKey, contentKey)
message.AddStringAttribute(constlabels.RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength())))
return true, true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func parseDubboResponse() protocol.ParsePkgFn {
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength())))
return true, true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@ import (
)

func NewHttpParser(urlClusteringMethod string) *protocol.ProtocolParser {
var method urlclustering.ClusteringMethod
switch urlClusteringMethod {
case "alphabet":
method = urlclustering.NewAlphabeticalClusteringMethod()
case "noparam":
method = urlclustering.NewNoParamClusteringMethod()
default:
method = urlclustering.NewAlphabeticalClusteringMethod()
}
method := urlclustering.NewMethod(urlClusteringMethod)
requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest(method))
responseParser := protocol.CreatePkgParser(fastfailHttpResponse(), parseHttpResponse())

Expand Down
Loading

0 comments on commit 69a9582

Please sign in to comment.