Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics about all Matrix API calls #4

Closed
wants to merge 9 commits into from
13 changes: 12 additions & 1 deletion mautrix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

from mautrix.errors import make_request_error, MatrixConnectionError, MatrixRequestError
from mautrix.util.logging import TraceLogger
from mautrix.util.opt_prometheus import Counter
from mautrix import __version__ as mautrix_version

if TYPE_CHECKING:
from mautrix.types import JSON

API_CALLS = Counter("bridge_matrix_api_calls",
"The number of Matrix client API calls made", ("method",))
API_CALLS_FAILED = Counter("bridge_matrix_api_calls_failed",
"The number of Matrix client API calls which failed", ("method",))

class APIPath(Enum):
"""
Expand Down Expand Up @@ -212,7 +217,8 @@ async def request(self, method: Method, path: Union[PathBuilder, str],
content: Optional[Union[dict, list, bytes, str]] = None,
headers: Optional[Dict[str, str]] = None,
query_params: Optional[Union[Dict[str, str], CIMultiDict[str, str]]] = None,
retry_count: Optional[int] = None) -> 'JSON':
retry_count: Optional[int] = None,
metrics_method: Optional[str] = "") -> 'JSON':
"""
Make a raw Matrix API request.

Expand All @@ -226,6 +232,7 @@ async def request(self, method: Method, path: Union[PathBuilder, str],
The ``Authorization`` header is always overridden if :attr:`token` is set.
query_params: A dict of query parameters to send.
retry_count: Number of times to retry if the homeserver isn't reachable.
metrics_method: A Matrix method which is reported as a Prometheus label

Returns:
The parsed response JSON.
Expand Down Expand Up @@ -255,8 +262,12 @@ async def request(self, method: Method, path: Union[PathBuilder, str],
backoff = 4
while True:
self._log_request(method, path, content, orig_content, query_params, req_id)
API_CALLS.labels(method=metrics_method).inc()
try:
return await self._send(method, full_url, content, query_params, headers or {})
except Exception:
API_CALLS_FAILED.labels(method=metrics_method).inc()
raise
except MatrixRequestError as e:
if retry_count > 0 and e.http_status in (502, 503, 504):
self.log.warning(f"Request #{req_id} failed with HTTP {e.http_status}, "
Expand Down
5 changes: 3 additions & 2 deletions mautrix/appservice/api/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def request(self, method: Method, path: PathBuilder,
content: Optional[Union[Dict, bytes, str]] = None, timestamp: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
query_params: Optional[Dict[str, Any]] = None,
retry_count: Optional[int] = None) -> Awaitable[Dict]:
retry_count: Optional[int] = None,
**kwargs) -> Awaitable[Dict]:
"""
Make a raw HTTP request, with optional AppService timestamp massaging and external_url
setting.
Expand All @@ -196,7 +197,7 @@ def request(self, method: Method, path: PathBuilder,
if not self.is_real_user:
query_params["user_id"] = self.identity or self.bot_mxid

return super().request(method, path, content, headers, query_params, retry_count)
return super().request(method, path, content, headers, query_params, retry_count, **kwargs)


class ChildAppServiceAPI(AppServiceAPI):
Expand Down
232 changes: 58 additions & 174 deletions mautrix/client/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@
MediaMessageEventContent, PresenceState, EventContent, Membership,
ReactionEventContent, RelationType, Obj, Serializable)
from mautrix.types.event.state import state_event_content_map
from mautrix.util.opt_prometheus import Counter, Histogram
from .base import BaseClientAPI

API_CALLS = Counter("bridge_matrix_api_calls",
"The number of Matrix client API calls made", ("method",))
API_CALLS_FAILED = Counter("bridge_matrix_api_calls_failed",
"The number of Matrix client API calls which failed", ("method",))
MATRIX_REQUEST_SECONDS = Histogram("matrix_request_seconds",
"Time spent on Matrix client API calls", ("method","outcome",))

class EventMethods(BaseClientAPI):
"""
Expand Down Expand Up @@ -61,18 +54,7 @@ def sync(self, since: SyncToken = None, timeout: int = 30000, filter_id: FilterI
request["full_state"] = "true" if full_state else "false"
if set_presence:
request["set_presence"] = str(set_presence)
method = "sync"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
try:
return await self.api.request(Method.GET, Path.sync, query_params=request, retry_count=0)
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return self.api.request(Method.GET, Path.sync, query_params=request, retry_count=0, metrics_method="sync")

# endregion
# region 8.3 Getting events for a room
Expand All @@ -94,22 +76,11 @@ async def get_event(self, room_id: RoomID, event_id: EventID) -> Event:
.. _/event/{eventId} API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
"""
method = "getEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET, Path.rooms[room_id].event[event_id], metrics_method="getEvent")
try:
content = await self.api.request(Method.GET, Path.rooms[room_id].event[event_id])
try:
return Event.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid event in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return Event.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid event in response") from e

async def get_state_event(self, room_id: RoomID, event_type: EventType,
state_key: Optional[str] = None) -> StateEventContent:
Expand All @@ -130,25 +101,15 @@ async def get_state_event(self, room_id: RoomID, event_type: EventType,
.. _GET /state/{eventType}/{stateKey} API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-state-eventtype-statekey
"""
method = "getStateEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET,
Path.rooms[room_id].state[event_type][state_key],
metrics_method="getStateEvent")
try:
content = await self.api.request(Method.GET,
Path.rooms[room_id].state[event_type][state_key])
try:
return state_event_content_map[event_type].deserialize(content)
except KeyError:
return Obj(**content)
except SerializerError as e:
raise MatrixResponseError("Invalid state event in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return state_event_content_map[event_type].deserialize(content)
except KeyError:
return Obj(**content)
except SerializerError as e:
raise MatrixResponseError("Invalid state event in response") from e

async def get_state(self, room_id: RoomID) -> List[StateEvent]:
"""
Expand All @@ -163,22 +124,11 @@ async def get_state(self, room_id: RoomID) -> List[StateEvent]:
.. _/state API reference:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-state
"""
method = "getState"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET, Path.rooms[room_id].state, metrics_method="getState")
try:
content = await self.api.request(Method.GET, Path.rooms[room_id].state)
try:
return [StateEvent.deserialize(event) for event in content]
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return [StateEvent.deserialize(event) for event in content]
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e

async def get_members(self, room_id: RoomID, at: Optional[SyncToken] = None,
membership: Optional[Membership] = None,
Expand Down Expand Up @@ -211,25 +161,14 @@ async def get_members(self, room_id: RoomID, at: Optional[SyncToken] = None,
query["membership"] = membership.value
if not_membership:
query["not_membership"] = not_membership.value
method = "getMembers"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET, Path.rooms[room_id].members,
query_params=query, metrics_method="getMembers")
try:
content = await self.api.request(Method.GET, Path.rooms[room_id].members,
query_params=query)
try:
return [StateEvent.deserialize(event) for event in content["chunk"]]
except KeyError:
raise MatrixResponseError("`chunk` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return [StateEvent.deserialize(event) for event in content["chunk"]]
except KeyError:
raise MatrixResponseError("`chunk` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid state events in response") from e

async def get_joined_members(self, room_id: RoomID) -> Dict[UserID, Member]:
"""
Expand All @@ -250,27 +189,16 @@ async def get_joined_members(self, room_id: RoomID) -> Dict[UserID, Member]:
.. _/members:
https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-members
"""
method = "getJoinedMembers"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET, Path.rooms[room_id].joined_members, metrics_method="getJoinedMembers")
try:
content = await self.api.request(Method.GET, Path.rooms[room_id].joined_members)
try:
return {user_id: Member(membership=Membership.JOIN,
displayname=member.get("display_name", ""),
avatar_url=member.get("avatar_url", ""))
for user_id, member in content["joined"].items()}
except KeyError:
raise MatrixResponseError("`joined` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid member objects in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return {user_id: Member(membership=Membership.JOIN,
displayname=member.get("display_name", ""),
avatar_url=member.get("avatar_url", ""))
for user_id, member in content["joined"].items()}
except KeyError:
raise MatrixResponseError("`joined` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid member objects in response") from e

async def get_messages(self, room_id: RoomID, direction: PaginationDirection,
from_token: SyncToken, to_token: Optional[SyncToken] = None,
Expand Down Expand Up @@ -306,30 +234,19 @@ async def get_messages(self, room_id: RoomID, direction: PaginationDirection,
"limit": str(limit) if limit else None,
"filter_json": filter_json,
}
method = "getMessages"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
content = await self.api.request(Method.GET, Path.rooms[room_id].messages,
query_params=query_params, metrics_method="getMessages")
try:
content = await self.api.request(Method.GET, Path.rooms[room_id].messages,
query_params=query_params)
try:
return PaginatedMessages(content["start"], content["end"],
[Event.deserialize(event) for event in content["chunk"]])
except KeyError:
if "start" not in content:
raise MatrixResponseError("`start` not in response.")
elif "end" not in content:
raise MatrixResponseError("`start` not in response.")
raise MatrixResponseError("`content` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid events in response") from e
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return PaginatedMessages(content["start"], content["end"],
[Event.deserialize(event) for event in content["chunk"]])
except KeyError:
if "start" not in content:
raise MatrixResponseError("`start` not in response.")
elif "end" not in content:
raise MatrixResponseError("`start` not in response.")
raise MatrixResponseError("`content` not in response.")
except SerializerError as e:
raise MatrixResponseError("Invalid events in response") from e

# endregion
# region 8.4 Sending events to a room
Expand Down Expand Up @@ -359,23 +276,12 @@ async def send_state_event(self, room_id: RoomID, event_type: EventType,
https://matrix.org/docs/spec/client_server/r0.5.0#put-matrix-client-r0-rooms-roomid-state-eventtype-statekey
"""
content = content.serialize() if isinstance(content, Serializable) else content
method = "sendStateEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
resp = await self.api.request(Method.PUT, Path.rooms[room_id].state[event_type][state_key],
content, **kwargs, metrics_method="sendStateEvent")
try:
resp = await self.api.request(Method.PUT, Path.rooms[room_id].state[event_type][state_key],
content, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")

async def send_message_event(self, room_id: RoomID, event_type: EventType,
content: EventContent, txn_id: Optional[str] = None,
Expand Down Expand Up @@ -406,22 +312,11 @@ async def send_message_event(self, room_id: RoomID, event_type: EventType,
raise ValueError("Event type not given")
url = Path.rooms[room_id].send[event_type][txn_id or self.api.get_txn_id()]
content = content.serialize() if isinstance(content, Serializable) else content
method = "sendMessageEvent"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
resp = await self.api.request(Method.PUT, url, content, **kwargs, metrics_method="sendMessageEvent")
try:
resp = await self.api.request(Method.PUT, url, content, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")

# region Message send helper functions
def send_message(self, room_id: RoomID, content: MessageEventContent, **kwargs
Expand Down Expand Up @@ -621,21 +516,10 @@ async def redact(self, room_id: RoomID, event_id: EventID, reason: Optional[str]
https://matrix.org/docs/spec/client_server/r0.5.0#put-matrix-client-r0-rooms-roomid-redact-eventid-txnid
"""
url = Path.rooms[room_id].redact[event_id][self.api.get_txn_id()]
method = "redact"
API_CALLS.labels(method=method).inc()
start_time = time.time()
outcome = "success"
resp = await self.api.request(Method.PUT, url, content={"reason": reason}, **kwargs, metrics_method="redact")
try:
resp = await self.api.request(Method.PUT, url, content={"reason": reason}, **kwargs)
try:
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")
except Exception:
API_CALLS_FAILED.labels(method=method).inc()
outcome = "fail"
raise
finally:
MATRIX_REQUEST_SECONDS.labels(method=method, outcome=outcome).observe(time.time() - start_time)
return resp["event_id"]
except KeyError:
raise MatrixResponseError("`event_id` not in response.")

# endregion