Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EG] update samples #36060

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,89 +47,90 @@ async def run():
cloud_event_ack = CloudEvent(data="acknowledge", source="https://example.com", type="example")
cloud_event_renew = CloudEvent(data="renew", source="https://example.com", type="example")

# Send Cloud Events
await publisher.send(
[
cloud_event_reject,
cloud_event_release,
cloud_event_ack,
cloud_event_renew,
]
)

# Receive Published Cloud Events
try:
receive_results = await client.receive(
max_events=10,
max_wait_time=10,
async with publisher, client:
# Send Cloud Events
await publisher.send(
[
cloud_event_reject,
cloud_event_release,
cloud_event_ack,
cloud_event_renew,
]
)
except HttpResponseError:
raise

# Iterate through the results and collect the lock tokens for events we want to release/acknowledge/reject/renew:

release_events = []
acknowledge_events = []
reject_events = []
renew_events = []

for detail in receive_results:
data = detail.event.data
broker_properties = detail.broker_properties
if data == "release":
release_events.append(broker_properties.lock_token)
elif data == "acknowledge":
acknowledge_events.append(broker_properties.lock_token)
elif data == "renew":
renew_events.append(broker_properties.lock_token)
else:
reject_events.append(broker_properties.lock_token)

# Release/Acknowledge/Reject/Renew events

if len(release_events) > 0:
try:
release_result = await client.release(
lock_tokens=release_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in release_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(acknowledge_events) > 0:
try:
ack_result = await client.acknowledge(
lock_tokens=acknowledge_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in ack_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(reject_events) > 0:
try:
reject_result = await client.reject(
lock_tokens=reject_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in reject_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(renew_events) > 0:
# Receive Published Cloud Events
try:
renew_result = await client.renew_locks(
lock_tokens=renew_events,
receive_results = await client.receive(
max_events=10,
max_wait_time=10,
)
except HttpResponseError:
raise

for succeeded_lock_token in renew_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")
# Iterate through the results and collect the lock tokens for events we want to release/acknowledge/reject/renew:

release_events = []
acknowledge_events = []
reject_events = []
renew_events = []

for detail in receive_results:
data = detail.event.data
broker_properties = detail.broker_properties
if data == "release":
release_events.append(broker_properties.lock_token)
elif data == "acknowledge":
acknowledge_events.append(broker_properties.lock_token)
elif data == "renew":
renew_events.append(broker_properties.lock_token)
else:
reject_events.append(broker_properties.lock_token)

# Release/Acknowledge/Reject/Renew events

if len(release_events) > 0:
try:
release_result = await client.release(
lock_tokens=release_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in release_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(acknowledge_events) > 0:
try:
ack_result = await client.acknowledge(
lock_tokens=acknowledge_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in ack_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(reject_events) > 0:
try:
reject_result = await client.reject(
lock_tokens=reject_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in reject_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")

if len(renew_events) > 0:
try:
renew_result = await client.renew_locks(
lock_tokens=renew_events,
)
except HttpResponseError:
raise

for succeeded_lock_token in renew_result.succeeded_lock_tokens:
print(f"Succeeded Lock Token:{succeeded_lock_token}")


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@


async def publish():
client.send(
[
{
"type": "Contoso.Items.ItemReceived",
"source": "/contoso/items",
"data": {"itemSku": "Contoso Item SKU #1"},
"subject": "Door1",
"specversion": "1.0",
"id": "randomclouduuid11",
}
]
)
async with client:
await client.send(
[
{
"type": "Contoso.Items.ItemReceived",
"source": "/contoso/items",
"data": {"itemSku": "Contoso Item SKU #1"},
"subject": "Door1",
"specversion": "1.0",
"id": "randomclouduuid11",
}
]
)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@


async def publish():
await client.send(
[
CloudEvent(
attributes={"type": "cloudevent", "source": "/cncf/cloud/event/1.0", "subject": "testing-cncf-event"},
data=b"This is a cncf cloud event.",
)
]
)
async with client:
await client.send(
[
CloudEvent(
attributes={"type": "cloudevent", "source": "/cncf/cloud/event/1.0", "subject": "testing-cncf-event"},
data=b"This is a cncf cloud event.",
)
]
)


if __name__ == "__main__":
Expand Down