Skip to content

Commit 5e51379

Browse files
feat(api): add execute method and structured output support
1 parent 6698e71 commit 5e51379

File tree

16 files changed

+751
-36
lines changed

16 files changed

+751
-36
lines changed

README.md

Lines changed: 144 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
The Parallel Python library provides convenient access to the Parallel REST API from any Python 3.8+
66
application. The library includes type definitions for all request params and response fields,
77
and offers both synchronous and asynchronous clients powered by [httpx](https://github.com/encode/httpx).
8+
It is strongly encouraged to use the asynchronous client for best performance.
89

910
It is generated with [Stainless](https://www.stainless.com/).
1011

1112
## Documentation
1213

13-
The REST API documentation can be found on [docs.parallel.ai](https://docs.parallel.ai). The full API of this library can be found in [api.md](api.md).
14+
The REST API documentation can be found on our [docs](https://docs.parallel.ai).
15+
The full API of this Python library can be found in [api.md](api.md).
1416

1517
## Installation
1618

@@ -34,11 +36,12 @@ client = Parallel(
3436
api_key=os.environ.get("PARALLEL_API_KEY"), # This is the default and can be omitted
3537
)
3638

37-
task_run = client.task_run.create(
39+
run_result = client.task_run.execute(
3840
input="France (2023)",
39-
processor="processor",
41+
processor="core",
42+
output="GDP"
4043
)
41-
print(task_run.run_id)
44+
print(run_result.output)
4245
```
4346

4447
While you can provide an `api_key` keyword argument,
@@ -61,33 +64,133 @@ client = AsyncParallel(
6164

6265

6366
async def main() -> None:
64-
task_run = await client.task_run.create(
67+
run_result = await client.task_run.execute(
6568
input="France (2023)",
66-
processor="processor",
69+
processor="core",
70+
output="GDP"
6771
)
68-
print(task_run.run_id)
72+
print(run_result.output)
6973

7074

71-
asyncio.run(main())
75+
if __name__ == "__main__":
76+
asyncio.run(main())
7277
```
7378

74-
Functionality between the synchronous and asynchronous clients is otherwise identical.
79+
To get the best performance out of Parallel's API, we recommend
80+
using the asynchronous client, especially for executing multiple Task Runs concurrently.
81+
Functionality between the synchronous and asynchronous clients is identical.
7582

76-
## Using types
83+
## Convenience methods
7784

78-
Nested request parameters are [TypedDicts](https://docs.python.org/3/library/typing.html#typing.TypedDict). Responses are [Pydantic models](https://docs.pydantic.dev) which also provide helper methods for things like:
85+
### Execute
7986

80-
- Serializing back into JSON, `model.to_json()`
81-
- Converting to a dictionary, `model.to_dict()`
87+
The `execute` method provides a single call which combines creating a task run,
88+
polling until it is completed, and parsing structured outputs (if specified).
8289

83-
Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set `python.analysis.typeCheckingMode` to `basic`.
90+
If an output type which inherits from `BaseModel` is
91+
specified in the call to `.execute()`, the response content will be parsed into an
92+
instance of the provided output type. The parsed output can be accessed via the
93+
`parsed` property on the output field of the response.
8494

85-
## Nested params
95+
```python
96+
import os
97+
import asyncio
98+
from parallel import AsyncParallel
99+
from pydantic import BaseModel
86100

87-
Nested parameters are dictionaries, typed using `TypedDict`, for example:
101+
client = AsyncParallel()
102+
103+
class SampleOutputStructure(BaseModel):
104+
output: str
105+
106+
async def main() -> None:
107+
# with pydantic
108+
run_result = await client.task_run.execute(
109+
input="France (2023)",
110+
processor="core",
111+
output=SampleOutputStructure,
112+
)
113+
# parsed output of type SampleOutputStructure
114+
print(run_result.output.parsed)
115+
# without pydantic
116+
run_result = await client.task_run.execute(
117+
input="France (2023)",
118+
processor="core",
119+
output="GDP"
120+
)
121+
print(run_result.output)
122+
123+
124+
if __name__ == "__main__":
125+
asyncio.run(main())
126+
```
127+
128+
The async client allows creating several task runs without blocking.
129+
To create multiple task runs in one go, call execute and then gather the results at the end.
130+
131+
```python
132+
import asyncio
133+
import os
134+
135+
from parallel import AsyncParallel
136+
from pydantic import BaseModel, Field
137+
from typing import List
138+
139+
class CountryInput(BaseModel):
140+
country: str = Field(
141+
description="Name of the country to research. Must be a recognized "
142+
"sovereign nation (e.g., 'France', 'Japan')."
143+
)
144+
year: int = Field(
145+
description="Year for which to retrieve data. Must be 2000 or later. "
146+
"Use most recent full-year estimates if year is current."
147+
)
148+
149+
class CountryOutput(BaseModel):
150+
gdp: str = Field(
151+
description="GDP in USD for the year, formatted like '$3.1 trillion (2023)'."
152+
)
153+
top_exports: List[str] = Field(
154+
description="Top 3 exported goods/services by value. Use credible sources."
155+
)
156+
top_imports: List[str] = Field(
157+
description="Top 3 imported goods/services by value. Use credible sources."
158+
)
159+
160+
async def main():
161+
# Initialize the Parallel client
162+
client = AsyncParallel(api_key=os.environ.get("PARALLEL_API_KEY"))
163+
164+
# Prepare structured input
165+
input_data = [
166+
CountryInput(country="France", year=2023),
167+
CountryInput(country="Germany", year=2023),
168+
CountryInput(country="Italy", year=2023)
169+
]
170+
171+
run_results = await asyncio.gather(*[
172+
client.task_run.execute(
173+
input=datum,
174+
output=CountryOutput,
175+
processor="core"
176+
)
177+
for datum in input_data
178+
])
179+
180+
for run_input, run_result in zip(input_data, run_results):
181+
print(f"Task run output for {run_input}: {run_result.output.parsed}")
182+
183+
if __name__ == "__main__":
184+
asyncio.run(main())
185+
```
186+
187+
## Low level API Access
188+
189+
The library also provides access to the low level API for accessing the Parallel API.
88190

89191
```python
90192
from parallel import Parallel
193+
from parallel.types import TaskSpecParam
91194

92195
client = Parallel()
93196

@@ -125,9 +228,16 @@ task_run = client.task_run.create(
125228
},
126229
},
127230
)
128-
print(task_run.task_spec)
231+
232+
run_result = client.task_run.result(task_run.id)
233+
print(run_result.output)
129234
```
130235

236+
For more information, please check out the relevant section in our docs:
237+
238+
- [Task Spec](https://docs.parallel.ai/core-concepts/task-spec)
239+
- [Task Runs](https://docs.parallel.ai/core-concepts/task-runs)
240+
131241
## Handling errors
132242

133243
When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of `parallel.APIConnectionError` is raised.
@@ -144,9 +254,10 @@ from parallel import Parallel
144254
client = Parallel()
145255

146256
try:
147-
client.task_run.create(
257+
client.task_run.execute(
148258
input="France (2023)",
149-
processor="processor",
259+
processor="core",
260+
output="GDP"
150261
)
151262
except parallel.APIConnectionError as e:
152263
print("The server could not be reached")
@@ -190,9 +301,10 @@ client = Parallel(
190301
)
191302

192303
# Or, configure per-request:
193-
client.with_options(max_retries=5).task_run.create(
304+
client.with_options(max_retries=5).task_run.execute(
194305
input="France (2023)",
195-
processor="processor",
306+
processor="core",
307+
output="GDP"
196308
)
197309
```
198310

@@ -216,9 +328,10 @@ client = Parallel(
216328
)
217329

218330
# Override per-request:
219-
client.with_options(timeout=5.0).task_run.create(
331+
client.with_options(timeout=5.0).task_run.execute(
220332
input="France (2023)",
221-
processor="processor",
333+
processor="core",
334+
output="GDP"
222335
)
223336
```
224337

@@ -260,14 +373,15 @@ The "raw" Response object can be accessed by prefixing `.with_raw_response.` to
260373
from parallel import Parallel
261374

262375
client = Parallel()
263-
response = client.task_run.with_raw_response.create(
376+
response = client.task_run.with_raw_response.execute(
264377
input="France (2023)",
265-
processor="processor",
378+
processor="core",
379+
output="GDP"
266380
)
267381
print(response.headers.get('X-My-Header'))
268382

269-
task_run = response.parse() # get the object that `task_run.create()` would have returned
270-
print(task_run.run_id)
383+
task_run = response.parse() # get the object that `task_run.execute()` would have returned
384+
print(task_run.output)
271385
```
272386

273387
These methods return an [`APIResponse`](https://github.com/shapleyai/parallel-sdk-python/tree/main/src/parallel/_response.py) object.
@@ -281,9 +395,10 @@ The above interface eagerly reads the full response body when you make the reque
281395
To stream the response body, use `.with_streaming_response` instead, which requires a context manager and only reads the response body once you call `.read()`, `.text()`, `.json()`, `.iter_bytes()`, `.iter_text()`, `.iter_lines()` or `.parse()`. In the async client, these are async methods.
282396

283397
```python
284-
with client.task_run.with_streaming_response.create(
398+
with client.task_run.with_streaming_response.execute(
285399
input="France (2023)",
286-
processor="processor",
400+
processor="core",
401+
output="GDP"
287402
) as response:
288403
print(response.headers.get("X-My-Header"))
289404

api.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33
Types:
44

55
```python
6-
from parallel.types import Input, JsonSchema, TaskRun, TaskRunResult, TaskSpec, TextSchema
6+
from parallel.types import Input, JsonSchema, ParsedTaskRunResult, TaskRun, TaskRunResult, TaskSpec, TextSchema
77
```
88

99
Methods:
1010

1111
- <code title="post /v1/tasks/runs">client.task_run.<a href="./src/parallel/resources/task_run.py">create</a>(\*\*<a href="src/parallel/types/task_run_create_params.py">params</a>) -> <a href="./src/parallel/types/task_run.py">TaskRun</a></code>
1212
- <code title="get /v1/tasks/runs/{run_id}">client.task_run.<a href="./src/parallel/resources/task_run.py">retrieve</a>(run_id) -> <a href="./src/parallel/types/task_run.py">TaskRun</a></code>
1313
- <code title="get /v1/tasks/runs/{run_id}/result">client.task_run.<a href="./src/parallel/resources/task_run.py">result</a>(run_id, \*\*<a href="src/parallel/types/task_run_result_params.py">params</a>) -> <a href="./src/parallel/types/task_run_result.py">TaskRunResult</a></code>
14+
15+
Convenience methods:
16+
17+
- <code title="post /v1/tasks/runs">client.task_run.<a href="./src/parallel/resources/task_run.py">execute</a>(input, processor, output: <a href="./src/parallel/types/task_spec_param.py">OutputSchema</a>) -> <a href="./src/parallel/types/task_run_result.py">TaskRunResult</a></code>
18+
- <code title="post /v1/tasks/runs">client.task_run.<a href="./src/parallel/resources/task_run.py">execute</a>(input, processor, output: Type[OutputT]) -> <a href="./src/parallel/types/parsed_task_run_result.py">ParsedTaskRunResult[OutputT]</a></code>

src/parallel/_compat.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ def model_parse(model: type[_ModelT], data: Any) -> _ModelT:
164164
return model.parse_obj(data) # pyright: ignore[reportDeprecated]
165165

166166

167+
def model_parse_json(model: type[_ModelT], data: str | bytes) -> _ModelT:
168+
if PYDANTIC_V2:
169+
return model.model_validate_json(data)
170+
return model.parse_raw(data) # pyright: ignore[reportDeprecated]
171+
172+
173+
def model_json_schema(model: type[_ModelT]) -> dict[str, Any]:
174+
if PYDANTIC_V2:
175+
return model.model_json_schema()
176+
return model.schema() # pyright: ignore[reportDeprecated]
177+
167178
# generic models
168179
if TYPE_CHECKING:
169180

src/parallel/_constants.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
RAW_RESPONSE_HEADER = "X-Stainless-Raw-Response"
66
OVERRIDE_CAST_TO_HEADER = "____stainless_override_cast_to"
77

8-
# default timeout is 1 minute
9-
DEFAULT_TIMEOUT = httpx.Timeout(timeout=60, connect=5.0)
8+
# default timeout for http requests is 10 minutes.
9+
DEFAULT_TIMEOUT_SECONDS = 600
10+
DEFAULT_TIMEOUT = httpx.Timeout(timeout=DEFAULT_TIMEOUT_SECONDS, connect=5.0)
1011
DEFAULT_MAX_RETRIES = 2
1112
DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=100, max_keepalive_connections=20)
13+
DEFAULT_POLL_INTERVAL_MS = 20000
1214

1315
INITIAL_RETRY_DELAY = 0.5
1416
MAX_RETRY_DELAY = 8.0

src/parallel/_utils/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ._sync import asyncify as asyncify
22
from ._proxy import LazyProxy as LazyProxy
33
from ._utils import (
4+
is_str as is_str,
45
flatten as flatten,
56
is_dict as is_dict,
67
is_list as is_list,

src/parallel/_utils/_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ def is_dict(obj: object) -> TypeGuard[dict[object, object]]:
169169
return isinstance(obj, dict)
170170

171171

172+
def is_str(obj: object) -> TypeGuard[str]:
173+
return isinstance(obj, str)
174+
175+
172176
def is_list(obj: object) -> TypeGuard[list[object]]:
173177
return isinstance(obj, list)
174178

src/parallel/lib/__init__.py

Whitespace-only changes.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from ._task_spec import build_task_spec_param as build_task_spec_param
2+
from ._task_run_result import task_run_result_parser as task_run_result_parser

0 commit comments

Comments
 (0)