diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index 0dba8a844b8..28d3375c542 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -5503,10 +5503,6 @@ "steps": [ { "color": "green" - }, - { - "color": "red", - "value": 80 } ] }, @@ -5539,12 +5535,12 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "metrics_FetchChunkTime_Mean{instance=~\"${instance}\"}", + "expr": "metrics_FetchMemoryChunkTime_Mean{instance=~\"${instance}\"}", "legendFormat": "${baseLegend}", "refId": "A" } ], - "title": "metrics_FetchChunkTime_Mean", + "title": "metrics_FetchMemoryChunkTime_Mean", "type": "timeseries" }, { @@ -5595,10 +5591,6 @@ "steps": [ { "color": "green" - }, - { - "color": "red", - "value": 80 } ] }, @@ -5631,12 +5623,188 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "metrics_FetchChunkTime_Max{instance=~\"${instance}\"}", + "expr": "metrics_FetchMemoryChunkTime_Max{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "refId": "A" + } + ], + "title": "metrics_FetchMemoryChunkTime_Max", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 103 + }, + "id": 21, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_FetchLocalChunkTime_Mean{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "refId": "A" + } + ], + "title": "metrics_FetchLocalChunkTime_Mean", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 103 + }, + "id": 22, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_LocalFetchChunkTime_Max{instance=~\"${instance}\"}", "legendFormat": "${baseLegend}", "refId": "A" } ], - "title": "metrics_FetchChunkTime_Max", + "title": "metrics_LocalFetchChunkTime_Max", "type": "timeseries" }, { @@ -5909,13 +6077,13 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "metrics_FetchChunkSuccessCount_Count{instance=~\"${instance}\"}", + "expr": "metrics_FetchMemoryChunkSuccessCount_Count{instance=~\"${instance}\"}", "legendFormat": "${baseLegend}", "range": true, "refId": "A" } ], - "title": "metrics_FetchChunkSuccessCount_Count", + "title": "metrics_FetchMemoryChunkSuccessCount_Count", "type": "timeseries" }, { @@ -5966,10 +6134,95 @@ "steps": [ { "color": "green" - }, + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 119 + }, + "id": 86, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_FetchLocalChunkSuccessCount_Count{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_FetchLocalChunkSuccessCount_Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ { - "color": "red", - "value": 80 + "color": "green" } ] } @@ -6002,13 +6255,102 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "metrics_FetchChunkFailCount_Count{instance=~\"${instance}\"}", + "expr": "metrics_FetchMemoryChunkFailCount_Count{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_FetchMemoryChunkFailCount_Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 119 + }, + "id": 85, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "metrics_FetchLocalChunkFailCount_Count{instance=~\"${instance}\"}", "legendFormat": "${baseLegend}", "range": true, "refId": "A" } ], - "title": "metrics_FetchChunkFailCount_Count", + "title": "metrics_FetchLocalChunkFailCount_Count", "type": "timeseries" }, { diff --git a/docs/monitoring.md b/docs/monitoring.md index 18bb29c8f35..3469ef08027 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -184,13 +184,16 @@ These metrics are exposed by Celeborn worker. | ActiveShuffleSize | The active shuffle size of a worker including master replica and slave replica. | | ActiveShuffleFileCount | The active shuffle file count of a worker including master replica and slave replica. | | OpenStreamTime | The time for a worker to process openStream RPC and return StreamHandle. | - | FetchChunkTime | The time for a worker to fetch a chunk which is 8MB by default from a reduced partition. | + | FetchMemoryChunkTime | The time for a worker to fetch a memory chunk which is 8MB by default from a reduced partition. | + | FetchLocalChunkTime | The time for a worker to fetch a local disk chunk which is 8MB by default from a reduced partition. | | FetchChunkTransferTime | The time for a worker to transfer for fetching a chunk from a reduced partition. | | ActiveChunkStreamCount | Active stream count for reduce partition reading streams. | | OpenStreamSuccessCount | The count of opening stream succeed in current worker. | | OpenStreamFailCount | The count of opening stream failed in current worker. | - | FetchChunkSuccessCount | The count of fetching chunk succeed in current worker. | - | FetchChunkFailCount | The count of fetching chunk failed in current worker. | + | FetchMemoryChunkSuccessCount | The count of fetching memory chunk succeed in current worker. | + | FetchLocalChunkSuccessCount | The count of fetching local disk chunk succeed in current worker. | + | FetchMemoryChunkFailCount | The count of fetching memory chunk failed in current worker. | + | FetchLocalChunkFailCount | The count of fetching local disk chunk failed in current worker. | | FetchChunkTransferSize | The size of transfer for fetching chunk in current worker. | | PrimaryPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn client. | | ReplicaPushDataTime | The time for a worker to handle a pushData RPC sent from a celeborn worker by replicating. | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index edf71039f58..ca9138e8fbc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -553,11 +553,21 @@ class FetchHandler( s" to fetch block $streamChunkSlice") val streamState = chunkStreamManager.getStreamState(streamChunkSlice.streamId) + val storageMetrics = streamState.buffers match { + case _: FileChunkBuffers => ( + WorkerSource.FETCH_LOCAL_CHUNK_TIME, + WorkerSource.FETCH_LOCAL_CHUNK_SUCCESS_COUNT, + WorkerSource.FETCH_LOCAL_CHUNK_FAIL_COUNT) + case _: MemoryChunkBuffers => ( + WorkerSource.FETCH_MEMORY_CHUNK_TIME, + WorkerSource.FETCH_MEMORY_CHUNK_SUCCESS_COUNT, + WorkerSource.FETCH_MEMORY_CHUNK_FAIL_COUNT) + } if (streamState == null) { val message = s"Stream ${streamChunkSlice.streamId} is not registered with worker. " + "This can happen if the worker was restart recently." logError(message) - workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) + workerSource.incCounter(storageMetrics._3) client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, message)) return } @@ -569,7 +579,7 @@ class FetchHandler( s"$chunksBeingTransferred exceeds ${MAX_CHUNKS_BEING_TRANSFERRED.key} " + s"${Utils.bytesToString(threshold)}." logError(message) - workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) + workerSource.incCounter(storageMetrics._3) client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, message)) return } @@ -578,7 +588,7 @@ class FetchHandler( workerSource.recordAppActiveConnection(client, streamState.shuffleKey) val reqStr = req.toString - workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr) + workerSource.startTimer(storageMetrics._1, reqStr) val fetchTimeMetric = chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId) val fetchBeginTime = System.nanoTime() try { @@ -596,18 +606,18 @@ class FetchHandler( logDebug( s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk $streamChunkSlice") } - workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT) + workerSource.incCounter(storageMetrics._2) } else { logWarning( s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk $streamChunkSlice", future.cause()) - workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) + workerSource.incCounter(storageMetrics._3) } chunkStreamManager.chunkSent(streamChunkSlice.streamId) if (fetchTimeMetric != null) { fetchTimeMetric.update(System.nanoTime() - fetchBeginTime) } - workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr) + workerSource.stopTimer(storageMetrics._1, reqStr) } }) } catch { @@ -616,11 +626,11 @@ class FetchHandler( s"Error opening block $streamChunkSlice for request from " + NettyUtils.getRemoteAddress(client.getChannel), e) - workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) + workerSource.incCounter(storageMetrics._3) client.getChannel.writeAndFlush(new ChunkFetchFailure( streamChunkSlice, Throwables.getStackTraceAsString(e))) - workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr) + workerSource.stopTimer(storageMetrics._1, reqStr) } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index f3f5446cc8f..e4ca4b14a17 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -40,8 +40,10 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) // add counters addCounter(OPEN_STREAM_SUCCESS_COUNT) addCounter(OPEN_STREAM_FAIL_COUNT) - addCounter(FETCH_CHUNK_SUCCESS_COUNT) - addCounter(FETCH_CHUNK_FAIL_COUNT) + addCounter(FETCH_MEMORY_CHUNK_SUCCESS_COUNT) + addCounter(FETCH_LOCAL_CHUNK_SUCCESS_COUNT) + addCounter(FETCH_MEMORY_CHUNK_FAIL_COUNT) + addCounter(FETCH_LOCAL_CHUNK_FAIL_COUNT) addCounter(WRITE_DATA_HARD_SPLIT_COUNT) addCounter(WRITE_DATA_SUCCESS_COUNT) addCounter(WRITE_DATA_FAIL_COUNT) @@ -91,7 +93,8 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) addTimer(PRIMARY_SEGMENT_START_TIME) addTimer(REPLICA_SEGMENT_START_TIME) - addTimer(FETCH_CHUNK_TIME) + addTimer(FETCH_MEMORY_CHUNK_TIME) + addTimer(FETCH_LOCAL_CHUNK_TIME) addTimer(OPEN_STREAM_TIME) addTimer(TAKE_BUFFER_TIME) addTimer(SORT_TIME) @@ -161,12 +164,15 @@ object WorkerSource { // fetch data val OPEN_STREAM_TIME = "OpenStreamTime" - val FETCH_CHUNK_TIME = "FetchChunkTime" + val FETCH_MEMORY_CHUNK_TIME = "FetchMemoryChunkTime" + val FETCH_LOCAL_CHUNK_TIME = "FetchLocalChunkTime" val ACTIVE_CHUNK_STREAM_COUNT = "ActiveChunkStreamCount" val OPEN_STREAM_SUCCESS_COUNT = "OpenStreamSuccessCount" val OPEN_STREAM_FAIL_COUNT = "OpenStreamFailCount" - val FETCH_CHUNK_SUCCESS_COUNT = "FetchChunkSuccessCount" - val FETCH_CHUNK_FAIL_COUNT = "FetchChunkFailCount" + val FETCH_MEMORY_CHUNK_SUCCESS_COUNT = "FetchMemoryChunkSuccessCount" + val FETCH_LOCAL_CHUNK_SUCCESS_COUNT = "FetchLocalChunkSuccessCount" + val FETCH_MEMORY_CHUNK_FAIL_COUNT = "FetchMemoryChunkFailCount" + val FETCH_LOCAL_CHUNK_FAIL_COUNT = "FetchLocalChunkFailCount" val FETCH_CHUNK_TRANSFER_SIZE = "FetchChunkTransferSize" val FETCH_CHUNK_TRANSFER_TIME = "FetchChunkTransferTime"