Skip to content

Commit

Permalink
Add resource-based concurrency control
Browse files Browse the repository at this point in the history
This is based on the system in saurabhnanda#55, and aims to
address saurabhnanda#38, where jobs need to have some limited
access to resources that controls how many can run simultaneously.

Unlike that PR, this implements a system where jobs can require access
to 0 or more resources, with different amounts of usage. This is because
of a business need for jobs to be able to require multiple resources.

The implementation is intended to have no performance impact on existing
code wherever the user has not opted in to resource-based concurrency.
This is done by having parallel implementations wherever necessary that
switch based on the concurrency control chosen.

This subsumes and replaces the job-type-based concurrency control,
because it is possible to model that system using job types as
resources.
  • Loading branch information
ivb-supercede committed Sep 2, 2021
1 parent 2c6ea1c commit 8ba23b9
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 62 deletions.
156 changes: 111 additions & 45 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE RankNTypes, FlexibleInstances, FlexibleContexts, PartialTypeSignatures, TupleSections, DeriveGeneric, UndecidableInstances #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module OddJobs.Job
Expand All @@ -14,13 +15,17 @@ module OddJobs.Job
-- $config
, Config(..)
, ConcurrencyControl(..)
, ResourceCfg(..)

-- * Creating/scheduling jobs
--
-- $createJobs
, createJob
, scheduleJob

, createJobWithResources
, scheduleJobWithResources

-- * @Job@ and associated data-types
--
-- $dataTypes
Expand All @@ -33,6 +38,8 @@ module OddJobs.Job
, Seconds(..)
, JobErrHandler(..)
, AllJobTypes(..)
, ResourceId(..)
, FunctionName

-- ** Structured logging
--
Expand All @@ -46,7 +53,7 @@ module OddJobs.Job
, jobMonitor
, jobEventListener
, jobPoller
, jobPollingSqlM
, jobPollingSql
, JobRunner
, HasJobRunner (..)

Expand Down Expand Up @@ -90,7 +97,8 @@ import Control.Monad.Logger as MLogger (LogLevel(..), LogStr, toLogStr)
import UnliftIO.IORef
import UnliftIO.Exception ( SomeException(..), try, catch, finally
, catchAny, bracket, Exception(..), throwIO
, catches, Handler(..), mask_, throwString
, catches, Handler(..), mask_, onException
, throwString
)
import Data.Proxy
import Control.Monad.Trans.Control
Expand Down Expand Up @@ -373,7 +381,7 @@ runJobWithTimeout timeoutSec job@Job{jobId} = do

a <- async $ liftIO $ jobRunner_ job

x <- atomicModifyIORef' threadsRef $ \threads ->
x <- atomicModifyIORef' threadsRef $ \threads ->
( DM.insert jobId a threads
, DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads
)
Expand Down Expand Up @@ -491,15 +499,19 @@ jobMonitor = do
liftIO $ Dir.removePathForcibly f

-- | Ref: 'jobPoller'
jobPollingSqlM :: (HasJobRunner m)
=> m Query
jobPollingSqlM = do
jobTypeSql <- getJobTypeSql
pure $
"update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1\
\ WHERE id in (select id from ? where (run_at<=? AND (" <> jobTypeSql <> " IN ?)\
\ AND ((status in ?) OR (status = ? and locked_at<?))) ORDER BY attempts asc, run_at ASC LIMIT\
\ 1 FOR UPDATE) RETURNING id"
jobPollingSql :: Query
jobPollingSql =
"update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1\
\ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) \
\ ORDER BY attempts asc, run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"

jobPollingWithResourceSql :: Query
jobPollingWithResourceSql =
" UPDATE ? SET status = ?, locked_at = ?, locked_by = ?, attempts = attempts + 1 \
\ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) \
\ AND ?(id) \
\ ORDER BY attempts ASC, run_at ASC LIMIT 1) \
\ RETURNING id"

-- | Ref: 'killJobPoller'
killJobPollingSqlM :: (HasJobRunner m)
Expand Down Expand Up @@ -529,7 +541,7 @@ waitForJobs = do
data ConcurrencyAction
= DontPoll
| PollAny
| PollOnlyTypes [Text]
| PollWithResources ResourceCfg

getConcurrencyControlFn :: (HasJobRunner m)
=> m (Connection -> m ConcurrencyAction)
Expand All @@ -538,12 +550,7 @@ getConcurrencyControlFn = getConcurrencyControl >>= \case
MaxConcurrentJobs maxJobs -> pure $ const $ do
curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef) >>= (return . DM.elems)
pure $ pollIf $ (DL.length curJobs) < maxJobs
MaxConcurrentJobsPerType n -> pure $ \dbConn -> do
tname <- getTableName
jobCountsByTypeSql <- jobCountsByTypeSqlM
countsByType <- liftIO $ PGS.query dbConn jobCountsByTypeSql (PGS.Only tname)
let underresourcedTypes = DL.concatMap (\(type', cnt) -> [type' | cnt < n]) countsByType
pure $ PollOnlyTypes underresourcedTypes
ResourceLimits resCfg -> pure $ const $ pure $ PollWithResources resCfg
DynamicConcurrency fn -> pure $ const $ pollIf <$> liftIO fn

where
Expand Down Expand Up @@ -578,26 +585,35 @@ jobPoller = do
lockTimeout <- getDefaultJobTimeout
log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName

let pollWithFilter conn mTypesFilter = do
let poll conn mResCfg = do
nextAction <- mask_ $ do
typesFilter <- case mTypesFilter of
Nothing -> getAllJobTypes
Just types -> pure types
log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.."
jobPollingSql <- jobPollingSqlM
t <- liftIO getCurrentTime
r <- liftIO $
r <- case mResCfg of
Nothing -> liftIO $
PGS.query conn jobPollingSql
( tname
, Locked
, t
, processName
, tname
, t
, (In typesFilter)
, (In [Queued, Retry])
, Locked
, (addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t))
Just ResourceCfg{..} -> liftIO $
PGS.query conn jobPollingWithResourceSql
( tname
, Locked
, t
, processName
, tname
, t
, (In [Queued, Retry])
, Locked
, (addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t)
, resCfgCheckResourceFunction
)
case r of
-- When we don't have any jobs to run, we can relax a bit...
[] -> pure delayAction
Expand All @@ -613,16 +629,16 @@ jobPoller = do
concurrencyControlFn <- getConcurrencyControlFn
withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn pollerDbConn >>= \case
DontPoll -> log LevelWarn $ LogText $ "NOT polling the job queue due to concurrency control"
PollAny -> pollWithFilter pollerDbConn Nothing
PollOnlyTypes types -> pollWithFilter pollerDbConn (Just types)
PollAny -> poll pollerDbConn Nothing
PollWithResources resCfg -> poll pollerDbConn (Just resCfg)

where
delayAction = delaySeconds =<< getPollingInterval
noDelayAction = pure ()

-- | Executes 'killJobPollingSql' every 'cfgPollingInterval' seconds to pick up jobs
-- that are cancelled and need to be killed. Uses @UPDATE@ along with @SELECT...
-- ..FOR UPDATE@ to efficiently find a job that matches /all/ of the following
-- ..FOR UPDATE@ to efficiently find a job that matches /all/ of the following
-- conditions:
--
-- * 'jobStatus' should be 'cancelled'
Expand All @@ -641,7 +657,7 @@ killJobPoller = do
currentTime <- liftIO getCurrentTime
result <- liftIO $ PGS.query conn killJobPollingSql
(tname, tname, Cancelled, processName, currentTime)

case result of
[] ->
pure delayAction
Expand Down Expand Up @@ -671,31 +687,32 @@ jobEventListener = do
jwName <- liftIO jobWorkerName
concurrencyControlFn <- getConcurrencyControlFn

let tryLockingJob jid = do
let tryLockingJob jid mResCfg = withDbConnection $ \conn -> do
let q = "UPDATE ? SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id"
(withDbConnection $ \conn -> (liftIO $ PGS.query conn q (tname, Locked, jwName, jid, In [Queued, Retry]))) >>= \case
qWithResources =
"UPDATE ? SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 \
\ WHERE id=? AND status in ? AND ?(id) RETURNING id"
result <- case mResCfg of
Nothing -> liftIO $ PGS.query conn q (tname, Locked, jwName, jid, In [Queued, Retry])
Just ResourceCfg{..} -> liftIO $ PGS.query conn qWithResources
(tname, Locked, jwName, jid, In [Queued, Retry], resCfgCheckResourceFunction)

case result of
[] -> do
log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid
pure Nothing
[Only (_ :: JobId)] -> pure $ Just jid
x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x)

let getJobType conn jid = do
jobTypeSql <- getJobTypeSql
let q = "SELECT (" <> jobTypeSql <> ") FROM ? WHERE id = ?"
liftIO $ PGS.query conn q (tname, jid) >>= \case
[Only jobType] -> pure jobType
x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x)

withResource pool $ \monitorDbConn -> do
void $ liftIO $ PGS.execute monitorDbConn ("LISTEN ?") (Only $ pgEventName tname)
forever $ do
log LevelDebug $ LogText "[LISTEN/NOTIFY] Event loop"
notif <- liftIO $ getNotification monitorDbConn

let pload = notificationData notif
runNotifWithFilter :: HasJobRunner m => Maybe [Text] -> m ()
runNotifWithFilter mTypesFilter = do
runNotifWithFilter :: HasJobRunner m => Maybe ResourceCfg -> m ()
runNotifWithFilter mResCfg = do
log LevelDebug $ LogText $ toS $ "NOTIFY | " <> show pload
case (eitherDecode $ toS pload) of
Left e -> log LevelError $ LogText $ toS $ "Unable to decode notification payload received from Postgres. Payload=" <> show pload <> " Error=" <> show e
Expand All @@ -706,21 +723,20 @@ jobEventListener = do
Nothing -> log LevelError $ LogText $ toS $ "Unable to extract id/run_at/locked_at from " <> show pload
Just (jid, runAt_, mLockedAt_) -> do
t <- liftIO getCurrentTime
jobType <- getJobType monitorDbConn jid
if (runAt_ <= t) && (isNothing mLockedAt_) && maybe True (elem jobType) mTypesFilter
if (runAt_ <= t) && (isNothing mLockedAt_)
then do log LevelDebug $ LogText $ toS $ "Job needs needs to be run immediately. Attempting to fork in background. JobId=" <> show jid
void $ async $ do
-- Let's try to lock the job first... it is possible that it has already
-- been picked up by the poller by the time we get here.
tryLockingJob jid >>= \case
tryLockingJob jid mResCfg >>= \case
Nothing -> pure ()
Just lockedJid -> runJob lockedJid
else log LevelDebug $ LogText $ toS $ "Job is either for future, is already locked, or would violate concurrency constraints. Skipping. JobId=" <> show jid

concurrencyControlFn monitorDbConn >>= \case
DontPoll -> log LevelWarn $ LogText "Received job event, but ignoring it due to concurrency control"
PollAny -> runNotifWithFilter Nothing
PollOnlyTypes types -> runNotifWithFilter (Just types)
PollWithResources resCfg -> runNotifWithFilter (Just resCfg)
where
parser :: Value -> Aeson.Parser (JobId, UTCTime, Maybe UTCTime)
parser = withObject "expecting an object to parse job.run_at and job.locked_at" $ \o -> do
Expand All @@ -734,6 +750,12 @@ jobEventListener = do
createJobQuery :: PGS.Query
createJobQuery = "INSERT INTO ? (run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns

ensureResource :: PGS.Query
ensureResource = "INSERT INTO ? (id, usage_limit) VALUES (?, ?) ON CONFLICT DO NOTHING"

registerResourceUsage :: PGS.Query
registerResourceUsage = "INSERT INTO ? (job_id, resource_id, usage) VALUES (?, ?, ?)"

-- $createJobs
--
-- Ideally you'd want to create wrappers for 'createJob' and 'scheduleJob' in
Expand Down Expand Up @@ -779,6 +801,50 @@ scheduleJob conn tname payload runAt = do
[r] -> pure r
_ -> (Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=")) <$> queryFormatter

type ResourceList = [(ResourceId, Int)]

createJobWithResources
:: ToJSON p
=> Connection
-> TableName
-> ResourceCfg
-> p
-> ResourceList
-> IO Job
createJobWithResources conn tname resCfg payload resources = do
t <- getCurrentTime
scheduleJobWithResources conn tname resCfg payload resources t

scheduleJobWithResources
:: ToJSON p
=> Connection
-> TableName
-> ResourceCfg
-> p
-> ResourceList
-> UTCTime
-> IO Job
scheduleJobWithResources conn tname ResourceCfg{..} payload resources runAt = do
-- We insert everything in a single transaction to delay @NOTIFY@ calls,
-- so a job isn't picked up before its resources are inserted.
PGS.begin conn
let args = ( tname, runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text )
queryFormatter = toS <$> (PGS.formatQuery conn createJobQuery args)
rs <- PGS.query conn createJobQuery args

job <- flip onException (PGS.rollback conn) $ case rs of
[] -> (Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=")) <$> queryFormatter
[r] -> pure r
_ -> (Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=")) <$> queryFormatter

forM_ resources $ \(resourceId, usage) -> do
PGS.execute conn ensureResource (resCfgResourceTable, rawResourceId resourceId, resCfgDefaultLimit)
PGS.execute conn registerResourceUsage (resCfgUsageTable, jobId job, rawResourceId resourceId, usage)

PGS.commit conn

pure job


-- getRunnerEnv :: (HasJobRunner m) => m RunnerEnv
-- getRunnerEnv = ask
Expand Down
57 changes: 57 additions & 0 deletions src/OddJobs/Migrations.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE RecordWildCards #-}
module OddJobs.Migrations
( module OddJobs.Migrations
, module OddJobs.Types
Expand Down Expand Up @@ -71,3 +72,59 @@ createJobTable conn tname = void $ do
fnName = PGS.Identifier $ "notify_job_monitor_for_" <> (getTnameTxt tname)
trgName = PGS.Identifier $ "trg_notify_job_monitor_for_" <> (getTnameTxt tname)
getTnameTxt (PGS.QualifiedIdentifier _ tname') = tname'

createResourceTableQuery :: Query
createResourceTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"( id text primary key" <>
", usage_limit int not null" <>
")";

createUsageTableQuery :: Query
createUsageTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"( job_id serial not null REFERENCES ? ON DELETE CASCADE" <>
", resource_id text not null REFERENCES ? ON DELETE CASCADE" <>
", usage int not null" <>
", PRIMARY KEY (job_id, resource_id)" <>
");"

createUsageFunction :: Query
createUsageFunction = "CREATE OR REPLACE FUNCTION ?(resourceId text) RETURNS int as $$" <>
" SELECT sum(usage) FROM ? AS j INNER JOIN ? AS jr ON j.id = jr.job_id " <>
" WHERE jr.resource_id = $1 AND j.status = ? " <>
" $$ LANGUAGE SQL;"

createCheckResourceFunction :: Query
createCheckResourceFunction = "CREATE OR REPLACE FUNCTION ?(jobId int) RETURNS bool as $$" <>
" SELECT coalesce(bool_and(?(resource.id) + job_resource.usage <= resource.usage_limit), true) FROM " <>
" ? AS job_resource INNER JOIN ? AS resource ON job_resource.resource_id = resource.id " <>
" WHERE job_resource.job_id = $1" <>
" $$ LANGUAGE SQL;"

createResourceTables
:: Connection
-> TableName -- ^ Name of the jobs table
-> ResourceCfg
-> IO ()
createResourceTables conn jobTableName ResourceCfg{..} = void $ do
PGS.execute conn createResourceTableQuery (PGS.Only resCfgResourceTable)
PGS.execute conn createUsageTableQuery
( resCfgUsageTable
, jobTableName
, resCfgResourceTable
)
let
PGS.execute conn createUsageFunction
( usageFnName
, jobTableName
, resCfgUsageTable
, Locked
)
PGS.execute conn createCheckResourceFunction
( resCfgCheckResourceFunction
, usageFnName
, resCfgUsageTable
, resCfgResourceTable
)
where
usageFnName = PGS.Identifier $ "calculate_usage_for_resource_from_" <> (getTnameTxt resCfgUsageTable)
getTnameTxt (PGS.QualifiedIdentifier _ tname') = tname'
Loading

0 comments on commit 8ba23b9

Please sign in to comment.