Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
birkagal committed Dec 23, 2021
0 parents commit 8e64813
Show file tree
Hide file tree
Showing 9 changed files with 722 additions and 0 deletions.
79 changes: 79 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# PyBuilder
.pybuilder/
target/

# IPython
profile_default/
ipython_config.py

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Cython debug symbols
cython_debug/
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# UrlScanner

> Lightweight Python CLI utility which makes use [URLScan.io](https://urlscan.io/) APIs to automate scanning and retrieving information about URLs
[URLScan.io](https://urlscan.io/) is a useful tool for scanning and obtaining information from potentially malicious websites. URLScan provide a useful [API](https://urlscan.io/docs/api/) which can be used to add some automation to your workflow.

## Install & Setup

1. Clone this repository using `git clone https://github.com/birkagal/urlscanner`

2. Consider adding [virtual environments](https://docs.python.org/3/library/venv.html) and install dependencies using `pip install -r requirements.txt`

3. All set. Use `python application.py --help` to see the man page.

## How to use

![enter image description here](https://i.ibb.co/cknWf30/Capture.png)

UrlScanner support two main modes: Analyze multiple URLs from a input file or Interactive one by one analyze. You can always check `python application.py -h` for more help.

### API Key

[URLScan.io](https://urlscan.io/) make use of personal API KEY to access the API features. You need to sign up an account at [URLScan.io](https://urlscan.io/) and save your personal API KEY at `.env` file with the key being `API_KEY` and the value of your actual key. You can also set an environment variable called `API_KEY`.

### Logging Level

The `-v` flag determines how verbose the logging would be. There are three possible values: 0 (critical), 1 (info), and 2 (debug). The default value is set to 0 when no verbose flag is present. If a flag is added with no value specified, it is set to 2. Otherwise, it will simply use the value specified.

### Batch Analysis

You can use the `-b` flag alongside a specified filename containing URLScan.io query in each line. The query should be in JSON format and contain a `url` key and a `visibility` key. The output would be a CSV file containing searched url, screenshot url, maliciousness score given by the api and link to the full online report.
If your `urls.txt` is inside `input` directory, you can use this command to execute:

python application.py -b /input/urls.txt

### Interactive Analysis

If no flag is provided the utility would ask you to enter manually a `URL` and a `visibility` parameter to scan. It will then use URLScan.io API to scan the URL and present you with the result. To run this mode simply run the application without specifying any other mode. (You can still use flags)

python application.py -v

### Display User Quotas

URLScan.io API has a Rate limit mechanism to limit the use of the API. There are different quotas per day, hour and minute for each action. You can use the `-q` flag to list the user remaining quotas for each action.

python application.py -q

## TODO

In this section I would list my thought for the future, the features I didn't had the time to implement.

- [ ] Display the output in a HTML file, using templating to render a single output page with all the information in a visual way.
- [ ] All the history of queries and results are stored in a database. Make use of the result table to give an option to search for past result from the utility.

## Implementation

UrlScanner has two main objects, UrlScanner and IOManager.

- UrlScanner is responsible for communicating with the URLScan.io API service. It holds the logic for submission requests, retrieval requests, parsing the information, quotas Rate Limiter. It follows URLScan.io implemantation advises such as respect 429 code (too many requests) and wait before polling for results. HTTP requests are sent using python requests module.
- IOManager is responsible for Input/Output and database logic. This tool use python's sqlite3 module to manage an SQLite database. The database has 2 tables: queries and results. The query table is used to make sure each url is only sent once even if it was already sent in the past. The result table is currently just storing the data and it will maybe be helpful in the future. The IOManager also read the input from a file (if working in batch mode), validate the query and add it to the work queue.

The main feature of this tool is the batch analysis. This tool use python Queue module and its threading capabilities to manage a work queue that all the threads can access and get work from. Once the work is done and the queue is empty, the IOManager print the result to a designated csv file.

The application also uses python logging module to create different logging levels that the user can chose from, each level show different amount of information.

The argparse module is used to manage user arguments and flag, and display the man page when running `-h` flag
128 changes: 128 additions & 0 deletions application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import os
import logging
from dotenv import load_dotenv
from queue import Queue
from threading import Thread
from src.iomanager import IOManager
from src.urlscanner import UrlScanner
from src.util import create_arg_parser, convert_int_to_logging_level

NUM_THREADS = 10


def main():
# Create argparse menu and get program args
parser = create_arg_parser()
args = parser.parse_args()
log_level = convert_int_to_logging_level(args.verbose)

# Create logging configuration
logging.basicConfig(
format="%(asctime)s : %(levelname)s : %(message)s", datefmt="%H:%M:%S"
)

# Load the enviorment variable and instantiate the scanner and IO objects
load_dotenv()
scanner = UrlScanner(os.getenv("API_KEY"), log_level)
io = IOManager(log_level)

# Run program in user specified mode
if args.batch_investigate:
batch_investigate(scanner, io, args.batch_investigate)
elif args.quotas:
show_user_quotas(scanner)
else:
interactive_query(scanner)
print("Exiting...")


def interactive_query(scanner: UrlScanner) -> None:
print("Welcome to UrlScanner interactive cli tool.\n")
url = input("Please enter the requested URL: ")
while True: # Loop until user provide valid visibility parameter
visibility = input(
"Please enter the requested visibility [public, private, unlisted]: "
).lower()
if visibility not in ["public", "private", "unlisted"]:
print("Please enter either public, private or unlisted.")
continue
break
print("Fetching results...")
report = scanner.generate_report(url, visibility)
if report == {}:
print(f"Couldn't analyze {url}")
else:
print(
f"""
FINISHED!
Results:
Scanned URL: {report['url']}
Screenshot URL: {report['screenshotURL']}
isMalicious: {report['isMalicious']}
Maliciousness: {report['maliciousness']}
Report URL: {report['report_url']}\n
"""
)


def show_user_quotas(scanner: UrlScanner) -> None:
print(
f"""
Public visibility:
Day: {scanner.quotas['public']['day']['remaining']} remaining.
Hour: {scanner.quotas['public']['hour']['remaining']} remaining.
Minute: {scanner.quotas['public']['minute']['remaining']} remaining.
Unlisted visibility:
Day: {scanner.quotas['unlisted']['day']['remaining']} remaining.
Hour: {scanner.quotas['unlisted']['hour']['remaining']} remaining.
Minute: {scanner.quotas['unlisted']['minute']['remaining']} remaining.
Private visibility:
Day: {scanner.quotas['private']['day']['remaining']} remaining.
Hour: {scanner.quotas['private']['hour']['remaining']} remaining.
Minute: {scanner.quotas['private']['minute']['remaining']} remaining.
Result Retrieve:
Day: {scanner.quotas['retrieve']['day']['remaining']} remaining.
Hour: {scanner.quotas['retrieve']['hour']['remaining']} remaining.
Minute: {scanner.quotas['retrieve']['minute']['remaining']} remaining.\n
"""
)


def batch_investigate(scanner: UrlScanner, io: IOManager, input_file: str) -> None:
reports = [] # List of all queries results
q = Queue() # Queue that will manage the work

# Instantiate NUM_THREADS threads and send them to worker function where they will wait for work
for _ in range(NUM_THREADS):
Thread(
target=worker,
args=(
scanner,
q,
reports,
),
daemon=True,
).start()

# Read queries from the given file and add each valid query to the queue
success = io.add_queries_to_queue_from_file(scanner, q, input_file)
if success:
# Save the results to a csv file
io.save_csv(reports)


def worker(scanner, q, reports):
report = {} # The result of a given query
while True:
query = q.get() # Get the next query
# Use API to scan and retrieve result for that query.
report = scanner.generate_report(query["url"], query["visibility"])
reports.append(report)
q.task_done() # Mark that query as finished


if __name__ == "__main__":
main()
30 changes: 30 additions & 0 deletions cherry-picking-algorithm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Alerts Cherry-Picking Algorithm

An “alert” is an object with various keys and values. Each alert has the following keys: Alert ID, Type, Subtype, and Title (in reality, there are more, but for the sake of the exercise, we are only using those). Some alerts are more important than others. Each alert based on its keys and identifiers found in the title can be ranked from 1 - highest priority to 6 - lowest.

This algorithm is implemented in the `cherry_pick` function.
The function gets a list of `alerts` and a `num_of_results` which is default to 4 and determines the amount of alerts to pick.

The return value is a list of size `num_of_results`with each element containing a string representing the alert id attribute `_id` of the most prioritize alert in the alerts list.

## Implementation

The algorithm's first step is to check whether the given `num_of_results`is greater than the amount of alerts in the list. If it does it just return the `id`of all alerts present in that list.

Next, the algorithm creates an empty `prioritised` list which would hold the values of the most prioritised alerts. It sets the first `num_of_result`element as the first highest priority alerts and keep track of the `worst_priority` value.

The main loop iterates over the remaining alerts in the list. For each alert, it compares it priority value with the `worst_priority`in the `prioritised`list. If the current alert value is less than the `worst_priority` (Note that value of `1` is the highest priority) we find the index or the alert with `worst_priority`value in the `prioritised`list, switch that alert with the new alert and update `worst_priority`based on the now new `prioritised`list.

After the loop ends, the `prioritised`list now contains `num_of_results`alerts which are heighest priority alerts from the original list.
All left is to extract the id of those alerts and return a list of those ids.

## Complexity

Let's analyze the complexity of the algorithm. The input is a list with `n` alerts, as well as a variable `k`which is the number of results to choose. The default value for `k`is 4.
The first part of the algorithm populates the `prioritised`list with the first `k` elements of the input. It also validates the input and keep track of the `worst_priority`variable. All in all, that loop run in a `O(k)` time.
The second part is the main loop, that iterate over the remaining list and compare each alert with the `prioritised`list to see if a switch is required.
Since `prioritised`has a fixed size which is `k`, the inner loop would take `O(k)`, where the outter loop is running through the list which means the complexity would take `O(n*k)`.

Last, we iterate over `prioritised`once more to extract the ids of the alerts in it, which is once again take `O(k)` time.

All in all, the algorithms complexity is `O(k + n*k + k) => O(n*k)` and if we assume`k=4` we get `O(n)`.
73 changes: 73 additions & 0 deletions cherry-picking-algorithm/cherry_pick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
def cherry_pick(alerts: list, num_of_results: int = 4) -> list:

# If num_of_results are greater than list size, then no need to filter just return full list
if len(alerts) <= num_of_results:
return [alert["_id"] for alert in alerts]

# Set the first num_of_results elements as first highest priority and maintain worst_priority
prioritised = []
worst_priority = 1
for index in range(num_of_results):
if not validate_alert(alerts[index]):
continue
prioritised.append(alerts[index])
priority = convert_alert_to_priority(
alerts[index]["Details"]["Type"], alerts[index]["Details"]["SubType"]
)
worst_priority = priority if priority > worst_priority else worst_priority

# Iterate over the remaining alerts
for index in range(num_of_results, len(alerts)):
if not validate_alert(alerts[index]):
continue
priority = convert_alert_to_priority(
alerts[index]["Details"]["Type"], alerts[index]["Details"]["SubType"]
)
# If current alert's priority is better than worst one, replace the worst alert with current
if priority < worst_priority:
for prioritised_alert_index in range(len(prioritised)):
if worst_priority == convert_alert_to_priority(
prioritised[prioritised_alert_index]["Details"]["Type"],
prioritised[prioritised_alert_index]["Details"]["SubType"],
):
prioritised[prioritised_alert_index] = alerts[index]
break
# Update worst for current prioritised
worst_priority = get_worst_priority(prioritised)

return [alert["_id"] for alert in prioritised]


def validate_alert(alert: dict) -> bool:
# Make sure the alert has _id, Details, Type and SubType attributes
if not all(key in alert for key in ("_id", "Details")):
return False
if not all(key in alert["Details"] for key in ("Type", "SubType")):
return False
return True


def get_worst_priority(alerts: list) -> int:
# Find the worst priority value in the alerts list
worst_priority = 1
for alert in alerts:
priority = convert_alert_to_priority(
alert["Details"]["Type"], alert["Details"]["SubType"]
)
worst_priority = priority if priority > worst_priority else worst_priority
return worst_priority


def convert_alert_to_priority(type: str, subtype: str) -> int:
# Map type and subtype to value
mapping = {
"AttackIndication": {"BlackMarket": 1, "BotDataForSale": 1},
"DataLeakage": {
"ConfidentialDocumentLeakage": 4,
"ConfidentialInformationExposed": 2,
"CredentialsLeakage": 3,
"ExposedMentionsOnGithub": 6,
},
"vip": {"BlackMarket": 5},
}
return mapping[type][subtype]
14 changes: 14 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
black==21.12b0
certifi==2021.10.8
charset-normalizer==2.0.9
click==8.0.3
colorama==0.4.4
idna==3.3
mypy-extensions==0.4.3
pathspec==0.9.0
platformdirs==2.4.0
python-dotenv==0.19.2
requests==2.26.0
tomli==1.2.3
typing_extensions==4.0.1
urllib3==1.26.7
Loading

0 comments on commit 8e64813

Please sign in to comment.