Skip to content

Commit

Permalink
Add ipinterface type to fieldtypes.net (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
Miauwkeru authored Feb 17, 2025
1 parent 380ff1c commit 1f4e711
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 8 deletions.
9 changes: 8 additions & 1 deletion flow/record/fieldtypes/net/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from __future__ import annotations

from flow.record.fieldtypes import string
from flow.record.fieldtypes.net.ip import IPAddress, IPNetwork, ipaddress, ipnetwork
from flow.record.fieldtypes.net.ip import (
IPAddress,
IPNetwork,
ipaddress,
ipinterface,
ipnetwork,
)

__all__ = [
"IPAddress",
"IPNetwork",
"ipaddress",
"ipinterface",
"ipnetwork",
]

Expand Down
64 changes: 58 additions & 6 deletions flow/record/fieldtypes/net/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from ipaddress import (
IPv4Address,
IPv4Interface,
IPv4Network,
IPv6Address,
IPv6Interface,
IPv6Network,
ip_address,
ip_interface,
ip_network,
)
from typing import Union
Expand All @@ -15,16 +18,19 @@

_IPNetwork = Union[IPv4Network, IPv6Network]
_IPAddress = Union[IPv4Address, IPv6Address]
_IPInterface = Union[IPv4Interface, IPv6Interface]
_ConversionTypes = Union[str, int, bytes]
_IPTypes = Union[_IPNetwork, _IPAddress, _IPInterface]


class ipaddress(FieldType):
val = None
val: _IPAddress = None
_type = "net.ipaddress"

def __init__(self, addr: str | int | bytes):
def __init__(self, addr: _ConversionTypes | _IPAddress):
self.val = ip_address(addr)

def __eq__(self, b: str | int | bytes | _IPAddress) -> bool:
def __eq__(self, b: _ConversionTypes | _IPAddress) -> bool:
try:
return self.val == ip_address(b)
except ValueError:
Expand Down Expand Up @@ -53,13 +59,13 @@ def _unpack(data: int) -> ipaddress:


class ipnetwork(FieldType):
val = None
val: _IPNetwork = None
_type = "net.ipnetwork"

def __init__(self, addr: str | int | bytes):
def __init__(self, addr: _ConversionTypes | _IPNetwork):
self.val = ip_network(addr)

def __eq__(self, b: str | int | bytes | _IPNetwork) -> bool:
def __eq__(self, b: _ConversionTypes | _IPNetwork) -> bool:
try:
return self.val == ip_network(b)
except ValueError:
Expand Down Expand Up @@ -98,6 +104,52 @@ def _pack(self) -> str:
def _unpack(data: str) -> ipnetwork:
return ipnetwork(data)

@property
def netmask(self) -> ipaddress:
return ipaddress(self.val.netmask)


class ipinterface(FieldType):
val: _IPInterface = None
_type = "net.ipinterface"

def __init__(self, addr: _ConversionTypes | _IPTypes) -> None:
self.val = ip_interface(addr)

def __eq__(self, b: _ConversionTypes | _IPTypes) -> bool:
try:
return self.val == ip_interface(b)
except ValueError:
return False

def __hash__(self) -> int:
return hash(self.val)

def __str__(self) -> str:
return str(self.val)

def __repr__(self) -> str:
return f"{self._type}({str(self)!r})"

@property
def ip(self) -> ipaddress:
return ipaddress(self.val.ip)

@property
def network(self) -> ipnetwork:
return ipnetwork(self.val.network)

@property
def netmask(self) -> ipaddress:
return ipaddress(self.val.netmask)

def _pack(self) -> str:
return self.val.compressed

@staticmethod
def _unpack(data: str) -> ipinterface:
return ipinterface(data)


# alias: net.IPAddress -> net.ipaddress
# alias: net.IPNetwork -> net.ipnetwork
Expand Down
2 changes: 1 addition & 1 deletion flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def pack_obj(self, obj: Any) -> dict | str:
"sha1": obj.sha1,
"sha256": obj.sha256,
}
if isinstance(obj, (fieldtypes.net.ipaddress, fieldtypes.net.ipnetwork)):
if isinstance(obj, (fieldtypes.net.ipaddress, fieldtypes.net.ipnetwork, fieldtypes.net.ipinterface)):
return str(obj)
if isinstance(obj, bytes):
return base64.b64encode(obj).decode()
Expand Down
1 change: 1 addition & 0 deletions flow/record/whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"bytes",
"record",
"net.ipaddress",
"net.ipinterface",
"net.ipnetwork",
"net.IPAddress",
"net.IPNetwork",
Expand Down
79 changes: 79 additions & 0 deletions tests/test_fieldtype_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,85 @@ def test_record_ipnetwork() -> None:
assert "::1" not in data


def test_record_ipinterface() -> None:
TestRecord = RecordDescriptor(
"test/ipinterface",
[
("net.ipinterface", "interface"),
],
)

# ipv4
r = TestRecord("192.168.0.0/24")
assert r.interface == "192.168.0.0/24"
assert "bad.ip" not in r.interface.network
assert "192.168.0.1" in r.interface.network
assert isinstance(r.interface, net.ipinterface)
assert repr(r.interface) == "net.ipinterface('192.168.0.0/24')"
assert hash(r.interface) == hash(net.ipinterface("192.168.0.0/24"))

r = TestRecord("192.168.1.1")
assert r.interface.ip == "192.168.1.1"
assert r.interface.network == "192.168.1.1/32"
assert r.interface == "192.168.1.1/32"
assert r.interface.netmask == "255.255.255.255"

r = TestRecord("192.168.1.24/255.255.255.0")
assert r.interface == "192.168.1.24/24"
assert r.interface.ip == "192.168.1.24"
assert r.interface.network == "192.168.1.0/24"
assert r.interface.netmask == "255.255.255.0"

# ipv6 - https://en.wikipedia.org/wiki/IPv6_address
r = TestRecord("::1")
assert r.interface == "::1"
assert r.interface == "::1/128"

r = TestRecord("64:ff9b::2/96")
assert r.interface == "64:ff9b::2/96"
assert r.interface.ip == "64:ff9b::2"
assert r.interface.network == "64:ff9b::/96"
assert r.interface.netmask == "ffff:ffff:ffff:ffff:ffff:ffff::"

# instantiate from different types
assert TestRecord(1).interface == "0.0.0.1/32"
assert TestRecord(0x7F0000FF).interface == "127.0.0.255/32"
assert TestRecord(b"\x7f\xff\xff\xff").interface == "127.255.255.255/32"

# Test whether it functions in a set
data = {TestRecord(x).interface for x in ["192.168.0.0/24", "192.168.0.0/24", "::1", "::1"]}
assert len(data) == 2
assert net.ipinterface("::1") in data
assert net.ipinterface("192.168.0.0/24") in data
assert "::1" not in data


def test_record_ipinterface_types() -> None:
TestRecord = RecordDescriptor(
"test/ipinterface",
[
(
"net.ipinterface",
"interface",
)
],
)

r = TestRecord("192.168.0.255/24")
_if = r.interface
assert isinstance(_if, net.ipinterface)
assert isinstance(_if.ip, net.ipaddress)
assert isinstance(_if.network, net.ipnetwork)
assert isinstance(_if.netmask, net.ipaddress)

r = TestRecord("64:ff9b::/96")
_if = r.interface
assert isinstance(_if, net.ipinterface)
assert isinstance(_if.ip, net.ipaddress)
assert isinstance(_if.network, net.ipnetwork)
assert isinstance(_if.netmask, net.ipaddress)


@pytest.mark.parametrize("PSelector", [Selector, CompiledSelector])
def test_selector_ipaddress(PSelector: type[Selector]) -> None:
TestRecord = RecordDescriptor(
Expand Down

0 comments on commit 1f4e711

Please sign in to comment.