|
1 |
| -{-# LANGUAGE AllowAmbiguousTypes #-} |
2 |
| -{-# LANGUAGE LambdaCase #-} |
3 |
| -{-# LANGUAGE MultiWayIf #-} |
4 |
| -{-# LANGUAGE OverloadedStrings #-} |
5 |
| -{-# LANGUAGE RankNTypes #-} |
6 |
| -{-# LANGUAGE TypeFamilyDependencies #-} |
| 1 | +{-# LANGUAGE AllowAmbiguousTypes #-} |
| 2 | +{-# LANGUAGE LambdaCase #-} |
| 3 | +{-# LANGUAGE MultiWayIf #-} |
| 4 | +{-# LANGUAGE OverloadedStrings #-} |
| 5 | +{-# LANGUAGE RankNTypes #-} |
| 6 | +{-# LANGUAGE TupleSections #-} |
7 | 7 | module Mafoc.Core
|
8 | 8 | ( module Mafoc.Core
|
9 | 9 | , module Mafoc.Upstream
|
10 | 10 | ) where
|
11 | 11 |
|
| 12 | +import Control.Concurrent qualified as IO |
| 13 | +import Control.Concurrent.STM qualified as STM |
| 14 | +import Control.Concurrent.STM.TChan qualified as TChan |
| 15 | +import Control.Monad.Except (runExceptT) |
12 | 16 | import Control.Monad.Trans.Class (lift)
|
13 |
| -import Data.Function ((&)) |
14 |
| -import Data.Maybe (fromMaybe) |
| 17 | +import Data.ByteString.Char8 qualified as C8 |
| 18 | +import Data.Coerce (coerce) |
| 19 | +import Data.Function (on, (&)) |
| 20 | +import Data.List qualified as L |
| 21 | +import Data.Maybe (fromMaybe, mapMaybe) |
| 22 | +import Data.Proxy (Proxy (Proxy)) |
15 | 23 | import Data.Text qualified as TS
|
16 | 24 | import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
|
17 |
| -import Data.Word (Word32) |
| 25 | +import Data.Word (Word32, Word64) |
18 | 26 | import Database.SQLite.Simple qualified as SQL
|
19 | 27 | import Numeric.Natural (Natural)
|
| 28 | +import Prettyprinter (Pretty (pretty), defaultLayoutOptions, layoutPretty) |
| 29 | +import Prettyprinter.Render.Text (renderStrict) |
20 | 30 | import Streaming qualified as S
|
21 | 31 | import Streaming.Prelude qualified as S
|
| 32 | +import System.Directory (listDirectory, removeFile) |
22 | 33 | import System.FilePath ((</>))
|
23 |
| - |
24 |
| -import Control.Concurrent qualified as IO |
25 |
| -import Control.Concurrent.STM qualified as STM |
26 |
| -import Control.Concurrent.STM.TChan qualified as TChan |
| 34 | +import Text.Read qualified as Read |
27 | 35 |
|
28 | 36 | import Cardano.Api qualified as C
|
29 |
| --- import Cardano.BM.Data.Trace |
30 | 37 | import Cardano.BM.Setup (withTrace)
|
31 | 38 | import Cardano.BM.Trace qualified as Trace
|
32 | 39 | import Cardano.BM.Tracing (defaultConfigStdout)
|
33 | 40 | import Cardano.Streaming qualified as CS
|
| 41 | +import Marconi.ChainIndex.Indexers.EpochState qualified as Marconi |
34 | 42 | import Marconi.ChainIndex.Indexers.MintBurn ()
|
35 | 43 | import Marconi.ChainIndex.Types qualified as Marconi
|
36 |
| -import Prettyprinter (Pretty (pretty), defaultLayoutOptions, layoutPretty) |
37 |
| -import Prettyprinter.Render.Text (renderStrict) |
| 44 | +import Ouroboros.Consensus.Config qualified as O |
| 45 | +import Ouroboros.Consensus.Ledger.Extended qualified as O |
38 | 46 |
|
39 | 47 | import Mafoc.RollbackRingBuffer qualified as RB
|
40 | 48 | import Mafoc.Logging qualified as Logging
|
@@ -178,6 +186,16 @@ nodeFolderToSocketPath nodeFolder = nodeFolder </> "socket" </> "node.socket"
|
178 | 186 | nodeInfoSocketPath :: Either NodeFolder (SocketPath, a) -> SocketPath
|
179 | 187 | nodeInfoSocketPath nodeInfo_ = either nodeFolderToSocketPath fst nodeInfo_
|
180 | 188 |
|
| 189 | +getNodeConfigSocketPath :: LocalChainsyncConfig NodeConfig -> (NodeConfig, SocketPath) |
| 190 | +getNodeConfigSocketPath chainsyncConfigWithNodeConfig = (nodeConfig, socketPath) |
| 191 | + where |
| 192 | + nodeInfo' = nodeInfo chainsyncConfigWithNodeConfig |
| 193 | + nodeConfig = case nodeInfo' of |
| 194 | + Left nodeFolder -> nodeFolderToConfigPath nodeFolder |
| 195 | + Right (_socketPath, nodeConfig') -> nodeConfig' |
| 196 | + socketPath = nodeInfoSocketPath nodeInfo' |
| 197 | + |
| 198 | + |
181 | 199 | -- | Resolve @LocalChainsyncConfig@ that came from e.g command line
|
182 | 200 | -- arguments into an "actionable" @LocalChainsyncRuntime@ runtime
|
183 | 201 | -- config which can be used to generate a stream of blocks.
|
@@ -306,6 +324,114 @@ traceInfo trace msg = Trace.logInfo trace $ renderStrict $ layoutPretty defaultL
|
306 | 324 | traceDebug :: Trace.Trace IO TS.Text -> String -> IO ()
|
307 | 325 | traceDebug trace msg = Trace.logDebug trace $ renderStrict $ layoutPretty defaultLayoutOptions $ pretty msg
|
308 | 326 |
|
| 327 | +-- * Ledger state checkpoint |
| 328 | + |
| 329 | +listExtLedgerStates :: FilePath -> IO [(FilePath, SlotNoBhh)] |
| 330 | +listExtLedgerStates dirPath = L.sortBy (flip compare `on` snd) . mapMaybe parse <$> listDirectory dirPath |
| 331 | + where |
| 332 | + parse :: FilePath -> Maybe (FilePath, SlotNoBhh) |
| 333 | + parse fn = either (const Nothing) Just . fmap (fn,) $ bhhFromFileName fn |
| 334 | + |
| 335 | +loadLedgerState :: FilePath -> Trace.Trace IO TS.Text -> IO (Marconi.ExtLedgerCfg_, Marconi.ExtLedgerState_, C.ChainPoint) |
| 336 | +loadLedgerState nodeConfig trace = do |
| 337 | + paths <- listExtLedgerStates "." |
| 338 | + case paths of |
| 339 | + -- A ledger state exists on disk, resume from there |
| 340 | + (fn, (slotNo, bhh)) : _ -> do |
| 341 | + cfg <- Marconi.getLedgerConfig nodeConfig |
| 342 | + let O.ExtLedgerCfg topLevelConfig = cfg |
| 343 | + extLedgerState <- Marconi.loadExtLedgerState (O.configCodec topLevelConfig) fn >>= \case |
| 344 | + Right (_, extLedgerState) -> return extLedgerState |
| 345 | + Left msg -> error $ "Error while deserialising file " <> fn <> ", error: " <> show msg |
| 346 | + let cp = C.ChainPoint slotNo bhh |
| 347 | + traceInfo trace $ "Found on-disk ledger state, resuming from: " <> show cp |
| 348 | + return (cfg, extLedgerState, cp) |
| 349 | + -- No existing ledger states, start from the beginning |
| 350 | + [] -> do |
| 351 | + (cfg, st) <- Marconi.getInitialExtLedgerState nodeConfig |
| 352 | + traceInfo trace "No on-disk ledger state found, resuming from genesis" |
| 353 | + return (cfg, st, C.ChainPointAtGenesis) |
| 354 | + |
| 355 | +storeLedgerState :: Marconi.ExtLedgerCfg_ -> SlotNoBhh -> Marconi.ExtLedgerState_ -> IO () |
| 356 | +storeLedgerState ledgerCfg slotNoBhh extLedgerState = do |
| 357 | + let O.ExtLedgerCfg topLevelConfig = ledgerCfg |
| 358 | + putStrLn $ "Write ledger state" |
| 359 | + Marconi.writeExtLedgerState (bhhToFileName slotNoBhh) (O.configCodec topLevelConfig) extLedgerState |
| 360 | + putStrLn $ "Wrote ledger state" |
| 361 | + mapM_ (removeFile . bhhToFileName) . drop 2 . map snd =<< listExtLedgerStates "." |
| 362 | + putStrLn $ "Removed other files" |
| 363 | + |
| 364 | +-- | Initialization for ledger state indexers |
| 365 | +initializeLedgerState |
| 366 | + :: LocalChainsyncConfig NodeConfig |
| 367 | + -> Trace.Trace IO TS.Text |
| 368 | + -> DbPathAndTableName -- ^ Path to sqlite db and table name from cli |
| 369 | + -> (SQL.Connection -> String -> IO ()) -- ^ Function which takes a connection and a table name and creates the table. |
| 370 | + -> String |
| 371 | + -> IO ( Marconi.ExtLedgerState_, Maybe C.EpochNo |
| 372 | + , LocalChainsyncRuntime |
| 373 | + , SQL.Connection, String, Marconi.ExtLedgerCfg_) |
| 374 | +initializeLedgerState chainsyncConfig trace dbPathAndTableName sqliteInit defaultTableName' = do |
| 375 | + let (nodeConfig, socketPath) = getNodeConfigSocketPath chainsyncConfig |
| 376 | + |
| 377 | + networkId <- #getNetworkId nodeConfig |
| 378 | + let localNodeConnectInfo = CS.mkLocalNodeConnectInfo networkId socketPath |
| 379 | + securityParam' <- querySecurityParam localNodeConnectInfo |
| 380 | + let (dbPath, tableName) = defaultTableName defaultTableName' dbPathAndTableName |
| 381 | + sqlCon <- sqliteOpen dbPath |
| 382 | + sqliteInit sqlCon tableName |
| 383 | + |
| 384 | + (ledgerConfig, extLedgerState, startFrom) <- loadLedgerState nodeConfig trace |
| 385 | + |
| 386 | + let chainsyncRuntime' = LocalChainsyncRuntime |
| 387 | + localNodeConnectInfo |
| 388 | + ((interval_ chainsyncConfig) {from = startFrom}) |
| 389 | + securityParam' |
| 390 | + (logging_ chainsyncConfig) |
| 391 | + (pipelineSize_ chainsyncConfig) |
| 392 | + (batchSize_ chainsyncConfig) |
| 393 | + (concurrencyPrimitive_ chainsyncConfig) |
| 394 | + |
| 395 | + return ( extLedgerState, Marconi.getEpochNo extLedgerState |
| 396 | + , chainsyncRuntime' |
| 397 | + , sqlCon, tableName, ledgerConfig) |
| 398 | + |
| 399 | + |
| 400 | +bhhToFileName :: SlotNoBhh -> FilePath |
| 401 | +bhhToFileName (slotNo, blockHeaderHash) = L.intercalate "_" |
| 402 | + [ "ledgerState" |
| 403 | + , show slotNo' |
| 404 | + , TS.unpack (C.serialiseToRawBytesHexText blockHeaderHash) |
| 405 | + ] |
| 406 | + where |
| 407 | + slotNo' = coerce slotNo :: Word64 |
| 408 | + |
| 409 | +bhhFromFileName :: String -> Either String SlotNoBhh |
| 410 | +bhhFromFileName str = case splitOn '_' str of |
| 411 | + _ : slotNoStr : blockHeaderHashHex : _ -> (,) |
| 412 | + <$> parseSlotNo_ slotNoStr |
| 413 | + <*> eitherParseHashBlockHeader_ blockHeaderHashHex |
| 414 | + _ -> Left "Can't parse ledger state file name, must be <slot no> _ <block header hash>" |
| 415 | + where |
| 416 | + |
| 417 | +splitOn :: Eq a => a -> [a] -> [[a]] |
| 418 | +splitOn x xs = case span (/= x) xs of |
| 419 | + (prefix, _x : rest) -> prefix : recurse rest |
| 420 | + (lastChunk, []) -> [lastChunk] |
| 421 | + where |
| 422 | + recurse = splitOn x |
| 423 | + |
| 424 | +parseSlotNo_ :: String -> Either String C.SlotNo |
| 425 | +parseSlotNo_ str = maybe (leftError "Can't read SlotNo" str) (Right . C.SlotNo) $ Read.readMaybe str |
| 426 | + |
| 427 | +-- eitherParseHashBlockHeader :: String -> Either RawBytesHexError (C.Hash C.BlockHeader) -- cardano-api-1.35.4:Cardano.Api.SerialiseRaw. |
| 428 | +eitherParseHashBlockHeader = C.deserialiseFromRawBytesHex (C.proxyToAsType Proxy) . C8.pack |
| 429 | + |
| 430 | +eitherParseHashBlockHeader_ :: String -> Either String (C.Hash C.BlockHeader) |
| 431 | +eitherParseHashBlockHeader_ = either (Left . show) Right . eitherParseHashBlockHeader |
| 432 | + |
| 433 | +leftError :: String -> String -> Either String a |
| 434 | +leftError label str = Left $ label <> ": '" <> str <> "'" |
309 | 435 |
|
310 | 436 | -- * Sqlite
|
311 | 437 |
|
|
0 commit comments