Skip to content

Commit

Permalink
Introduce Api Telemetry (#1487)
Browse files Browse the repository at this point in the history
* Introduce Api Telemetry

* Add telemetry for session run and add configuration/hint

* Fix typo

* Do not send telmetry on retries

* Add tests for messages, connection and protocol changes

* add tests for telemetry work

* Add tests for Unmanaged transactions

* Update Config documentation

* Refactoring

* Fix docs paragraph

* Set telemetry flag on connection upon creation

* Refactoring

* Remove redundant header

* Exclude vulnerable dependency

* Refactoring

* Refactoring

* Add config builder tests

* Update documentation

* Update documentation

* Add NetworkSession tests

---------

Co-authored-by: Dmitriy Tverdiakov <dmitriy.tverdiakov@neo4j.com>
  • Loading branch information
bigmontz and injectives authored Oct 4, 2023
1 parent f0f46ba commit 92e7341
Show file tree
Hide file tree
Showing 69 changed files with 2,273 additions and 118 deletions.
47 changes: 47 additions & 0 deletions driver/src/main/java/org/neo4j/driver/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public final class Config implements Serializable {
*/
private final MetricsAdapter metricsAdapter;

/**
* Specify if telemetry collection is disabled.
* <p>
* By default, the driver will send anonymous usage statistics to the server it connects to if the server requests those.
*/
private final boolean telemetryDisabled;

private Config(ConfigBuilder builder) {
this.logging = builder.logging;
this.logLeakedSessions = builder.logLeakedSessions;
Expand All @@ -169,6 +176,7 @@ private Config(ConfigBuilder builder) {

this.eventLoopThreads = builder.eventLoopThreads;
this.metricsAdapter = builder.metricsAdapter;
this.telemetryDisabled = builder.telemetryDisabled;
}

/**
Expand Down Expand Up @@ -335,6 +343,18 @@ public String userAgent() {
return userAgent;
}

/**
* Returns if the telemetry is disabled on the driver side.
* <p>
* The telemetry is collected only when it is enabled both the server and the driver.
*
* @return {@code true} if telemetry is disabled or {@code false} otherwise
* @since 5.13
*/
public boolean isTelemetryDisabled() {
return telemetryDisabled;
}

/**
* Used to build new config instances
*/
Expand All @@ -357,6 +377,8 @@ public static final class ConfigBuilder {
private int eventLoopThreads = 0;
private NotificationConfig notificationConfig = NotificationConfig.defaultConfig();

private boolean telemetryDisabled = false;

private ConfigBuilder() {}

/**
Expand Down Expand Up @@ -748,6 +770,31 @@ public ConfigBuilder withNotificationConfig(NotificationConfig notificationConfi
return this;
}

/**
* Sets if telemetry is disabled on the driver side.
* <p>
* By default, the driver sends anonymous telemetry data to the server it connects to if the server has
* telemetry enabled. This can be explicitly disabled on the driver side by setting this setting to
* {@code true}.
* <p>
* At present, the driver sends which API type is used, like:
* <ul>
* <li>Managed transaction ({@link Session#executeWrite(TransactionCallback)},
* {@link Session#executeRead(TransactionCallback)} and similar options)</li>
* <li>Unmanaged transaction ({@link Session#beginTransaction()} and similar options)</li>
* <li>Autocommit transaction ({@link Session#run(Query)} and similar options)</li>
* <li>Executable query ({@link Driver#executableQuery(String)} and similar options)</li>
* </ul>
*
* @param telemetryDisabled {@code true} if telemetry is disabled or {@code false} otherwise
* @return this builder
* @since 5.13
*/
public ConfigBuilder withTelemetryDisabled(boolean telemetryDisabled) {
this.telemetryDisabled = telemetryDisabled;
return this;
}

/**
* Create a config instance from this builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ protected InternalDriver createRoutingDriver(
*/
protected InternalDriver createDriver(
SecurityPlan securityPlan, SessionFactory sessionFactory, MetricsProvider metricsProvider, Config config) {
return new InternalDriver(securityPlan, sessionFactory, metricsProvider, config.logging());
return new InternalDriver(
securityPlan, sessionFactory, metricsProvider, config.isTelemetryDisabled(), config.logging());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ public class InternalDriver implements Driver {
private final SessionFactory sessionFactory;
private final Logger log;

private final boolean telemetryDisabled;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final MetricsProvider metricsProvider;

InternalDriver(
SecurityPlan securityPlan,
SessionFactory sessionFactory,
MetricsProvider metricsProvider,
boolean telemetryDisabled,
Logging logging) {
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
this.metricsProvider = metricsProvider;
this.log = logging.getLog(getClass());
this.telemetryDisabled = telemetryDisabled;
}

@Override
Expand Down Expand Up @@ -215,7 +219,7 @@ private static RuntimeException driverCloseException() {

public NetworkSession newSession(SessionConfig config, AuthToken overrideAuthToken) {
assertOpen();
var session = sessionFactory.newInstance(config, overrideAuthToken);
var session = sessionFactory.newInstance(config, overrideAuthToken, telemetryDisabled);
if (closed.get()) {
// session does not immediately acquire connection, it is fine to just throw
throw driverCloseException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionCallback;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.internal.telemetry.TelemetryApi;

public class InternalExecutableQuery implements ExecutableQuery {
private final Driver driver;
Expand Down Expand Up @@ -81,7 +82,8 @@ public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinish
return resultFinisher.finish(result.keys(), finishedValue, summary);
};
var accessMode = config.routing().equals(RoutingControl.WRITE) ? AccessMode.WRITE : AccessMode.READ;
return session.execute(accessMode, txCallback, TransactionConfig.empty(), false);
return session.execute(
accessMode, txCallback, TransactionConfig.empty(), TelemetryApi.EXECUTABLE_QUERY, false);
}
}

Expand Down
31 changes: 21 additions & 10 deletions driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.telemetry.TelemetryApi;
import org.neo4j.driver.internal.util.Futures;

public class InternalSession extends AbstractQueryRunner implements Session {
Expand Down Expand Up @@ -93,7 +95,7 @@ public Transaction beginTransaction(TransactionConfig config) {

public Transaction beginTransaction(TransactionConfig config, String txType) {
var tx = Futures.blockingGet(
session.beginTransactionAsync(config, txType),
session.beginTransactionAsync(config, txType, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION)),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
return new InternalTransaction(tx);
}
Expand All @@ -107,12 +109,12 @@ public <T> T readTransaction(TransactionWork<T> work) {
@Override
@Deprecated
public <T> T readTransaction(TransactionWork<T> work, TransactionConfig config) {
return transaction(AccessMode.READ, work, config, true);
return transaction(AccessMode.READ, work, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
public <T> T executeRead(TransactionCallback<T> callback, TransactionConfig config) {
return execute(AccessMode.READ, callback, config, true);
return execute(AccessMode.READ, callback, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
Expand All @@ -124,12 +126,12 @@ public <T> T writeTransaction(TransactionWork<T> work) {
@Override
@Deprecated
public <T> T writeTransaction(TransactionWork<T> work, TransactionConfig config) {
return transaction(AccessMode.WRITE, work, config, true);
return transaction(AccessMode.WRITE, work, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
public <T> T executeWrite(TransactionCallback<T> callback, TransactionConfig config) {
return execute(AccessMode.WRITE, callback, config, true);
return execute(AccessMode.WRITE, callback, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
Expand All @@ -151,21 +153,29 @@ public void reset() {
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while resetting the session"));
}

<T> T execute(AccessMode accessMode, TransactionCallback<T> callback, TransactionConfig config, boolean flush) {
return transaction(accessMode, tx -> callback.execute(new DelegatingTransactionContext(tx)), config, flush);
<T> T execute(
AccessMode accessMode,
TransactionCallback<T> callback,
TransactionConfig config,
TelemetryApi telemetryApi,
boolean flush) {
return transaction(
accessMode, tx -> callback.execute(new DelegatingTransactionContext(tx)), config, telemetryApi, flush);
}

private <T> T transaction(
AccessMode mode,
@SuppressWarnings("deprecation") TransactionWork<T> work,
TransactionConfig config,
TelemetryApi telemetryApi,
boolean flush) {
// use different code path compared to async so that work is executed in the caller thread
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
var apiTelemetryWork = new ApiTelemetryWork(telemetryApi);
return session.retryLogic().retry(() -> {
try (var tx = beginTransaction(mode, config, flush)) {
try (var tx = beginTransaction(mode, config, apiTelemetryWork, flush)) {

var result = work.execute(tx);
if (result instanceof Result) {
Expand All @@ -182,9 +192,10 @@ private <T> T transaction(
});
}

private Transaction beginTransaction(AccessMode mode, TransactionConfig config, boolean flush) {
private Transaction beginTransaction(
AccessMode mode, TransactionConfig config, ApiTelemetryWork apiTelemetryWork, boolean flush) {
var tx = Futures.blockingGet(
session.beginTransactionAsync(mode, config, null, flush),
session.beginTransactionAsync(mode, config, null, apiTelemetryWork, flush),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
return new InternalTransaction(tx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.driver.internal.async.NetworkSession;

public interface SessionFactory {
NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken);
NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken, boolean telemetryDisabled);

CompletionStage<Void> verifyConnectivity();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class SessionFactoryImpl implements SessionFactory {
}

@Override
public NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken) {
public NetworkSession newInstance(
SessionConfig sessionConfig, AuthToken overrideAuthToken, boolean telemetryDisabled) {
return createSession(
connectionProvider,
retryLogic,
Expand All @@ -65,7 +66,8 @@ public NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrid
logging,
sessionConfig.bookmarkManager().orElse(NoOpBookmarkManager.INSTANCE),
sessionConfig.notificationConfig(),
overrideAuthToken);
overrideAuthToken,
telemetryDisabled);
}

private Set<Bookmark> toDistinctSet(Iterable<Bookmark> bookmarks) {
Expand Down Expand Up @@ -142,7 +144,8 @@ private NetworkSession createSession(
Logging logging,
BookmarkManager bookmarkManager,
NotificationConfig notificationConfig,
AuthToken authToken) {
AuthToken authToken,
boolean telemetryDisabled) {
Objects.requireNonNull(bookmarks, "bookmarks may not be null");
Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null");
return leakedSessionsLoggingEnabled
Expand All @@ -157,7 +160,8 @@ private NetworkSession createSession(
logging,
bookmarkManager,
notificationConfig,
authToken)
authToken,
telemetryDisabled)
: new NetworkSession(
connectionProvider,
retryLogic,
Expand All @@ -169,6 +173,7 @@ private NetworkSession createSession(
logging,
bookmarkManager,
notificationConfig,
authToken);
authToken,
telemetryDisabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.telemetry.TelemetryApi;
import org.neo4j.driver.internal.util.Futures;

public class InternalAsyncSession extends AsyncAbstractQueryRunner implements AsyncSession {
Expand Down Expand Up @@ -80,7 +82,8 @@ public CompletionStage<AsyncTransaction> beginTransactionAsync() {

@Override
public CompletionStage<AsyncTransaction> beginTransactionAsync(TransactionConfig config) {
return session.beginTransactionAsync(config).thenApply(InternalAsyncTransaction::new);
return session.beginTransactionAsync(config, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION))
.thenApply(InternalAsyncTransaction::new);
}

@Override
Expand Down Expand Up @@ -136,9 +139,10 @@ private <T> CompletionStage<T> transactionAsync(
AccessMode mode,
@SuppressWarnings("deprecation") AsyncTransactionWork<CompletionStage<T>> work,
TransactionConfig config) {
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
return session.retryLogic().retryAsync(() -> {
var resultFuture = new CompletableFuture<T>();
var txFuture = session.beginTransactionAsync(mode, config);
var txFuture = session.beginTransactionAsync(mode, config, apiTelemetryWork);

txFuture.whenComplete((tx, completionError) -> {
var error = Futures.completionExceptionCause(completionError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public LeakLoggingNetworkSession(
Logging logging,
BookmarkManager bookmarkManager,
NotificationConfig notificationConfig,
AuthToken overrideAuthToken) {
AuthToken overrideAuthToken,
boolean telemetryDisabled) {
super(
connectionProvider,
retryLogic,
Expand All @@ -60,7 +61,8 @@ public LeakLoggingNetworkSession(
logging,
bookmarkManager,
notificationConfig,
overrideAuthToken);
overrideAuthToken,
telemetryDisabled);
this.stackTrace = captureStackTrace();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class NetworkConnection implements Connection {
private final InboundMessageDispatcher messageDispatcher;
private final String serverAgent;
private final BoltServerAddress serverAddress;
private final boolean telemetryEnabled;
private final BoltProtocol protocol;
private final ExtendedChannelPool channelPool;
private final CompletableFuture<Void> releaseFuture;
Expand All @@ -92,6 +93,7 @@ public NetworkConnection(
this.messageDispatcher = ChannelAttributes.messageDispatcher(channel);
this.serverAgent = ChannelAttributes.serverAgent(channel);
this.serverAddress = ChannelAttributes.serverAddress(channel);
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
this.protocol = BoltProtocol.forChannel(channel);
this.channelPool = channelPool;
this.releaseFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -136,6 +138,11 @@ public void writeAndFlush(Message message, ResponseHandler handler) {
}
}

@Override
public boolean isTelemetryEnabled() {
return telemetryEnabled;
}

@Override
public CompletionStage<Void> reset(Throwable throwable) {
var result = new CompletableFuture<Void>();
Expand Down
Loading

0 comments on commit 92e7341

Please sign in to comment.