Smallpond provides both high-level and low-level APIs.
Note
Currently, smallpond provides two different APIs, supporting dynamic and static construction of data flow graphs respectively. Due to historical reasons, these two APIs use different scheduler backends and support different configuration options.
- The High-level API currently uses Ray as the backend, supporting dynamic construction and execution of data flow graphs.
- The Low-level API uses a built-in scheduler and only supports one-time execution of static data flow graphs. However, it offers more performance optimizations and richer configuration options.
We are working to merge them so that in the future, you can use a unified high-level API and freely choose between Ray or the built-in scheduler.
The high-level API is centered around :ref:`dataframe`. It allows dynamic construction of data flow graphs, execution, and result retrieval.
A typical workflow looks like this:
import smallpond
sp = smallpond.init()
df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")
.. toctree:: :maxdepth: 2 api/dataframe
It is recommended to use the DataFrame API.
In the low-level API, users manually create :ref:`nodes` to construct static data flow graphs, then submit them to smallpond to generate :ref:`tasks` and wait for all tasks to complete.
A complete example is shown below.
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan
from smallpond.execution.driver import Driver
def my_pipeline(input_paths: List[str], npartitions: int):
ctx = Context()
dataset = ParquetDataSet(input_paths)
node = DataSourceNode(ctx, dataset)
node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)
node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")
return LogicalPlan(ctx, node)
if __name__ == "__main__":
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-n", "--npartitions", type=int, default=10)
plan = my_pipeline(**driver.get_arguments())
driver.run(plan)
To run this script:
python script.py -i "path/to/*.parquet" -n 10
.. toctree:: :maxdepth: 2 api/dataset api/nodes api/tasks api/execution