Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Commit

Permalink
Merge pull request #546 from 40huo/develop
Browse files Browse the repository at this point in the history
fix #527
  • Loading branch information
FeeiCN authored Sep 6, 2017
2 parents b4eb988 + 2344c67 commit 8bbe66e
Showing 1 changed file with 57 additions and 28 deletions.
85 changes: 57 additions & 28 deletions tests/test_apiserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
:copyright: Copyright (c) 2017 Feei. All rights reserved
"""

import requests
import json
import multiprocessing
import time
import os
import shutil
import socket
from cobra.config import project_directory
import time

import requests

from cobra.api import start
from cobra.config import project_directory, running_path

p = multiprocessing.Process(target=start, args=('127.0.0.1', 5000, False))
p.start()
Expand All @@ -30,17 +32,39 @@
template_path = os.path.join(project_directory, 'config.template')
shutil.copyfile(template_path, config_path)

a_sid = ''
s_sid = ''


def test_add_job():
url = "http://127.0.0.1:5000/api/add"
post_data = {
"key": "your_secret_key",
"target": ["tests/vulnerabilities"],
"target": [os.path.join(project_directory, 'tests/vulnerabilities')]
}
headers = {
"Content-Type": "application/json",
}
re = requests.post(url=url, data=json.dumps(post_data), headers=headers)
result = json.loads(re.text)

global a_sid
a_sid = result.get('result').get('sid')

a_sid_file = os.path.join(running_path, '{sid}_list'.format(sid=a_sid))

# wait writing scan_list
while True:
with open(a_sid_file, 'r') as f:
scan_list = json.load(f)
print(scan_list)
if len(scan_list.get('sids')) > 0:
break
time.sleep(0.1)

global s_sid
s_sid = list(scan_list.get('sids').keys())[0]

assert "1001" in re.text
assert "Add scan job successfully" in re.text
assert "sid" in re.text
Expand All @@ -50,44 +74,57 @@ def test_job_status():
url = "http://127.0.0.1:5000/api/status"
post_data = {
"key": "your_secret_key",
"sid": 24,
"sid": a_sid,
}
headers = {
"Content-Type": "application/json",
}
re = requests.post(url=url, data=json.dumps(post_data), headers=headers)
assert "1004" in re.text
assert "1001" in re.text
assert "msg" in re.text
assert "sid" in re.text
assert a_sid in re.text
assert "status" in re.text
assert "report" in re.text


def test_result_data():
url = 'http://127.0.0.1:5000/api/list'
post_data = {
'sid': 24,
'sid': s_sid,
}
headers = {
"Content-Type": "application/json",
}
re = requests.post(url=url, data=json.dumps(post_data), headers=headers)
assert '1002' in re.text
assert 'No such target' in re.text

s_sid_file = os.path.join(running_path, '{sid}_data'.format(sid=s_sid))
if os.path.exists(s_sid_file):
assert '1001' in re.text
assert 'result' in re.text
assert 'rule_filter' in re.text
else:
assert '1002' in re.text
assert 'No such target' in re.text


def test_result_detail():
url = 'http://127.0.0.1:5000/api/detail'
post_data = {
'sid': 'abcdeft',
'file_path': 'setup.py',
'sid': s_sid,
'file_path': 'v.php',
}
headers = {
"Content-Type": "application/json",
}
re = requests.post(url=url, data=json.dumps(post_data), headers=headers)
assert '1002' in re.text
assert 'No such target' in re.text

s_sid_file = os.path.join(running_path, '{sid}_data'.format(sid=s_sid))
if os.path.exists(s_sid_file):
assert '1001' in re.text
assert 'file_content' in re.text
else:
assert '1002' in re.text
assert 'No such target' in re.text


def test_index():
Expand All @@ -105,23 +142,15 @@ def test_close_api():
p.terminate()
p.join()

# wait for scan process
while True:
cobra_process = os.popen('ps aux | grep python').read()
cobra_process_num = len(cobra_process.strip().split('\n'))
if cobra_process_num <= 3:
# grep python
# sh -c ps aux | grep python
# python pytest
break
# wait for scan data
s_sid_file = os.path.join(running_path, '{sid}_data'.format(sid=s_sid))
while not os.path.exists(s_sid_file):
time.sleep(1)

# whether port 5000 is closed
# wait for port closed
s = socket.socket()
s.settimeout(0.5)
try:
assert s.connect_ex(('localhost', 5000)) != 0
finally:
s.close()
while s.connect_ex(('localhost', 5000)) == 0:
time.sleep(0.5)

assert not os.path.exists(config_path)

0 comments on commit 8bbe66e

Please sign in to comment.