Skip to content

Commit

Permalink
Stream JSON events
Browse files Browse the repository at this point in the history
  • Loading branch information
arcz committed Nov 3, 2023
1 parent fcff1cc commit ca9ca93
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/Echidna/Campaign.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ replayCorpus vm txSeqs =
-- optional dictionary to generate calls with. Return the 'Campaign' state once
-- we can't solve or shrink anything.
runWorker
:: (MonadIO m, MonadThrow m, MonadRandom m, MonadReader Env m)
:: (MonadIO m, MonadThrow m, MonadReader Env m)
=> StateT WorkerState m ()
-- ^ Callback to run after each state update (for instrumentation)
-> VM -- ^ Initial VM state
Expand Down
55 changes: 55 additions & 0 deletions lib/Echidna/SSE.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module Echidna.SSE where

import Control.Concurrent
import Control.Monad (when)
import Data.Aeson
import Data.Binary.Builder (fromLazyByteString)
import Data.IORef
import Data.Time (LocalTime)
import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO)
import Network.Wai.Handler.Warp (run)

import Echidna.Types.Campaign (CampaignEvent (..))
import Echidna.Types.Config (Env(..))

newtype SSE = SSE (Int, LocalTime, CampaignEvent)

instance ToJSON SSE where
toJSON (SSE (workerId, time, event)) =
object [ "worker" .= workerId
, "timestamp" .= time
, "data" .= event
]

runSSEServer :: Env -> Int -> IO (MVar ())
runSSEServer env nworkers = do
aliveRef <- newIORef nworkers
sseFinished <- newEmptyMVar
sseChan <- dupChan env.eventQueue

let sseListener = do
aliveNow <- readIORef aliveRef
if aliveNow == 0 then
pure CloseEvent
else do
event@(_, _, campaignEvent) <- readChan sseChan
let eventName = \case
TestFalsified _ -> "test_falsified"
TestOptimized _ -> "test_optimized"
NewCoverage {} -> "new_coverage"
TxSequenceReplayed _ _ -> "tx_sequence_replayed"
WorkerStopped _ -> "worker_stopped"
case campaignEvent of
WorkerStopped _ -> do
aliveAfter <- atomicModifyIORef' aliveRef (\n -> (n-1, n-1))
when (aliveAfter == 0) $ putMVar sseFinished ()
_ -> pure ()
pure $ ServerEvent
{ eventName = Just (eventName campaignEvent)
, eventId = Nothing
, eventData = [ fromLazyByteString $ encode (SSE event) ]
}

_serverTid <- forkIO $ do
run 3413 $ eventSourceAppIO sseListener
pure sseFinished
10 changes: 10 additions & 0 deletions lib/Echidna/Types/Campaign.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Echidna.Types.Campaign where

import Data.Aeson
import Data.Map (Map)
import Data.Text (Text)
import Data.Text qualified as T
Expand Down Expand Up @@ -51,6 +52,15 @@ data CampaignEvent
-- this one
deriving Show

instance ToJSON CampaignEvent where
toJSON = \case
TestFalsified test -> toJSON test
TestOptimized test -> toJSON test
NewCoverage coverage numContracts corpusSize ->
object [ "coverage" .= coverage, "contracts" .= numContracts, "corpus_size" .= corpusSize]
TxSequenceReplayed current total -> object [ "current" .= current, "total" .= total ]
WorkerStopped reason -> object [ "reason" .= show reason ]

data WorkerStopReason
= TestLimitReached
| TimeLimitReached
Expand Down
22 changes: 19 additions & 3 deletions lib/Echidna/Types/Test.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE DeriveAnyClass #-}

module Echidna.Types.Test where

import Data.Aeson (ToJSON(..), object)
import Data.Aeson
import Data.DoubleWord (Int256)
import Data.Maybe (maybeToList)
import Data.Text (Text)
Expand All @@ -12,6 +14,7 @@ import Echidna.Events (Events)
import Echidna.Types (ExecException)
import Echidna.Types.Signature (SolSignature)
import Echidna.Types.Tx (Tx, TxResult)
import GHC.Generics (Generic)

-- | Test mode is parsed from a string
type TestMode = String
Expand Down Expand Up @@ -40,7 +43,7 @@ data TestValue
= BoolValue Bool
| IntValue Int256
| NoValue
deriving (Eq, Ord)
deriving (Eq, Ord, Generic, ToJSON)

instance Show TestValue where
show (BoolValue x) = show x
Expand Down Expand Up @@ -70,6 +73,19 @@ instance Show TestType where
CallTest t _ -> show t
Exploration -> "Exploration"

instance ToJSON TestType where
toJSON = \case
PropertyTest name addr ->
object [ "type" .= ("property_test" :: String), "name" .= name, "addr" .= addr ]
OptimizationTest name addr ->
object [ "type" .= ("optimization_test" :: String), "name" .= name, "addr" .= addr ]
AssertionTest _ sig addr ->
object [ "type" .= ("assertion_test" :: String), "signature" .= sig, "addr" .= addr ]
CallTest name _ ->
object [ "type" .= ("call_test" :: String), "name" .= name ]
Exploration ->
object [ "type" .= ("exploration_test" :: String) ]

instance Eq TestState where
Open == Open = True
Large i == Large j = i == j
Expand All @@ -85,7 +101,7 @@ data EchidnaTest = EchidnaTest
, reproducer :: [Tx]
, result :: TxResult
, events :: Events
} deriving (Eq, Show)
} deriving (Eq, Show, Generic, ToJSON)

isOptimizationTest :: EchidnaTest -> Bool
isOptimizationTest EchidnaTest{testType = OptimizationTest _ _} = True
Expand Down
28 changes: 10 additions & 18 deletions lib/Echidna/UI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,28 @@ import Control.Concurrent (killThread, threadDelay)
import Control.Exception (AsyncException)
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Random.Strict (MonadRandom)
import Control.Monad.Reader
import Control.Monad.State.Strict hiding (state)
import Data.Binary.Builder
import Data.ByteString.Lazy qualified as BS
import Data.List.Split (chunksOf)
import Data.Map (Map)
import Data.Maybe (fromMaybe, isJust)
import Data.Maybe (fromMaybe)
import Data.Time
import UnliftIO
( MonadUnliftIO, newIORef, readIORef, atomicWriteIORef, hFlush, stdout
, writeIORef, atomicModifyIORef', timeout
)
( MonadUnliftIO, newIORef, readIORef, hFlush, stdout , writeIORef, timeout)
import UnliftIO.Concurrent hiding (killThread, threadDelay)
import Network.Wai.Handler.Warp (run)
import Network.Wai.EventSource (eventSourceAppIO, ServerEvent(..))

import EVM.Types (Addr, Contract, VM, W256)

import Echidna.ABI
import Echidna.Campaign (runWorker)
import Echidna.Output.JSON qualified
import Echidna.SSE (runSSEServer)
import Echidna.Types.Campaign
import Echidna.Types.Config
import Echidna.Types.Corpus (corpusSize)
import Echidna.Types.Coverage (scoveragePoints)
import Echidna.Types.Test (EchidnaTest(..), didFail, isOptimizationTest, TestType, TestState(..))
import Echidna.Types.Test (EchidnaTest(..), didFail, isOptimizationTest)
import Echidna.Types.Tx (Tx)
import Echidna.Types.World (World)
import Echidna.UI.Report
Expand All @@ -59,7 +54,7 @@ data UIEvent =
-- | Set up and run an Echidna 'Campaign' and display interactive UI or
-- print non-interactive output in desired format at the end
ui
:: (MonadCatch m, MonadRandom m, MonadReader Env m, MonadUnliftIO m)
:: (MonadCatch m, MonadReader Env m, MonadUnliftIO m)
=> VM -- ^ Initial VM state
-> World -- ^ Initial world state
-> GenDict
Expand Down Expand Up @@ -176,14 +171,7 @@ ui vm world dict initialCorpus = do
putStrLn $ time <> "[status] " <> line
hFlush stdout

sseChan <- dupChan env.eventQueue

let streamStatus = do
(_, _, event) <- readChan sseChan
return $ ServerEvent { eventName = Nothing, eventId = Nothing, eventData = [ putStringUtf8 $ show event ]}

server <- liftIO . forkIO $ do
run 3413 $ eventSourceAppIO streamStatus
sseFinished <- liftIO $ runSSEServer env nworkers

ticker <- liftIO . forkIO . forever $ do
threadDelay 3_000_000 -- 3 seconds
Expand All @@ -197,6 +185,10 @@ ui vm world dict initialCorpus = do
-- print final status regardless the last scheduled update
liftIO printStatus

-- wait until we send all SSE events
liftIO $ putStrLn "Waiting until all SSE are received..."
readMVar sseFinished

states <- liftIO $ workerStates workers

case outputFormat of
Expand Down

0 comments on commit ca9ca93

Please sign in to comment.