Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for anonymization of in-memory configurations #186

Merged
merged 5 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 132 additions & 98 deletions netconan/anonymize_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,111 @@
_CHAR_CHOICES = string.ascii_letters + string.digits


class FileAnonymizer:
"""Class that handles anonymization of files and corresponding configuraiton."""

def __init__(
self,
anon_pwd,
anon_ip,
salt=None,
sensitive_words=None,
undo_ip_anon=False,
as_numbers=None,
reserved_words=None,
preserve_prefixes=None,
preserve_networks=None,
preserve_suffix_v4=None,
preserve_suffix_v6=None,
):
"""Creates anonymizer classes."""
self.undo_ip_anon = undo_ip_anon

self.anonymizer4 = None
self.anonymizer6 = None
self.anonymizer_as_num = None
self.anonymizer_sensitive_word = None
self.compiled_regexes = None
self.pwd_lookup = None

# The salt is only used for IP and sensitive word anonymization
self.salt = salt
if self.salt is None:
self.salt = "".join(
random.choice(_CHAR_CHOICES) for _ in range(_DEFAULT_SALT_LENGTH)
)
logging.warning(
'No salt was provided; using randomly generated "%s"', self.salt
)
logging.debug('Using salt: "%s"', self.salt)

if anon_pwd:
self.compiled_regexes = generate_default_sensitive_item_regexes()
self.pwd_lookup = {}
if reserved_words is not None:
default_reserved_words.update(reserved_words)
if sensitive_words is not None:
self.anonymizer_sensitive_word = SensitiveWordAnonymizer(
sensitive_words, self.salt
)
if anon_ip or undo_ip_anon:
self.anonymizer4 = IpAnonymizer(
self.salt,
preserve_prefixes,
preserve_networks,
preserve_suffix=preserve_suffix_v4,
)
self.anonymizer6 = IpV6Anonymizer(
self.salt, preserve_suffix=preserve_suffix_v6
)
if as_numbers is not None:
self.anonymizer_as_num = AsNumberAnonymizer(as_numbers, self.salt)

def anonymize_file(self, in_file, out_file):
"""Anonymize a single file."""
if os.path.isdir(out_file):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(out_file)
)
with open(in_file, "r") as in_io, open(out_file, "w") as out_io:
self.anonymize_io(in_io, out_io)

def anonymize_io(self, in_io, out_io):
"""Reads from the in_io buffer, writing anonymized configuration into the out_io buffer.

Both in_io and out_io can either be
- an actual file (`io.TextIOWrapper` as returned by 'open')
- in memory (`io.StringIO`)
"""
for line in in_io.readlines():
output_line = line
if self.compiled_regexes is not None and self.pwd_lookup is not None:
output_line = replace_matching_item(
self.compiled_regexes, output_line, self.pwd_lookup
)

if self.anonymizer6 is not None:
output_line = anonymize_ip_addr(
self.anonymizer6, output_line, self.undo_ip_anon
)
if self.anonymizer4 is not None:
output_line = anonymize_ip_addr(
self.anonymizer4, output_line, self.undo_ip_anon
)

if self.anonymizer_sensitive_word is not None:
output_line = self.anonymizer_sensitive_word.anonymize(output_line)

if self.anonymizer_as_num is not None:
output_line = anonymize_as_numbers(self.anonymizer_as_num, output_line)

if line != output_line:
logging.debug("Input line: %s", line.rstrip())
logging.debug("Output line: %s", output_line.rstrip())
out_io.write(output_line)


def anonymize_files(
input_path,
output_path,
Expand All @@ -52,38 +157,6 @@ def anonymize_files(
preserve_suffix_v6=None,
):
"""Anonymize each file in input and save to output."""
anonymizer4 = None
anonymizer6 = None
anonymizer_as_num = None
anonymizer_sensitive_word = None
compiled_regexes = None
pwd_lookup = None
# The salt is only used for IP and sensitive word anonymization:
if salt is None:
salt = "".join(
random.choice(_CHAR_CHOICES) for _ in range(_DEFAULT_SALT_LENGTH)
)
logging.warning('No salt was provided; using randomly generated "%s"', salt)
logging.debug('Using salt: "%s"', salt)
if anon_pwd:
compiled_regexes = generate_default_sensitive_item_regexes()
pwd_lookup = {}
if reserved_words is not None:
default_reserved_words.update(reserved_words)

if sensitive_words is not None:
anonymizer_sensitive_word = SensitiveWordAnonymizer(sensitive_words, salt)
if anon_ip or undo_ip_anon:
anonymizer4 = IpAnonymizer(
salt,
preserve_prefixes,
preserve_networks,
preserve_suffix=preserve_suffix_v4,
)
anonymizer6 = IpV6Anonymizer(salt, preserve_suffix=preserve_suffix_v6)
if as_numbers is not None:
anonymizer_as_num = AsNumberAnonymizer(as_numbers, salt)

if not os.path.exists(input_path):
raise ValueError("Input does not exist")

Expand Down Expand Up @@ -112,79 +185,40 @@ def anonymize_files(
]
)

file_anonymizer = FileAnonymizer(
anon_ip=anon_ip,
anon_pwd=anon_pwd,
as_numbers=as_numbers,
preserve_networks=preserve_networks,
preserve_prefixes=preserve_prefixes,
preserve_suffix_v4=preserve_suffix_v4,
preserve_suffix_v6=preserve_suffix_v6,
reserved_words=reserved_words,
salt=salt,
sensitive_words=sensitive_words,
undo_ip_anon=undo_ip_anon,
)

for in_path, out_path in file_list:
logging.debug("File in %s", in_path)
logging.debug("File out %s", out_path)
try:
anonymize_file(
in_path,
out_path,
compiled_regexes=compiled_regexes,
pwd_lookup=pwd_lookup,
anonymizer_sensitive_word=anonymizer_sensitive_word,
anonymizer_as_num=anonymizer_as_num,
undo_ip_anon=undo_ip_anon,
anonymizer4=anonymizer4,
anonymizer6=anonymizer6,
)
# Make parent dirs for output file if they don't exist
_mkdirs(out_path)
if os.path.isdir(out_path):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(out_path)
)
with open(in_path, "r") as f_in, open(out_path, "w") as f_out:
file_anonymizer.anonymize_io(f_in, f_out)
except Exception:
logging.error("Failed to anonymize file %s", in_path, exc_info=True)

if dumpfile is not None:
with open(dumpfile, "w") as f_out:
anonymizer4.dump_to_file(f_out)
anonymizer6.dump_to_file(f_out)


def anonymize_file(
filename_in,
filename_out,
compiled_regexes=None,
anonymizer4=None,
anonymizer6=None,
pwd_lookup=None,
anonymizer_sensitive_word=None,
anonymizer_as_num=None,
undo_ip_anon=False,
):
"""Anonymize contents of input file and save to the output file.

This only applies sensitive line removal if compiled_regexes and pwd_lookup
are not None. This only applies ip anonymization if anonymizer is not None.
"""
logging.debug("File in %s", filename_in)
logging.debug("File out %s", filename_out)

# Make parent dirs for output file if they don't exist
_mkdirs(filename_out)

if os.path.isdir(filename_out):
raise ValueError(
"Cannot write output file; "
"output file is a directory ({})".format(filename_out)
)

with open(filename_out, "w") as f_out, open(filename_in, "r") as f_in:
for line in f_in:
output_line = line
if compiled_regexes is not None and pwd_lookup is not None:
output_line = replace_matching_item(
compiled_regexes, output_line, pwd_lookup
)

if anonymizer6 is not None:
output_line = anonymize_ip_addr(anonymizer6, output_line, undo_ip_anon)
if anonymizer4 is not None:
output_line = anonymize_ip_addr(anonymizer4, output_line, undo_ip_anon)

if anonymizer_sensitive_word is not None:
output_line = anonymizer_sensitive_word.anonymize(output_line)

if anonymizer_as_num is not None:
output_line = anonymize_as_numbers(anonymizer_as_num, output_line)

if line != output_line:
logging.debug("Input line: %s", line.rstrip())
logging.debug("Output line: %s", output_line.rstrip())
f_out.write(output_line)
file_anonymizer.anonymizer4.dump_to_file(f_out)
file_anonymizer.anonymizer6.dump_to_file(f_out)


def _mkdirs(file_path):
Expand Down
Loading