Skip to content

Commit

Permalink
Add PCAP file parser for protocol analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
rytilahti committed Feb 23, 2022
1 parent 3abffd8 commit 1d7f985
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
9 changes: 9 additions & 0 deletions devtools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

This directory contains tooling useful for developers

## PCAP parser (parse_pcap.py)

This tool parses PCAP file and tries to decrypt the traffic using the given tokens. Requires typer, dpkt, and rich.
Token option can be used multiple times. All tokens are tested for decryption until decryption succeeds or there are no tokens left to try.

```
python pcap_parser.py <pcap file> --token <token> [--token <token>]
```

## MiOT generator

This tool generates some boilerplate code for adding support for MIoT devices
Expand Down
84 changes: 84 additions & 0 deletions devtools/parse_pcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Parse PCAP files for miio traffic."""
from collections import Counter, defaultdict
from ipaddress import ip_address

import dpkt
import typer
from dpkt.ethernet import ETH_TYPE_IP, Ethernet
from rich import print

from miio import Message

app = typer.Typer()


def read_payloads_from_file(file, tokens: list[str]):
"""Read the given pcap file and yield src, dst, and result."""
pcap = dpkt.pcap.Reader(file)

stats: defaultdict[str, Counter] = defaultdict(Counter)
for _ts, pkt in pcap:
eth = Ethernet(pkt)
if eth.type != ETH_TYPE_IP:
continue

ip = eth.ip

if ip.p != 17:
continue

transport = ip.udp

if transport.dport != 54321 and transport.sport != 54321:
continue

data = transport.data

src_addr = str(ip_address(ip.src))
dst_addr = str(ip_address(ip.dst))

decrypted = None
for token in tokens:
try:
decrypted = Message.parse(data, token=bytes.fromhex(token))

break
except BaseException:
continue

if decrypted is None:
continue

stats["stats"]["miio_packets"] += 1

if decrypted.data.length == 0:
stats["stats"]["empty_packets"] += 1
continue

stats["dst_addr"][dst_addr] += 1
stats["src_addr"][src_addr] += 1

payload = decrypted.data.value

if "result" in payload:
stats["stats"]["results"] += 1
if "method" in payload:
method = payload["method"]
stats["commands"][method] += 1

yield src_addr, dst_addr, payload

print(stats) # noqa: T001


@app.command()
def read_file(
file: typer.FileBinaryRead, token: list[str] = typer.Option(...) # noqa: B008
):
"""Read PCAP file and output decrypted miio communication."""
for src_addr, dst_addr, payload in read_payloads_from_file(file, token):
print(f"{src_addr:<15} -> {dst_addr:<15} {payload}") # noqa: T001


if __name__ == "__main__":
app()

0 comments on commit 1d7f985

Please sign in to comment.