diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 5e651b77133e0..52dc724ee23af 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -78,6 +78,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable; import org.apache.ignite.jdbc.thin.JdbcThinStatementCancelSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinStatementTimeoutSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinStreamingNotOrderedSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinStreamingOrderedSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest; @@ -177,6 +178,7 @@ public static TestSuite suite() { suite.addTest(new JUnit4TestAdapter(JdbcThinErrorsSelfTest.class)); suite.addTest(new JUnit4TestAdapter(JdbcThinStatementCancelSelfTest.class)); suite.addTest(new JUnit4TestAdapter(JdbcThinConnectionTimeoutSelfTest.class)); + suite.addTest(new JUnit4TestAdapter(JdbcThinStatementTimeoutSelfTest.class)); suite.addTest(new JUnit4TestAdapter(JdbcThinInsertStatementSelfTest.class)); suite.addTest(new JUnit4TestAdapter(JdbcThinUpdateStatementSelfTest.class)); diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java index c38a8e696a1b0..58ed1e5488575 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java @@ -22,7 +22,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; -import java.sql.SQLTimeoutException; import java.sql.Statement; import java.util.concurrent.Callable; import org.apache.ignite.IgniteCache; @@ -381,29 +380,6 @@ public void testCloseOnCompletionBeforeQuery() throws Exception { assert stmt.isClosed() : "Statement must be closed"; } - /** - * @throws Exception If failed. - */ - @org.junit.Test - public void testExecuteQueryTimeout() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-5438"); - - final String sqlText = "select sleep_func(3)"; - - stmt.setQueryTimeout(1); - - // Timeout - GridTestUtils.assertThrows(log, - new Callable() { - @Override public Object call() throws Exception { - return stmt.executeQuery(sqlText); - } - }, - SQLTimeoutException.class, - "Timeout" - ); - } - /** * @throws Exception If failed. */ @@ -566,29 +542,6 @@ public void testExecuteUpdateProducesResultSet() throws Exception { ); } - /** - * @throws Exception If failed. - */ - @org.junit.Test - public void testExecuteUpdateTimeout() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-5438"); - - final String sqlText = "update test set val=1 where _key=sleep_func(3)"; - - stmt.setQueryTimeout(1); - - // Timeout - GridTestUtils.assertThrows(log, - new Callable() { - @Override public Object call() throws Exception { - return stmt.executeUpdate(sqlText); - } - }, - SQLTimeoutException.class, - "Timeout" - ); - } - /** * @throws Exception If failed. */ diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java new file mode 100644 index 0000000000000..2e4f36d356a90 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementTimeoutSelfTest.java @@ -0,0 +1,311 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.io.File; +import java.io.FileWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLTimeoutException; +import java.sql.Statement; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Statement timeout test. + */ +@SuppressWarnings("ThrowableNotThrown") +@RunWith(JUnit4.class) +public class JdbcThinStatementTimeoutSelfTest extends JdbcThinAbstractSelfTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** URL. */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1/"; + + /** Server thread pull size. */ + private static final int SERVER_THREAD_POOL_SIZE = 4; + + /** Connection. */ + private Connection conn; + + /** Statement. */ + private Statement stmt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setSqlFunctionClasses(TestSQLFunctions.class); + cache.setIndexedTypes(Integer.class, Integer.class, Long.class, Long.class, String.class, + JdbcThinAbstractDmlStatementSelfTest.Person.class); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setThreadPoolSize(SERVER_THREAD_POOL_SIZE)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + + for (int i = 0; i < 10000; ++i) + grid(0).cache(DEFAULT_CACHE_NAME).put(i, i); + + for (int i = 0; i < 10000; ++i) + grid(0).cache(DEFAULT_CACHE_NAME).put((long)i, (long)i); + } + + /** + * Called before execution of every test method in class. + * + * @throws Exception If failed. + */ + @Before + public void before() throws Exception { + conn = DriverManager.getConnection(URL); + + conn.setSchema('"' + DEFAULT_CACHE_NAME + '"'); + + stmt = conn.createStatement(); + + assert stmt != null; + assert !stmt.isClosed(); + } + + /** + * Called after execution of every test method in class. + * + * @throws Exception If failed. + */ + @After + public void after() throws Exception { + if (stmt != null && !stmt.isClosed()) { + stmt.close(); + + assert stmt.isClosed(); + } + + conn.close(); + + assert stmt.isClosed(); + assert conn.isClosed(); + } + + /** + * Trying to set negative timeout. SQLException with message "Invalid timeout value." is expected. + */ + @Test + public void testSettingNegativeQueryTimeout() { + GridTestUtils.assertThrows(log, () -> { + stmt.setQueryTimeout(-1); + + return null; + }, SQLException.class, "Invalid timeout value."); + } + + /** + * Trying to set zero timeout. Zero timeout means no timeout, so no exception is expected. + * + * @throws Exception If failed. + */ + @Test + public void testSettingZeroQueryTimeout() throws Exception { + stmt.setQueryTimeout(0); + + stmt.executeQuery("select sleep_func(1000);"); + } + + /** + * Setting timeout that is greater than query execution time. SQLTimeoutException is expected. + * + * @throws Exception If failed. + */ + @Test + public void testQueryTimeout() throws Exception { + stmt.setQueryTimeout(2); + + GridTestUtils.assertThrows(log, () -> { + stmt.executeQuery("select sleep_func(10) from Integer;"); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Setting timeout that is greater than query execution time. Running same query multiple times. + * SQLTimeoutException is expected in all cases. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + @Test + public void testQueryTimeoutRepeatable() throws Exception { + stmt.setQueryTimeout(2); + + GridTestUtils.assertThrows(log, () -> { + stmt.executeQuery("select sleep_func(10) from Integer;"); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + + GridTestUtils.assertThrows(log, () -> { + stmt.executeQuery("select sleep_func(10) from Integer;"); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Setting timeout that is greater than file uploading execution time. + * SQLTimeoutException is expected. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + @Test + public void testFileUploadingTimeout() throws Exception { + + File file = File.createTempFile("bulkload", "csv"); + + FileWriter writer = new FileWriter(file); + + for (int i = 1; i <= 1_000_000; i++) + writer.write(String.format("%d,%d,\"FirstName%d MiddleName%d\",LastName%d", i, i, i, i, i)); + + writer.close(); + + stmt.setQueryTimeout(1); + + GridTestUtils.assertThrows(log, () -> { + stmt.executeUpdate( + "copy from '" + file.getAbsolutePath() + "' into Person" + + " (_key, age, firstName, lastName)" + + " format csv"); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Setting timeout that is greater than batch query execution time. + * SQLTimeoutException is expected. + * + * @throws Exception If failed. + */ + @Test + public void testBatchQuery() throws Exception { + stmt.setQueryTimeout(1); + + GridTestUtils.assertThrows(log, () -> { + stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)"); + stmt.addBatch("update Long set _val = _val + 1 where _key > sleep_func (10)"); + + stmt.executeBatch(); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Setting timeout that is greater than multiple statements query execution time. + * SQLTimeoutException is expected. + * + * @throws Exception If failed. + */ + @Test + public void testMultipleStatementsQuery() throws Exception { + stmt.setQueryTimeout(1); + + GridTestUtils.assertThrows(log, () -> { + stmt.execute( + "update Long set _val = _val + 1 where _key > sleep_func (10);" + + "update Long set _val = _val + 1 where _key > sleep_func (10);" + + "update Long set _val = _val + 1 where _key > sleep_func (10);" + + "update Long set _val = _val + 1 where _key > sleep_func (10);" + + "select _val, sleep_func(10) as s from Integer limit 10"); + + return null; + }, SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Setting timeout that is greater than update query execution time. + * SQLTimeoutException is expected. + * + * @throws Exception If failed. + */ + @Test + public void testExecuteUpdateTimeout() throws Exception { + stmt.setQueryTimeout(1); + + GridTestUtils.assertThrows(log, () -> + stmt.executeUpdate("update Integer set _val=1 where _key > sleep_func(10)"), + SQLTimeoutException.class, "The query was cancelled while executing."); + } + + /** + * Utility class with custom SQL functions. + */ + public static class TestSQLFunctions { + /** + * @param v amount of milliseconds to sleep + * @return amount of milliseconds to sleep + */ + @SuppressWarnings("unused") + @QuerySqlFunction + public static int sleep_func(int v) { + try { + Thread.sleep(v); + } + catch (InterruptedException ignored) { + // No-op + } + return v; + } + } +} \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index cc448761f730f..431efdf8decfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -29,6 +29,7 @@ import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.SQLPermission; import java.sql.SQLTimeoutException; import java.sql.SQLWarning; @@ -42,15 +43,20 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.SqlStateCode; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcOrderedBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; @@ -84,12 +90,11 @@ public class JdbcThinConnection implements Connection { /** Request timeout period. */ private static final int REQUEST_TIMEOUT_PERIOD = 1_000; - /** Network timeout permission */ - private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout"; - /** Zero timeout as query timeout means no timeout. */ static final int NO_TIMEOUT = 0; + private static final String SET_NETWORK_TIMEOUT_PERM = "setNetworkTimeout"; + /** Statements modification mutex. */ final private Object stmtsMux = new Object(); @@ -129,6 +134,9 @@ public class JdbcThinConnection implements Connection { /** Tracked statements to close on disconnect. */ private final Set stmts = Collections.newSetFromMap(new IdentityHashMap<>()); + /** Query timeout timer */ + private final Timer timer; + /** * Creates new connection. * @@ -146,6 +154,8 @@ public JdbcThinConnection(ConnectionProperties connProps) throws SQLException { cliIo = new JdbcThinTcpIo(connProps); + timer = new Timer("query-timeout-timer"); + ensureConnected(); } @@ -399,6 +409,8 @@ private void doCommit() throws SQLException { closed = true; cliIo.close(); + + timer.cancel(); } /** {@inheritDoc} */ @@ -772,10 +784,25 @@ R sendRequest(JdbcRequest req) throws SQLException { R sendRequest(JdbcRequest req, JdbcThinStatement stmt) throws SQLException { ensureConnected(); + RequestTimeoutTimerTask reqTimeoutTimerTask = null; + try { + if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) { + reqTimeoutTimerTask = new RequestTimeoutTimerTask( + req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(), + stmt.requestTimeout()); + + timer.schedule(reqTimeoutTimerTask, 0, REQUEST_TIMEOUT_PERIOD); + } + JdbcResponse res = cliIo.sendRequest(req, stmt); - if (res.status() != ClientListenerResponse.STATUS_SUCCESS) + if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null && + stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null && reqTimeoutTimerTask.expired.get()) { + throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED, + IgniteQueryErrorCode.QUERY_CANCELED); + } + else if (res.status() != ClientListenerResponse.STATUS_SUCCESS) throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), res.status()); return (R)res.response(); @@ -791,6 +818,10 @@ R sendRequest(JdbcRequest req, JdbcThinStatement stmt) th else throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); } + finally { + if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTimerTask != null) + reqTimeoutTimerTask.cancel(); + } } /** @@ -865,6 +896,8 @@ private void onDisconnect() { stmts.clear(); } + + timer.cancel(); } /** @@ -903,9 +936,6 @@ private class StreamState { /** Maximum requests count that may be sent before any responses. */ private static final int MAX_REQUESTS_BEFORE_RESPONSE = 10; - /** Wait timeout. */ - private static final long WAIT_TIMEOUT = 1; - /** Batch size for streaming. */ private int streamBatchSize; @@ -1108,4 +1138,52 @@ void readResponses () { boolean isQueryCancellationSupported() { return cliIo.isQueryCancellationSupported(); } + + /** + * Request Timeout Timer Task + */ + private class RequestTimeoutTimerTask extends TimerTask { + + /** Request id. */ + private long reqId; + + /** Remaining query timeout. */ + private int remainingQryTimeout; + + /** Flag that shows whether TimerTask was expired or not. */ + private AtomicBoolean expired; + + /** + * @param reqId Request Id to cancel in case of timeout + * @param initReqTimeout Initial request timeout + */ + RequestTimeoutTimerTask(long reqId, int initReqTimeout) { + this.reqId = reqId; + + remainingQryTimeout = initReqTimeout; + + expired = new AtomicBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + if (remainingQryTimeout <= 0) { + expired.set(true); + + sendQueryCancelRequest(new JdbcQueryCancelRequest(reqId)); + + cancel(); + } + + remainingQryTimeout -= REQUEST_TIMEOUT_PERIOD; + } + catch (SQLException e) { + LOG.log(Level.WARNING, + "Request timeout processing failure: unable to cancel request [reqId=" + reqId + ']', e); + + cancel(); + } + } + } } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index 9d0aaa4175c52..a4b9bac221e00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.SQLWarning; import java.sql.Statement; import java.util.ArrayList; @@ -80,6 +81,9 @@ public class JdbcThinStatement implements Statement { /** Query timeout. */ private int timeout; + /** Request timeout. */ + private int reqTimeout; + /** Fetch size. */ private int pageSize = DFLT_PAGE_SIZE; @@ -304,34 +308,49 @@ private JdbcResult sendFile(JdbcBulkLoadAckResult cmdRes) throws SQLException { byte[] buf = new byte[batchSize]; int readBytes; + int timeSpendMillis = 0; + while ((readBytes = input.read(buf)) != -1) { + long startTime = System.currentTimeMillis(); + if (readBytes == 0) continue; + if (reqTimeout != JdbcThinConnection.NO_TIMEOUT) + reqTimeout -= timeSpendMillis; + JdbcResult res = conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.cursorId(), - batchNum++, - JdbcBulkLoadBatchRequest.CMD_CONTINUE, - readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)), + cmdRes.cursorId(), + batchNum++, + JdbcBulkLoadBatchRequest.CMD_CONTINUE, + readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes)), this); if (!(res instanceof JdbcQueryExecuteResult)) throw new SQLException("Unknown response sent by the server: " + res); + + timeSpendMillis = (int)(System.currentTimeMillis() - startTime); } + if (reqTimeout != JdbcThinConnection.NO_TIMEOUT) + reqTimeout -= timeSpendMillis; + return conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.cursorId(), - batchNum++, - JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF), + cmdRes.cursorId(), + batchNum++, + JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF), this); } } catch (Exception e) { + if (e instanceof SQLTimeoutException) + throw (SQLTimeoutException)e; + try { conn.sendRequest(new JdbcBulkLoadBatchRequest( - cmdRes.cursorId(), - batchNum, - JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR), + cmdRes.cursorId(), + batchNum, + JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR), this); } catch (SQLException e1) { @@ -467,6 +486,8 @@ void closeOnDisconnect() { throw new SQLException("Invalid timeout value."); this.timeout = timeout * 1000; + + reqTimeout = this.timeout; } /** {@inheritDoc} */ @@ -945,7 +966,7 @@ void closeIfAllResultsClosed() throws SQLException { } /** - * @param currReqId Sets curresnt request Id. + * @param currReqId Sets current request Id. */ void currentRequestId(long currReqId) { synchronized (cancellationMux) { @@ -953,6 +974,15 @@ void currentRequestId(long currReqId) { } } + /** + * @return Current request Id. + */ + long currentRequestId() { + synchronized (cancellationMux) { + return currReqId; + } + } + /** * @return Cancellation mutex. */ @@ -966,4 +996,11 @@ Object cancellationMutex() { private boolean isQueryCancellationSupported() { return conn.isQueryCancellationSupported(); } + + /** + * @return Request timeout. + */ + int requestTimeout() { + return reqTimeout; + } }