Skip to content

Add support for Bolt 5.4 an API telemetry #965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ Additional configuration can be provided via the :class:`neo4j.Driver` construct
+ :ref:`user-agent-ref`
+ :ref:`driver-notifications-min-severity-ref`
+ :ref:`driver-notifications-disabled-categories-ref`
+ :ref:`telemetry-disabled-ref`


.. _connection-acquisition-timeout-ref:
Expand Down Expand Up @@ -664,6 +665,30 @@ Notifications are available via :attr:`.ResultSummary.notifications` and :attr:`
.. seealso:: :class:`.NotificationDisabledCategory`, session config :ref:`session-notifications-disabled-categories-ref`


.. _telemetry-disabled-ref:

``telemetry_disabled``
----------------------
By default, the driver will send anonymous usage statistics to the server it connects to if the server requests those.
By setting ``telemetry_disabled=True``, the driver will not send any telemetry data.

The driver transmits the following information:

* Every time one of the following APIs is used to execute a query (for the first time), the server is informed of this
(without any further information like arguments, client identifiers, etc.):

* :meth:`.Driver.execute_query`
* :meth:`.Session.begin_transaction`
* :meth:`.Session.execute_read`, :meth:`.Session.execute_write`
* :meth:`.Session.run`
* the async counterparts of the above methods

:Type: :class:`bool`
:Default: :data:`False`

.. versionadded:: 5.13


Driver Object Lifetime
======================

Expand Down
8 changes: 8 additions & 0 deletions src/neo4j/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"NotificationCategory",
"NotificationSeverity",
"RoutingControl",
"TelemetryAPI"
]


Expand Down Expand Up @@ -227,6 +228,13 @@ class RoutingControl(str, Enum):
WRITE = "w"


class TelemetryAPI(int, Enum):
TX_FUNC = 0
TX = 1
AUTO_COMMIT = 2
DRIVER = 3


if t.TYPE_CHECKING:
T_RoutingControl = t.Union[
RoutingControl,
Expand Down
18 changes: 12 additions & 6 deletions src/neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
T_NotificationMinimumSeverity,
)

from .._api import RoutingControl
from .._api import (
RoutingControl,
TelemetryAPI,
)
from .._async_compat.util import AsyncUtil
from .._conf import (
Config,
Expand Down Expand Up @@ -71,6 +74,7 @@
URI_SCHEME_NEO4J,
URI_SCHEME_NEO4J_SECURE,
URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
WRITE_ACCESS,
)
from ..auth_management import (
AsyncAuthManager,
Expand Down Expand Up @@ -159,7 +163,8 @@ def driver(
fetch_size: int = ...,
impersonated_user: t.Optional[str] = ...,
bookmark_manager: t.Union[AsyncBookmarkManager,
BookmarkManager, None] = ...
BookmarkManager, None] = ...,
telemetry_disabled: bool = ...,
) -> AsyncDriver:
...

Expand Down Expand Up @@ -866,15 +871,16 @@ async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
session = self._session(session_config)
async with session:
if routing_ == RoutingControl.WRITE:
executor = session.execute_write
access_mode = WRITE_ACCESS
elif routing_ == RoutingControl.READ:
executor = session.execute_read
access_mode = READ_ACCESS
else:
raise ValueError("Invalid routing control value: %r"
% routing_)
with session._pipelined_begin:
return await executor(
_work, query_, parameters, result_transformer_
return await session._run_transaction(
access_mode, TelemetryAPI.DRIVER,
_work, (query_, parameters, result_transformer_), {}
)

@property
Expand Down
33 changes: 29 additions & 4 deletions src/neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from logging import getLogger
from time import perf_counter

from ..._api import TelemetryAPI
from ..._async_compat.network import AsyncBoltSocket
from ..._async_compat.util import AsyncUtil
from ..._codec.hydration import v1 as hydration_v1
Expand Down Expand Up @@ -134,7 +135,8 @@ class AsyncBolt:
def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
auth=None, auth_manager=None, user_agent=None,
routing_context=None, notifications_min_severity=None,
notifications_disabled_categories=None):
notifications_disabled_categories=None,
telemetry_disabled=False):
self.unresolved_address = unresolved_address
self.socket = sock
self.local_port = self.socket.getsockname()[1]
Expand Down Expand Up @@ -172,6 +174,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
self.auth = auth
self.auth_dict = self._to_auth_dict(auth)
self.auth_manager = auth_manager
self.telemetry_disabled = telemetry_disabled

self.notifications_min_severity = notifications_min_severity
self.notifications_disabled_categories = \
Expand Down Expand Up @@ -280,6 +283,7 @@ def protocol_handlers(cls, protocol_version=None):
AsyncBolt5x1,
AsyncBolt5x2,
AsyncBolt5x3,
AsyncBolt5x4,
)

handlers = {
Expand All @@ -293,6 +297,7 @@ def protocol_handlers(cls, protocol_version=None):
AsyncBolt5x1.PROTOCOL_VERSION: AsyncBolt5x1,
AsyncBolt5x2.PROTOCOL_VERSION: AsyncBolt5x2,
AsyncBolt5x3.PROTOCOL_VERSION: AsyncBolt5x3,
AsyncBolt5x4.PROTOCOL_VERSION: AsyncBolt5x4,
}

if protocol_version is None:
Expand Down Expand Up @@ -407,7 +412,10 @@ async def open(

# Carry out Bolt subclass imports locally to avoid circular dependency
# issues.
if protocol_version == (5, 3):
if protocol_version == (5, 4):
from ._bolt5 import AsyncBolt5x4
bolt_cls = AsyncBolt5x4
elif protocol_version == (5, 3):
from ._bolt5 import AsyncBolt5x3
bolt_cls = AsyncBolt5x3
elif protocol_version == (5, 2):
Expand Down Expand Up @@ -471,7 +479,8 @@ async def open(
routing_context=routing_context,
notifications_min_severity=pool_config.notifications_min_severity,
notifications_disabled_categories=
pool_config.notifications_disabled_categories
pool_config.notifications_disabled_categories,
telemetry_disabled=pool_config.telemetry_disabled,
)

try:
Expand Down Expand Up @@ -555,7 +564,6 @@ def re_auth(
hydration_hooks=hydration_hooks)
return True


@abc.abstractmethod
async def route(
self, database=None, imp_user=None, bookmarks=None,
Expand Down Expand Up @@ -584,6 +592,23 @@ async def route(
"""
pass

@abc.abstractmethod
def telemetry(self, api: TelemetryAPI, dehydration_hooks=None,
hydration_hooks=None, **handlers) -> None:
"""Send telemetry information about the API usage to the server.

:param api: the API used.
:param dehydration_hooks:
Hooks to dehydrate types (dict from type (class) to dehydration
function). Dehydration functions receive the value and returns an
object of type understood by packstream.
:param hydration_hooks:
Hooks to hydrate types (mapping from type (class) to
dehydration function). Dehydration functions receive the value of
type understood by packstream and are free to return anything.
"""
pass

@abc.abstractmethod
def run(self, query, parameters=None, mode=None, bookmarks=None,
metadata=None, timeout=None, db=None, imp_user=None,
Expand Down
6 changes: 6 additions & 0 deletions src/neo4j/_async/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from logging import getLogger
from ssl import SSLSocket

from ..._api import TelemetryAPI
from ..._exceptions import BoltProtocolError
from ...api import (
READ_ACCESS,
Expand Down Expand Up @@ -225,6 +226,11 @@ def logoff(self, dehydration_hooks=None, hydration_hooks=None):
"""Append a LOGOFF message to the outgoing queue."""
self.assert_re_auth_support()

def telemetry(self, api: TelemetryAPI, dehydration_hooks=None,
hydration_hooks=None, **handlers) -> None:
# TELEMETRY not support by this protocol version, so we ignore it.
pass

async def route(
self, database=None, imp_user=None, bookmarks=None,
dehydration_hooks=None, hydration_hooks=None
Expand Down
6 changes: 6 additions & 0 deletions src/neo4j/_async/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from logging import getLogger
from ssl import SSLSocket

from ..._api import TelemetryAPI
from ..._exceptions import BoltProtocolError
from ...api import (
READ_ACCESS,
Expand Down Expand Up @@ -146,6 +147,11 @@ def logoff(self, dehydration_hooks=None, hydration_hooks=None):
"""Append a LOGOFF message to the outgoing queue."""
self.assert_re_auth_support()

def telemetry(self, api: TelemetryAPI, dehydration_hooks=None,
hydration_hooks=None, **handlers) -> None:
# TELEMETRY not support by this protocol version, so we ignore it.
pass

async def route(
self, database=None, imp_user=None, bookmarks=None,
dehydration_hooks=None, hydration_hooks=None
Expand Down
25 changes: 25 additions & 0 deletions src/neo4j/_async/io/_bolt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from logging import getLogger
from ssl import SSLSocket

from ..._api import TelemetryAPI
from ..._codec.hydration import v2 as hydration_v2
from ..._exceptions import BoltProtocolError
from ..._meta import BOLT_AGENT_DICT
Expand Down Expand Up @@ -168,6 +169,11 @@ def logoff(self, dehydration_hooks=None, hydration_hooks=None):
"""Append a LOGOFF message to the outgoing queue."""
self.assert_re_auth_support()

def telemetry(self, api: TelemetryAPI, dehydration_hooks=None,
hydration_hooks=None, **handlers) -> None:
# TELEMETRY not support by this protocol version, so we ignore it.
pass

async def route(self, database=None, imp_user=None, bookmarks=None,
dehydration_hooks=None, hydration_hooks=None):
routing_context = self.routing_context or {}
Expand Down Expand Up @@ -665,3 +671,22 @@ def get_base_headers(self):
headers = super().get_base_headers()
headers["bolt_agent"] = BOLT_AGENT_DICT
return headers


class AsyncBolt5x4(AsyncBolt5x3):

PROTOCOL_VERSION = Version(5, 4)

def telemetry(self, api: TelemetryAPI, dehydration_hooks=None,
hydration_hooks=None, **handlers) -> None:
if (
self.telemetry_disabled
or not self.configuration_hints.get("telemetry.enabled", False)
):
return
api_raw = int(api)
log.debug("[#%04X] C: TELEMETRY %i # (%r)",
self.local_port, api_raw, api)
self._append(b"\x54", (api_raw,),
Response(self, "telemetry", hydration_hooks, **handlers),
dehydration_hooks=dehydration_hooks)
Loading