From 8e64813cd7608225d4801c2ebfa87dafa3cfd4ff Mon Sep 17 00:00:00 2001 From: Gal Date: Thu, 23 Dec 2021 15:13:59 +0200 Subject: [PATCH] Initial commit --- .gitignore | 79 +++++++++++++ README.md | 66 +++++++++++ application.py | 128 +++++++++++++++++++++ cherry-picking-algorithm/README.md | 30 +++++ cherry-picking-algorithm/cherry_pick.py | 73 ++++++++++++ requirements.txt | 14 +++ src/iomanager.py | 127 ++++++++++++++++++++ src/urlscanner.py | 147 ++++++++++++++++++++++++ src/util.py | 58 ++++++++++ 9 files changed, 722 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 application.py create mode 100644 cherry-picking-algorithm/README.md create mode 100644 cherry-picking-algorithm/cherry_pick.py create mode 100644 requirements.txt create mode 100644 src/iomanager.py create mode 100644 src/urlscanner.py create mode 100644 src/util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d9fc98 --- /dev/null +++ b/.gitignore @@ -0,0 +1,79 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# PyBuilder +.pybuilder/ +target/ + +# IPython +profile_default/ +ipython_config.py + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Cython debug symbols +cython_debug/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5bd9e3e --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +# UrlScanner + +> Lightweight Python CLI utility which makes use [URLScan.io](https://urlscan.io/) APIs to automate scanning and retrieving information about URLs + +[URLScan.io](https://urlscan.io/) is a useful tool for scanning and obtaining information from potentially malicious websites. URLScan provide a useful [API](https://urlscan.io/docs/api/) which can be used to add some automation to your workflow. + +## Install & Setup + +1. Clone this repository using `git clone https://github.com/birkagal/urlscanner` + +2. Consider adding [virtual environments](https://docs.python.org/3/library/venv.html) and install dependencies using `pip install -r requirements.txt` + +3. All set. Use `python application.py --help` to see the man page. + +## How to use + +![enter image description here](https://i.ibb.co/cknWf30/Capture.png) + +UrlScanner support two main modes: Analyze multiple URLs from a input file or Interactive one by one analyze. You can always check `python application.py -h` for more help. + +### API Key + +[URLScan.io](https://urlscan.io/) make use of personal API KEY to access the API features. You need to sign up an account at [URLScan.io](https://urlscan.io/) and save your personal API KEY at `.env` file with the key being `API_KEY` and the value of your actual key. You can also set an environment variable called `API_KEY`. + +### Logging Level + +The `-v` flag determines how verbose the logging would be. There are three possible values: 0 (critical), 1 (info), and 2 (debug). The default value is set to 0 when no verbose flag is present. If a flag is added with no value specified, it is set to 2. Otherwise, it will simply use the value specified. + +### Batch Analysis + +You can use the `-b` flag alongside a specified filename containing URLScan.io query in each line. The query should be in JSON format and contain a `url` key and a `visibility` key. The output would be a CSV file containing searched url, screenshot url, maliciousness score given by the api and link to the full online report. +If your `urls.txt` is inside `input` directory, you can use this command to execute: + + python application.py -b /input/urls.txt + +### Interactive Analysis + +If no flag is provided the utility would ask you to enter manually a `URL` and a `visibility` parameter to scan. It will then use URLScan.io API to scan the URL and present you with the result. To run this mode simply run the application without specifying any other mode. (You can still use flags) + + python application.py -v + +### Display User Quotas + +URLScan.io API has a Rate limit mechanism to limit the use of the API. There are different quotas per day, hour and minute for each action. You can use the `-q` flag to list the user remaining quotas for each action. + + python application.py -q + +## TODO + +In this section I would list my thought for the future, the features I didn't had the time to implement. + +- [ ] Display the output in a HTML file, using templating to render a single output page with all the information in a visual way. +- [ ] All the history of queries and results are stored in a database. Make use of the result table to give an option to search for past result from the utility. + +## Implementation + +UrlScanner has two main objects, UrlScanner and IOManager. + +- UrlScanner is responsible for communicating with the URLScan.io API service. It holds the logic for submission requests, retrieval requests, parsing the information, quotas Rate Limiter. It follows URLScan.io implemantation advises such as respect 429 code (too many requests) and wait before polling for results. HTTP requests are sent using python requests module. +- IOManager is responsible for Input/Output and database logic. This tool use python's sqlite3 module to manage an SQLite database. The database has 2 tables: queries and results. The query table is used to make sure each url is only sent once even if it was already sent in the past. The result table is currently just storing the data and it will maybe be helpful in the future. The IOManager also read the input from a file (if working in batch mode), validate the query and add it to the work queue. + +The main feature of this tool is the batch analysis. This tool use python Queue module and its threading capabilities to manage a work queue that all the threads can access and get work from. Once the work is done and the queue is empty, the IOManager print the result to a designated csv file. + +The application also uses python logging module to create different logging levels that the user can chose from, each level show different amount of information. + +The argparse module is used to manage user arguments and flag, and display the man page when running `-h` flag diff --git a/application.py b/application.py new file mode 100644 index 0000000..e2219ef --- /dev/null +++ b/application.py @@ -0,0 +1,128 @@ +import os +import logging +from dotenv import load_dotenv +from queue import Queue +from threading import Thread +from src.iomanager import IOManager +from src.urlscanner import UrlScanner +from src.util import create_arg_parser, convert_int_to_logging_level + +NUM_THREADS = 10 + + +def main(): + # Create argparse menu and get program args + parser = create_arg_parser() + args = parser.parse_args() + log_level = convert_int_to_logging_level(args.verbose) + + # Create logging configuration + logging.basicConfig( + format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%H:%M:%S" + ) + + # Load the enviorment variable and instantiate the scanner and IO objects + load_dotenv() + scanner = UrlScanner(os.getenv("API_KEY"), log_level) + io = IOManager(log_level) + + # Run program in user specified mode + if args.batch_investigate: + batch_investigate(scanner, io, args.batch_investigate) + elif args.quotas: + show_user_quotas(scanner) + else: + interactive_query(scanner) + print("Exiting...") + + +def interactive_query(scanner: UrlScanner) -> None: + print("Welcome to UrlScanner interactive cli tool.\n") + url = input("Please enter the requested URL: ") + while True: # Loop until user provide valid visibility parameter + visibility = input( + "Please enter the requested visibility [public, private, unlisted]: " + ).lower() + if visibility not in ["public", "private", "unlisted"]: + print("Please enter either public, private or unlisted.") + continue + break + print("Fetching results...") + report = scanner.generate_report(url, visibility) + if report == {}: + print(f"Couldn't analyze {url}") + else: + print( + f""" + FINISHED! + Results: + Scanned URL: {report['url']} + Screenshot URL: {report['screenshotURL']} + isMalicious: {report['isMalicious']} + Maliciousness: {report['maliciousness']} + Report URL: {report['report_url']}\n + """ + ) + + +def show_user_quotas(scanner: UrlScanner) -> None: + print( + f""" + Public visibility: + Day: {scanner.quotas['public']['day']['remaining']} remaining. + Hour: {scanner.quotas['public']['hour']['remaining']} remaining. + Minute: {scanner.quotas['public']['minute']['remaining']} remaining. + + Unlisted visibility: + Day: {scanner.quotas['unlisted']['day']['remaining']} remaining. + Hour: {scanner.quotas['unlisted']['hour']['remaining']} remaining. + Minute: {scanner.quotas['unlisted']['minute']['remaining']} remaining. + + Private visibility: + Day: {scanner.quotas['private']['day']['remaining']} remaining. + Hour: {scanner.quotas['private']['hour']['remaining']} remaining. + Minute: {scanner.quotas['private']['minute']['remaining']} remaining. + + Result Retrieve: + Day: {scanner.quotas['retrieve']['day']['remaining']} remaining. + Hour: {scanner.quotas['retrieve']['hour']['remaining']} remaining. + Minute: {scanner.quotas['retrieve']['minute']['remaining']} remaining.\n + """ + ) + + +def batch_investigate(scanner: UrlScanner, io: IOManager, input_file: str) -> None: + reports = [] # List of all queries results + q = Queue() # Queue that will manage the work + + # Instantiate NUM_THREADS threads and send them to worker function where they will wait for work + for _ in range(NUM_THREADS): + Thread( + target=worker, + args=( + scanner, + q, + reports, + ), + daemon=True, + ).start() + + # Read queries from the given file and add each valid query to the queue + success = io.add_queries_to_queue_from_file(scanner, q, input_file) + if success: + # Save the results to a csv file + io.save_csv(reports) + + +def worker(scanner, q, reports): + report = {} # The result of a given query + while True: + query = q.get() # Get the next query + # Use API to scan and retrieve result for that query. + report = scanner.generate_report(query["url"], query["visibility"]) + reports.append(report) + q.task_done() # Mark that query as finished + + +if __name__ == "__main__": + main() diff --git a/cherry-picking-algorithm/README.md b/cherry-picking-algorithm/README.md new file mode 100644 index 0000000..819876a --- /dev/null +++ b/cherry-picking-algorithm/README.md @@ -0,0 +1,30 @@ +# Alerts Cherry-Picking Algorithm + +An “alert” is an object with various keys and values. Each alert has the following keys: Alert ID, Type, Subtype, and Title (in reality, there are more, but for the sake of the exercise, we are only using those). Some alerts are more important than others. Each alert based on its keys and identifiers found in the title can be ranked from 1 - highest priority to 6 - lowest. + +This algorithm is implemented in the `cherry_pick` function. +The function gets a list of `alerts` and a `num_of_results` which is default to 4 and determines the amount of alerts to pick. + +The return value is a list of size `num_of_results`with each element containing a string representing the alert id attribute `_id` of the most prioritize alert in the alerts list. + +## Implementation + +The algorithm's first step is to check whether the given `num_of_results`is greater than the amount of alerts in the list. If it does it just return the `id`of all alerts present in that list. + +Next, the algorithm creates an empty `prioritised` list which would hold the values of the most prioritised alerts. It sets the first `num_of_result`element as the first highest priority alerts and keep track of the `worst_priority` value. + +The main loop iterates over the remaining alerts in the list. For each alert, it compares it priority value with the `worst_priority`in the `prioritised`list. If the current alert value is less than the `worst_priority` (Note that value of `1` is the highest priority) we find the index or the alert with `worst_priority`value in the `prioritised`list, switch that alert with the new alert and update `worst_priority`based on the now new `prioritised`list. + +After the loop ends, the `prioritised`list now contains `num_of_results`alerts which are heighest priority alerts from the original list. +All left is to extract the id of those alerts and return a list of those ids. + +## Complexity + +Let's analyze the complexity of the algorithm. The input is a list with `n` alerts, as well as a variable `k`which is the number of results to choose. The default value for `k`is 4. +The first part of the algorithm populates the `prioritised`list with the first `k` elements of the input. It also validates the input and keep track of the `worst_priority`variable. All in all, that loop run in a `O(k)` time. +The second part is the main loop, that iterate over the remaining list and compare each alert with the `prioritised`list to see if a switch is required. +Since `prioritised`has a fixed size which is `k`, the inner loop would take `O(k)`, where the outter loop is running through the list which means the complexity would take `O(n*k)`. + +Last, we iterate over `prioritised`once more to extract the ids of the alerts in it, which is once again take `O(k)` time. + +All in all, the algorithms complexity is `O(k + n*k + k) => O(n*k)` and if we assume`k=4` we get `O(n)`. diff --git a/cherry-picking-algorithm/cherry_pick.py b/cherry-picking-algorithm/cherry_pick.py new file mode 100644 index 0000000..fb32ce9 --- /dev/null +++ b/cherry-picking-algorithm/cherry_pick.py @@ -0,0 +1,73 @@ +def cherry_pick(alerts: list, num_of_results: int = 4) -> list: + + # If num_of_results are greater than list size, then no need to filter just return full list + if len(alerts) <= num_of_results: + return [alert["_id"] for alert in alerts] + + # Set the first num_of_results elements as first highest priority and maintain worst_priority + prioritised = [] + worst_priority = 1 + for index in range(num_of_results): + if not validate_alert(alerts[index]): + continue + prioritised.append(alerts[index]) + priority = convert_alert_to_priority( + alerts[index]["Details"]["Type"], alerts[index]["Details"]["SubType"] + ) + worst_priority = priority if priority > worst_priority else worst_priority + + # Iterate over the remaining alerts + for index in range(num_of_results, len(alerts)): + if not validate_alert(alerts[index]): + continue + priority = convert_alert_to_priority( + alerts[index]["Details"]["Type"], alerts[index]["Details"]["SubType"] + ) + # If current alert's priority is better than worst one, replace the worst alert with current + if priority < worst_priority: + for prioritised_alert_index in range(len(prioritised)): + if worst_priority == convert_alert_to_priority( + prioritised[prioritised_alert_index]["Details"]["Type"], + prioritised[prioritised_alert_index]["Details"]["SubType"], + ): + prioritised[prioritised_alert_index] = alerts[index] + break + # Update worst for current prioritised + worst_priority = get_worst_priority(prioritised) + + return [alert["_id"] for alert in prioritised] + + +def validate_alert(alert: dict) -> bool: + # Make sure the alert has _id, Details, Type and SubType attributes + if not all(key in alert for key in ("_id", "Details")): + return False + if not all(key in alert["Details"] for key in ("Type", "SubType")): + return False + return True + + +def get_worst_priority(alerts: list) -> int: + # Find the worst priority value in the alerts list + worst_priority = 1 + for alert in alerts: + priority = convert_alert_to_priority( + alert["Details"]["Type"], alert["Details"]["SubType"] + ) + worst_priority = priority if priority > worst_priority else worst_priority + return worst_priority + + +def convert_alert_to_priority(type: str, subtype: str) -> int: + # Map type and subtype to value + mapping = { + "AttackIndication": {"BlackMarket": 1, "BotDataForSale": 1}, + "DataLeakage": { + "ConfidentialDocumentLeakage": 4, + "ConfidentialInformationExposed": 2, + "CredentialsLeakage": 3, + "ExposedMentionsOnGithub": 6, + }, + "vip": {"BlackMarket": 5}, + } + return mapping[type][subtype] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..79a3270 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +black==21.12b0 +certifi==2021.10.8 +charset-normalizer==2.0.9 +click==8.0.3 +colorama==0.4.4 +idna==3.3 +mypy-extensions==0.4.3 +pathspec==0.9.0 +platformdirs==2.4.0 +python-dotenv==0.19.2 +requests==2.26.0 +tomli==1.2.3 +typing_extensions==4.0.1 +urllib3==1.26.7 diff --git a/src/iomanager.py b/src/iomanager.py new file mode 100644 index 0000000..5e4f091 --- /dev/null +++ b/src/iomanager.py @@ -0,0 +1,127 @@ +import os +import csv +import json +import sqlite3 +import logging +from datetime import datetime + + +class IOManager: + def __init__(self, log_level: int, output_dir="./output"): + self.output_dir = output_dir # Path to the output directory + + # Get logger and set level based on user choice + self.logger = logging.getLogger("__name__") + self.logger.setLevel(log_level) + + # Connect to SQLite database and create table if not exist + self.db = sqlite3.connect("urlscanner.db", isolation_level=None).cursor() + self.db.execute( + "create table if not exists queries (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, url TEXT NOT NULL, visibility TEXT NOT NULL)" + ) + self.db.execute( + "create table if not exists results (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, url TEXT NOT NULL, screenshotURL TEXT NOT NULL, isMalicious TEXT NOT NULL, maliciousness TEXT NOT NULL, report_url TEXT NOT NULL)" + ) + + def __check_query_in_database(self, query: dict) -> bool: + self.db.execute("SELECT id FROM queries WHERE url = ?", (query["url"],)) + data = self.db.fetchall() + return len(data) != 0 + + def __add_query_to_database(self, query: dict) -> None: + self.db.execute( + "INSERT INTO queries VALUES (?, ?, ?)", + (None, query["url"], query["visibility"]), + ) + + def __add_result_to_database(self, result: dict) -> None: + self.db.execute( + "INSERT INTO results VALUES (?, ?, ?, ?, ?, ?)", + ( + None, + result["url"], + result["screenshotURL"], + result["isMalicious"], + result["maliciousness"], + result["report_url"], + ), + ) + + def add_queries_to_queue_from_file( + self, scanner: object, queue: object, path: str + ) -> None: + try: + with open(path) as f: + for counter, query in enumerate(f): + # Parse query and validate existence of URL attribute + try: + query = json.loads(query) + except ValueError: + self.logger.debug(f"Invalid JSON object. Recieved: {query}") + continue + if "url" not in query: + self.logger.debug( + f"Query provided without url attribute. Ignoring: {query}" + ) + continue + + # Use visibility if provided else use default to public + query["visibility"] = ( + query["visibility"] if "visibility" in query else "public" + ) + + if self.__check_query_in_database(query): + self.logger.debug(f"{query['url']} already in DB. skipping...") + continue + + if not scanner.update_quotas(query["visibility"]): + self.logger.critical("Closing the file...") + self.logger.info( + f"Scanned the first {counter} rows in the file." + ) + break + + self.__add_query_to_database(query) + queue.put(query) # Add query to work queue + + except FileNotFoundError: + self.logger.critical(f"Couldn't open file at {path}") + + queue.join() # Wait for queue to get empty + + def save_csv(self, reports: list) -> None: + self.logger.critical("All work completed. Saving data...") + + if not reports: + self.logger.critical("Noting to save.") + return + + # Check if output directory exists, create if doesn’t + if not os.path.isdir(self.output_dir): + os.mkdir(self.output_dir) + + # Generate output filename based on current time + filename = datetime.now().strftime("%Y_%m_%d-%I_%M_%S_%p") + + with open(f"{self.output_dir}/{filename}.csv", "w+", newline="") as f: + output = csv.writer(f) + output.writerow( + ["url", "screenshotURL", "isMalicious", "maliciousness", "report_url"] + ) + output.writerow([]) + for report in reports: + if report: + self.__add_result_to_database(report) + output.writerow( + [ + report["url"], + report["screenshotURL"], + report["isMalicious"], + report["maliciousness"], + report["report_url"], + ] + ) + self.logger.critical( + f"Saving completed. You can check the result at {f'{self.output_dir}/{filename}.csv'}" + ) + self.db.close() # Close the database diff --git a/src/urlscanner.py b/src/urlscanner.py new file mode 100644 index 0000000..93c46b5 --- /dev/null +++ b/src/urlscanner.py @@ -0,0 +1,147 @@ +import time +import json +import requests +import logging + + +class UrlScanner: + MAX_POLL_ATTEMPS = 10 # Maximum number of tries to poll from result API + PAUSE_BETWEEN_POLL = 5 # Time in seconds between each poll + PAUSE_BEFORE_RESULT = 10 # Time between scan submission and retrieval + + API_URL = "https://urlscan.io/api/v1" + QUOTAS_URL = "https://urlscan.io/user/quotas/" + + def __init__(self, api_key: str, log_level: int): + self.api_key = api_key + + # Set default header since all requests have the same + self.headers = {"API-Key": self.api_key, "Content-Type": "application/json"} + + # Get logger and set level based on user choice + self.logger = logging.getLogger("__name__") + self.logger.setLevel(log_level) + + self.quotas = self.__quotas() + + def __quotas(self) -> dict: + # Make request to quotas URL and return the 'limits' object with the user quotas + status, body = self.__fetch("GET", self.QUOTAS_URL) + if status >= 400: + self.logger.critical("Error while fetching user quotas.") + return {} + return body["limits"] + + def __fetch( + self, method: str, url: str, headers: dict = None, data: dict = None + ) -> dict: + # Set headers and data to either user provided information or object defaults + headers = self.headers if not headers else headers + data = "" if not data else json.dumps(data) + + # Send HTTP requests to given URL using given method and parse result + response = requests.request(method, url, headers=headers, data=data) + body = json.loads(response.text) + return response.status_code, body + + def __scan(self, url: str, visibility: str) -> str: + # Prepare the request url and header, and make the POST request + api_url = self.API_URL + "/scan/" + data = {"url": url, "visibility": visibility} + status, body = self.__fetch("POST", api_url, data=data) + + # Check if the API returned with 429 - Too many requests + if status == 429: + self.logger.debug( + "Too many requests for scan. Waiting {self.PAUSE_BETWEEN_POLL} seconds and trying again..." + ) + time.sleep( + self.PAUSE_BETWEEN_POLL + ) # Wait a bit before making another request + status, body = self.__fetch( + "POST", api_url, data=data + ) # Try to fetch the data again + if status >= 400: + self.logger.info( + f"Scan was not completed. Description: {body['description']}" + ) + + return body["uuid"] if "uuid" in body else "" + + def __result(self, uuid: str) -> dict: + api_url = self.API_URL + f"/result/{uuid}" + + attemps = 0 + while attemps < self.MAX_POLL_ATTEMPS: + # Try and retrieve the result from API + status, body = self.__fetch("GET", api_url) + if status >= 400: + # If result is not ready wait a bit and try again + attemps += 1 + self.logger.debug( + f"Result for: {uuid} not ready yet. Tried {attemps} times. Waiting {self.PAUSE_BETWEEN_POLL} seconds..." + ) + time.sleep(self.PAUSE_BETWEEN_POLL) + else: + # Make sure the result is not empty + if not body["lists"]["ips"]: + self.logger.info( + f"{body['task']['url']} is unaccessible. Result has been removed." + ) + return {} + # Return the specific result + self.logger.debug(f"Received result for {body['task']['url']}") + return { + "url": body["task"]["url"], + "screenshotURL": body["task"]["screenshotURL"], + "isMalicious": body["verdicts"]["overall"]["malicious"], + "maliciousness": body["verdicts"]["overall"]["score"], + "report_url": body["task"]["reportURL"], + } + return {} + + def generate_report(self, url: str, visibility: str = "public") -> dict: + # Make scan API request for given URL + uuid = self.__scan(url, visibility) + + if uuid == "": + self.logger.critical(f"Couldn't get submission for {url}") + return {} + + # Wait after sending scan requests and before getting the results + self.logger.debug( + f"Submitted {url}, waiting {self.PAUSE_BEFORE_RESULT} seconds before polling." + ) + time.sleep(self.PAUSE_BEFORE_RESULT) + + result = self.__result(uuid) + if result == {}: + self.logger.critical(f"Couldn't get result for {url}.") + + return result + + def update_quotas(self, section: str) -> None: + if self.quotas[section]["day"]["remaining"] <= 0: + self.logger.critical( + f"Not enough daily quotas for section: {section}. Please try again with different section or wait for midnight UTC." + ) + return False + + if self.quotas[section]["hour"]["remaining"] <= 0: + self.logger.critical( + f"Not enough hourly quotas for section: {section}. Please try again with different section or wait for the top of the hour." + ) + return False + + if self.quotas[section]["minute"]["remaining"] <= 0: + self.logger.critical( + f"Not enough minute quotas for section {section}. Please try again with different section or wait a few seconds." + ) + return False + + # If there is enough quotas, decrease by one and keep going + self.quotas[section]["minute"]["remaining"] -= 1 + self.quotas[section]["hour"]["remaining"] -= 1 + self.quotas[section]["day"]["remaining"] -= 1 + + return True diff --git a/src/util.py b/src/util.py new file mode 100644 index 0000000..f73c1f1 --- /dev/null +++ b/src/util.py @@ -0,0 +1,58 @@ +import argparse +import logging + + +def convert_int_to_logging_level(log_level: int) -> int: + mapping = {0: logging.CRITICAL, 1: logging.INFO, 2: logging.DEBUG} + return mapping[log_level] + + +def create_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="UrlScanner", + description=( + """Lightweight CLI for analyzing URLs using URLScan.io API. + Please set the API_KEY environment variable with your API key + from URLScan.io or put it inside a .env file. + If no mode is specified you would enter queries one by one. + """ + ), + ) + parser.add_argument( + "-v", + "--verbose", + help=( + "Determines how verbose the logging would be. There are three " + "possible values: 0 (critical), 1 (info), and 2 (debug). The default value " + "is set to 0 when no verbose flag is present. If a flag is added with no " + "value specified, it is set to 2. Otherwise, it will simply use the value " + "specified." + ), + choices=[0, 1, 2], + default=0, + nargs="?", + const=2, + type=int, + ) + + group = parser.add_mutually_exclusive_group() + group.add_argument( + "-b", + "--batch-investigate", + help=( + "Investigates the URLScan.io queries included in the specified file. The file format should " + "Contain a JSON in each row, where each json need a url and visibility attributes. " + "The output is a CSV file containing searched url, screenshot url, maliciousness score " + "given by the api and link to the full online report." + ), + type=str, + ) + + group.add_argument( + "-q", + "--quotas", + help="Show the remaining quotas of the user.", + action="store_true", + ) + + return parser