Skip to content

Commit ddcc50e

Browse files
authored
Merge pull request #4485 from hhunter-ms/issue_4424
add python to streaming subscriptions
2 parents 5f6a726 + 0a61f41 commit ddcc50e

File tree

1 file changed

+106
-1
lines changed

1 file changed

+106
-1
lines changed

daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,112 @@ As messages are sent to the given message handler code, there is no concept of r
203203

204204
The example below shows the different ways to stream subscribe to a topic.
205205

206-
{{< tabs Go>}}
206+
{{< tabs Python Go >}}
207+
208+
{{% codetab %}}
209+
210+
You can use the `subscribe` method, which returns a `Subscription` object and allows you to pull messages from the stream by calling the `next_message` method. This runs in and may block the main thread while waiting for messages.
211+
212+
```python
213+
import time
214+
215+
from dapr.clients import DaprClient
216+
from dapr.clients.grpc.subscription import StreamInactiveError
217+
218+
counter = 0
219+
220+
221+
def process_message(message):
222+
global counter
223+
counter += 1
224+
# Process the message here
225+
print(f'Processing message: {message.data()} from {message.topic()}...')
226+
return 'success'
227+
228+
229+
def main():
230+
with DaprClient() as client:
231+
global counter
232+
233+
subscription = client.subscribe(
234+
pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
235+
)
236+
237+
try:
238+
while counter < 5:
239+
try:
240+
message = subscription.next_message()
241+
242+
except StreamInactiveError as e:
243+
print('Stream is inactive. Retrying...')
244+
time.sleep(1)
245+
continue
246+
if message is None:
247+
print('No message received within timeout period.')
248+
continue
249+
250+
# Process the message
251+
response_status = process_message(message)
252+
253+
if response_status == 'success':
254+
subscription.respond_success(message)
255+
elif response_status == 'retry':
256+
subscription.respond_retry(message)
257+
elif response_status == 'drop':
258+
subscription.respond_drop(message)
259+
260+
finally:
261+
print("Closing subscription...")
262+
subscription.close()
263+
264+
265+
if __name__ == '__main__':
266+
main()
267+
268+
```
269+
270+
You can also use the `subscribe_with_handler` method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn't block the main thread.
271+
272+
```python
273+
import time
274+
275+
from dapr.clients import DaprClient
276+
from dapr.clients.grpc._response import TopicEventResponse
277+
278+
counter = 0
279+
280+
281+
def process_message(message):
282+
# Process the message here
283+
global counter
284+
counter += 1
285+
print(f'Processing message: {message.data()} from {message.topic()}...')
286+
return TopicEventResponse('success')
287+
288+
289+
def main():
290+
with (DaprClient() as client):
291+
# This will start a new thread that will listen for messages
292+
# and process them in the `process_message` function
293+
close_fn = client.subscribe_with_handler(
294+
pubsub_name='pubsub', topic='orders', handler_fn=process_message,
295+
dead_letter_topic='orders_dead'
296+
)
297+
298+
while counter < 5:
299+
time.sleep(1)
300+
301+
print("Closing subscription...")
302+
close_fn()
303+
304+
305+
if __name__ == '__main__':
306+
main()
307+
```
308+
309+
[Learn more about streaming subscriptions using the Python SDK client.]({{< ref "python-client.md#streaming-message-subscription" >}})
310+
311+
{{% /codetab %}}
207312
208313
{{% codetab %}}
209314

0 commit comments

Comments
 (0)