Skip to content
231 changes: 174 additions & 57 deletions mautrix/client/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
MediaMessageEventContent, PresenceState, EventContent, Membership,
ReactionEventContent, RelationType, Obj, Serializable)
from mautrix.types.event.state import state_event_content_map
from mautrix.util.opt_prometheus import Counter, Histogram
from .base import BaseClientAPI

API_CALLS = Counter("bridge_matrix_api_calls",
"The number of Matrix client API calls made", ("method",))
API_CALLS_FAILED = Counter("bridge_matrix_api_calls_failed",
"The number of Matrix client API calls which failed", ("method",))
MATRIX_REQUEST_SECONDS = Histogram("matrix_request_seconds",
"Time spent on Matrix client API calls", ("method","outcome",))

class EventMethods(BaseClientAPI):
"""
Expand Down Expand Up @@ -54,7 +61,18 @@ def sync(self, since: SyncToken = None, timeout: int = 30000, filter_id: FilterI
request["full_state"] = "true" if full_state else "false"
if set_presence:
request["set_presence"] = str(set_presence)
return self.api.request(Method.GET, Path.sync, query_params=request, retry_count=0)
method = "sync"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return await self.api.request(Method.GET, Path.sync, query_params=request, retry_count=0)
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

# endregion
# region 8.3 Getting events for a room
Expand All @@ -76,11 +94,22 @@ async def get_event(self, room_id: RoomID, event_id: EventID) -> Event:
.. _/event/{eventId} API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
"""
content = await self.api.request(Method.GET, Path.rooms[room_id].event[event_id])
method = "getEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return Event.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid event in response") from e
content = await self.api.request(Method.GET, Path.rooms[room_id].event[event_id])
try:
return Event.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid event in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def get_state_event(self, room_id: RoomID, event_type: EventType,
state_key: Optional[str] = None) -> StateEventContent:
Expand All @@ -101,14 +130,25 @@ async def get_state_event(self, room_id: RoomID, event_type: EventType,
.. _GET /state/{eventType}/{stateKey} API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-state-eventtype-statekey
"""
content = await self.api.request(Method.GET,
Path.rooms[room_id].state[event_type][state_key])
method = "getStateEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return state_event_content_map[event_type].deserialize(content)
except KeyError:
return Obj(**content)
except SerializerError as e:
raise MatrixResponseError("Invalid state event in response") from e
content = await self.api.request(Method.GET,
Path.rooms[room_id].state[event_type][state_key])
try:
return state_event_content_map[event_type].deserialize(content)
except KeyError:
return Obj(**content)
except SerializerError as e:
raise MatrixResponseError("Invalid state event in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def get_state(self, room_id: RoomID) -> List[StateEvent]:
"""
Expand All @@ -123,11 +163,22 @@ async def get_state(self, room_id: RoomID) -> List[StateEvent]:
.. _/state API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-state
"""
content = await self.api.request(Method.GET, Path.rooms[room_id].state)
method = "getState"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return [StateEvent.deserialize(event) for event in content]
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
content = await self.api.request(Method.GET, Path.rooms[room_id].state)
try:
return [StateEvent.deserialize(event) for event in content]
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def get_members(self, room_id: RoomID, at: Optional[SyncToken] = None,
membership: Optional[Membership] = None,
Expand Down Expand Up @@ -160,14 +211,25 @@ async def get_members(self, room_id: RoomID, at: Optional[SyncToken] = None,
query["membership"] = membership.value
if not_membership:
query["not_membership"] = not_membership.value
content = await self.api.request(Method.GET, Path.rooms[room_id].members,
query_params=query)
method = "getMembers"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return [StateEvent.deserialize(event) for event in content["chunk"]]
except KeyError:
raise MatrixResponseError("`chunk` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
content = await self.api.request(Method.GET, Path.rooms[room_id].members,
query_params=query)
try:
return [StateEvent.deserialize(event) for event in content["chunk"]]
except KeyError:
raise MatrixResponseError("`chunk` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def get_joined_members(self, room_id: RoomID) -> Dict[UserID, Member]:
"""
Expand All @@ -188,16 +250,27 @@ async def get_joined_members(self, room_id: RoomID) -> Dict[UserID, Member]:
.. _/members:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-members
"""
content = await self.api.request(Method.GET, Path.rooms[room_id].joined_members)
method = "getJoinedMembers"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return {user_id: Member(membership=Membership.JOIN,
displayname=member.get("display_name", ""),
avatar_url=member.get("avatar_url", ""))
for user_id, member in content["joined"].items()}
except KeyError:
raise MatrixResponseError("`joined` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid member objects in response") from e
content = await self.api.request(Method.GET, Path.rooms[room_id].joined_members)
try:
return {user_id: Member(membership=Membership.JOIN,
displayname=member.get("display_name", ""),
avatar_url=member.get("avatar_url", ""))
for user_id, member in content["joined"].items()}
except KeyError:
raise MatrixResponseError("`joined` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid member objects in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def get_messages(self, room_id: RoomID, direction: PaginationDirection,
from_token: SyncToken, to_token: Optional[SyncToken] = None,
Expand Down Expand Up @@ -233,19 +306,30 @@ async def get_messages(self, room_id: RoomID, direction: PaginationDirection,
"limit": str(limit) if limit else None,
"filter_json": filter_json,
}
content = await self.api.request(Method.GET, Path.rooms[room_id].messages,
query_params=query_params)
method = "getMessages"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return PaginatedMessages(content["start"], content["end"],
[Event.deserialize(event) for event in content["chunk"]])
except KeyError:
if "start" not in content:
raise MatrixResponseError("`start` not in response.")
elif "end" not in content:
raise MatrixResponseError("`start` not in response.")
raise MatrixResponseError("`content` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid events in response") from e
content = await self.api.request(Method.GET, Path.rooms[room_id].messages,
query_params=query_params)
try:
return PaginatedMessages(content["start"], content["end"],
[Event.deserialize(event) for event in content["chunk"]])
except KeyError:
if "start" not in content:
raise MatrixResponseError("`start` not in response.")
elif "end" not in content:
raise MatrixResponseError("`start` not in response.")
raise MatrixResponseError("`content` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

# endregion
# region 8.4 Sending events to a room
Expand Down Expand Up @@ -275,12 +359,23 @@ async def send_state_event(self, room_id: RoomID, event_type: EventType,
https://matrix.org/docs/spec/client_server/r0.5.0#put-matrix-client-r0-rooms-roomid-state-eventtype-statekey
"""
content = content.serialize() if isinstance(content, Serializable) else content
resp = await self.api.request(Method.PUT, Path.rooms[room_id].state[event_type][state_key],
content, **kwargs)
method = "sendStateEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
resp = await self.api.request(Method.PUT, Path.rooms[room_id].state[event_type][state_key],
content, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

async def send_message_event(self, room_id: RoomID, event_type: EventType,
content: EventContent, txn_id: Optional[str] = None,
Expand Down Expand Up @@ -311,11 +406,22 @@ async def send_message_event(self, room_id: RoomID, event_type: EventType,
raise ValueError("Event type not given")
url = Path.rooms[room_id].send[event_type][txn_id or self.api.get_txn_id()]
content = content.serialize() if isinstance(content, Serializable) else content
resp = await self.api.request(Method.PUT, url, content, **kwargs)
method = "sendMessageEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
resp = await self.api.request(Method.PUT, url, content, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

# region Message send helper functions
def send_message(self, room_id: RoomID, content: MessageEventContent, **kwargs
Expand Down Expand Up @@ -515,10 +621,21 @@ async def redact(self, room_id: RoomID, event_id: EventID, reason: Optional[str]
https://matrix.org/docs/spec/client_server/r0.5.0#put-matrix-client-r0-rooms-roomid-redact-eventid-txnid
"""
url = Path.rooms[room_id].redact[event_id][self.api.get_txn_id()]
resp = await self.api.request(Method.PUT, url, content={"reason": reason}, **kwargs)
method = "redact"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
resp = await self.api.request(Method.PUT, url, content={"reason": reason}, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)

# endregion