Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jun 19, 2017

What changes were proposed in this pull request?

StateStoreProvider instances are loaded on-demand in a executor when a query is started. When a query is restarted, the loaded provider instance will get reused. Now, there is a non-trivial chance, that the task of the previous query run is still running, while the tasks of the restarted run has started. So for a stateful partition, there may be two concurrent tasks related to the same stateful partition, and there for using the same provider instance. This can lead to inconsistent results and possibly random failures, as state store implementations are not designed to be thread-safe.

To fix this, I have introduced a StateStoreProviderId, that unique identifies a provider loaded in an executor. It has the query run id in it, thus making sure that restarted queries will force the executor to load a new provider instance, thus avoiding two concurrent tasks (from two different runs) from reusing the same provider instance.

Additional minor bug fixes

  • All state stores related to query run is marked as deactivated in the StateStoreCoordinator so that the executors can unload them and clear resources.
  • Moved the code that determined the checkpoint directory of a state store from implementation-specific code (HDFSBackedStateStoreProvider) to non-specific code (StateStoreId), so that implementation do not accidentally get it wrong.
    • Also added store name to the path, to support multiple stores per sql operator partition.

Note: This change does not address the scenario where two tasks of the same run (e.g. speculative tasks) are concurrently running in the same executor. The chance of this very small, because ideally speculative tasks should never run in the same executor.

How was this patch tested?

Existing unit tests + new unit test.


/** Used to identify the state store for a given operator. */
case class OperatorStateId(
case class StatefulOperatorStateInfo(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to ***Info, so that there is less confusion with StateStoreId

// Stop and verify whether the stores are deactivated in the coordinator
query.stop()
assert(coordRef.getLocation(providerId).isEmpty)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line.

}
awaitTerminationLock.notifyAll()
}
stateStoreCoordinator.deactivateInstances(terminatedQuery.runId)
Copy link
Contributor Author

@tdas tdas Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the change that deactivate all the state stores related to the query thus enabling the executors to lazily unload all the related provider instances.

@tdas tdas changed the title Added StateStoreProviderId with queryRunId to reload StateStoreProviders when query is restarted [SPARK-21145][SS] Added StateStoreProviderId with queryRunId to reload StateStoreProviders when query is restarted Jun 19, 2017
@SparkQA
Copy link

SparkQA commented Jun 20, 2017

Test build #78266 has finished for PR 18355 at commit 3da6b0f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StateStoreProviderId(storeId: StateStoreId, queryRunId: UUID)
  • case class StatefulOperatorStateInfo(

@SparkQA
Copy link

SparkQA commented Jun 20, 2017

Test build #78270 has finished for PR 18355 at commit d2f1676.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 20, 2017

Test build #78317 has finished for PR 18355 at commit 35b1bb6.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #3804 has finished for PR 18355 at commit 35b1bb6.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78349 has finished for PR 18355 at commit a5fefab.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78358 has finished for PR 18355 at commit a5fefab.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.


// Both providers should have the same StateStoreId, but the should be different objects
assert(loadedProvidersAfterRun2(0).stateStoreId === loadedProvidersAfterRun2(1).stateStoreId)
assert(loadedProvidersAfterRun2(0).hashCode !== loadedProvidersAfterRun2(1).hashCode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assert(loadedProvidersAfterRun2(0) ne loadedProvidersAfterRun2(1))

@zsxwing
Copy link
Member

zsxwing commented Jun 21, 2017

LGTM. Just one nit.

@zsxwing
Copy link
Member

zsxwing commented Jun 21, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78408 has finished for PR 18355 at commit 0ad5a5c.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Jun 21, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78410 has finished for PR 18355 at commit 0ad5a5c.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #3806 has finished for PR 18355 at commit 0ad5a5c.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2017

Test build #78492 has finished for PR 18355 at commit 762fe60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Jun 23, 2017

Merging this to master. Thank you @zsxwing for reviewing and @HyukjinKwon for suggesting the workaround to the unidoc issue.

@asfgit asfgit closed this in fe24634 Jun 23, 2017
robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 29, 2017
…d StateStoreProviders when query is restarted

## What changes were proposed in this pull request?
StateStoreProvider instances are loaded on-demand in a executor when a query is started. When a query is restarted, the loaded provider instance will get reused. Now, there is a non-trivial chance, that the task of the previous query run is still running, while the tasks of the restarted run has started. So for a stateful partition, there may be two concurrent tasks related to the same stateful partition, and there for using the same provider instance. This can lead to inconsistent results and possibly random failures, as state store implementations are not designed to be thread-safe.

To fix this, I have introduced a `StateStoreProviderId`, that unique identifies a provider loaded in an executor. It has the query run id in it, thus making sure that restarted queries will force the executor to load a new provider instance, thus avoiding two concurrent tasks (from two different runs) from reusing the same provider instance.

Additional minor bug fixes
- All state stores related to query run is marked as deactivated in the `StateStoreCoordinator` so that the executors can unload them and clear resources.
- Moved the code that determined the checkpoint directory of a state store from implementation-specific code (`HDFSBackedStateStoreProvider`) to non-specific code (StateStoreId), so that implementation do not accidentally get it wrong.
  - Also added store name to the path, to support multiple stores per sql operator partition.

*Note:* This change does not address the scenario where two tasks of the same run (e.g. speculative tasks) are concurrently running in the same executor. The chance of this very small, because ideally speculative tasks should never run in the same executor.

## How was this patch tested?
Existing unit tests + new unit test.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#18355 from tdas/SPARK-21145.
val env = SparkEnv.get
if (env != null) {
if (_coordRef == null) {
logInfo("Env is not null")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas could you say what's the reason to add this message on the "INFO" level?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants