Skip to content

Commit

Permalink
Merge pull request #99 from channable/dd/add-ws-queue-metrics
Browse files Browse the repository at this point in the history
Add metrics on WebSocket queue
  • Loading branch information
diegodiv authored Mar 2, 2023
2 parents 8642cfe + 026808d commit 5e7a6e9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
26 changes: 16 additions & 10 deletions server/src/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Control.Concurrent.MVar (MVar, newMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue, isFullTBQueue)
import Control.Concurrent.STM.TVar (TVar, newTVarIO)
import Control.Monad (forever, unless, replicateM_, when)
import Control.Monad (forever, unless, when)
import Control.Monad.IO.Class
import Data.Aeson (Value (..))
import Data.Foldable (forM_, for_)
Expand Down Expand Up @@ -110,7 +110,9 @@ postQuit core = do
atomically $ do
writeTBQueue (coreQueue core) Stop
writeTBQueue (coreUpdates core) Nothing
replicateM_ 2 $ for_ (coreMetrics core) Metrics.incrementQueueAdded
for_ (coreMetrics core) $ \metrics -> do
Metrics.incrementQueueAdded metrics
Metrics.incrementWsQueueAdded metrics

-- | Try to enqueue a command. It succeeds if the queue is not full, otherwise,
-- nothing is changed. This should be used for non-critical commands that can
Expand Down Expand Up @@ -169,14 +171,18 @@ runCommandLoop core = go

-- | Post an update to the core's update queue (read by the websocket subscribers)
postUpdate :: Path -> Core -> IO ()
postUpdate path core = atomically $ do
value <- Persistence.getValue (coreCurrentValue core)
full <- isFullTBQueue (coreUpdates core)
-- In order not to block the reader thread, and subsequently stop processing coreQueue,
-- we don't send new updates to subscribers if coreUpdates is full.
if full then
return ()
else writeTBQueue (coreUpdates core) (Just $ Updated path value)
postUpdate path core = do
isWsQueueFull <- atomically $ do
value <- Persistence.getValue (coreCurrentValue core)
full <- isFullTBQueue (coreUpdates core)
-- In order not to block the reader thread, and subsequently stop processing coreQueue,
-- we don't send new updates to subscribers if coreUpdates is full.
unless full $ writeTBQueue (coreUpdates core) (Just $ Updated path value)
return full
for_ (coreMetrics core) $
if isWsQueueFull
then Metrics.incrementWsSkippedUpdates
else Metrics.incrementWsQueueAdded

-- | Periodically send a 'Sync' command to the 'Core' if enabled in the core
-- configuration.
Expand Down
20 changes: 19 additions & 1 deletion server/src/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ data IcepeakMetrics = IcepeakMetrics
, icepeakMetricsQueueAdded :: Counter
, icepeakMetricsQueueRemoved :: Counter
, icepeakMetricsSyncDuration :: Histogram
, icepeakMetricsWsQueueAdded :: Counter
, icepeakMetricsWsQueueRemoved :: Counter
, icepeakMetricsWsSkippedUpdates :: Counter
}

createAndRegisterIcepeakMetrics :: IO IcepeakMetrics
Expand All @@ -51,9 +54,15 @@ createAndRegisterIcepeakMetrics = IcepeakMetrics
<*> register (counter (Info "icepeak_internal_queue_items_added"
"Total number of items added to the queue."))
<*> register (counter (Info "icepeak_internal_queue_items_removed"
"Total number of items removed from the queue.."))
"Total number of items removed from the queue."))
<*> register (histogram (Info "icepeak_sync_duration" "Duration of a Sync command.")
syncBuckets)
<*> register (counter (Info "icepeak_internal_ws_queue_items_added"
"Total number of items added to the WebSocket queue."))
<*> register (counter (Info "icepeak_internal_ws_queue_items_removed"
"Total number of items removed from the WebSocket queue."))
<*> register (counter (Info "icepeak_internal_ws_skipped_updates_total"
"Total number of updates that have not been sent to subscribers."))
where
requestHistogram = histogram (Info "http_request_duration_seconds"
"Duration of HTTP requests since starting Icepeak.")
Expand Down Expand Up @@ -106,3 +115,12 @@ incrementQueueRemoved = incCounter . icepeakMetricsQueueRemoved

measureSyncDuration :: (MonadIO m, MonadMonitor m) => IcepeakMetrics -> m a -> m a
measureSyncDuration = observeDuration . icepeakMetricsSyncDuration

incrementWsQueueAdded :: MonadMonitor m => IcepeakMetrics -> m ()
incrementWsQueueAdded = incCounter . icepeakMetricsWsQueueAdded

incrementWsQueueRemoved :: MonadMonitor m => IcepeakMetrics -> m ()
incrementWsQueueRemoved = incCounter . icepeakMetricsWsQueueRemoved

incrementWsSkippedUpdates :: MonadMonitor m => IcepeakMetrics -> m ()
incrementWsSkippedUpdates = incCounter . icepeakMetricsWsSkippedUpdates
2 changes: 2 additions & 0 deletions server/src/WebsocketServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Control.Concurrent.STM.TBQueue (readTBQueue)
import Control.Exception (SomeAsyncException, SomeException, finally, fromException, catch, throwIO)
import Control.Monad (forever)
import Data.Aeson (Value)
import Data.Foldable (for_)
import Data.Text (Text)
import Data.UUID
import System.Random (randomIO)
Expand Down Expand Up @@ -135,6 +136,7 @@ processUpdates core = go
where
go = do
maybeUpdate <- atomically $ readTBQueue (coreUpdates core)
for_ (coreMetrics core) Metrics.incrementWsQueueRemoved
case maybeUpdate of
Just (Updated path value) -> do
clients <- readMVar (coreClients core)
Expand Down

0 comments on commit 5e7a6e9

Please sign in to comment.