-
Notifications
You must be signed in to change notification settings - Fork 530
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1160 from rochabr/jobs-python
Jobs API Quickstart for Python - HTTP
- Loading branch information
Showing
6 changed files
with
327 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
# Dapr Jobs API (HTTP Client) | ||
|
||
In this quickstart, you'll schedule, get, and delete a job using Dapr's Job API. This API is responsible for scheduling and running jobs at a specific time or interval. | ||
|
||
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/jobs/) link for more information about Dapr and the Jobs API. | ||
|
||
|
||
This quickstart includes two apps: | ||
|
||
- `job-scheduler/app.py`, responsible for scheduling, retrieving and deleting jobs. | ||
- `job-service/app.py`, responsible for handling the triggered jobs. | ||
|
||
## Prerequisites | ||
|
||
- [Python 3.8+](https://www.python.org/downloads/) | ||
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) | ||
- [Initialized Dapr environment](https://docs.dapr.io/getting-started/install-dapr-selfhost/) | ||
|
||
## Install dependencies | ||
|
||
```bash | ||
pip install -r requirements.txt | ||
``` | ||
|
||
## Run all apps with multi-app run template file | ||
|
||
This section shows how to run both applications at once using [multi-app run template files](https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-overview/) with `dapr run -f .`. This enables you to test the interactions between multiple applications and will `schedule`, `run`, `get`, and `delete` jobs within a single process. | ||
|
||
Open a new terminal window and run the multi app run template: | ||
|
||
<!-- STEP | ||
name: Run multi app run template | ||
expected_stdout_lines: | ||
- '== APP - job-service == Received job request...' | ||
- '== APP - job-service == Executing maintenance job: Oil Change' | ||
- '== APP - job-scheduler == Job Scheduled: C-3PO' | ||
- '== APP - job-service == Received job request...' | ||
- '== APP - job-service == Executing maintenance job: Limb Calibration' | ||
expected_stderr_lines: | ||
output_match_mode: substring | ||
match_order: none | ||
background: true | ||
sleep: 60 | ||
timeout_seconds: 120 | ||
--> | ||
|
||
```bash | ||
dapr run -f . | ||
``` | ||
|
||
The terminal console output should look similar to this, where: | ||
|
||
- The `R2-D2` job is being scheduled. | ||
- The `R2-D2` job is being executed after 2 seconds. | ||
- The `C-3PO` job is being scheduled. | ||
- The `C-3PO` job is being retrieved. | ||
|
||
```text | ||
== APP - job-scheduler == Job Scheduled: R2-D2 | ||
== APP - job-service == Received job request... | ||
== APP - job-service == Starting droid: R2-D2 | ||
== APP - job-service == Executing maintenance job: Oil Change | ||
== APP - job-scheduler == Job Scheduled: C-3PO | ||
== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"ttype.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} | ||
``` | ||
|
||
After 30 seconds, the terminal output should present the `C-3PO` job being processed: | ||
|
||
```text | ||
== APP - job-service == Received job request... | ||
== APP - job-service == Starting droid: C-3PO | ||
== APP - job-service == Executing maintenance job: Limb Calibration | ||
``` | ||
|
||
<!-- END_STEP --> | ||
|
||
2. Stop and clean up application processes | ||
|
||
<!-- STEP | ||
name: Stop multi-app run | ||
sleep: 5 | ||
--> | ||
|
||
```bash | ||
dapr stop -f . | ||
``` | ||
|
||
<!-- END_STEP --> | ||
|
||
## Run apps individually | ||
|
||
### Start the job service | ||
|
||
1. Open a terminal and run the `job-service` app: | ||
|
||
```bash | ||
cd job-service | ||
dapr run --app-id job-service --app-port 6200 --dapr-http-port 6280 -- python app.py | ||
``` | ||
|
||
### Schedule jobs | ||
|
||
1. On a new terminal window, schedule the `R2-D2` Job using the Jobs API: | ||
|
||
```bash | ||
curl -X POST \ | ||
http://localhost:6280/v1.0-alpha1/jobs/R2D2 \ | ||
-H "Content-Type: application/json" \ | ||
-d '{ | ||
"data": { | ||
"@type": "type.googleapis.com/google.protobuf.StringValue", | ||
"value": "R2-D2:Oil Change" | ||
}, | ||
"dueTime": "2s" | ||
}' | ||
``` | ||
|
||
Back at the `job-service` app terminal window, the output should be: | ||
|
||
```text | ||
== APP - job-service == Received job request... | ||
== APP - job-service == Starting droid: R2-D2 | ||
== APP - job-service == Executing maintenance job: Oil Change | ||
``` | ||
|
||
2. On the same terminal window, schedule the `C-3PO` Job using the Jobs API: | ||
|
||
```bash | ||
curl -X POST \ | ||
http://localhost:6280/v1.0-alpha1/jobs/c-3po \ | ||
-H "Content-Type: application/json" \ | ||
-d '{ | ||
"data": { | ||
"@type": "type.googleapis.com/google.protobuf.StringValue", | ||
"value": "C-3PO:Limb Calibration" | ||
}, | ||
"dueTime": "30s" | ||
}' | ||
``` | ||
|
||
### Get a scheduled job | ||
|
||
1. On the same terminal window, run the command below to get the recently scheduled `C-3PO` job: | ||
|
||
```bash | ||
curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" | ||
``` | ||
|
||
You should see the following: | ||
|
||
```text | ||
{"name":"C-3PO", "dueTime":"30s", "data":{"@type":"type.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} | ||
``` | ||
|
||
### Delete a scheduled job | ||
|
||
1. On the same terminal window, run the command below to delete the recently scheduled `C-3PO` job: | ||
|
||
```bash | ||
curl -X DELETE http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" | ||
``` | ||
|
||
2. Run the command below to attempt to retrieve the deleted job: | ||
|
||
```bash | ||
curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" | ||
``` | ||
|
||
You should see an error message indicating that the job was not found: | ||
|
||
```text | ||
{"errorCode":"ERR_JOBS_NOT_FOUND","message":"job not found: app||default||job-service||c-3po"} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
version: 1 | ||
apps: | ||
- appDirPath: ./job-service/ | ||
appID: job-service | ||
appPort: 6200 | ||
daprHTTPPort: 6280 | ||
command: ["python3", "app.py"] | ||
- appDirPath: ./job-scheduler/ | ||
appID: job-scheduler | ||
appPort: 6300 | ||
daprHTTPPort: 6380 | ||
command: ["python3", "app.py"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import os | ||
import time | ||
import requests | ||
import json | ||
|
||
C3PO_JOB_BODY = { | ||
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "C-3PO:Limb Calibration"}, | ||
"dueTime": "10s", | ||
} | ||
|
||
R2D2_JOB_BODY = { | ||
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "R2-D2:Oil Change"}, | ||
"dueTime": "2s" | ||
} | ||
|
||
def schedule_job(host: str, port: str, job_name: str, job_body: dict) -> None: | ||
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}" | ||
|
||
try: | ||
response = requests.post( | ||
req_url, | ||
json=job_body, | ||
headers={"Content-Type": "application/json"}, | ||
timeout=15 | ||
) | ||
|
||
# Accept both 200 and 204 as success codes | ||
if response.status_code not in [200, 204]: | ||
raise Exception(f"Failed to schedule job. Status code: {response.status_code}, Response: {response.text}") | ||
|
||
print(f"Job Scheduled: {job_name}") | ||
if response.text: | ||
print(f"Response: {response.text}") | ||
|
||
except requests.exceptions.RequestException as e: | ||
print(f"Error scheduling job {job_name}: {str(e)}") | ||
raise | ||
|
||
def get_job_details(host: str, port: str, job_name: str) -> None: | ||
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}" | ||
|
||
try: | ||
response = requests.get(req_url, timeout=15) | ||
if response.status_code in [200, 204]: | ||
print(f"Job details for {job_name}: {response.text}") | ||
else: | ||
print(f"Failed to get job details. Status code: {response.status_code}, Response: {response.text}") | ||
|
||
except requests.exceptions.RequestException as e: | ||
print(f"Error getting job details for {job_name}: {str(e)}") | ||
raise | ||
|
||
def main(): | ||
# Wait for services to be ready | ||
time.sleep(5) | ||
|
||
dapr_host = os.getenv('DAPR_HOST', 'http://localhost') | ||
scheduler_dapr_http_port = os.getenv('SCHEDULER_DAPR_HTTP_PORT', '6280') | ||
|
||
# Schedule R2-D2 job | ||
schedule_job(dapr_host, scheduler_dapr_http_port, "R2-D2", R2D2_JOB_BODY) | ||
time.sleep(5) | ||
|
||
# Schedule C-3PO job | ||
schedule_job(dapr_host, scheduler_dapr_http_port, "C-3PO", C3PO_JOB_BODY) | ||
time.sleep(5) | ||
|
||
# Get C-3PO job details | ||
get_job_details(dapr_host, scheduler_dapr_http_port, "C-3PO") | ||
time.sleep(5) | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import os | ||
import json | ||
import traceback | ||
from http.server import HTTPServer, BaseHTTPRequestHandler | ||
from urllib.parse import urlparse, parse_qs | ||
|
||
class DroidJob: | ||
def __init__(self, droid: str, task: str): | ||
self.droid = droid | ||
self.task = task | ||
|
||
def set_droid_job(decoded_value: str) -> DroidJob: | ||
# Remove newlines from decoded value and split into droid and task | ||
droid_str = decoded_value.replace('\n', '') | ||
droid_array = droid_str.split(':') | ||
return DroidJob(droid_array[0], droid_array[1]) | ||
|
||
class JobHandler(BaseHTTPRequestHandler): | ||
def _send_response(self, status_code: int, message: str = ""): | ||
self.send_response(status_code) | ||
self.send_header('Content-type', 'application/json') | ||
self.end_headers() | ||
if message: | ||
self.wfile.write(json.dumps({"message": message}).encode('utf-8')) | ||
|
||
def do_POST(self): | ||
print('Received job request...', flush=True) | ||
|
||
try: | ||
# Check if path starts with /job/ | ||
if not self.path.startswith('/job/'): | ||
self._send_response(404, "Not Found") | ||
return | ||
|
||
# Read request body | ||
content_length = int(self.headers.get('Content-Length', 0)) | ||
raw_data = self.rfile.read(content_length).decode('utf-8') | ||
|
||
# Parse outer JSON data | ||
job_data = json.loads(raw_data) | ||
|
||
# Extract value directly from the job data | ||
value = job_data.get('value', '') | ||
|
||
# Create DroidJob from value | ||
droid_job = set_droid_job(value) | ||
|
||
print("Starting droid: " + droid_job.droid, flush=True) | ||
print("Executing maintenance job: " + droid_job.task, flush=True) | ||
|
||
self._send_response(200) | ||
|
||
except Exception as e: | ||
print("Error processing job request:", flush= True) | ||
print(traceback.format_exc()) | ||
self._send_response(400, f"Error processing job: {str(e)}") | ||
|
||
def run_server(port: int): | ||
server_address = ('', port) | ||
httpd = HTTPServer(server_address, JobHandler) | ||
print("Server started on port " + str(port), flush=True) | ||
httpd.serve_forever() | ||
|
||
if __name__ == '__main__': | ||
app_port = int(os.getenv('APP_PORT', '6200')) | ||
run_server(app_port) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
include ../../../docker.mk | ||
include ../../../validate.mk |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
requests==2.31.0 |