Skip to content

Commit

Permalink
Update scan.py
Browse files Browse the repository at this point in the history
Add additional checks for Deeply Nested Query and Batch Requests checks
  • Loading branch information
davidfortytwo authored Apr 23, 2024
1 parent 51d7e26 commit 1f7af6f
Showing 1 changed file with 64 additions and 62 deletions.
126 changes: 64 additions & 62 deletions scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@
import json
from termcolor import colored

def send_query(url, query):
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(url, json=query, headers=headers, timeout=10)
response.raise_for_status()
if 'application/json' in response.headers.get('Content-Type', ''):
return response.json()
else:
print(colored(f"[!] Unexpected Content-Type received from {url}", "yellow"))
return None
except requests.exceptions.RequestException as e:
print(colored(f"[!] HTTP error occurred: {e}", "red"))
return None

def check_introspection(url):
introspection_query = {
'query': '''{
Expand All @@ -15,24 +29,16 @@ def check_introspection(url):
}
}
}'''}

try:
response = requests.post(url, json=introspection_query)
response.raise_for_status()
response_json = response.json()

if 'data' in response_json and '__schema' in response_json['data']:
print(colored(f"[!] Introspection is enabled at {url}", "red"))
print(colored(f"Typical severity: Low", "blue"))
print("Evidence:", json.dumps(response_json, indent=4))
return True
else:
print(colored(f"[-] Introspection is not enabled at {url}", "green"))
return False
except Exception as e:
print(f"Error during introspection check: {e}")
response_json = send_query(url, introspection_query)
if response_json and 'data' in response_json and '__schema' in response_json['data']:
print(colored(f"[!] Introspection is enabled at {url}", "red"))
print(colored(f"Typical severity: Low", "blue"))
print("Evidence:", json.dumps(response_json, indent=4))
return True
else:
print(colored(f"[-] Introspection is not enabled at {url}", "green"))
return False

def check_circular_introspection(url):
circular_introspection_query = {
'query': '''{
Expand Down Expand Up @@ -60,21 +66,14 @@ def check_circular_introspection(url):
}
}'''
}
response_json = send_query(url, circular_introspection_query)
if response_json and 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Circular introspection vulnerability found at {url}", "red"))
print(colored(f"Typical severity: High", "red"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No circular introspection vulnerability found at {url}", "green"))

try:
response = requests.post(url, json=circular_introspection_query)
response.raise_for_status()
response_json = response.json()

if 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Circular introspection vulnerability found at {url}", "red"))
print(colored(f"Typical severity: High", "red"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No circular introspection vulnerability found at {url}", "green"))
except Exception as e:
print(f"Error during circular introspection check: {e}")

def check_resource_request(url):
resource_query = {
'query': '''{
Expand All @@ -93,20 +92,13 @@ def check_resource_request(url):
}
}
}'''}

try:
response = requests.post(url, json=resource_query)
response.raise_for_status()
response_json = response.json()

if 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Excessive resource request vulnerability found at {url}", "red"))
print(colored(f"Typical severity: High", "red"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No excessive resource request vulnerability found at {url}", "green"))
except Exception as e:
print(f"Error during resource request check: {e}")
response_json = send_query(url, resource_query)
if response_json and 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Excessive resource request vulnerability found at {url}", "red"))
print(colored(f"Typical severity: High", "red"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No excessive resource request vulnerability found at {url}", "green"))

def check_directive_limit(url):
directive_query = {
Expand All @@ -123,30 +115,40 @@ def check_directive_limit(url):
}
}
}'''}
response_json = send_query(url, directive_query)
if response_json and 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Unlimited number of directives vulnerability found at {url}", "red"))
print(colored(f"Typical severity: Low", "blue"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No unlimited number of directives vulnerability found at {url}", "green"))

try:
response = requests.post(url, json=directive_query)
response.raise_for_status()
response_json = response.json()

if 'data' in response_json and '__type' in response_json['data']:
print(colored(f"[!] Unlimited number of directives vulnerability found at {url}", "red"))
print(colored(f"Typical severity: Low", "blue"))
print("Evidence:", json.dumps(response_json, indent=4))
else:
print(colored(f"[-] No unlimited number of directives vulnerability found at {url}", "green"))
except Exception as e:
print(f"Error during directive limit check: {e}")
def check_deeply_nested_query(url):
deeply_nested_query = {
'query': '{ a1: __schema { queryType { name, fields { name, type { name, fields { name, type { name } } } } } } }'
}
response_json = send_query(url, deeply_nested_query)
if response_json and 'data' in response_json:
print(colored(f"[!] Server responds to deeply nested queries at {url}. Possible DoS vulnerability.", "red"))
else:
print(colored(f"[-] No issues with deeply nested queries found at {url}.", "green"))

# Add more checks here...
def check_batch_requests(url):
batch_query = [{'query': '{ __schema { types { name } } }'}] * 10 # Adjust the batch size as needed
response_json = send_query(url, batch_query)
if response_json:
print(colored(f"[!] Server allows batch requests at {url}. May lead to DoS if abused.", "red"))
else:
print(colored(f"[-] No batch request vulnerability found at {url}.", "green"))

def main(target):
introspection_enabled = check_introspection(target)
if introspection_enabled:
print("Starting checks for GraphQL endpoint at:", target)
if check_introspection(target):
check_circular_introspection(target)
check_deeply_nested_query(target)
check_batch_requests(target)
check_resource_request(target)
check_directive_limit(target)
# Call more checks here...

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Check GraphQL endpoint for common vulnerabilities.")
Expand Down

0 comments on commit 1f7af6f

Please sign in to comment.