Skip to content

Commit

Permalink
refactor: put in-memory anonymization into a separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
Kircheneer committed May 23, 2023
1 parent 7633e36 commit 72824f8
Show file tree
Hide file tree
Showing 3 changed files with 5,203 additions and 5,235 deletions.
284 changes: 132 additions & 152 deletions netconan/anonymize_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,111 @@
_CHAR_CHOICES = string.ascii_letters + string.digits


class FileAnonymizer:
"""Class that handles anonymization of files and corresponding configuraiton."""

def __init__(
self,
anon_pwd,
anon_ip,
salt=None,
sensitive_words=None,
undo_ip_anon=False,
as_numbers=None,
reserved_words=None,
preserve_prefixes=None,
preserve_networks=None,
preserve_suffix_v4=None,
preserve_suffix_v6=None,
):
"""Creates anonymizer classes."""
self.undo_ip_anon = undo_ip_anon

self.anonymizer4 = None
self.anonymizer6 = None
self.anonymizer_as_num = None
self.anonymizer_sensitive_word = None
self.compiled_regexes = None
self.pwd_lookup = None

# The salt is only used for IP and sensitive word anonymization
self.salt = salt
if self.salt is None:
self.salt = "".join(
random.choice(_CHAR_CHOICES) for _ in range(_DEFAULT_SALT_LENGTH)
)
logging.warning(
'No salt was provided; using randomly generated "%s"', self.salt
)
logging.debug('Using salt: "%s"', self.salt)

if anon_pwd:
self.compiled_regexes = generate_default_sensitive_item_regexes()
self.pwd_lookup = {}
if reserved_words is not None:
default_reserved_words.update(reserved_words)
if sensitive_words is not None:
self.anonymizer_sensitive_word = SensitiveWordAnonymizer(
sensitive_words, self.salt
)
if anon_ip or undo_ip_anon:
self.anonymizer4 = IpAnonymizer(
self.salt,
preserve_prefixes,
preserve_networks,
preserve_suffix=preserve_suffix_v4,
)
self.anonymizer6 = IpV6Anonymizer(
self.salt, preserve_suffix=preserve_suffix_v6
)
if as_numbers is not None:
self.anonymizer_as_num = AsNumberAnonymizer(as_numbers, self.salt)

def anonymize_file(self, in_file, out_file):
"""Anonymize a single file."""
if os.path.isdir(out_file):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(out_file)
)
with open(in_file, "r") as in_io, open(out_file, "w") as out_io:
self.anonymize_io(in_io, out_io)

def anonymize_io(self, in_io, out_io):
"""Reads from the in_io buffer, writing anonymized configuration into the out_io buffer.
Both in_io and out_io can either be
- an actual file (`io.TextIOWrapper` as returned by 'open')
- in memory (`io.StringIO`)
"""
for line in in_io.readlines():
output_line = line
if self.compiled_regexes is not None and self.pwd_lookup is not None:
output_line = replace_matching_item(
self.compiled_regexes, output_line, self.pwd_lookup
)

if self.anonymizer6 is not None:
output_line = anonymize_ip_addr(
self.anonymizer6, output_line, self.undo_ip_anon
)
if self.anonymizer4 is not None:
output_line = anonymize_ip_addr(
self.anonymizer4, output_line, self.undo_ip_anon
)

if self.anonymizer_sensitive_word is not None:
output_line = self.anonymizer_sensitive_word.anonymize(output_line)

if self.anonymizer_as_num is not None:
output_line = anonymize_as_numbers(self.anonymizer_as_num, output_line)

if line != output_line:
logging.debug("Input line: %s", line.rstrip())
logging.debug("Output line: %s", output_line.rstrip())
out_io.write(output_line)


def anonymize_files(
input_path,
output_path,
Expand All @@ -52,21 +157,6 @@ def anonymize_files(
preserve_suffix_v6=None,
):
"""Anonymize each file in input and save to output."""
anonymizer_configuration = {
"anon_ip": anon_ip,
"anon_pwd": anon_pwd,
"as_numbers": as_numbers,
"preserve_networks": preserve_networks,
"preserve_prefixes": preserve_prefixes,
"preserve_suffix_v4": preserve_suffix_v4,
"preserve_suffix_v6": preserve_suffix_v6,
"reserved_words": reserved_words,
"salt": salt,
"sensitive_words": sensitive_words,
"undo_ip_anon": undo_ip_anon
}
anonymizers = build_anonymizers(anonymizer_configuration)

if not os.path.exists(input_path):
raise ValueError("Input does not exist")

Expand Down Expand Up @@ -95,151 +185,41 @@ def anonymize_files(
]
)

file_anonymizer = FileAnonymizer(
anon_ip=anon_ip,
anon_pwd=anon_pwd,
as_numbers=as_numbers,
preserve_networks=preserve_networks,
preserve_prefixes=preserve_prefixes,
preserve_suffix_v4=preserve_suffix_v4,
preserve_suffix_v6=preserve_suffix_v6,
reserved_words=reserved_words,
salt=salt,
sensitive_words=sensitive_words,
undo_ip_anon=undo_ip_anon,
)

for in_path, out_path in file_list:
logging.debug("File in %s", in_path)
logging.debug("File out %s", out_path)
try:
anonymize_file(
in_path,
out_path,
compiled_regexes=anonymizers["compiled_regexes"],
pwd_lookup=anonymizers["pwd_lookup"],
anonymizer_sensitive_word=anonymizers["anonymizer_sensitive_word"],
anonymizer_as_num=anonymizers["anonymizer_as_num"],
undo_ip_anon=undo_ip_anon,
anonymizer4=anonymizers["anonymizer4"],
anonymizer6=anonymizers["anonymizer6"],
)
# Make parent dirs for output file if they don't exist
_mkdirs(out_path)
if os.path.isdir(out_path):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(out_path)
)
with open(in_path, "r") as f_in, open(out_path, "w") as f_out:
file_anonymizer.anonymize_io(f_in, f_out)
except Exception:
logging.error("Failed to anonymize file %s", in_path, exc_info=True)

if dumpfile is not None:
with open(dumpfile, "w") as f_out:
anonymizers["anonymizer4"].dump_to_file(f_out)
anonymizers["anonymizer6"].dump_to_file(f_out)


def build_anonymizers(settings):
"""Build anonymizer class from settings."""
anonymizer4 = None
anonymizer6 = None
anonymizer_as_num = None
anonymizer_sensitive_word = None
compiled_regexes = None
pwd_lookup = None
# The salt is only used for IP and sensitive word anonymization:
salt = settings["salt"]
if salt is None:
salt = "".join(
random.choice(_CHAR_CHOICES) for _ in range(_DEFAULT_SALT_LENGTH)
)
logging.warning('No salt was provided; using randomly generated "%s"', salt)
logging.debug('Using salt: "%s"', salt)
if settings["anon_pwd"]:
compiled_regexes = generate_default_sensitive_item_regexes()
pwd_lookup = {}
if settings["reserved_words"] is not None:
default_reserved_words.update(settings["reserved_words"])
if settings["sensitive_words"] is not None:
anonymizer_sensitive_word = SensitiveWordAnonymizer(settings["sensitive_words"], salt)
if settings["anon_ip"] or settings["undo_ip_anon"]:
anonymizer4 = IpAnonymizer(
salt,
settings["preserve_prefixes"],
settings["preserve_networks"],
preserve_suffix=settings["preserve_suffix_v4"],
)
anonymizer6 = IpV6Anonymizer(salt, preserve_suffix=settings["preserve_suffix_v6"])
if settings["as_numbers"] is not None:
anonymizer_as_num = AsNumberAnonymizer(settings["as_numbers"], salt)
return {
"anonymizer4": anonymizer4,
"anonymizer6": anonymizer6,
"anonymizer_as_num": anonymizer_as_num,
"anonymizer_sensitive_word": anonymizer_sensitive_word,
"compiled_regexes": compiled_regexes,
"pwd_lookup": pwd_lookup
}


def anonymize_file(
filename_in,
filename_out,
compiled_regexes=None,
anonymizer4=None,
anonymizer6=None,
pwd_lookup=None,
anonymizer_sensitive_word=None,
anonymizer_as_num=None,
undo_ip_anon=False,
):
"""Anonymize contents of input file and save to the output file.
This only applies sensitive line removal if compiled_regexes and pwd_lookup
are not None. This only applies ip anonymization if anonymizer is not None.
"""
logging.debug("File in %s", filename_in)
logging.debug("File out %s", filename_out)

# Make parent dirs for output file if they don't exist
_mkdirs(filename_out)

if os.path.isdir(filename_out):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(filename_out)
)

with open(filename_in, "r") as f_in:
input_configuration = f_in.readlines()

anonymized_configuration = anonymize_configuration(
input_configuration,
compiled_regexes=compiled_regexes,
anonymizer4=anonymizer4,
anonymizer6=anonymizer6,
pwd_lookup=pwd_lookup,
anonymizer_sensitive_word=anonymizer_sensitive_word,
anonymizer_as_num=anonymizer_as_num,
undo_ip_anon=undo_ip_anon,
)

with open(filename_out, "w") as f_out:
f_out.writelines(anonymized_configuration)


def anonymize_configuration(
input_configuration,
compiled_regexes=None,
anonymizer4=None,
anonymizer6=None,
pwd_lookup=None,
anonymizer_sensitive_word=None,
anonymizer_as_num=None,
undo_ip_anon=False,
):
anonymized_configuration = []
for line in input_configuration:
output_line = line
if compiled_regexes is not None and pwd_lookup is not None:
output_line = replace_matching_item(
compiled_regexes, output_line, pwd_lookup
)

if anonymizer6 is not None:
output_line = anonymize_ip_addr(anonymizer6, output_line, undo_ip_anon)
if anonymizer4 is not None:
output_line = anonymize_ip_addr(anonymizer4, output_line, undo_ip_anon)

if anonymizer_sensitive_word is not None:
output_line = anonymizer_sensitive_word.anonymize(output_line)

if anonymizer_as_num is not None:
output_line = anonymize_as_numbers(anonymizer_as_num, output_line)
file_anonymizer.anonymizer4.dump_to_file(f_out)
file_anonymizer.anonymizer6.dump_to_file(f_out)

if line != output_line:
logging.debug("Input line: %s", line.rstrip())
logging.debug("Output line: %s", output_line.rstrip())
anonymized_configuration.append(output_line)
return anonymized_configuration

def _mkdirs(file_path):
"""Make parent directories for the specified file if they don't exist."""
Expand Down
Loading

0 comments on commit 72824f8

Please sign in to comment.