-
Notifications
You must be signed in to change notification settings - Fork 15
/
all_my_enis.py
executable file
·186 lines (156 loc) · 6.92 KB
/
all_my_enis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
import sys
import Inventory_Modules
from Inventory_Modules import display_results, get_all_credentials
from ArgumentsClass import CommonArguments
from datetime import datetime
from colorama import init, Fore
from botocore.exceptions import ClientError
from queue import Queue
from threading import Thread
from time import time
import logging
init()
__version__ = '2024.05.07'
def parse_args(f_args):
"""
Description: Parses the arguments passed into the script
@param f_args: args represents the list of arguments passed in
@return: returns an object namespace that contains the individualized parameters passed in
"""
parser = CommonArguments()
parser.multiprofile()
parser.multiregion()
parser.extendedargs()
parser.rootOnly()
parser.timing()
parser.save_to_file()
parser.verbosity()
parser.version(__version__)
parser.my_parser.add_argument(
"--ipaddress", "--ip",
dest="pipaddresses",
nargs="*",
metavar="IP address",
default=None,
help="IP address(es) you're looking for within your accounts")
return parser.my_parser.parse_args(f_args)
def check_accounts_for_enis(fCredentialList, fip=None):
"""
Note that this function takes a list of Credentials and checks for ENIs in every account and region it has creds for
"""
class FindENIs(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# Get the work from the queue and expand the tuple
c_account_credentials, c_region, c_fip, c_PlacesToLook, c_PlaceCount = self.queue.get()
logging.info(f"De-queued info for account {c_account_credentials['AccountId']}")
try:
logging.info(f"Attempting to connect to {c_account_credentials['AccountId']}")
account_enis = Inventory_Modules.find_account_enis2(c_account_credentials, c_region, c_fip)
logging.info(f"Successfully connected to account {c_account_credentials['AccountId']} in region {c_region}")
for eni in account_enis:
eni['MgmtAccount'] = c_account_credentials['MgmtAccount']
Results.extend(account_enis)
except KeyError as my_Error:
logging.error(f"Account Access failed - trying to access {c_account_credentials['AccountId']}")
logging.info(f"Actual Error: {my_Error}")
pass
except AttributeError as my_Error:
logging.error(f"Error: Likely that one of the supplied profiles {pProfiles} was wrong")
logging.warning(my_Error)
continue
finally:
print(f"{ERASE_LINE}Finished finding ENIs in account {c_account_credentials['AccountId']} in region {c_region} - {c_PlaceCount} / {c_PlacesToLook}", end='\r')
self.queue.task_done()
checkqueue = Queue()
Results = []
PlaceCount = 0
PlacesToLook = WorkerThreads = min(len(fCredentialList), 50)
for x in range(WorkerThreads):
worker = FindENIs(checkqueue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
for credential in fCredentialList:
logging.info(f"Connecting to account {credential['AccountId']} in region {credential['Region']}")
try:
# print(f"{ERASE_LINE}Queuing account {credential['AccountId']} in region {region}", end='\r')
checkqueue.put((credential, credential['Region'], fip, PlacesToLook, PlaceCount))
PlaceCount += 1
except ClientError as my_Error:
if "AuthFailure" in str(my_Error):
logging.error(f"Authorization Failure accessing account {credential['AccountId']} in {credential['Region']} region")
logging.warning(f"It's possible that the region {credential['Region']} hasn't been opted-into")
pass
checkqueue.join()
return Results
def present_results(ENIsFound:list):
"""
Description: Presents results at the end of the script
@param ENIsFound: The list of records to show...
"""
display_dict = {'MgmtAccount' : {'DisplayOrder': 1, 'Heading': 'Mgmt Acct'},
'AccountId' : {'DisplayOrder': 2, 'Heading': 'Acct Number'},
'Region' : {'DisplayOrder': 3, 'Heading': 'Region'},
'PrivateDnsName' : {'DisplayOrder': 4, 'Heading': 'ENI Name'},
'Status' : {'DisplayOrder': 5, 'Heading': 'Status', 'Condition': ['available', 'attaching', 'detaching']},
'PublicIp' : {'DisplayOrder': 6, 'Heading': 'Public IP Address'},
'ENIId' : {'DisplayOrder': 7, 'Heading': 'ENI Id'},
'PrivateIpAddress': {'DisplayOrder': 8, 'Heading': 'Assoc. IP'}}
sorted_ENIs_Found = sorted(ENIsFound, key=lambda d: (d['MgmtAccount'], d['AccountId'], d['Region'], d['VpcId']))
display_results(sorted_ENIs_Found, display_dict, 'None', pFilename)
DetachedENIs = [x for x in sorted_ENIs_Found if x['Status'] in ['available', 'attaching', 'detaching']]
RegionList = list(set([x['Region'] for x in CredentialList]))
AccountList = list(set([x['AccountId'] for x in CredentialList]))
print()
print(f"These accounts were skipped - as requested: {pSkipAccounts}") if pSkipAccounts is not None else ""
print(f"These profiles were skipped - as requested: {pSkipProfiles}") if pSkipProfiles is not None else ""
print()
print(f"Your output will be saved to {Fore.GREEN}'{pFilename}-{datetime.now().strftime('%y-%m-%d--%H:%M:%S')}'{Fore.RESET}") if pFilename is not None else ""
print(f"Found {len(ENIsFound)} ENIs across {len(AccountList)} accounts across {len(RegionList)} regions")
print(f"{Fore.RED}Found {len(DetachedENIs)} ENIs that are not listed as 'in-use' and may therefore be costing you additional money while they're unused.{Fore.RESET}")
print()
if verbose < 40:
for x in DetachedENIs:
print(x)
##################
ERASE_LINE = '\x1b[2K'
if __name__ == '__main__':
args = parse_args(sys.argv[1:])
pProfiles = args.Profiles
pRegionList = args.Regions
pSkipAccounts = args.SkipAccounts
pSkipProfiles = args.SkipProfiles
pAccounts = args.Accounts
pRootOnly = args.RootOnly
pIPaddressList = args.pipaddresses
pFilename = args.Filename
pTiming = args.Time
verbose = args.loglevel
# Setup logging levels
logging.basicConfig(level=verbose, format="[%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s")
logging.getLogger("boto3").setLevel(logging.CRITICAL)
logging.getLogger("botocore").setLevel(logging.CRITICAL)
logging.getLogger("s3transfer").setLevel(logging.CRITICAL)
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
begin_time = time()
print()
print(f"Checking for Elastic Network Interfaces... ")
print()
logging.info(f"Profiles: {pProfiles}")
# Get credentials for all relevant children accounts
CredentialList = get_all_credentials(pProfiles, pTiming, pSkipProfiles, pSkipAccounts, pRootOnly, pAccounts, pRegionList)
# Find ENIs across all children accounts
ENIsFound = check_accounts_for_enis(CredentialList, fip=pIPaddressList)
# Display results
present_results(ENIsFound)
if pTiming:
print(ERASE_LINE)
print(f"{Fore.GREEN}This script took {time() - begin_time:.2f} seconds{Fore.RESET}")
print()
print("Thank you for using this script")
print()