Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace timeout handling to use SharedTimer #920

Merged
merged 14 commits into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 17 additions & 47 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Set;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -3016,6 +3017,10 @@ void setDataLoggable(boolean value) {
dataIsLoggable = value;
}

SharedTimer getSharedTimer() {
return con.getSharedTimer();
}

private TDSCommand command = null;

// TDS message type (Query, RPC, DTC, etc.) sent at the beginning
Expand Down Expand Up @@ -6236,7 +6241,7 @@ final class TDSReaderMark {
final class TDSReader {
private final static Logger logger = Logger.getLogger("com.microsoft.sqlserver.jdbc.internals.TDS.Reader");
final private String traceID;
private TimeoutCommand<TDSCommand> timeoutCommand;
private ScheduledFuture<?> timeout;

final public String toString() {
return traceID;
Expand Down Expand Up @@ -6390,9 +6395,8 @@ synchronized final boolean readPacket() throws SQLServerException {
// terminate the connection.
if ((command.getCancelQueryTimeoutSeconds() > 0 && command.getQueryTimeoutSeconds() > 0)) {
rene-ye marked this conversation as resolved.
Show resolved Hide resolved
// if a timeout is configured with this object, add it to the timeout poller
int timeout = command.getCancelQueryTimeoutSeconds() + command.getQueryTimeoutSeconds();
this.timeoutCommand = new TdsTimeoutCommand(timeout, this.command, this.con);
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
int seconds = command.getCancelQueryTimeoutSeconds() + command.getQueryTimeoutSeconds();
this.timeout = con.getSharedTimer().schedule(new TDSTimeoutTask(command, con), seconds);
}
}
// First, read the packet header.
Expand All @@ -6413,8 +6417,9 @@ synchronized final boolean readPacket() throws SQLServerException {
}

// if execution was subject to timeout then stop timing
if (this.timeoutCommand != null) {
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
if (this.timeout != null) {
this.timeout.cancel(false);
this.timeout = null;
}
// Header size is a 2 byte unsigned short integer in big-endian order.
int packetLength = Util.readUnsignedShortBigEndian(newPacket.header, TDS.PACKET_HEADER_MESSAGE_LENGTH);
Expand Down Expand Up @@ -7003,42 +7008,6 @@ final void trySetSensitivityClassification(SensitivityClassification sensitivity
}


/**
* The tds default implementation of a timeout command
*/
class TdsTimeoutCommand extends TimeoutCommand<TDSCommand> {
public TdsTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
super(timeout, command, sqlServerConnection);
}

public void interrupt() {
TDSCommand command = getCommand();
SQLServerConnection sqlServerConnection = getSqlServerConnection();
try {
// If TCP Connection to server is silently dropped, exceeding the query timeout
// on the same connection does
// not throw SQLTimeoutException
// The application stops responding instead until SocketTimeoutException is
// thrown. In this case, we must
// manually terminate the connection.
if (null == command && null != sqlServerConnection) {
sqlServerConnection.terminate(SQLServerException.DRIVER_ERROR_IO_FAILED,
SQLServerException.getErrString("R_connectionIsClosed"));
} else {
// If the timer wasn't canceled before it ran out of
// time then interrupt the registered command.
command.interrupt(SQLServerException.getErrString("R_queryTimedOut"));
}
} catch (SQLServerException e) {
// Unfortunately, there's nothing we can do if we
// fail to time out the request. There is no way
// to report back what happened.
assert null != command;
command.log(Level.FINE, "Command could not be timed out. Reason: " + e.getMessage());
}
}
}

/**
* TDSCommand encapsulates an interruptable TDS conversation.
*
Expand Down Expand Up @@ -7160,7 +7129,7 @@ protected void setProcessedResponse(boolean processedResponse) {
private volatile boolean readingResponse;
private int queryTimeoutSeconds;
private int cancelQueryTimeoutSeconds;
private TdsTimeoutCommand timeoutCommand;
private ScheduledFuture<?> timeout;

protected int getQueryTimeoutSeconds() {
return this.queryTimeoutSeconds;
Expand Down Expand Up @@ -7576,8 +7545,8 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
// If command execution is subject to timeout then start timing until
// the server returns the first response packet.
if (queryTimeoutSeconds > 0) {
this.timeoutCommand = new TdsTimeoutCommand(queryTimeoutSeconds, this, null);
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
SQLServerConnection conn = tdsReader != null ? tdsReader.getConnection() : null;
this.timeout = tdsWriter.getSharedTimer().schedule(new TDSTimeoutTask(this, conn), queryTimeoutSeconds);
}

if (logger.isLoggable(Level.FINEST))
Expand All @@ -7600,8 +7569,9 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
} finally {
// If command execution was subject to timeout then stop timing as soon
// as the server returns the first response packet or errors out.
if (this.timeoutCommand != null) {
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
if (this.timeout != null) {
this.timeout.cancel(false);
this.timeout = null;
}
}

Expand Down
77 changes: 21 additions & 56 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;

import javax.sql.RowSet;
Expand Down Expand Up @@ -246,31 +247,7 @@ class BulkColumnMetaData {
*/
private int srcColumnCount;

/**
* Timeout for the bulk copy command
*/
private final class BulkTimeoutCommand extends TimeoutCommand<TDSCommand> {
public BulkTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
super(timeout, command, sqlServerConnection);
}

@Override
public void interrupt() {
TDSCommand command = getCommand();
// If the timer wasn't canceled before it ran out of
// time then interrupt the registered command.
try {
command.interrupt(SQLServerException.getErrString("R_queryTimedOut"));
} catch (SQLServerException e) {
// Unfortunately, there's nothing we can do if we
// fail to time out the request. There is no way
// to report back what happened.
command.log(Level.FINE, "Command could not be timed out. Reason: " + e.getMessage());
}
}
}

private BulkTimeoutCommand timeoutCommand;
private ScheduledFuture<?> timeout;

/**
* The maximum temporal precision we can send when using varchar(precision) in bulkcommand, to send a
Expand Down Expand Up @@ -646,16 +623,14 @@ private void sendBulkLoadBCP() throws SQLServerException {
final class InsertBulk extends TDSCommand {
InsertBulk() {
super("InsertBulk", 0, 0);
int timeoutSeconds = copyOptions.getBulkCopyTimeout();
timeoutCommand = timeoutSeconds > 0 ? new BulkTimeoutCommand(timeoutSeconds, this, null) : null;
}

final boolean doExecute() throws SQLServerException {
if (null != timeoutCommand) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Starting bulk timer...");

TimeoutPoller.getTimeoutPoller().addTimeoutCommand(timeoutCommand);
int timeoutSeconds = copyOptions.getBulkCopyTimeout();
if (timeoutSeconds > 0) {
connection.checkClosed();
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
timeout = connection.getSharedTimer().schedule(new TDSTimeoutTask(this, connection),
timeoutSeconds);
}

// doInsertBulk inserts the rows in one batch. It returns true if there are more rows in
Expand All @@ -671,21 +646,27 @@ final boolean doExecute() throws SQLServerException {
}

// Check whether it is a timeout exception.
if (rootCause instanceof SQLException) {
checkForTimeoutException((SQLException) rootCause, timeoutCommand);
if (rootCause instanceof SQLException && timeout != null && timeout.isDone()) {
SQLException sqlEx = (SQLException) rootCause;
if (sqlEx.getSQLState() != null
&& sqlEx.getSQLState().equals(SQLState.STATEMENT_CANCELED.getSQLStateCode())) {
// If SQLServerBulkCopy is managing the transaction, a rollback is needed.
if (copyOptions.isUseInternalTransaction()) {
connection.rollback();
}
throw new SQLServerException(SQLServerException.getErrString("R_queryTimedOut"),
SQLState.STATEMENT_CANCELED, DriverError.NOT_SET, sqlEx);
}
}

// It is not a timeout exception. Re-throw.
throw topLevelException;
}

if (null != timeoutCommand) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Stopping bulk timer...");

TimeoutPoller.getTimeoutPoller().remove(timeoutCommand);
if (timeout != null) {
timeout.cancel(true);
timeout = null;
}

return true;
}
}
Expand Down Expand Up @@ -1145,22 +1126,6 @@ private void writeColumnMetaData(TDSWriter tdsWriter) throws SQLServerException
}
}

/**
* Helper method that throws a timeout exception if the cause of the exception was that the query was cancelled
*/
private void checkForTimeoutException(SQLException e, BulkTimeoutCommand timeoutCommand) throws SQLServerException {
if ((null != e.getSQLState()) && (e.getSQLState().equals(SQLState.STATEMENT_CANCELED.getSQLStateCode()))
&& timeoutCommand.canTimeout()) {
// If SQLServerBulkCopy is managing the transaction, a rollback is needed.
if (copyOptions.isUseInternalTransaction()) {
connection.rollback();
}

throw new SQLServerException(SQLServerException.getErrString("R_queryTimedOut"),
SQLState.STATEMENT_CANCELED, DriverError.NOT_SET, e);
}
}

/**
* Validates whether the source JDBC types are compatible with the destination table data types. We need to do this
* only once for the whole bulk copy session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial

private Boolean isAzureDW = null;

private SharedTimer sharedTimer;

/**
* Return an existing cached SharedTimer associated with this Connection or create a new one.
*
* The SharedTimer will be released when the Connection is closed.
*/
SharedTimer getSharedTimer() {
if (state == State.Closed) {
throw new IllegalStateException("Connection is closed");
}
if (sharedTimer == null) {
this.sharedTimer = SharedTimer.getTimer();
}
return this.sharedTimer;
}

static class CityHash128Key implements java.io.Serializable {

/**
Expand Down Expand Up @@ -3174,6 +3191,11 @@ public void close() throws SQLServerException {
// with the connection.
setState(State.Closed);

if (sharedTimer != null) {
sharedTimer.removeRef();
sharedTimer = null;
}

// Close the TDS channel. When the channel is closed, the server automatically
// rolls back any pending transactions and closes associated resources like
// prepared handles.
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/SharedTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Microsoft JDBC Driver for SQL Server Copyright(c) Microsoft Corporation All rights reserved. This program is made
* available under the terms of the MIT License. See the LICENSE file in the project root for more information.
*/
package com.microsoft.sqlserver.jdbc;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


class SharedTimer {
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
static final String CORE_THREAD_PREFIX = "mssql-jdbc-shared-timer-core-";
private static final AtomicLong CORE_THREAD_COUNTER = new AtomicLong();
private static SharedTimer instance;

/**
* Unique ID of this SharedTimer
*/
private final long id = CORE_THREAD_COUNTER.getAndIncrement();
/**
* Number of outstanding references to this SharedTimer
*/
private int refCount = 0;
private ScheduledThreadPoolExecutor executor;

private SharedTimer() {
executor = new ScheduledThreadPoolExecutor(1, task -> new Thread(task, CORE_THREAD_PREFIX + id));
executor.setRemoveOnCancelPolicy(true);
}

public long getId() {
return id;
}

/**
* @return Whether there is an instance of the SharedTimer currently allocated.
*/
static synchronized boolean isRunning() {
return instance != null;
}

/**
* Remove a reference to this SharedTimer.
*
* If the reference count reaches zero then the underlying executor will be shutdown so that its thread stops.
*/
public synchronized void removeRef() {
if (refCount <= 0) {
throw new IllegalStateException("removeRef() called more than actual references");
}
refCount -= 1;
if (refCount == 0) {
// Removed last reference so perform cleanup
executor.shutdownNow();
executor = null;
instance = null;
}
}

/**
* Retrieve a reference to existing SharedTimer or create a new one.
*
* The SharedTimer's reference count will be incremented to account for the new reference.
*
* When the caller is finished with the SharedTimer it must be released via {@link#removeRef}
*/
public static synchronized SharedTimer getTimer() {
if (instance == null) {
// No shared object exists so create a new one
instance = new SharedTimer();
}
instance.refCount += 1;
return instance;
}

/**
* Schedule a task to execute in the future using this SharedTimer's internal executor.
*/
public ScheduledFuture<?> schedule(TDSTimeoutTask task, long delaySeconds) {
return schedule(task, delaySeconds, TimeUnit.SECONDS);
}

/**
* Schedule a task to execute in the future using this SharedTimer's internal executor.
*/
public ScheduledFuture<?> schedule(TDSTimeoutTask task, long delay, TimeUnit unit) {
if (executor == null) {
throw new IllegalStateException("Cannot schedule tasks after shutdown");
}
return executor.schedule(task, delay, unit);
}
}
Loading