Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 解难:到底你们是谁在写我呢? #219

Open
bingoohuang opened this issue Feb 25, 2022 · 1 comment
Open

Kafka 解难:到底你们是谁在写我呢? #219

bingoohuang opened this issue Feb 25, 2022 · 1 comment
Labels

Comments

@bingoohuang
Copy link
Owner

Kafka 解难:到底你们是谁在写我呢?

image

缘起

Kafka 里积攒了几百万的消息等待着消费,运维同学很急慌,看着上百个连接着 Kafka 的连接,想方便找出来,哪些连接是生产者(写入者),其中又有哪些是忙碌的,以及在哪些主题上忙碌。

image

思索

“天灵灵,地灵灵,太上老君快显灵”。发动脑筋,马上想出几种方案:

  1. Kafka 有没有自带方案?找了一圈,没找到。
  2. Kafka 源代码好不好改,要不在生产者建立的时候,记录一下。理论可行,实际上干起来好像有点费劲。
  3. 整个流量统计的工具,统计一下 TCP 连接,哪些是上行多那就是生产者。可是还要找出主题,好像已有工具不行。
  4. 建立代理拦截 Kafka 流量,使用 IP Tables 重定向,。。。技术上好像比较牛叉,但是主路,技术还不太熟悉,也不是一时半会能搞定。
  5. ...

当当当,几种方案,都不好使。

干活

对了,想到之前曾经做过的 HTTP 旁路流量复制,能不能在 Kafka 上整一个呢,如果能方便解析出 Kafka 的 生产者消息包格式,那就很方便干活了。这个思路是通的。马上“百度”一下,很容易,找到了一个开源的 kafka-sniffer 项目,那就顺手拈来吧,稍在改造一下吧, Golang干这活还是太方便了,一会就搞定,效果如下

https://github.com/bingoohuang/kafka-sniffer

# kafka-sniffer
2022/02/24 13:34:42 starting capture on interface "eth0"
2022/02/24 13:34:42 client 192.1.1.15:61285-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-logcenter], correlationID: 117377425, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.15:37953-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-metrics], correlationID: 6003063, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.11:24717-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-metrics], correlationID: 196489671, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.7:37233-192.1.1.14:9092 type: *kafka.FetchRequest topic [__consumer_offsets], correlationID: 247189, clientID: consumer-KMOffsetCache-cmak-548974c6c4-sxvgt-1723
2022/02/24 13:34:42 client 192.1.6.17:51404-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-ids], correlationID: 6716609, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.9.23:36866-192.1.1.14:9092 type: *kafka.FetchRequest topic [bq_disaster_recovery], correlationID: 623626, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.7:34038-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_transaction], correlationID: 12480162, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:55214-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-cloudSignLogServer], correlationID: 3341672, clientID: 2428545257036493
2022/02/24 13:34:42 client 192.1.1.12:6267-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial_disaster], correlationID: 9009620, clientID: consumer-2
2022/02/24 13:34:42 client 192.1.1.11:33378-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-gateway], correlationID: 10948681, clientID: producer-1
2022/02/24 13:34:42 client 192.1.1.12:9195-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial-2tripartite], correlationID: 9011202, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:41426-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_count_transaction], correlationID: 194647, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.11:22615-192.1.1.14:9092 type: *kafka.FetchRequest topic [ids-message-record-1], correlationID: 8999184, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:20394-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction_pro], correlationID: 3240311, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.12:7273-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction_pro], correlationID: 3240395, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:6654-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction], correlationID: 572423, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:48249-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction], correlationID: 8692411, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.9.23:33500-192.1.1.14:9092 type: *kafka.FetchRequest topic [verif_supplement_file_v1], correlationID: 117992, clientID: consumer-2

再顺便造一个统计 API:

# gurl :9870/client
Conn-Session: 127.0.0.1:44828->127.0.0.1:9870 (reused: false, wasIdle: false, idle: 0s)
GET /client? HTTP/1.1
Host: 127.0.0.1:9870
Accept: application/json
Accept-Encoding: gzip, deflate
Content-Type: application/json
Gurl-Date: Thu, 24 Feb 2022 06:30:03 GMT
User-Agent: gurl/1.0.0


HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 24 Feb 2022 06:30:03 GMT

[
  {
    "Start": "2022-02-24T14:29:19.049701376+08:00",
    "Client": "192.1.6.17:51404",
    "ReqType": "*kafka.FetchRequest",
    "ClientID": "consumer-1",
    "Requests": 89,
    "BytesRead": 7387,
    "Topics": [
      "dev-ids"
    ]
  },
  {
    "Start": "2022-02-24T14:29:19.025437041+08:00",
    "Client": "192.1.8.12:6267",
    "ReqType": "*kafka.FetchRequest",
    "ClientID": "consumer-2",
    "Requests": 89,
    "BytesRead": 7031,
    "Topics": [
      "judicial_disaster"
    ]
  },
  {
    "Start": "2022-02-24T14:29:20.301435997+08:00",
    "Client": "192.1.6.15:56324",
    "ReqType": "*kafka.ProduceRequest",
    "ClientID": "sarama",
    "Requests": 309,
    "BytesRead": 123625,
    "Topics": [
      "dev-metrics"
    ]
  },
  {
    "Start": "2022-02-24T14:29:20.84427283+08:00",
    "Client": "192.1.6.4:54598",
    "ReqType": "*kafka.ProduceRequest",
    "ClientID": "sarama",
    "Requests": 283,
    "BytesRead": 113472,
    "Topics": [
      "dev-metrics"
    ]
  }
]

齐活

好了,可以比较方便地找出谁在写我,用什么主题写我,累计写了多少次,写了多少量了。

看帖的同学,你们还有没有更好的方案呢,敬请放马过来,比这个方案好的,我要请客吃饭咯。

补充:

上面提到的第4种使用 IPTables 方案,https://github.com/JackOfMostTrades/tls-tproxy 使用此技术的示意图如下:

image

@bingoohuang
Copy link
Owner Author

bingoohuang commented Mar 23, 2022

netstat 怎么查不到对应的连接了呢

从 192.1.8.14 上看

有很多来自于 192.1.8.11 的连接连到本机的 9092 Kafka 端口上

[beta19 ~]# hostname -I
192.1.8.14 172.17.0.1 10.42.0.0
[beta19 ~]# netstat -atnp | awk 'NR <= 2 || /9092/ && /108.11/' | awk '{if (NR <= 2) { print $0 } else  {print NR-2,$0}}'
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
1 tcp        0      0 192.1.8.14:9092     192.1.8.11:37594    ESTABLISHED 21239/java
2 tcp        0      0 192.1.8.14:9092     192.1.8.11:60318    ESTABLISHED 21239/java
3 tcp        0      0 192.1.8.14:9092     192.1.8.11:47051    ESTABLISHED 21239/java
4 tcp        0      0 192.1.8.14:9092     192.1.8.11:23547    ESTABLISHED 21239/java
5 tcp        0      0 192.1.8.14:9092     192.1.8.11:41188    ESTABLISHED 21239/java
6 tcp        0      0 192.1.8.14:9092     192.1.8.11:22280    ESTABLISHED 21239/java
7 tcp        0      0 192.1.8.14:9092     192.1.8.11:38398    ESTABLISHED 21239/java
8 tcp        0      0 192.1.8.14:9092     192.1.8.11:23467    ESTABLISHED 21239/java
9 tcp        0      0 192.1.8.14:9092     192.1.8.11:47125    ESTABLISHED 21239/java
10 tcp        0      0 192.1.8.14:9092     192.1.8.11:41180    ESTABLISHED 21239/java
11 tcp        0      0 192.1.8.14:9092     192.1.8.11:41292    ESTABLISHED 21239/java
12 tcp        0      0 192.1.8.14:9092     192.1.8.11:5478     ESTABLISHED 21239/java
13 tcp        0      0 192.1.8.14:9092     192.1.8.11:54091    ESTABLISHED 21239/java

从 192.1.8.11 上看

于是登录到 192.1.8.11 上使用同样的 netstat 命令,却找不到对应的连接,使用 /proc/net/nf_conntrack 找到了,来源是 10.42.1.211,原来是 k8s 容器

[beta11 ~]# hostname -I
192.1.8.11 172.17.0.1 10.42.1.0
[beta11 ~]# netstat -atnp | grep 192.1.8.11:22280
[beta11 ~]# cat /proc/net/nf_conntrack | grep 22280
ipv4     2 tcp      6 86365 ESTABLISHED src=10.42.1.211 dst=192.1.8.14 sport=39856 dport=9092 src=192.1.8.14 dst=192.1.8.11 sport=9092 dport=22280 [ASSURED] mark=0 zone=0 use=2
[beta17 ~]# kubectl get pod -A -o wide | awk 'NR <=1 || /10.42.1.211/'
NAMESPACE NAME                              READY STATUS  RESTARTS AGE IP          NODE      NOMINATED NODE READINESS GATES
cloudsign timetaskcloudsign-74c59b777-xhkgs 1/1   Running 0        71d 10.42.1.211 192.1.8.11<none>         <none>

文章 连接跟踪(conntrack):原理、应用及 Linux 内核实现 提到

> 由于这套连接跟踪机制是独立于 Netfilter 的,因此它的 conntrack 和 NAT 信息也没有 存储在内核的(也就是 Netfilter 的)conntrack table 和 NAT table。所以常规的 conntrack/netstats/ss/lsof 等工具是看不到的

image

看来 k8s 的这个网络方案就实现了这样一套独立的连接跟踪和 NAT 机制。所以 netstat 命令是看不到的。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant