Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add missing type hints to synapse.logging.context (#11556)
Browse files Browse the repository at this point in the history
  • Loading branch information
squahtx authored Dec 14, 2021
1 parent 2519bea commit 0147b3d
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 122 deletions.
1 change: 1 addition & 0 deletions changelog.d/11556.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add missing type hints to `synapse.logging.context`.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ disallow_untyped_defs = True
[mypy-synapse.http.server]
disallow_untyped_defs = True

[mypy-synapse.logging.context]
disallow_untyped_defs = True

[mypy-synapse.metrics.*]
disallow_untyped_defs = True

Expand Down
9 changes: 5 additions & 4 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
from typing import Any, List, Optional, Type, Union

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class RedisProtocol(protocol.Protocol):
def publish(self, channel: str, message: bytes): ...
async def ping(self) -> None: ...
async def set(
def ping(self) -> "Deferred[None]": ...
def set(
self,
key: str,
value: Any,
expire: Optional[int] = None,
pexpire: Optional[int] = None,
only_if_not_exists: bool = False,
only_if_exists: bool = False,
) -> None: ...
async def get(self, key: str) -> Any: ...
) -> "Deferred[None]": ...
def get(self, key: str) -> "Deferred[Any]": ...

class SubscriberProtocol(RedisProtocol):
def __init__(self, *args, **kwargs): ...
Expand Down
9 changes: 4 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from prometheus_client import Counter, Gauge, Histogram

from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure

Expand Down Expand Up @@ -67,7 +66,7 @@
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_server_name

Expand Down Expand Up @@ -360,13 +359,13 @@ async def _handle_incoming_transaction(
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
],
),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
Expand Down
19 changes: 11 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,31 +360,34 @@ async def try_backfill(domains: List[str]) -> bool:

logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
states = await make_deferred_yieldable(
states_list = await make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
)

# dict[str, dict[tuple, str]], a map from event_id to state map of
# event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
# A map from event_id to state map of event_ids.
state_ids: Dict[str, StateMap[str]] = dict(
zip(event_ids, [s.state for s in states_list])
)

state_map = await self.store.get_events(
[e_id for ids in states.values() for e_id in ids.values()],
[e_id for ids in state_ids.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {

# A map from event_id to state map of events.
state_events: Dict[str, StateMap[EventBase]] = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.items()
if e_id in state_map
}
for key, state_dict in states.items()
for key, state_dict in state_ids.items()
}

for e_id in event_ids:
likely_extremeties_domains = get_domains_from_state(states[e_id])
likely_extremeties_domains = get_domains_from_state(state_events[e_id])

success = await try_backfill(
[
Expand Down
33 changes: 19 additions & 14 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,27 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, List, Optional, Tuple

from twisted.internet import defer
from typing import TYPE_CHECKING, List, Optional, Tuple, cast

from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async_helpers import concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -190,22 +196,21 @@ async def handle_room(event: RoomsForUser) -> None:
)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
).addCallback(
lambda states: cast(StateMap[EventBase], states[event.event_id])
)

(messages, token), current_state = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
),
deferred_room_state,
]
)
)
).addErrback(unwrapFirstError)

Expand Down Expand Up @@ -454,8 +459,8 @@ async def get_receipts() -> List[JsonDict]:
return receipts

presence, receipts, (messages, token) = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
Expand All @@ -464,7 +469,7 @@ async def get_receipts() -> List[JsonDict]:
limit=limit,
end_token=now_token.room_key,
),
],
),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
Expand Down
13 changes: 6 additions & 7 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from canonicaljson import encode_canonical_json

from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall

from synapse import event_auth
Expand Down Expand Up @@ -57,7 +56,7 @@
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -1168,9 +1167,9 @@ async def handle_new_client_event(

# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result = await make_deferred_yieldable(
defer.gatherResults(
[
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_event,
requester=requester,
Expand All @@ -1182,12 +1181,12 @@ async def handle_new_client_event(
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
],
),
consumeErrors=True,
)
).addErrback(unwrapFirstError)

return result[0]
return result

async def _persist_event(
self,
Expand Down
7 changes: 5 additions & 2 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import (
IProtocol,
IProtocolFactory,
IReactorCore,
IStreamClientEndpoint,
Expand Down Expand Up @@ -309,12 +310,14 @@ def __init__(

self._srv_resolver = srv_resolver

def connect(self, protocol_factory: IProtocolFactory) -> defer.Deferred:
def connect(
self, protocol_factory: IProtocolFactory
) -> "defer.Deferred[IProtocol]":
"""Implements IStreamClientEndpoint interface"""

return run_in_background(self._do_connect, protocol_factory)

async def _do_connect(self, protocol_factory: IProtocolFactory) -> None:
async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
first_exception = None

server_list = await self._resolve_server()
Expand Down
Loading

0 comments on commit 0147b3d

Please sign in to comment.