Skip to content

Commit

Permalink
Add kafka support (#890)
Browse files Browse the repository at this point in the history

Co-authored-by: Nikola Grcevski <grcevski@gmail.com>
  • Loading branch information
marctc and grcevski authored Jun 3, 2024
1 parent cdea3f1 commit ef47785
Show file tree
Hide file tree
Showing 68 changed files with 1,351 additions and 31 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,18 @@ oats-test-redis-other-langs: oats-prereq
mkdir -p test/oats/redis_other_langs/$(TEST_OUTPUT)/run
cd test/oats/redis_other_langs && TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test-kafka
oats-test-kafka: oats-prereq
mkdir -p test/oats/kafka/$(TEST_OUTPUT)/run
cd test/oats/kafka && TESTCASE_TIMEOUT=120s TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test
oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs
oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs oats-test-kafka
$(MAKE) itest-coverage-data

.PHONY: oats-test-debug
oats-test-debug: oats-prereq
cd test/oats/sql_statement && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r
cd test/oats/kafka && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r

.PHONY: drone
drone:
Expand Down
2 changes: 1 addition & 1 deletion bpf/http_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#define KPROBES_LARGE_RESPONSE_LEN 100000 // 100K and above we try to track the response actual time with kretprobes

#define K_TCP_MAX_LEN 256
#define K_TCP_RES_LEN 24
#define K_TCP_RES_LEN 128

#define CONN_INFO_FLAG_TRACE 0x1

Expand Down
18 changes: 10 additions & 8 deletions docs/sources/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ The following table describes the exported metrics in both OpenTelemetry and Pro
| Application | `rpc.server.duration` | `rpc_server_duration_seconds` | Histogram | seconds | Duration of RPC service calls from the server side |
| Application | `sql.client.duration` | `sql_client_duration_seconds` | Histogram | seconds | Duration of SQL client operations (Experimental) |
| Application | `redis.client.duration` | `redis_client_duration_seconds` | Histogram | seconds | Duration of Redis client operations (Experimental) |
| Application | `messaging.publish.duration` | `messaging_publish_duration` | Histogram | seconds | Duration of Messaging (Kafka) publish operations (Experimental) |
| Application | `messaging.process.duration` | `messaging_process_duration` | Histogram | seconds | Duration of Messaging (Kafka) process operations (Experimental) |
| Network | `beyla.network.flow.bytes` | `beyla_network_flow_bytes` | Counter | bytes | Bytes submitted from a source network endpoint to a destination network endpoint |

Beyla can also export [Span metrics](/docs/tempo/latest/metrics-generator/span_metrics/) and
[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which can be enabled via the
[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which you can enable via the
[features]({{< relref "./configure/options.md" >}}) configuration option.

## Attributes of Beyla metrics

For the sake of brevity, only the OTEL `dot.notation` of the metrics and attributes is listed, but
the metrics and attributes are exposed `underscore_notation` when a Prometheus exporter is used.
For the sake of brevity, the metrics and attributes in this list use the OTEL `dot.notation`. When using the Prometheus exporter, the metrics use `underscore_notation`.

In order to hide attributes that are shown by default, or show attributes that are hidden by
default, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}).
In order to configure which attributes to show or which attributes to hide, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}).

| Metrics | Name | Default |
|--------------------------------|-----------------------------|-----------------------------------------------|
| Application (all) | `http.request.method` | shown |
| Application (all) | `http.response.status_code` | shown |
| Application (all) | `http.route` | shown if `routes` configuration is defined |
| Application (all) | `http.route` | shown if `routes` configuration section exists |
| Application (all) | `k8s.daemonset.name` | shown if `attributes.kubernetes.enable` |
| Application (all) | `k8s.deployment.name` | shown if `attributes.kubernetes.enable` |
| Application (all) | `k8s.namespace.name` | shown if `attributes.kubernetes.enable` |
Expand All @@ -66,9 +66,11 @@ default, check the `attributes`->`select` section in the [configuration document
| `beyla.network.flow.bytes` | `beyla.ip` | hidden |
| `db.client.operation.duration` | `db.operation.name` | shown |
| `db.client.operation.duration` | `db.collection.name` | hidden |
| `message.publish.duration` | `messaging.system` | shown |
| `message.publish.duration` | `messaging.destination.name`| shown |
| `beyla.network.flow.bytes` | `direction` | hidden |
| `beyla.network.flow.bytes` | `dst.address` | hidden |
| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration is defined |
| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration section exists |
| `beyla.network.flow.bytes` | `dst.name` | hidden |
| `beyla.network.flow.bytes` | `dst.port` | hidden |
| `beyla.network.flow.bytes` | `iface` | hidden |
Expand All @@ -87,7 +89,7 @@ default, check the `attributes`->`select` section in the [configuration document
| `beyla.network.flow.bytes` | `k8s.src.owner.type` | hidden |
| `beyla.network.flow.bytes` | `k8s.src.type` | hidden |
| `beyla.network.flow.bytes` | `src.address` | hidden |
| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration is defined |
| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration section exists |
| `beyla.network.flow.bytes` | `src.name` | hidden |
| `beyla.network.flow.bytes` | `src.port` | hidden |
| `beyla.network.flow.bytes` | `transport` | hidden |
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_arm64.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_arm64.o
Binary file not shown.
2 changes: 1 addition & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_x86.o
Binary file not shown.
225 changes: 225 additions & 0 deletions pkg/internal/ebpf/common/kafka_detect_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package ebpfcommon

import (
"encoding/binary"
"errors"

"github.com/grafana/beyla/pkg/internal/request"
)

type Operation int8

const (
Produce Operation = 0
Fetch Operation = 1
)

type Header struct {
MessageSize int32
APIKey int16
APIVersion int16
CorrelationID int32
ClientIDSize int16
}

type KafkaInfo struct {
Operation Operation
Topic string
ClientID string
TopicOffset int
}

func (k Operation) String() string {
switch k {
case Produce:
return request.MessagingPublish
case Fetch:
return request.MessagingProcess
default:
return "unknown"
}
}

const KafaMinLength = 14

// ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request.
// Otherwise, return kafka.Info with the processed data.
func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
k, err := ProcessKafkaRequest(pkt)
if err != nil {
k, err = ProcessKafkaRequest(rpkt)
}

return k, err
}

func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
k := &KafkaInfo{}
if len(pkt) < KafaMinLength {
return k, errors.New("packet too short")
}

header := &Header{
MessageSize: int32(binary.BigEndian.Uint32(pkt[0:4])),
APIKey: int16(binary.BigEndian.Uint16(pkt[4:6])),
APIVersion: int16(binary.BigEndian.Uint16(pkt[6:8])),
CorrelationID: int32(binary.BigEndian.Uint32(pkt[8:12])),
ClientIDSize: int16(binary.BigEndian.Uint16(pkt[12:14])),
}

if !isValidKafkaHeader(header) {
return k, errors.New("invalid Kafka request header")
}

offset := KafaMinLength
if header.ClientIDSize > 0 {
clientID := pkt[offset : offset+int(header.ClientIDSize)]
if !isValidClientID(clientID, int(header.ClientIDSize)) {
return k, errors.New("invalid client ID")
}
offset += int(header.ClientIDSize)
k.ClientID = string(clientID)
} else if header.ClientIDSize < -1 {
return k, errors.New("invalid client ID size")
}

switch Operation(header.APIKey) {
case Produce:
ok, err := getTopicOffsetFromProduceOperation(header, pkt, &offset)
if !ok || err != nil {
return k, err
}
k.Operation = Produce
k.TopicOffset = offset
case Fetch:
offset += getTopicOffsetFromFetchOperation(header)
k.Operation = Fetch
k.TopicOffset = offset
default:
return k, errors.New("invalid Kafka operation")
}
topic, err := getTopicName(pkt, offset)
if err != nil {
return k, err
}
k.Topic = topic
return k, nil
}

func isValidKafkaHeader(header *Header) bool {
if header.MessageSize < int32(KafaMinLength) || header.APIVersion < 0 {
return false
}
switch Operation(header.APIKey) {
case Fetch:
if header.APIVersion > 11 {
return false
}
case Produce:
if header.APIVersion == 0 || header.APIVersion > 8 {
return false
}
default:
return false
}
if header.CorrelationID < 0 {
return false
}
return header.ClientIDSize >= -1
}

// nolint:cyclop
func isValidKafkaString(buffer []byte, maxBufferSize, realSize int, printableOk bool) bool {
for j := 0; j < maxBufferSize; j++ {
if j >= realSize {
break
}
ch := buffer[j]
if ('a' <= ch && ch <= 'z') || ('A' <= ch && ch <= 'Z') || ('0' <= ch && ch <= '9') || ch == '.' || ch == '_' || ch == '-' {
continue
}
if printableOk && (ch >= ' ' && ch <= '~') {
continue
}
return false
}
return true
}

func isValidClientID(buffer []byte, realClientIDSize int) bool {
return isValidKafkaString(buffer, len(buffer), realClientIDSize, true)
}

func getTopicName(pkt []byte, offset int) (string, error) {
offset += 4
if offset > len(pkt) {
return "", errors.New("invalid buffer length")
}
topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:]))
if topicNameSize <= 0 || topicNameSize > 255 {
return "", errors.New("invalid topic name size")
}
offset += 2

if offset > len(pkt) {
return "", nil
}
maxLen := offset + int(topicNameSize)
if len(pkt) < maxLen {
maxLen = len(pkt)
}
topicName := pkt[offset:maxLen]
if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) {
return string(topicName), nil
}
return "", errors.New("invalid topic name")
}

func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) {
if header.APIVersion >= 3 {
if len(pkt) < *offset+2 {
return false, errors.New("packet too short")
}
transactionalIDSize := int16(binary.BigEndian.Uint16(pkt[*offset:]))
*offset += 2
if transactionalIDSize > 0 {
*offset += int(transactionalIDSize)
}
}

if len(pkt) < *offset+2 {
return false, errors.New("packet too short")
}
acks := int16(binary.BigEndian.Uint16(pkt[*offset:]))
if acks < -1 || acks > 1 {
return false, nil
}
*offset += 2

if len(pkt) < *offset+4 {
return false, errors.New("packet too short")
}
timeoutMS := int32(binary.BigEndian.Uint32(pkt[*offset:]))
if timeoutMS < 0 {
return false, nil
}
*offset += 4

return true, nil
}

func getTopicOffsetFromFetchOperation(header *Header) int {
offset := 3 * 4 // 3 * sizeof(int32)

if header.APIVersion >= 3 {
offset += 4 // max_bytes
if header.APIVersion >= 4 {
offset++ // isolation_level
if header.APIVersion >= 7 {
offset += 2 * 4 // session_id + session_epoch
}
}
}

return offset
}
Loading

0 comments on commit ef47785

Please sign in to comment.