Skip to content

Commit

Permalink
fix issue with monitoring thread stops due to inactivity and then nee…
Browse files Browse the repository at this point in the history
…ds to be started again
  • Loading branch information
sergiyvamz committed Nov 2, 2023
1 parent 8138a86 commit b8a8e46
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;
Expand Down Expand Up @@ -63,7 +64,7 @@ static class ConnectionStatus {
private final TelemetryFactory telemetryFactory;
private final Properties properties;
private final HostSpec hostSpec;
private final MonitorService monitorService;
private final MonitorThreadContainer threadContainer;
private final long monitorDisposalTimeMillis;
private volatile long contextLastUsedTimestampNano;
private volatile boolean stopped = false;
Expand All @@ -85,21 +86,21 @@ static class ConnectionStatus {
* @param monitorDisposalTimeMillis Time in milliseconds before stopping the monitoring thread
* where there are no active connection to the server this
* {@link MonitorImpl} instance is monitoring.
* @param monitorService A reference to the {@link MonitorServiceImpl} implementation
* @param threadContainer A reference to the {@link MonitorThreadContainer} implementation
* that initialized this class.
*/
public MonitorImpl(
final @NonNull PluginService pluginService,
@NonNull final HostSpec hostSpec,
@NonNull final Properties properties,
final long monitorDisposalTimeMillis,
@NonNull final MonitorService monitorService) {
@NonNull final MonitorThreadContainer threadContainer) {
this.pluginService = pluginService;
this.telemetryFactory = pluginService.getTelemetryFactory();
this.hostSpec = hostSpec;
this.properties = properties;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.monitorService = monitorService;
this.threadContainer = threadContainer;

this.contextLastUsedTimestampNano = this.getCurrentTimeNano();
this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size",
Expand Down Expand Up @@ -143,6 +144,7 @@ public void run() {
this.telemetryContext = telemetryFactory.openTelemetryContext(
"monitoring thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());

try {
this.stopped = false;
while (true) {
Expand All @@ -152,6 +154,7 @@ public void run() {
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();

while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
Expand Down Expand Up @@ -179,8 +182,7 @@ public void run() {
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final ConnectionStatus status = checkConnectionStatus(this.nodeCheckTimeoutMillis);

long delayMillis = -1;
MonitorConnectionContext monitorContext;
Expand Down Expand Up @@ -240,7 +242,7 @@ public void run() {
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
threadContainer.releaseResource(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
Expand All @@ -250,25 +252,34 @@ public void run() {
throw intEx;
} catch (final Exception ex) {
// log and ignore
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
}
}
} catch (final InterruptedException intEx) {
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
} finally {
this.stopped = true;
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
Expand All @@ -279,7 +290,6 @@ public void run() {
if (telemetryContext != null) {
this.telemetryContext.closeContext();
}
this.stopped = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
/** Interface for initialize a new {@link MonitorImpl}. */
@FunctionalInterface
public interface MonitorInitializer {
Monitor createMonitor(HostSpec hostSpec, Properties properties, MonitorService monitorService);
Monitor createMonitor(HostSpec hostSpec, Properties properties, MonitorThreadContainer threadContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,4 @@ MonitorConnectionContext startMonitoring(
void stopMonitoringForAllConnections(Set<String> nodeKeys);

void releaseResources();

/**
* Handle unused {@link Monitor}.
*
* @param monitor The {@link Monitor} in idle.
*/
void notifyUnused(Monitor monitor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package software.amazon.jdbc.plugin.efm;

import java.lang.ref.WeakReference;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
Expand All @@ -42,15 +42,15 @@ public class MonitorServiceImpl implements MonitorService {
public static final AwsWrapperProperty MONITOR_DISPOSAL_TIME_MS =
new AwsWrapperProperty(
"monitorDisposalTime",
"60000",
"600000", // 10min
"Interval in milliseconds for a monitor to be considered inactive and to be disposed.");

private final PluginService pluginService;
private MonitorThreadContainer threadContainer;

final MonitorInitializer monitorInitializer;
private Set<String> cachedMonitorNodeKeys = null;
private Monitor cachedMonitor = null;
private WeakReference<Monitor> cachedMonitor = null;
final TelemetryFactory telemetryFactory;
final TelemetryCounter abortedConnectionsCounter;

Expand Down Expand Up @@ -100,16 +100,15 @@ public MonitorConnectionContext startMonitoring(
new Object[] {hostSpec}));
}

final Monitor monitor;
if (this.cachedMonitor == null
Monitor monitor = this.cachedMonitor == null ? null : this.cachedMonitor.get();
if (monitor == null
|| monitor.isStopped()
|| this.cachedMonitorNodeKeys == null
|| !this.cachedMonitorNodeKeys.equals(nodeKeys)) {

monitor = getMonitor(nodeKeys, hostSpec, properties);
this.cachedMonitor = monitor;
this.cachedMonitor = new WeakReference<>(monitor);
this.cachedMonitorNodeKeys = Collections.unmodifiableSet(nodeKeys);
} else {
monitor = this.cachedMonitor;
}

final MonitorConnectionContext context =
Expand Down Expand Up @@ -139,7 +138,6 @@ public void stopMonitoringForAllConnections(@NonNull final Set<String> nodeKeys)
monitor = this.threadContainer.getMonitor(nodeKey);
if (monitor != null) {
monitor.clearContexts();
this.threadContainer.resetResource(monitor);
return;
}
}
Expand All @@ -148,18 +146,6 @@ public void stopMonitoringForAllConnections(@NonNull final Set<String> nodeKeys)
@Override
public void releaseResources() {
this.threadContainer = null;
MonitorThreadContainer.releaseInstance();
}

@Override
public void notifyUnused(final Monitor monitor) {
if (monitor == null) {
LOGGER.warning(() -> Messages.get("MonitorServiceImpl.nullMonitorParam"));
return;
}

// Remove monitor from the maps
this.threadContainer.releaseResource(monitor);
}

/**
Expand All @@ -172,7 +158,7 @@ public void notifyUnused(final Monitor monitor) {
*/
protected Monitor getMonitor(final Set<String> nodeKeys, final HostSpec hostSpec, final Properties properties) {
return this.threadContainer.getOrCreateMonitor(
nodeKeys, () -> monitorInitializer.createMonitor(hostSpec, properties, this));
nodeKeys, () -> monitorInitializer.createMonitor(hostSpec, properties, this.threadContainer));
}

MonitorThreadContainer getThreadContainer() {
Expand Down
Loading

0 comments on commit b8a8e46

Please sign in to comment.