Skip to content

Commit

Permalink
Intert in Bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
kderme committed Jan 4, 2024
1 parent bb1e64a commit 929b1a9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync/DbAction.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ newThreadChannels =
-- The pipeline queue in the LocalChainSync machinery is 50 elements long
-- so we should not exceed that.
ThreadChannels
<$> TBQ.newTBQueueIO 47
<$> TBQ.newTBQueueIO 300
<*> newTVarIO False

writeDbActionQueue :: ThreadChannels -> DbAction -> STM ()
Expand Down
40 changes: 29 additions & 11 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Cardano.DbSync.Era.Shelley.Adjust (adjustEpochRewards)
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, mkAdaPots)
import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefunds, insertRewards)
import Cardano.DbSync.Era.Shelley.Insert.Grouped
import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards)
import Cardano.DbSync.Error
import Cardano.DbSync.Fix.EpochStake
Expand Down Expand Up @@ -57,22 +58,24 @@ insertListBlocks ::
[CardanoBlock] ->
IO (Either SyncNodeError ())
insertListBlocks synEnv blocks = do
DB.runDbIohkLogging (envBackend synEnv) tracer
. runExceptT
$ traverse_ (applyAndInsertBlockMaybe synEnv) blocks
DB.runDbIohkLogging (envBackend synEnv) tracer $ runExceptT $ do
groups <- foldM (applyAndInsertBlockMaybe synEnv) [] blocks
unless (null groups) $
void $ insertBlockGroupedData synEnv $ mconcat $ reverse groups
where
tracer = getTrace synEnv

applyAndInsertBlockMaybe ::
SyncEnv ->
[BlockGroupedData] ->
CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
applyAndInsertBlockMaybe syncEnv cblk = do
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
applyAndInsertBlockMaybe syncEnv groups cblk = do
bl <- liftIO $ isConsistent syncEnv
(!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl)
if bl
then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block
insertBlock syncEnv cblk applyRes False tookSnapshot
insertBlock syncEnv groups cblk applyRes False False tookSnapshot
else do
eiBlockInDbAlreadyId <- lift (DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk))
-- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or
Expand All @@ -88,7 +91,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
]
rollbackFromBlockNo syncEnv (blockNo cblk)
void $ migrateStakeDistr syncEnv (apOldLedger applyRes)
insertBlock syncEnv cblk applyRes True tookSnapshot
_ <- insertBlock syncEnv groups cblk applyRes True True tookSnapshot
liftIO $ setConsistentLevel syncEnv Consistent
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
replaced <- lift $ DB.replaceAdaPots blockId $ mkAdaPots blockId slotNo epochNo adaPots
Expand All @@ -99,6 +102,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
| Just epochNo <- getNewEpoch applyRes ->
liftIO $ logInfo tracer $ "Reached " <> textShow epochNo
_ -> pure ()
pure []
where
tracer = getTrace syncEnv

Expand All @@ -122,26 +126,31 @@ applyAndInsertBlockMaybe syncEnv cblk = do

insertBlock ::
SyncEnv ->
[BlockGroupedData] ->
CardanoBlock ->
ApplyResult ->
-- force inserting all data
Bool ->
-- is first Block after rollback
Bool ->
-- has snapshot been taken
Bool ->
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
insertBlock syncEnv groupsPrev cblk applyRes forceInsert firstAfterRollback tookSnapshot = do
!epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv (apSlotDetails applyRes)
let !applyResult = applyRes {apEvents = sort $ epochEvents <> apEvents applyRes}
let !details = apSlotDetails applyResult
let !withinTwoMin = isWithinTwoMin details
let !withinHalfHour = isWithinHalfHour details
let !insertAll = forceInsert || withinTwoMin || withinHalfHour || tookSnapshot
insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult)
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)
let insertShelley blk =
insertShelleyBlock
syncEnv
groupsPrev
isStartEventOrRollback
withinTwoMin
withinHalfHour
Expand All @@ -152,10 +161,11 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do

-- Here we insert the block and it's txs, but in adition we also cache some values which we later
-- use when updating the Epoch, thus saving us having to recalulating them later.
case cblk of
BlockByron blk ->
groups <- case cblk of
BlockByron blk -> do
newExceptT $
insertByronBlock syncEnv isStartEventOrRollback blk details
pure []
BlockShelley blk ->
newExceptT $
insertShelley $
Expand Down Expand Up @@ -186,7 +196,15 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
when (unBlockNo blkNo `mod` getPruneInterval syncEnv == 0) $
do
lift $ DB.deleteConsumedTxOut tracer (getSafeBlockNoDiff syncEnv)
groups' <-
if insertAll then do
unless (null groups) $
void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groups
pure []
else
pure groups
commitOrIndexes withinTwoMin withinHalfHour
pure groups'
where
tracer = getTrace syncEnv
iopts = getInsertOptions syncEnv
Expand Down
54 changes: 33 additions & 21 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,16 @@ type IsPoolMember = PoolKeyHash -> Bool
insertShelleyBlock ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
[BlockGroupedData] ->
Bool ->
Bool ->
Bool ->
Generic.Block ->
SlotDetails ->
IsPoolMember ->
ApplyResult ->
ReaderT SqlBackend m (Either SyncNodeError ())
insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do
ReaderT SqlBackend m (Either SyncNodeError [BlockGroupedData])
insertShelleyBlock syncEnv groupsPrev shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do
runExceptT $ do
pbid <- case Generic.blkPreviousHash blk of
Nothing -> liftLookupFail (renderErrorMessage (Generic.blkEra blk)) DB.queryGenesis -- this is for networks that fork from Byron on epoch 0.
Expand Down Expand Up @@ -141,12 +142,11 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is

let zippedTx = zip [0 ..] (Generic.blkTxs blk)

txsPrepared <- foldAndAccM (prepareTx syncEnv blkId applyResult) zippedTx
txsPrepared <- foldAndAccM (prepareTx syncEnv txOutPrev blkId applyResult) zippedTx
txIds <- lift $ DB.insertManyTx (ptrTxDb <$> txsPrepared)
let txInserter = insertTx syncEnv blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
let txInserter = insertTx syncEnv txOutPrev blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult
let newZip = zipWith3 (\tx txId ptr -> (txId, tx, ptr)) (Generic.blkTxs blk) txIds txsPrepared
blockGroupedData <- foldM txInserter mempty newZip
minIds <- insertBlockGroupedData syncEnv blockGroupedData

-- now that we've inserted the Block and all it's txs lets cache what we'll need
-- when we later update the epoch values.
Expand All @@ -158,15 +158,12 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
EpochBlockDiff
{ ebdBlockId = blkId
, ebdTime = sdSlotTime details
, ebdFees = groupedTxFees blockGroupedData
, ebdFees = sum (ptrFees <$> txsPrepared)
, ebdEpochNo = unEpochNo (sdEpochNo details)
, ebdOutSum = fromIntegral $ groupedTxOutSum blockGroupedData
, ebdOutSum = sum (fromIntegral . ptrOutSum <$> txsPrepared)
, ebdTxCount = fromIntegral $ length (Generic.blkTxs blk)
}

when withinHalfHour $
insertReverseIndex blkId minIds

liftIO $ do
let epoch = unEpochNo epochNo
slotWithinEpoch = unEpochSlot (sdEpochSlot details)
Expand Down Expand Up @@ -208,9 +205,20 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is
when (ioOffChainPoolData iopts)
. lift
$ insertOffChainPoolResults tracer (envOffChainPoolResultQueue syncEnv)

if withinHalfHour then do
unless (null groupsPrev) $
void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groupsPrev
minIds <- insertBlockGroupedData syncEnv blockGroupedData
insertReverseIndex blkId minIds
pure []
else do
pure $ blockGroupedData : groupsPrev
where
iopts = getInsertOptions syncEnv

txOutPrev = fmap fst . groupedTxOut <$> groupsPrev

logger :: Trace IO a -> a -> IO ()
logger
| shouldLog = logInfo
Expand Down Expand Up @@ -273,12 +281,13 @@ data PrepareTxRes = PrepareTxRes
prepareTx ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
[[ExtendedTxOut]] ->
DB.BlockId ->
ApplyResult ->
[(ByteString, Generic.TxOut)] ->
(Word64, Generic.Tx) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) (PrepareTxRes, [(ByteString, Generic.TxOut)])
prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
prepareTx syncEnv txOutPrev blkId applyResult blockTxOuts (blockIndex, tx) = do
let !txHash = Generic.txHash tx
let !mdeposits = if not (Generic.txValidContract tx) then Just (Coin 0) else lookupDepositsMap txHash (apDepositsMap applyResult)
let !outSum = fromIntegral $ unCoin $ Generic.txOutSum tx
Expand All @@ -293,7 +302,7 @@ prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
pure (resolvedInputsDB, fees, unCoin <$> mdeposits)
(_, Nothing) -> do
-- Nothing in fees means a phase 2 failure
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue blockTxOuts) (Generic.txInputs tx)
(resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue txOutPrev blockTxOuts) (Generic.txInputs tx)
let !inSum = sum $ map unDbLovelace $ catMaybes amounts
!diffSum = if inSum >= outSum then inSum - outSum else 0
!fees = maybe diffSum (fromIntegral . unCoin) (Generic.txFees tx)
Expand All @@ -318,6 +327,7 @@ prepareTx syncEnv blkId applyResult blockTxOuts (blockIndex, tx) = do
insertTx ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
[[ExtendedTxOut]] ->
DB.BlockId ->
IsPoolMember ->
EpochNo ->
Expand All @@ -326,17 +336,17 @@ insertTx ::
BlockGroupedData ->
(DB.TxId, Generic.Tx, PrepareTxRes) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData
insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do
insertTx syncEnv txOutPrev blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do
let !txHash = Generic.txHash tx
disInOut <- liftIO $ getDisableInOutState syncEnv
if not (Generic.txValidContract tx)
then do
!txOutsGrouped <- mapM (prepareTxOut tracer cache iopts (txId, txHash)) (Generic.txOutputs tx)

!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) Map.empty) (ptrResolvedTxIn ptr)
!txIns <- mapM (prepareTxIn txId groups Map.empty) (ptrResolvedTxIn ptr)
-- There is a custom semigroup instance for BlockGroupedData which uses addition for the values `fees` and `outSum`.
-- Same happens bellow on last line of this function.
pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [] (ptrFees ptr) (ptrOutSum ptr))
pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [])
else do
-- The following operations only happen if the script passes stage 2 validation (or the tx has
-- no script).
Expand All @@ -346,7 +356,7 @@ insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, pt
Map.fromList
<$> whenFalseMempty
(ioPlutusExtra iopts)
(mapM (insertRedeemer tracer disInOut (fst <$> groupedTxOut grouped) txId) (Generic.txRedeemer tx))
(mapM (insertRedeemer tracer disInOut groups txId) (Generic.txRedeemer tx))

when (ioPlutusExtra iopts) $ do
mapM_ (insertDatum tracer cache txId) (Generic.txData tx)
Expand Down Expand Up @@ -390,13 +400,15 @@ insertTx syncEnv blkId isMember epochNo slotNo applyResult grouped (txId, tx, pt
mapM_ (insertGovActionProposal cache blkId txId (getGovExpiresAt applyResult epochNo)) $ zip [0 ..] (Generic.txProposalProcedure tx)
mapM_ (insertVotingProcedures tracer cache txId) (Generic.txVotingProcedure tx)

!txIns <- mapM (prepareTxIn txId (fst <$> groupedTxOut grouped) redeemers) (ptrResolvedTxIn ptr)
pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint (ptrFees ptr) (ptrOutSum ptr))
!txIns <- mapM (prepareTxIn txId groups redeemers) (ptrResolvedTxIn ptr)
pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint)
where
tracer = getTrace syncEnv
cache = envCache syncEnv
iopts = getInsertOptions syncEnv

groups = (fst <$> groupedTxOut grouped) : txOutPrev

prepareTxOut ::
(MonadBaseControl IO m, MonadIO m) =>
Trace IO Text ->
Expand Down Expand Up @@ -481,15 +493,15 @@ insertCollateralTxOut tracer cache iopts (txId, _txHash) (Generic.TxOut index ad
prepareTxIn ::
Monad m =>
DB.TxId ->
[ExtendedTxOut] ->
[[ExtendedTxOut]] ->
Map Word64 DB.RedeemerId ->
(Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) ->
ExceptT SyncNodeError m ExtendedTxIn
prepareTxIn txInId groupedOutputs redeemers (txIn, mtxOutId, mTxOutId) = do
txOutId <- liftLookupFail "resolveScriptHash" $
case mtxOutId of
Just txOutId -> pure $ Right txOutId
Nothing -> case resolveInMemory txIn groupedOutputs of
Nothing -> case resolveInMemoryMany txIn groupedOutputs of
Nothing -> pure $ Left $ DB.DbLookupTxHash (Generic.txInHash txIn)
Just txOut -> pure $ Right $ DB.txOutTxId $ etoTxOut txOut
let txInDB =
Expand Down Expand Up @@ -1138,7 +1150,7 @@ insertRedeemer ::
(MonadBaseControl IO m, MonadIO m) =>
Trace IO Text ->
Bool ->
[ExtendedTxOut] ->
[[ExtendedTxOut]] ->
DB.TxId ->
(Word64, Generic.TxRedeemer) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) (Word64, DB.RedeemerId)
Expand Down
Loading

0 comments on commit 929b1a9

Please sign in to comment.