@@ -75,11 +75,14 @@ Assuming one has an asynchronous client for the target database, three parts are
7575with asynchronous I/O against the database:
7676
7777 - An implementation of ` AsyncFunction ` that dispatches the requests
78- - A * callback* that takes the result of the operation and hands it to the ` ResultFuture `
78+ - A * callback* that takes the result of the operation and hands it to the ` ResultFuture ` in Java API or await the result of the operation in Python API
7979 - Applying the async I/O operation on a DataStream as a transformation with or without retry
8080
8181The following code example illustrates the basic pattern:
8282
83+ {{< tabs "6c8c009c-4c12-4338-9eeb-3be83cfa9e36" >}}
84+ {{< tab "Java" >}}
85+
8386``` java
8487// This example implements the asynchronous request and callback with Futures that have the
8588// interface of Java 8's futures (which is the same one followed by Flink's Future)
@@ -147,7 +150,74 @@ DataStream<Tuple2<String, String>> resultStream =
147150 AsyncDataStream . unorderedWaitWithRetry(stream, new AsyncDatabaseRequest (), 1000 , TimeUnit . MILLISECONDS , 100 , asyncRetryStrategy);
148151```
149152
150- ** Important note** : The ` ResultFuture ` is completed with the first call of ` ResultFuture.complete ` .
153+ {{< /tab >}}
154+ {{< tab "Python" >}}
155+
156+ ``` python
157+ from typing import List
158+
159+ from pyflink.common import Time, Types
160+ from pyflink.datastream import AsyncFunction, AsyncDataStream, async_retry_predicates
161+ from pyflink.datastream.functions import RuntimeContext, AsyncRetryStrategy
162+
163+
164+ class AsyncDatabaseRequest (AsyncFunction[str , (str , str )]):
165+
166+ def __init__ (self , host , port , credentials ):
167+ self ._host = host
168+ self ._port = port
169+ self ._credentials = credentials
170+
171+ def open (self , runtime_context : RuntimeContext):
172+ # The database specific client that can issue concurrent requests with callbacks
173+ self ._client = DatabaseClient(self ._host, self ._port, self ._credentials)
174+
175+ def close (self ):
176+ if self ._client:
177+ self ._client.close()
178+
179+ async def async_invoke (self , value : str ) -> List[(str , str )]:
180+ try :
181+ # issue the asynchronous request
182+ result = await self ._client.query(value)
183+ return [(value, str (result))]
184+ except Exception :
185+ return [(value, None )]
186+
187+
188+ # create the original stream
189+ stream = ...
190+
191+ # apply the async I/O transformation without retry
192+ result_stream = AsyncDataStream.unordered_wait(
193+ data_stream = stream,
194+ async_function = AsyncDatabaseRequest(" 127.0.0.1" , " 1234" , None ),
195+ timeout = Time.seconds(10 ),
196+ capacity = 100 ,
197+ output_type = Types.TUPLE([Types.STRING(), Types.STRING()]))
198+
199+ # or apply the async I/O transformation with retry
200+ # create an async retry strategy via utility class or a user defined strategy
201+ async_retry_strategy = AsyncRetryStrategy.fixed_delay(
202+ max_attempts = 3 ,
203+ backoff_time_millis = 100 ,
204+ result_predicate = async_retry_predicates.empty_result_predicate,
205+ exception_predicate = async_retry_predicates.has_exception_predicate)
206+
207+ # apply the async I/O transformation with retry
208+ result_stream_with_retry = AsyncDataStream.unordered_wait_with_retry(
209+ data_stream = stream,
210+ async_function = AsyncDatabaseRequest(" 127.0.0.1" , " 1234" , None ),
211+ timeout = Time.seconds(10 ),
212+ async_retry_strategy = async_retry_strategy,
213+ capacity = 1000 ,
214+ output_type = Types.TUPLE([Types.STRING(), Types.STRING()]))
215+ ```
216+
217+ {{< /tab >}}
218+ {{< /tabs >}}
219+
220+ ** Important note** : The ` ResultFuture ` is completed with the first call of ` ResultFuture.complete ` in the Java API.
151221All subsequent ` complete ` calls will be ignored.
152222
153223The following three parameters control the asynchronous operations:
@@ -162,17 +232,21 @@ The following three parameters control the asynchronous operations:
162232 accumulate an ever-growing backlog of pending requests, but that it will trigger backpressure once the capacity
163233 is exhausted.
164234
165- - ** AsyncRetryStrategy** : The asyncRetryStrategy defines what conditions will trigger a delayed retry and the delay strategy,
235+ - ** AsyncRetryStrategy** : This parameter defines what conditions will trigger a delayed retry and the delay strategy,
166236 e.g., fixed-delay, exponential-backoff-delay, custom implementation, etc.
167237
168238### Timeout Handling
169239
170240When an async I/O request times out, by default an exception is thrown and job is restarted.
171241If you want to handle timeouts, you can override the ` AsyncFunction#timeout ` method.
172- Make sure you call ` ResultFuture.complete() ` or ` ResultFuture.completeExceptionally() ` when overriding
242+
243+ In the Java API, make sure you call ` ResultFuture.complete() ` or ` ResultFuture.completeExceptionally() ` when overriding
173244in order to indicate to Flink that the processing of this input record has completed. You can call
174245` ResultFuture.complete(Collections.emptyList()) ` if you do not want to emit any record when timeouts happen.
175246
247+ In the Python API, you can return a collection of results or raise an exception when overriding
248+ in order to indicate to Flink that the processing of this input record has completed. You can return
249+ empty list by calling ` return [] ` if you do not want to emit any record when timeouts happen.
176250
177251### Order of Results
178252
@@ -182,14 +256,14 @@ To control in which order the resulting records are emitted, Flink offers two mo
182256 - ** Unordered** : Result records are emitted as soon as the asynchronous request finishes.
183257 The order of the records in the stream is different after the async I/O operator than before.
184258 This mode has the lowest latency and lowest overhead, when used with * processing time* as the basic time characteristic.
185- Use ` AsyncDataStream.unorderedWait(...) ` for this mode.
259+ Use ` AsyncDataStream.unorderedWait(...) ` or ` AsyncDataStream.unordered_wait(...) ` for this mode.
186260
187261 - ** Ordered** : In that case, the stream order is preserved. Result records are emitted in the same order as the asynchronous
188262 requests are triggered (the order of the operators input records). To achieve that, the operator buffers a result record
189263 until all its preceding records are emitted (or timed out).
190264 This usually introduces some amount of extra latency and some overhead in checkpointing, because records or results are maintained
191265 in the checkpointed state for a longer time, compared to the unordered mode.
192- Use ` AsyncDataStream.orderedWait(...) ` for this mode.
266+ Use ` AsyncDataStream.orderedWait(...) ` or ` AsyncDataStream.ordered_wait(...) ` for this mode.
193267
194268
195269### Event Time
@@ -240,6 +314,7 @@ with the checkpoint bookkeeping happens in a dedicated thread-pool anyways.
240314A ` DirectExecutor ` can be obtained via ` org.apache.flink.util.concurrent.Executors.directExecutor() ` or
241315` com.google.common.util.concurrent.MoreExecutors.directExecutor() ` .
242316
317+ ** NOTE:** This only applies for the Java API. In the Python API, you could just await the asynchronous result.
243318
244319### Caveats
245320
0 commit comments