-
Notifications
You must be signed in to change notification settings - Fork 109
Guide to Use a Python UDF
User-defined Functions (UDFs) provide a means to incorporate custom logic into Texera. Texera offers comprehensive Python UDF APIs, enabling users to accomplish various tasks. This guide will delve into the usage of UDFs, breaking down the process step by step.
The UDF operator offers the following interface, requiring the user to provide the following inputs: Python code, worker count, and output schema.
-
Users can click on the "Edit code content" button to open the UDF code editor, where they can enter their custom Python code to define the desired operator. -
Users have the flexibility to adjust the parallelism of the UDF operator by modifying the number of workers. The engine will then create the corresponding number of workers to execute the same operator in parallel. -
Users need to provide the output schema of the UDF operator, which describes the output data's fields.- The option
Retain input columnsallows users to include the input schema as the foundation for the output schema. - The
Extra output column(s)list allows users to define additional fields that should be included in the output schema.
- The option
-
Optionally, users can click on the pencil icon located next to the operator name to make modifications to the name of the operator.
In Texera, all operators are implemented as iterators, including Python UDFs. Concepturally, a defined operator is executed as:
operator = UDF() # initialize a UDF operator
... # some other initialization logic
# the main process loop
while input_stream.has_more():
input_data = next_data()
output_iterator = operator.process(input_data)
for output_data in output_iterator:
send(output_data)
... # some cleanup logicThe complete life cycle of a UDF operator consists of the following APIs:
-
open() -> NoneOpen a context of the operator. Usually it can be used for loading/initiating some resources, such as a file, a model, or an API client. It will be invoked once per operator. -
process(data, port: int) -> Iterator[Optional[data]]Process an input data from the given port, returning an iterator of optional data as output. It will be invoked once for every unit of data. -
on_finish(port: int) -> Iterator[Optional[data]]Callback when one input port is exhausted, returning an iterator of optional data as output. It will be invoked once per port. -
close() -> NoneClose the context of the operator. It will be invoked once per operator.
There are three APIs to process the data in different units.
- Tuple API.
class ProcessTupleOperator(UDFOperatorV2):
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
yield tuple_Tuple API takes one input tuple from a port at a time. It returns an iterator of optional TupleLike instances. A TupleLike is any data structure that supports key-value pairs, such as pytexera.Tuple, dict, defaultdict, NamedTuple, etc.
Tuple API is useful for implementing functional operations which are applied to tuples one by one, such as map, reduce, and filter.
- Table API.
class ProcessTableOperator(UDFTableOperator):
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield tableTable API consumes a Table at a time, which consists of all the tuples from a port. It returns an iterator of optional TableLike instances. A TableLike is a collection of TupleLike, and currently, we support pytexera.Table and pandas.DataFrame as a TableLike instance. More flexible types will be supported down the road.
Table API is useful for implementing blocking operations that will consume all the data from one port, such as join, sort, and machine learning training.
- Batch API.
class ProcessBatchOperator(UDFBatchOperator):
BATCH_SIZE = 10
def process_batch(self, batch: Batch, port: int) -> Iterator[Optional[BatchLike]]:
yield batchBatch API consumes a batch of tuples at a time. Similar to Table, a Batch is also a collection of Tuples; however, its size is defined by the BATCH_SIZE, and one port can have multiple batches. It returns an iterator of optional BatchLike instances. A BatchLike is a collection of TupleLike, and currently, we support pytexera.Batch and pandas.DataFrame as a BatchLike instance. More flexible types will be supported down the road.
The Batch API serves as a hybrid API combining the features of both the Tuple and Table APIs. It is particularly valuable for striking a balance between time and space considerations, offering a trade-off that optimizes efficiency.
All three APIs can return an empty iterator by yield None.
A UDF has an input Schema and an output Schema. The input schema is determined by the upstream operator's output schema and the engine will make sure the input data (tuple, table, or batch) matches the input schema. On the other hand, users are required to define the output schema of the UDF, and it is the user's responsibility to make sure the data output from the UDF matches the defined output schema.
-
Input ports: A UDF can take zero, one or multiple input ports, different ports can have different input schemas. Each port can take in multiple links, as long as they share the same schema.
-
Output ports: Currently, a UDF can only have exactly one output port. This means it cannot be used as a terminal operator (i.e., operator without output ports), or have more than one output port.
This UDF has zero input port and one output port. It is considered as a source operator (operator that produces data without an upstream). It has a special API:
class GenerateOperator(UDFSourceOperator):
@overrides
def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
yield This produce() API returns an iterator of TupleLike, TableLike, or simply None.
See Generator Operator for an example of 1-out UDF.
This UDF has two input ports, namely model port and tuples port. The tuples port depends on the model port, which means that during the execution, the model port will execute first, and the tuples port will start after the model port consumes all its input data.
This dependency is particularly useful to implement machine learning inference operators, where a machine learning model is sent into the 2-in UDF through the model port, and becomes an operator state, then the tuples are coming in through the tuples port to be processed by the model.
An example of 2-in UDF:
class SVMClassifier(UDFOperatorV2):
@overrides
def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
if port == 0: # models port
self.model = tuple_['model']
else: # tuples port
tuple_['pred'] = self.model.predict(tuple_['text'])
yield tuple_
Currently, in 2-in UDF, "Retain input columns" will retain only the tuples port's input schema.
Copyright © 2025 The Apache Software Foundation.
Getting Started
Implementing an Operator
- Step 2 - Guide to Implement a Java Native Operator
- Step 3 - Guide to Use a Python UDF
- Step 4 - Guide to Implement a Python Native Operator
Contributing to the Project