Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark-snowflake connector does not cancel running queries if the DataFrame command was cancelled in long-lived Spark applications #519

Open
sadikovi opened this issue Aug 3, 2023 · 3 comments

Comments

@sadikovi
Copy link

sadikovi commented Aug 3, 2023

I have observed that spark-snowflake connector never cancels queries if the corresponding DataFrame command was cancelled in applications such as Spark shell or any interactive environment such as notebooks.

Although there is a listener implemented to cancel all queries at the end of the application, this may not happen for many hours or days, and those queries would continue to run regardless. SparkConnectorContext.removeRunningQuery only removes the query metadata from the global tracking map but does not actually cancel the query - one would have to log in and cancel queries manually. I have an example where a query would run for hours because of this limitation.

I also noticed that thread interruption does not seem to work properly for ResultSet in the connector, particularly this block of code:

val objects = asyncRs
  .asInstanceOf[SnowflakeResultSet]
  .getResultSetSerializables(params.expectedPartitionSize)

and does not allow query cancellation on thread interrupt signal. You may need to wrap it into Await block or a separate thread to allow interrupts.

@kleinux
Copy link

kleinux commented Sep 18, 2023

I experience this issue too. Cancelling a query from spark does not cancel the query in snowflake. This can lead to very expensive operations that are not expected to execute.

@sadikovi
Copy link
Author

Also, this is not specific to a particular version. I can reproduce the same issue in Snowflake 2.12.0+ and the latest master. The code needs to be refactored to handle failures and query cancellation.

@sadikovi
Copy link
Author

sadikovi commented Jan 17, 2024

Current issues:

  1. Cancel queries in an interactive manner, we should not wait for the job to finish to cancel queries (this could take hours).
  2. Fix JDBC driver to allow cancellation of async queries, currently it is hard to do.
  3. Spark problem: py4j does not track JVM threads, i.e. socket close in PVM != socket close in JVM - I will fix it.

Code for 1:

private[snowflake] def cancelRunningQuery(sparkContext: SparkContext, queryID: String): Unit = {
  withSyncAndDoNotThrowException {
    val appId = sparkContext.applicationId
    val queries = runningQueries.get(appId)
    val candidates = queries.get.filter(rq => rq.queryID == queryID)
    logger.info(s"Running queries: $appId, $queries, trying to find $queryID, " +
      s"found queries: ${queries}, candidates: ${candidates}")
    if (candidates.nonEmpty) {
      candidates.foreach(rq => try {
        if (!rq.conn.isClosed) {
          val statement = rq.conn.createStatement()
          val sessionID = rq.conn.getSessionID
          logger.warn(s"Canceling query ${rq.queryID} for session: $sessionID")
          statement.execute(s"select SYSTEM$$CANCEL_QUERY('${rq.queryID}')")
          statement.close()
        }
      } catch {
        case th: Throwable =>
          logger.warn("Fail to cancel running queries: ", th)
      })
      logger.warn(s"Finish cancelling all queries for $appId")
      runningQueries.remove(appId)
    } else {
      logger.error(s"No running query for: $appId and $queryID")
    }
  }
}

Code for 2

wrapperThread = new WrapperThread("Snowflake-Async-Query") {
  private var objects: java.util.List[net.snowflake.client.jdbc.SnowflakeResultSetSerializable] = null
  private var err: Throwable = null

  override def run(): Unit = {
    objects = asyncRs
      .asInstanceOf[SnowflakeResultSet]
      .getResultSetSerializables(params.expectedPartitionSize)
  }

  override val getQueryID: String = queryID
  def getObjects: java.util.List[net.snowflake.client.jdbc.SnowflakeResultSetSerializable] = objects
  def setErr(e: Throwable) = this.err = e
  def getErr(): Throwable = this.err
}

wrapperThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = {
    if (t.isInstanceOf[WrapperThread]) {
      val thread = t.asInstanceOf[WrapperThread]
      thread.setErr(e)
      thread.interrupt()
    }
  }
})

wrapperThread.start()
wrapperThread.join()

Then you would cancel like this:

if (wrapperThread != null) {
  SparkConnectorContext.cancelRunningQuery(sqlContext.sparkContext, wrapperThread.getQueryID)
  SparkConnectorContext.removeRunningQuery(sqlContext.sparkContext, conn, wrapperThread.getQueryID)
  // wrapperThread.interrupt()
  wrapperThread.join()
  wrapperThread = null
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants