Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Fix compatibility with Python 3.13 #176

Merged
merged 2 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/176.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Unix socket handling was corrected to support changes in Python 3.13.
1 change: 1 addition & 0 deletions src/gbulb/glib_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class GLibBaseEventLoopPlatformExt(unix_events.SelectorEventLoop):

def __init__(self):
self._sighandlers = {}
self._unix_server_sockets = {}

def close(self):
for sig in list(self._sighandlers):
Expand Down
40 changes: 40 additions & 0 deletions tests/test_glib_events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import os
import sys
import tempfile
from unittest import mock, skipIf

import pytest
Expand Down Expand Up @@ -575,3 +577,41 @@ async def run():
assert server_success

glib_loop.run_until_complete(run())


def test_unix_sockets(glib_loop):
server_done = asyncio.Event()
server_done._loop = glib_loop
server_success = False

async def cb(reader, writer):
nonlocal server_success

writer.write(b"cool data\n")
await writer.drain()

d = await reader.readline()
server_success = d == b"thank you\n"

writer.close()
server_done.set()

async def run():
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "socket")
await asyncio.start_unix_server(cb, path)
reader, writer = await asyncio.open_unix_connection(path)

d = await reader.readline()
assert d == b"cool data\n"

writer.write(b"thank you\n")
await writer.drain()

writer.close()

await server_done.wait()

assert server_success

glib_loop.run_until_complete(run())