Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Deadlock in PostgreSQL. #4445

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 8 additions & 37 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.jooq.impl.DSL.table
import org.jooq.util.mysql.MySQLDSL
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import java.util.concurrent.atomic.AtomicInteger

@KotlinOpen
class SqlQueue(
Expand Down Expand Up @@ -233,31 +234,12 @@ class SqlQueue(
*/
private fun doPoll(maxMessages: Int, callback: (Message, () -> Unit) -> Unit) {
val now = clock.instant().toEpochMilli()
var changed = 0
val changed = AtomicInteger()

/**
* Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages,
* sorted by delivery time.
*
* To minimize lock contention, this is a non-locking read. The id's returned may be
* locked or removed by another instance before we can acquire them. We read more id's
* than [maxMessages] and shuffle them to decrease the likelihood that multiple instances
* polling concurrently are all competing for the oldest ready messages when many more
* than [maxMessages] are read.
*
* Candidate rows are locked via an autocommit update query by primary key that will
* only modify unlocked rows. When (candidates > maxMessages), a sliding window is used
* to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3
* attempts (and update queries) to grab [maxMessages].
*
* I.e. if maxMessage == 5 and
* candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4]
*
* - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages
* - pass2: attempts to claim [8, 5], locks 1 message
* - pass3: attempt to claim [2], succeeds but if not, there are no further attempts
* - proceeds to process 5 messages locked via 3 update queries.
*
* This makes a trade-off between grabbing the maximum number of ready messages per poll cycle
* vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario
* with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle]
Expand All @@ -281,26 +263,15 @@ class SqlQueue(
return
}

candidates.shuffle()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SO MOSTLY concerned b/c if I'm reading the comments ABOVE correct:

 To minimize lock contention, this is a non-locking read. The id's returned may be
 locked or removed by another instance before we can acquire them. We read more id's
 than [maxMessages] and shuffle them to decrease the likelihood that multiple instances
 polling concurrently are all competing for the oldest ready messages when many more
 than [maxMessages] are read.

The shuffle shouldn't have mattered. Except... it turns out it DOES matter b/c of lock behavior. Wondering if we can update the comments or explanations on this... OR refactor this code to be less... confusing on how it operates in combination with below which does seem to use a lock mechanism...


var position = 0
var passes = 0
while (changed < maxMessages && position < candidates.size && passes < 3) {
passes++
val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position)
val ids = candidates.slice(IntRange(position, position + sliceNext))
when (sliceNext) {
0 -> position++
else -> position += sliceNext
}

changed += jooq.update(queueTable)
// Must use one query per id to avoid Dead Lock in PostgreSQL
candidates.parallelStream().forEach {
changed.addAndGet(jooq.update(queueTable)
.set(lockedField, "$lockId:$now")
.where(idField.`in`(*ids.toTypedArray()), lockedField.eq("0"))
.execute()
.where(idField.eq(it), lockedField.eq("0"))
.execute())
}

if (changed > 0) {
if (changed.get() > 0) {
val rs = withRetry(READ) {
jooq.select(
field("q.id").`as`("id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,29 @@ import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueue
import com.netflix.spinnaker.q.sql.SqlQueue
import com.zaxxer.hikari.HikariConfig
import de.huxhorn.sulky.ulid.ULID
import java.time.Clock
import java.time.Duration
import java.util.Optional
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.junit.runner.RunWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.test.context.junit4.SpringRunner
import java.time.Clock
import java.time.Duration
import java.util.*

@Configuration
class SqlTestConfig {
@Bean
fun jooq(): DSLContext {
val testDatabase = SqlTestUtil.initTcMysqlDatabase()
val hikariConfig = HikariConfig()
hikariConfig.jdbcUrl = SqlTestUtil.tcJdbcUrl
hikariConfig.maximumPoolSize = 10

val testDatabase = SqlTestUtil.initDatabase(hikariConfig, SQLDialect.MYSQL, "test")
return testDatabase.context
}

Expand Down