1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ # To run the tests:
16+ # nox -s "lint(sample='./dataflow/run_template')"
17+ # nox -s "py27(sample='./dataflow/run_template')"
18+ # nox -s "py36(sample='./dataflow/run_template')"
19+
1520import flask
1621import json
1722import os
1823import pytest
19- import subprocess as sp
2024import time
2125
2226from datetime import datetime
27+ from googleapiclient .discovery import build
28+ from googleapiclient .errors import HttpError
2329from werkzeug .urls import url_encode
2430
2531import main
2632
2733PROJECT = os .environ ['GCLOUD_PROJECT' ]
2834BUCKET = os .environ ['CLOUD_STORAGE_BUCKET' ]
2935
30- # Wait time until a job can be cancelled, as a best effort.
31- # If it fails to be cancelled, the job will run for ~8 minutes.
32- WAIT_TIME = 5 # seconds
36+ dataflow = build ('dataflow' , 'v1b3' )
3337
3438# Create a fake "app" for generating test request contexts.
3539@pytest .fixture (scope = "module" )
3640def app ():
3741 return flask .Flask (__name__ )
3842
3943
40- def test_run_template_empty_args (app ):
44+ def test_run_template_python_empty_args (app ):
45+ project = PROJECT
46+ job = datetime .now ().strftime ('test_run_template_python-%Y%m%d-%H%M%S' )
47+ template = 'gs://dataflow-templates/latest/Word_Count'
48+ with pytest .raises (HttpError ):
49+ main .run (project , job , template )
50+
51+
52+ def test_run_template_python (app ):
53+ project = PROJECT
54+ job = datetime .now ().strftime ('test_run_template_python-%Y%m%d-%H%M%S' )
55+ template = 'gs://dataflow-templates/latest/Word_Count'
56+ parameters = {
57+ 'inputFile' : 'gs://apache-beam-samples/shakespeare/kinglear.txt' ,
58+ 'output' : 'gs://{}/dataflow/wordcount/outputs' .format (BUCKET ),
59+ }
60+ res = main .run (project , job , template , parameters )
61+ dataflow_jobs_cancel (res ['job' ]['id' ])
62+
63+
64+ def test_run_template_http_empty_args (app ):
4165 with app .test_request_context ():
4266 with pytest .raises (KeyError ):
4367 main .run_template (flask .request )
4468
4569
46- def test_run_template_url (app ):
70+ def test_run_template_http_url (app ):
4771 args = {
4872 'project' : PROJECT ,
4973 'job' : datetime .now ().strftime ('test_run_template_url-%Y%m%d-%H%M%S' ),
@@ -54,12 +78,10 @@ def test_run_template_url(app):
5478 with app .test_request_context ('/?' + url_encode (args )):
5579 res = main .run_template (flask .request )
5680 data = json .loads (res )
57- job_id = data ['job' ]['id' ]
58- time .sleep (WAIT_TIME )
59- assert sp .call (['gcloud' , 'dataflow' , 'jobs' , 'cancel' , job_id ]) == 0
81+ dataflow_jobs_cancel (data ['job' ]['id' ])
6082
6183
62- def test_run_template_data (app ):
84+ def test_run_template_http_data (app ):
6385 args = {
6486 'project' : PROJECT ,
6587 'job' : datetime .now ().strftime ('test_run_template_data-%Y%m%d-%H%M%S' ),
@@ -70,12 +92,10 @@ def test_run_template_data(app):
7092 with app .test_request_context (data = args ):
7193 res = main .run_template (flask .request )
7294 data = json .loads (res )
73- job_id = data ['job' ]['id' ]
74- time .sleep (WAIT_TIME )
75- assert sp .call (['gcloud' , 'dataflow' , 'jobs' , 'cancel' , job_id ]) == 0
95+ dataflow_jobs_cancel (data ['job' ]['id' ])
7696
7797
78- def test_run_template_json (app ):
98+ def test_run_template_http_json (app ):
7999 args = {
80100 'project' : PROJECT ,
81101 'job' : datetime .now ().strftime ('test_run_template_json-%Y%m%d-%H%M%S' ),
@@ -86,6 +106,16 @@ def test_run_template_json(app):
86106 with app .test_request_context (json = args ):
87107 res = main .run_template (flask .request )
88108 data = json .loads (res )
89- job_id = data ['job' ]['id' ]
90- time .sleep (WAIT_TIME )
91- assert sp .call (['gcloud' , 'dataflow' , 'jobs' , 'cancel' , job_id ]) == 0
109+ dataflow_jobs_cancel (data ['job' ]['id' ])
110+
111+
112+ def dataflow_jobs_cancel (job_id ):
113+ # Wait time until a job can be cancelled, as a best effort.
114+ # If it fails to be cancelled, the job will run for ~8 minutes.
115+ time .sleep (5 ) # seconds
116+ request = dataflow .projects ().jobs ().update (
117+ projectId = PROJECT ,
118+ jobId = job_id ,
119+ body = {'requestedState' : 'JOB_STATE_CANCELLED' }
120+ )
121+ request .execute ()
0 commit comments