Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stopped monitoring threads #718

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;
Expand Down Expand Up @@ -63,7 +64,7 @@ static class ConnectionStatus {
private final TelemetryFactory telemetryFactory;
private final Properties properties;
private final HostSpec hostSpec;
private final MonitorService monitorService;
private final MonitorThreadContainer threadContainer;
private final long monitorDisposalTimeMillis;
private volatile long contextLastUsedTimestampNano;
private volatile boolean stopped = false;
Expand All @@ -85,21 +86,21 @@ static class ConnectionStatus {
* @param monitorDisposalTimeMillis Time in milliseconds before stopping the monitoring thread
* where there are no active connection to the server this
* {@link MonitorImpl} instance is monitoring.
* @param monitorService A reference to the {@link MonitorServiceImpl} implementation
* @param threadContainer A reference to the {@link MonitorThreadContainer} implementation
* that initialized this class.
*/
public MonitorImpl(
final @NonNull PluginService pluginService,
@NonNull final HostSpec hostSpec,
@NonNull final Properties properties,
final long monitorDisposalTimeMillis,
@NonNull final MonitorService monitorService) {
@NonNull final MonitorThreadContainer threadContainer) {
this.pluginService = pluginService;
this.telemetryFactory = pluginService.getTelemetryFactory();
this.hostSpec = hostSpec;
this.properties = properties;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.monitorService = monitorService;
this.threadContainer = threadContainer;

this.contextLastUsedTimestampNano = this.getCurrentTimeNano();
this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size",
Expand Down Expand Up @@ -143,6 +144,7 @@ public void run() {
this.telemetryContext = telemetryFactory.openTelemetryContext(
"monitoring thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());

try {
this.stopped = false;
while (true) {
Expand All @@ -152,6 +154,7 @@ public void run() {
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();

while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
Expand Down Expand Up @@ -179,8 +182,7 @@ public void run() {
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final ConnectionStatus status = checkConnectionStatus(this.nodeCheckTimeoutMillis);

long delayMillis = -1;
MonitorConnectionContext monitorContext;
Expand Down Expand Up @@ -240,7 +242,7 @@ public void run() {
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
threadContainer.releaseResource(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
Expand All @@ -250,25 +252,34 @@ public void run() {
throw intEx;
} catch (final Exception ex) {
// log and ignore
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
}
}
} catch (final InterruptedException intEx) {
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
} finally {
this.stopped = true;
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
Expand All @@ -279,7 +290,6 @@ public void run() {
if (telemetryContext != null) {
this.telemetryContext.closeContext();
}
this.stopped = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
/** Interface for initialize a new {@link MonitorImpl}. */
@FunctionalInterface
public interface MonitorInitializer {
Monitor createMonitor(HostSpec hostSpec, Properties properties, MonitorService monitorService);
Monitor createMonitor(HostSpec hostSpec, Properties properties, MonitorThreadContainer threadContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,4 @@ MonitorConnectionContext startMonitoring(
void stopMonitoringForAllConnections(Set<String> nodeKeys);

void releaseResources();

/**
* Handle unused {@link Monitor}.
*
* @param monitor The {@link Monitor} in idle.
*/
void notifyUnused(Monitor monitor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package software.amazon.jdbc.plugin.efm;

import java.lang.ref.WeakReference;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
Expand All @@ -42,15 +42,15 @@ public class MonitorServiceImpl implements MonitorService {
public static final AwsWrapperProperty MONITOR_DISPOSAL_TIME_MS =
new AwsWrapperProperty(
"monitorDisposalTime",
"60000",
"600000", // 10min
"Interval in milliseconds for a monitor to be considered inactive and to be disposed.");

private final PluginService pluginService;
private MonitorThreadContainer threadContainer;

final MonitorInitializer monitorInitializer;
private Set<String> cachedMonitorNodeKeys = null;
private Monitor cachedMonitor = null;
private WeakReference<Monitor> cachedMonitor = null;
final TelemetryFactory telemetryFactory;
final TelemetryCounter abortedConnectionsCounter;

Expand Down Expand Up @@ -100,16 +100,15 @@ public MonitorConnectionContext startMonitoring(
new Object[] {hostSpec}));
}

final Monitor monitor;
if (this.cachedMonitor == null
Monitor monitor = this.cachedMonitor == null ? null : this.cachedMonitor.get();
karenc-bq marked this conversation as resolved.
Show resolved Hide resolved
if (monitor == null
|| monitor.isStopped()
|| this.cachedMonitorNodeKeys == null
|| !this.cachedMonitorNodeKeys.equals(nodeKeys)) {

monitor = getMonitor(nodeKeys, hostSpec, properties);
this.cachedMonitor = monitor;
this.cachedMonitor = new WeakReference<>(monitor);
this.cachedMonitorNodeKeys = Collections.unmodifiableSet(nodeKeys);
} else {
monitor = this.cachedMonitor;
}

final MonitorConnectionContext context =
Expand Down Expand Up @@ -139,7 +138,6 @@ public void stopMonitoringForAllConnections(@NonNull final Set<String> nodeKeys)
monitor = this.threadContainer.getMonitor(nodeKey);
if (monitor != null) {
monitor.clearContexts();
this.threadContainer.resetResource(monitor);
return;
}
}
Expand All @@ -148,18 +146,6 @@ public void stopMonitoringForAllConnections(@NonNull final Set<String> nodeKeys)
@Override
public void releaseResources() {
this.threadContainer = null;
MonitorThreadContainer.releaseInstance();
}

@Override
public void notifyUnused(final Monitor monitor) {
if (monitor == null) {
LOGGER.warning(() -> Messages.get("MonitorServiceImpl.nullMonitorParam"));
return;
}

// Remove monitor from the maps
this.threadContainer.releaseResource(monitor);
}

/**
Expand All @@ -172,7 +158,7 @@ public void notifyUnused(final Monitor monitor) {
*/
protected Monitor getMonitor(final Set<String> nodeKeys, final HostSpec hostSpec, final Properties properties) {
return this.threadContainer.getOrCreateMonitor(
nodeKeys, () -> monitorInitializer.createMonitor(hostSpec, properties, this));
nodeKeys, () -> monitorInitializer.createMonitor(hostSpec, properties, this.threadContainer));
}

MonitorThreadContainer getThreadContainer() {
Expand Down
Loading
Loading