Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POST request to an API #111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
help="Make a GET request to an API",
action='store_true')

parser.add_argument("-POST",
help="Make a POST request to an API",
action='store_true')

parser.add_argument("-DELETE",
help = "Make a DELETE request to an API",
action = 'store_true')
Expand Down
146 changes: 114 additions & 32 deletions src/arguments/api_test.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import requests, json
from pygments import highlight, lexers, formatters

class ApiTesting():

class ApiTesting:
default_url = "https://127.0.0.1:8000"
default_headers = {}
invalid_schema_message = "Check whether the URL is valid or check if" + "the localhost server is active or not"
invalid_schema_message = (
"Check whether the URL is valid or check if"
+ "the localhost server is active or not"
)
# fetches the input data for making a request
@classmethod
def fetch_input_url(cls):
request_url = cls.default_url
request_headers = cls.default_headers
input_url = input('Enter URL: ')
input_headers = input('Enter Headers: ')
if input_url != '':
input_url = input("Enter URL: ")
input_headers = input("Enter Headers: ")
if input_url != "":
request_url = input_url
if input_headers != '':
if input_headers != "":
try:
request_headers = json.loads(input_headers)
except Exception:
Expand All @@ -25,48 +29,102 @@ def fetch_input_url(cls):
# Check if http:// or https:// is present in request_url
has_protocol = cls.__check_protocol(request_url)

if not(has_protocol):
if not (has_protocol):
request_url = "https://" + request_url

# Ask the user for endpoint if not present in request_url
if not(has_endpoint):
if(request_url[-1] == '/'):
endpoint = input("Input endpoint " +
"(Without the starting slash): ")
if not (has_endpoint):
if request_url[-1] == "/":
endpoint = input("Input endpoint " + "(Without the starting slash): ")
else:
endpoint = input("Input endpoint (With the starting slash): ")
request_url += endpoint

print("Trying ...\u26A1")
return {
"request_url" : request_url,
"request_headers" : request_headers,
"request_url": request_url,
"request_headers": request_headers,
}

#saves the json response into a file
@classmethod
def save_response_data(cls,response_data):
store_data = input('Store response data? (Y/N): ')
if(store_data.lower() == 'y'):
filename = input("Enter a filename (response_data.json)")
if filename == '':
filename = 'response_data.json'
with open(filename, 'w') as jsonFile:
json.dump(response_data, jsonFile, indent=4)
print(f"Response data stored in {filename}")
elif(store_data.lower()) == 'n':
def read_data_from_file(cls):
filename = input("Enter a filename (response_data.json)")
data = {}
if filename.strip() == "":
filename = "response_data.json"
print(f"filename empty, so default file {filename} is used ")
with open(filename, "r") as reader:
file_content = reader.read()
try:
json_data = json.loads(file_content)
data = json_data.get("data")
# Make sure the data is not None and send
except json.JSONDecodeError:
print("Unable to parse the file, Please try again")
cls.read_data_from_file()
return data

@classmethod
def enter_data_payload(cls):
print("Option 1: For sending data payload from terminal\n")
print("Option 2: For sending data payload by reading from json file\n")
store = int(input("Please choose the above options? (1/2)"))

data = {}
if store == 1:
data = input("Enter data as key value pairs")
try:
data = json.loads(data)
except Exception as exception_obj:
print(f"Unable to load the data due to {exception_obj}, please try again \n")
cls.enter_data_payload()
elif store == 2:
data = cls.read_data_from_file()
else:
print(f"you have entered {store}, please choose from above options")
cls.enter_data_payload()
return data

@classmethod
def fetch_payload_data(cls):
store = input("Do you want to send data payload? (Y/N)")
data = None
if store.lower() == "y":
data = cls.enter_data_payload()

elif store.lower() == "n":
data = {}
else:
print(f"You have entered {store}, please enter from the above options")
cls.fetch_payload_data()
return data

# saves the json response into a file
@classmethod
def save_response_data(cls, response_data):
store_data = input("Store response data? (Y/N): ")
if store_data.lower() == "y":
filename = input("Enter a filename (response_data.json)")
if filename == "":
filename = "response_data.json"
with open(filename, "w") as jsonFile:
json.dump(response_data, jsonFile, indent=4)
print(f"Response data stored in {filename}")
elif (store_data.lower()) == "n":
print(f"You have entered {store_data}, So the response is not saved")
else:
print(f"You have entered {store_data}, please enter either Y or N")
cls.save_response_data(response_data)

# formats the response data and prints it in json on console
@classmethod
def print_response_json(cls,response):
def print_response_json(cls, response):
print(f"Reponse Status Code: {response.status_code}")
response_data = json.loads(response.content)
parsed_json = json.dumps(response_data, indent=4)
output_json = highlight(parsed_json, lexers.JsonLexer(),
formatters.TerminalFormatter())
output_json = highlight(
parsed_json, lexers.JsonLexer(), formatters.TerminalFormatter()
)
print(output_json)

# Make GET request
Expand All @@ -75,7 +133,9 @@ def get_request(cls):
request_data = cls.fetch_input_url()
# Make GET request and store the response in response_data.json
try:
response = requests.get(request_data["request_url"], headers= request_data["request_headers"])
response = requests.get(
request_data["request_url"], headers=request_data["request_headers"]
)
cls.print_response_json(response)
response_data = json.loads(response.content)
cls.save_response_data(response_data)
Expand All @@ -84,13 +144,35 @@ def get_request(cls):
print(cls.invalid_schema_message)
except Exception as exception_obj:
print(exception_obj)

# Make a POST request
@classmethod
def post_request(cls):
request_data = cls.fetch_input_url()
data = cls.fetch_payload_data()
try:
response = requests.post(
url=request_data["request_url"],
headers=request_data["request_headers"],
data=data,
)
cls.print_response_json(response)
response_data = json.loads(response.content)
cls.save_response_data(response_data)
except requests.exceptions.InvalidSchema:
print(cls.invalid_schema_message)
except Exception as exception_obj:
print(exception_obj)

# Make a delete request
@classmethod
def delete_request(cls):
# request_data contains dictionary of inputs entered by user
request_data = cls.fetch_input_url()
try:
response = requests.delete(request_data["request_url"], headers= request_data["request_headers"])
response = requests.delete(
request_data["request_url"], headers=request_data["request_headers"]
)
cls.print_response_json(response)
response_data = json.loads(response.content)
cls.save_response_data(response_data)
Expand All @@ -102,14 +184,14 @@ def delete_request(cls):

@classmethod
def __check_endpoint(cls, request_url):
if(request_url == cls.default_url):
if request_url == cls.default_url:
return False
else:
return True

@classmethod
def __check_protocol(cls, request_url):
if(request_url[:4] == 'http'):
if request_url[:4] == "http":
return True
else:
return False
return False
2 changes: 2 additions & 0 deletions src/arguments/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def search_args(self):
update.check_for_updates()
elif self.arguments.GET:
self.api_test_object.get_request()
elif self.arguments.POST:
self.api_test_object.post_request()
elif self.arguments.DELETE:
self.api_test_object.delete_request()
elif self.arguments.notion:
Expand Down