Skip to content

Commit

Permalink
Merge branch 'release/v0.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian A. Underwood committed Apr 3, 2023
2 parents 9eefd75 + 8f262ff commit 8c90356
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Change Log

### 0.1.2 - 2022-07- - Minor Updates
* Modify: Expand compatibility to Python 3.10

### 0.1.1 - 2021-11-28 - Minor Updates
* Modify: Changes to the build environment so PyPi would properly work.

### 0.1 - 2021-11-02 - Initial Commit

#### misc.py
Expand Down
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# iupy
Ian Underwood Python Kit
Ian Underwood Python Kit (IUPY)

## About
This package is a set of Python functions that I have built out and found useful in my travels as a network engineer. This is the base package for all iupy additional packages to follow.

This functions in this package are meant to be standalone packages and have no other package dependencies beyond a base Python installation. These functions are designed to be light on requirements.

This package has been built and tested against Python 3.9, and may work in earlier versions.
This package has been built and tested against Python 3.9, and should work down to 3.7 but I can't guarantee that.

## Function Summary

### misc.py

iupy.text_header : Returns a text header returning the time, user, and system a given output was generated on. Additional options allow for identifying a source, as well as another user if specified. A footer may also be defined.

iupy.merge_dict : Returns a combined dictionary given two dictionary inputs.

### myconfig.py

iupy.get_file_handle_ro : Returns a file handle given a name, or None if the file cannot be found or is inaccessible. Exceptions are logged to DEBUG.
Expand All @@ -22,10 +24,22 @@ iupy.get_my_config : Moves through a series of directories looking for a configu

### network.py

iupy.get_my_ip : Returns a string, the source IP given a remote IP address or destination. Returns None if there is a SocketError which prevents the source from being determined.
#### Host Information

iupy.get_my_ip : Returns a string, the source IP given a remote IP address or destination. Returns None if there is a SocketError which prevents the source from being determined. If no destination is specified, the function assumes well-known DNS server 4.2.2.1

#### ACL and Bitmask Tools

iupy.v4_bits_to_mask : Returns a string, a netmask based upon the number of bits in relation to 0.0.0.0/bits. Returns None if the mask is out of range.

ippy.v4_mask_to_bits : Returns a string, as the largest number of consecutive bits given an IP address. This is far more forgiving than the ipaddress library, which may be useful to some.

iupy.v4_wildcard : This function returns a compliant wildcard for a given IP address. This is similar to, but different than a host mask as a wildcard does not need to have a series of consecutive bits in order to be valid.
iupy.v4_wildcard : This function returns a compliant wildcard for a given IP address. This is similar to, but different than a host mask as a wildcard does not need to have a series of consecutive bits in order to be valid.

iupy.aclv4_hostmask : Takes an IPv4 host or host/mask and returns a Cisco-compatible string for use in ACL generation.

iupy.routev4_hostmask : Takes an IPv4 host or host/mask and returns a cisco-compatible string for use in generating static route statements.

#### Session Announcement Protocol

iupy.sap_segment : This function generates the segment portion of an RFC-2974 Session Announcement Protocol packet. The segment still needs to but put on the wire with a UDP socket. The keywords cover the fields the segment requires. Returns None if any of the keywords are invalid or incomplete.
5 changes: 4 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = iupy
version = 0.1.1
version = 0.2.0
author = Ian A. Underwood
author_email = ian@underwood-hq.org
description = Ian Underwood Python kit
Expand All @@ -17,8 +17,11 @@ classifiers =
Natural Language :: English
Operating System :: OS Independent
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Topic :: Utilities

[options]
Expand Down
25 changes: 25 additions & 0 deletions src/iupy/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,28 @@ def text_header(**kwargs):
header += "{}\n".format(comment)

return header


def merge_dict(ad, bd):
"""
Function that merges two dictionaries together, uses recursion to handle the nesting.
Returns an empty dictionary if any input is not a dictionary.
Inspired by:
https://stackoverflow.com/questions/43797333/how-to-merge-two-nested-dict-in-python
:param ad:
:param bd:
:return:
"""
if not isinstance(ad, dict) or not isinstance(bd, dict):
return {}

for key in bd:
if key in ad:
ad[key] = merge_dict(ad[key], bd[key])
else:
ad[key] = bd[key]

return ad
187 changes: 184 additions & 3 deletions src/iupy/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
import socket
import struct

# Default module logger

Expand All @@ -28,7 +29,7 @@ def get_my_ip(destination=None, **kwargs):
:return:
"""

logger = logging.getLogger("iupy/network/get_my_ip")
_logger = logging.getLogger("iupy/network/get_my_ip")

# Only do an IPv4 socket if specified, otherwise leverage dual-stack.

Expand All @@ -50,7 +51,7 @@ def get_my_ip(destination=None, **kwargs):
s.connect((test_ip, 1))
my_ip = ipaddress.ip_address(s.getsockname()[0])
except socket.error as error_message:
logger.debug("Socket error for {} / {}".format(test_ip, error_message))
_logger.debug("Socket error for {} / {}".format(test_ip, error_message))
my_ip = None
finally:
s.close()
Expand All @@ -67,7 +68,7 @@ def get_my_ip(destination=None, **kwargs):
if my_ip.ipv4_mapped and kwargs.get('version', 0) != 6:
my_ip = ipaddress.IPv4Address(int(ipaddress.ip_address(str(my_ip).replace('::ffff:', '::'))))

logger.debug("Source address to reach {} is {}.".format(test_ip, my_ip))
_logger.debug("Source address to reach {} is {}.".format(test_ip, my_ip))

return str(my_ip)

Expand Down Expand Up @@ -188,3 +189,183 @@ def v4_wildcard(v4_mask):
logger.debug("Wildcard for {} is {}".format(v4_mask, wildcard_mask))

return wildcard_mask


def aclv4_hostmask(host_mask):
"""
Returns a Cisco friendly text for a given host and netmask.
:param host_mask:
:return:
"""
logger = logging.getLogger("iupy/network/v4_wildcard")

logger.debug("Input: {}".format(host_mask))

ip_info = host_mask.split("/")

# Host Exceptions
try:
# Check /32 host exception
if int(ip_info[1]) == 32:
output = "host {}".format(ip_info[0])
return output
except IndexError:
# Host if no mask was provided
output = "host {}".format(ip_info[0])
return output

# Get the wildcard mask
wildcard = v4_wildcard(ip_info[1])

# Return the valid host wildcard value, otherwise return None.
if wildcard is not None:
return "{} {}".format(ip_info[0], wildcard)
else:
return None


def routev4_hostmask(host_mask):
"""
Returns a Cisco friendly text for a given host and netmask.
:param host_mask:
:return:
"""
_logger = logging.getLogger("iupy/network/routev4")

_logger.debug("Input: {}".format(host_mask))

ip_info = host_mask.split("/")

# Host Exceptions
try:
# Check /32 host exception
if int(ip_info[1]) == 32:
mask = v4_bits_to_mask(32)
return "{} {}".format(ip_info[0], mask)
except IndexError:
# Host if no mask was provided
mask = v4_bits_to_mask(32)
return "{} {}".format(ip_info[0], mask)

# Get the wildcard mask
mask = v4_wildcard(ip_info[1])

# Return the valid host wildcard value, otherwise return None.
if mask is not None:
return "{} {}".format(ip_info[0], mask)
else:
return None


def sap_segment(*, version=1, src_ip_addr=None, reserved=False, announce=True, encrypted=False, compressed=False,
auth_data=None, id_hash=None, payload_type=None, payload=None):
"""
This function returns a SAP segment based on RFC-2974.
The segment generated strictly matches the format of the packet only. it is possible to generate a
segment that is not RFC-compliant. For example, generating a SAP announcement with version 6, which
is not a valid value.
https://www.rfc-editor.org/rfc/rfc2974
"""

_logger = logging.getLogger("iupy/network/sap_segment")

# Segment Header - SAP Version
if type(version) is not int:
_logger.debug("SAP Version {} must be an integer value.".format(version))
return None
if version < 0 or version > 7:
_logger.debug("SAP Version {} is not between 0 and 7.".format(version))
return None
hdr_v = format(version, '0>3b') # Version

# Segment Header - Address Type
try:
ip_addr_object = ipaddress.ip_address(src_ip_addr)
except ValueError:
_logger.debug("{} is not a valid IP address.".format(src_ip_addr))
return None
if ip_addr_object.version == 6:
hdr_a = 1
else:
hdr_a = 0

# Segment Header - Reserved Bit
if reserved is True:
hdr_r = 1
else:
hdr_r = 0

# Segment Header - Message Type
if announce is True:
hdr_t = 0
else:
hdr_t = 1

# Segment Header - Encryption
if encrypted is True:
hdr_e = 1
else:
hdr_e = 0

# Segment Header - Compression
if compressed is True:
hdr_c = 1
else:
hdr_c = 0

# Segment Header - First Byte
hdr_bits = "{}{}{}{}{}{}".format(hdr_v, hdr_a, hdr_r, hdr_t, hdr_e, hdr_c)
hdr_byte = struct.pack('b', int(hdr_bits, 2))

# Authentication length as a number of 4-byte words.
if auth_data is not None:
if len(auth_data) % 4 != 0:
_logger.debug("Authentication data length must be a multiple of 4.")
return None
hdr_auth_length = struct.pack('b', (len(auth_data) // 4))
else:
hdr_auth_length = struct.pack('b', 0)

# Message ID Hash
if (id_hash is None) or (type(id_hash) is not int):
_logger.debug("ID Hash must be an integer between 0 and 65535")
_logger.debug("{}, type {}".format(id_hash, type(id_hash)))
return None
if id_hash < 0 or id_hash > 65535:
_logger.debug("ID hash is out of range.")
return None
hdr_id_hash = struct.pack('H', id_hash)

# Originating Source IP Address
hdr_src_addr = ip_addr_object.packed

# Start building the segment.
segment = hdr_byte + hdr_auth_length + hdr_id_hash + hdr_src_addr

# Optional - Authentication Data
if auth_data is None:
_logger.debug("No authentication data provided.")
elif type(auth_data) is not bytes:
_logger.debug("Authentication data must be of bytes type. Skipping.")
else:
segment += auth_data

# Optional - Payload Type
if len(payload_type) == 0:
_logger.debug("No payload type provided.")
else:
segment += bytes(payload_type, 'utf-8')
segment += b'\x00'

# Payload
if payload is None:
_logger.debug("Payload cannot be empty.")
return None
else:
segment += bytes(payload, 'utf-8')

return segment
57 changes: 57 additions & 0 deletions tests/test_sap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import datetime
import logging
import socket
import struct
import time

from src import iupy

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("iupy/test_sap")

myip = iupy.get_my_ip("172.30.0.141")

datetime_int = int(datetime.datetime.utcnow().timestamp())

session_info = "v=0\r\n" \
"o=- {} {} IN IP4 172.30.0.141\r\n" \
"s=4.1: WBZ-DT\r\n" \
"i=DVBlast Video Network\r\n" \
"c=IN IP4 239.255.1.1/255\r\n" \
"t=0 0\r\n" \
"a=recvonly\r\na=type:broadcast\r\n" \
"a=source-filter: incl IN * 172.30.0.141\r\n" \
"m=video 28001 udp mpeg\r\n".format(datetime_int, datetime_int)

sap_struct = iupy.sap_segment(version=1, src_ip_addr=myip, id_hash=2,
payload_type='application/sdp', payload=session_info)

if sap_struct is None:
print("Unable to build SAP Packet. Check debug Logs.")
exit()

print("SAP Structure: ", ' '.join(hex(byte)[2:].zfill(2) for byte in sap_struct))

beaconSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
beaconSocket.settimeout(0.2)
beaconSocket.setsockopt(socket.IPPROTO_IP, socket.SO_REUSEADDR, 1)
# beaconSocket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(myip))
beaconSocket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, struct.pack('B', 16))

beaconDest = ('224.2.127.254', 9875)

done = False
i = 0
while not done:
beaconSocket.sendto(sap_struct, beaconDest)

time.sleep(5)

i += 1;
if i > 6:
done = True

beaconSocket.close()

print("SAP Complete")

0 comments on commit 8c90356

Please sign in to comment.