Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show stream health info and average db query time and gpu utilization #6982

Merged
merged 15 commits into from
Jan 28, 2025
98 changes: 70 additions & 28 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

import io.antmedia.analytic.model.PublishEndedEvent;
import io.antmedia.analytic.model.PublishStartedEvent;
import io.antmedia.analytic.model.PublishStatsEvent;
import io.antmedia.analytic.model.ViewerCountEvent;
import io.antmedia.cluster.ClusterNode;
import io.antmedia.cluster.IClusterNotifier;
Expand Down Expand Up @@ -676,7 +677,10 @@ public void closeBroadcast(String streamId) {
}
else {

getDataStore().updateStatus(streamId, BROADCAST_STATUS_FINISHED);
BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
broadcastUpdate.setUpdateTime(System.currentTimeMillis());
broadcastUpdate.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_FINISHED);
getDataStore().updateBroadcastFields(streamId, broadcastUpdate);
// This is resets Viewer map in HLS Viewer Stats
resetHLSStats(streamId);

Expand Down Expand Up @@ -728,6 +732,7 @@ public static Broadcast saveMainBroadcast(String streamId, String mainTrackId, D
mainBroadcast.setZombi(true);
mainBroadcast.setStatus(BROADCAST_STATUS_BROADCASTING);
mainBroadcast.getSubTrackStreamIds().add(streamId);
mainBroadcast.setVirtual(true);
// don't set setOriginAdress because it's not a real stream and it causes extra delay -> mainBroadcast.setOriginAdress(serverSettings.getHostAddress())
mainBroadcast.setStartTime(System.currentTimeMillis());

Expand Down Expand Up @@ -1244,7 +1249,7 @@ public void notifyHook(@NotNull String url, String id, String mainTrackId, Strin

@Override
public void notifyWebhookForStreamStatus(Broadcast broadcast, int width, int height, long totalByteReceived,
int inputQueueSize, double speed) {
int inputQueueSize, int encodingQueueSize, int dropFrameCountInEncoding, int dropPacketCountInIngestion, double speed) {
String listenerHookURL = getListenerHookURL(broadcast);

if (StringUtils.isNotBlank(listenerHookURL)) {
Expand All @@ -1262,6 +1267,9 @@ public void notifyWebhookForStreamStatus(Broadcast broadcast, int width, int hei
variables.put("speed", speed);
variables.put("timestamp", System.currentTimeMillis());
variables.put("streamName",broadcast.getName());
variables.put("encodingQueueSize", encodingQueueSize);
variables.put("dropFrameCountInEncoding", dropFrameCountInEncoding);
variables.put("dropPacketCountInIngestion", dropPacketCountInIngestion);

try {
sendPOST(listenerHookURL, variables, appSettings.getWebhookRetryCount(), appSettings.getWebhookContentType());
Expand Down Expand Up @@ -1449,12 +1457,15 @@ public void setStreamAcceptFilter(StreamAcceptFilter streamAcceptFilter) {
public boolean isValidStreamParameters(int width, int height, int fps, int bitrate, String streamId) {
return streamAcceptFilter.isValidStreamParameters(width, height, fps, bitrate, streamId);
}

public static final boolean isStreaming(Broadcast broadcast) {
//if updatetime is older than 2 times update period time, regard that it's not streaming
return System.currentTimeMillis() - broadcast.getUpdateTime() < STREAM_TIMEOUT_MS &&
(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(broadcast.getStatus())
|| IAntMediaStreamHandler.BROADCAST_STATUS_PREPARING.equals(broadcast.getStatus()));

/**
* Important information: Status field of Broadcast class checks the update time to report the status is broadcasting or not.
* {@link Broadcast#getStatus()}
*/
public static final boolean isStreaming(String status) {

return (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(status)
|| IAntMediaStreamHandler.BROADCAST_STATUS_PREPARING.equals(status));
}

public Result startStreaming(Broadcast broadcast) {
Expand Down Expand Up @@ -1611,42 +1622,73 @@ public void setStreamFetcherManager(StreamFetcherManager streamFetcherManager) {
}



@Override
public void setQualityParameters(String id, String quality, double speed, int pendingPacketSize, long updateTimeMs) {

public void setQualityParameters(String streamId, PublishStatsEvent stats, long currentTimeMillis) {
vertx.runOnContext(h -> {

Broadcast broadcastLocal = getDataStore().get(id);
Broadcast broadcastLocal = getDataStore().get(streamId);
if (broadcastLocal != null)
{
//round the number to three decimal places,
double roundedSpeed = Math.round(speed * 1000.0) / 1000.0;

logger.debug("update source quality for stream: {} quality:{} speed:{}", id, quality, speed);


BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
broadcastUpdate.setSpeed(roundedSpeed);
broadcastUpdate.setPendingPacketSize(pendingPacketSize);
broadcastUpdate.setUpdateTime(updateTimeMs);
broadcastUpdate.setQuality(quality);
long elapsedTime = System.currentTimeMillis() - broadcastLocal.getStartTime();
broadcastUpdate.setDuration(elapsedTime);

broadcastUpdate.setSpeed(stats.getSpeed());
broadcastUpdate.setPendingPacketSize(stats.getInputQueueSize());
broadcastUpdate.setUpdateTime(currentTimeMillis);
long elapsedTimeMs = System.currentTimeMillis() - broadcastLocal.getStartTime();
broadcastUpdate.setDuration(elapsedTimeMs);
long elapsedSeconds = elapsedTimeMs / 1000;
if (elapsedSeconds > 0 ) { //protect by zero division
long bitrate = (stats.getTotalByteReceived()/elapsedSeconds)*8;
broadcastUpdate.setBitrate(bitrate);
}

getDataStore().updateBroadcastFields(id, broadcastUpdate);


broadcastUpdate.setWidth(stats.getWidth());
broadcastUpdate.setHeight(stats.getHeight());

broadcastUpdate.setEncoderQueueSize(stats.getEncodingQueueSize());
broadcastUpdate.setDropPacketCountInIngestion(stats.getDroppedPacketCountInIngestion());
broadcastUpdate.setDropFrameCountInEncoding(stats.getDroppedFrameCountInEncoding());
broadcastUpdate.setPacketLostRatio(stats.getPacketLostRatio());
broadcastUpdate.setPacketsLost(stats.getPacketsLost());
broadcastUpdate.setJitterMs(stats.getJitterMs());
broadcastUpdate.setRttMs(stats.getRoundTripTimeMs());

broadcastUpdate.setRemoteIp(stats.getRemoteIp());
broadcastUpdate.setUserAgent(stats.getUserAgent());
broadcastUpdate.setReceivedBytes(stats.getTotalByteReceived());

getDataStore().updateBroadcastFields(streamId, broadcastUpdate);

ViewerCountEvent viewerCountEvent = new ViewerCountEvent();
viewerCountEvent.setApp(getScope().getName());
viewerCountEvent.setStreamId(id);
viewerCountEvent.setStreamId(streamId);
viewerCountEvent.setDashViewerCount(broadcastLocal.getDashViewerCount());
viewerCountEvent.setHlsViewerCount(broadcastLocal.getHlsViewerCount());
viewerCountEvent.setWebRTCViewerCount(broadcastLocal.getWebRTCViewerCount());

LoggerUtils.logAnalyticsFromServer(viewerCountEvent);

logger.debug("update source quality for stream:{} width:{} height:{} bitrate:{} input queue size:{} encoding queue size:{} packetsLost:{} packetLostRatio:{} jitter:{} rtt:{}",

streamId, stats.getWidth(), stats.getHeight(), broadcastUpdate.getBitrate(), stats.getInputQueueSize(), stats.getEncodingQueueSize(),
stats.getPacketsLost(), stats.getPacketLostRatio(), stats.getJitterMs(), stats.getRoundTripTimeMs());
}

});


}

@Override
public void setQualityParameters(String id, String quality, double speed, int pendingPacketSize, long updateTimeMs) {
PublishStatsEvent stats = new PublishStatsEvent();
stats.setSpeed(speed);
stats.setInputQueueSize(pendingPacketSize);

setQualityParameters(id, stats, updateTimeMs);
}

@Override
Expand Down Expand Up @@ -1722,7 +1764,7 @@ public void waitUntilLiveStreamsStopped() {
+ " total wait time: {}ms", getScope().getName(), i*waitPeriod);
}
if (i>10) {
logger.error("Not all live streams're stopped gracefully. It will update the streams' status to finished explicitly");
logger.error("Not all live streams're stopped gracefully. It will update the streams' status to finished_unexpectedly");
everythingHasStopped = false;
break;
}
Expand All @@ -1743,8 +1785,8 @@ public void waitUntilLiveStreamsStopped() {
//if it's not closed properly, let's set the state to failed
BroadcastUpdate broadcastUpdate = new BroadcastUpdate();

broadcastUpdate.setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED);
broadcastUpdate.setPlayListStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED);
broadcastUpdate.setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_TERMINATED_UNEXPECTEDLY);
broadcastUpdate.setPlayListStatus(IAntMediaStreamHandler.BROADCAST_STATUS_TERMINATED_UNEXPECTEDLY);
broadcastUpdate.setWebRTCViewerCount(0);
broadcastUpdate.setHlsViewerCount(0);
broadcastUpdate.setDashViewerCount(0);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/AppSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -2434,8 +2434,8 @@ public boolean isWriteStatsToDatastore() {
* The size of encoding queue to keep the frames waiting for encoding in Stream Adaptor
* default: 150 (5 seconds frame for 30 fps stream)
*/
@Value("${encodingQueueSize:300}")
private int encodingQueueSize = 300;
@Value("${encodingQueueSize:150}")
private int encodingQueueSize = 150;

/**
* Write subscriber events to datastore. It's false by default
Expand Down
Loading
Loading