forked from OpenDataServices/cove
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsecret_data_test.py
97 lines (71 loc) · 3.62 KB
/
secret_data_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import glob
import json
import time
import signal
import subprocess
from urllib.parse import urlparse, urlunparse
import requests
"""
This is a test that uses old/unpublished data files that we don't want to make
public.
It checks that the JSON produced by ocds-cli is the same as previously.
Travis is set up with a password to download the files from a password
protected url, see .travis.yml
"""
os.makedirs('secret_data_test/ocds_cli', exist_ok=True)
os.chdir(os.path.join('secret_data_test/ocds_cli'))
for fname in glob.glob('../../ocds/*'):
subprocess.call(['../../ocds-cli', fname])
os.chdir('../..')
server_proc = os.fork()
if not server_proc:
os.environ['DJANGO_SETTINGS_MODULE'] = 'cove_360.settings'
subprocess.call(['python', 'manage.py', 'runserver', '8008'])
exit()
time.sleep(5)
for original_file in glob.glob('360/*'):
output_dir = os.path.join('secret_data_test', '360', original_file.split('/')[-1].split('.')[0])
os.makedirs(output_dir, exist_ok=True)
output_filename = os.path.join(output_dir, 'validation_errors.json')
with open(output_filename, 'w+') as output_file:
response = requests.post('http://localhost:8008/', files={'original_file': open(original_file, 'rb')}, data={'csrfmiddlewaretoken': 'foo'}, headers={'Cookie': 'csrftoken=' + 'foo'})
parsed = urlparse(response.url)
new_tuple = parsed.scheme, parsed.netloc, '/media/' + parsed.path.split('/')[-1] + '/validation_errors-3.json', '', '', ''
new_url = urlunparse(new_tuple)
output_file.write(requests.get(new_url).text)
os.kill(server_proc, signal.SIGTERM)
server_proc = os.fork()
if not server_proc:
os.environ['DJANGO_SETTINGS_MODULE'] = 'cove_ocds.settings'
subprocess.call(['python', 'manage.py', 'runserver', '8009'])
exit()
time.sleep(5)
for original_file in glob.glob('ocds/*'):
output_dir = os.path.join('secret_data_test', 'ocds', original_file.split('/')[-1].split('.')[0])
os.makedirs(output_dir, exist_ok=True)
output_filename = os.path.join(output_dir, 'validation_errors.json')
with open(output_filename, 'w+') as output_file:
response = requests.post('http://localhost:8009/validator/', files={'original_file': open(original_file, 'rb')}, data={'csrfmiddlewaretoken': 'foo'}, headers={'Cookie': 'csrftoken=' + 'foo'})
parsed = urlparse(response.url)
new_tuple = parsed.scheme, parsed.netloc, '/media/' + parsed.path.split('/')[-1] + '/validation_errors-3.json', '', '', ''
new_url = urlunparse(new_tuple)
output_file.write(requests.get(new_url).text)
os.kill(server_proc, signal.SIGTERM)
os.chdir(os.path.join('secret_data_test/ocds_cli'))
for dirname in os.listdir('.'):
print(dirname)
with open(os.path.join(dirname, 'results.json')) as fp, open(os.path.join('..', '..', 'secret_data_test_archive', 'ocds_cli', dirname, 'results.json')) as fp_archive:
assert json.load(fp) == json.load(fp_archive)
os.chdir('../..')
os.chdir(os.path.join('secret_data_test', 'ocds'))
for dirname in os.listdir('.'):
print(dirname)
with open(os.path.join(dirname, 'validation_errors.json')) as fp, open(os.path.join('..', '..', 'secret_data_test_archive', 'ocds', dirname, 'validation_errors.json')) as fp_archive:
assert json.load(fp) == json.load(fp_archive)
os.chdir('../..')
os.chdir(os.path.join('secret_data_test', '360'))
for dirname in os.listdir('.'):
print(dirname)
with open(os.path.join(dirname, 'validation_errors.json')) as fp, open(os.path.join('..', '..', 'secret_data_test_archive', '360', dirname, 'validation_errors.json')) as fp_archive:
assert json.load(fp) == json.load(fp_archive)