Skip to content

Commit

Permalink
Add better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
disa6302 committed Mar 24, 2023
1 parent a1d7887 commit 9b7df28
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions CMake/Dependencies/libkvscproducer-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ include(ExternalProject)
# clone repo only
ExternalProject_Add(libkvscproducer-download
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
GIT_TAG 1497d3c6c8299548f53eb077533fd3f87bf3eebf
GIT_TAG cf21f56d5bc47d9a64d983438fcc39fbefa352b7
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-src"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/kvscproducer-build"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
)
28 changes: 15 additions & 13 deletions src/KinesisVideoStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ KinesisVideoStream::KinesisVideoStream(const KinesisVideoProducer& kinesis_video

bool KinesisVideoStream::putFrame(KinesisVideoFrame& frame) const {
if (debug_dump_frame_info_) {
LOG_DEBUG("pts: " << frame.presentationTs << ", dts: " << frame.decodingTs << ", duration: " << frame.duration << ", size: " << frame.size << ", trackId: " << frame.trackId
LOG_DEBUG("[" << this->stream_name_ << "] pts: " << frame.presentationTs << ", dts: " << frame.decodingTs << ", duration: " << frame.duration << ", size: " << frame.size << ", trackId: " << frame.trackId
<< ", isKey: " << CHECK_FRAME_FLAG_KEY_FRAME(frame.flags));
}

assert(0 != stream_handle_);
STATUS status = putKinesisVideoFrame(stream_handle_, &frame);
if (STATUS_FAILED(status)) {
LOG_ERROR("Put frame for " << this->stream_name_ << "failed with 0x" << std::hex << status);
return false;
}

Expand All @@ -40,7 +41,8 @@ bool KinesisVideoStream::putFrame(KinesisVideoFrame& frame) const {
auto total_transfer_tate = 8 * client_metrics.getTotalTransferRate();
auto transfer_rate = 8 * stream_metrics.getCurrentTransferRate();

LOG_DEBUG("Kinesis Video client and stream metrics"
LOG_DEBUG("Kinesis Video client and stream metrics for "
<< this->stream_name_
<< "\n\t>> Overall storage byte size: " << client_metrics.getContentStoreSizeSize()
<< "\n\t>> Available storage byte size: " << client_metrics.getContentStoreAvailableSize()
<< "\n\t>> Allocated storage byte size: " << client_metrics.getContentStoreAllocatedSize()
Expand All @@ -66,19 +68,19 @@ bool KinesisVideoStream::start(const std::string& hexEncodedCodecPrivateData, ui
STATUS status;

if (STATUS_FAILED(status = hexDecode((PCHAR) pStrCpd, 0, NULL, &size))) {
LOG_ERROR("Failed to get the size of the buffer for hex decoding the codec private data with: " << status);
LOG_ERROR("Failed to get the size of the buffer for hex decoding the codec private data with: " << status << " for " << this->stream_name_);
return false;
}

// Allocate the buffer needed
pBuffer = reinterpret_cast<PBYTE>(malloc(size));
if (nullptr == pBuffer) {
LOG_ERROR("Failed to allocate enough buffer for hex decoding. Size: " << size);
LOG_ERROR("Failed to allocate enough buffer for hex decoding. Size: " << size << " for " << this->stream_name_);
return false;
}

if (STATUS_FAILED(status = hexDecode((PCHAR) pStrCpd, 0, pBuffer, &size))) {
LOG_ERROR("Failed to hex decode the codec private data with: " << status);
LOG_ERROR("Failed to hex decode the codec private data with: " << status << " for " << this->stream_name_);
::free(pBuffer);
return false;
}
Expand All @@ -97,7 +99,7 @@ bool KinesisVideoStream::start(const unsigned char* codecPrivateData, size_t cod

if (STATUS_FAILED(status = kinesisVideoStreamFormatChanged(stream_handle_, (UINT32) codecPrivateDataSize,
(PBYTE) codecPrivateData, (UINT64) trackId))) {
LOG_ERROR("Failed to set the codec private data with: " << status);
LOG_ERROR("Failed to set the codec private data with: " << status << " for " << this->stream_name_);
return false;
}

Expand All @@ -115,7 +117,7 @@ bool KinesisVideoStream::resetConnection() {
STATUS status = STATUS_SUCCESS;

if (STATUS_FAILED(status = kinesisVideoStreamResetConnection(stream_handle_))) {
LOG_ERROR("Failed to reset the connection with: " << status);
LOG_ERROR("Failed to reset the connection with: " << status << " for " << this->stream_name_);
return false;
}

Expand All @@ -126,15 +128,15 @@ bool KinesisVideoStream::resetStream() {
STATUS status = STATUS_SUCCESS;

if (STATUS_FAILED(status = kinesisVideoStreamResetStream(stream_handle_))) {
LOG_ERROR("Failed to reset the stream with: " << status);
LOG_ERROR("Failed to reset the stream with: " << status << " for " << this->stream_name_);
return false;
}

return true;
}

void KinesisVideoStream::free() {
LOG_INFO("Freeing Kinesis Video Stream " << stream_name_);
LOG_INFO("Freeing Kinesis Video Stream for " << this->stream_name_);

// Free the underlying stream
std::call_once(free_kinesis_video_stream_flag_, freeKinesisVideoStream, getStreamHandle());
Expand All @@ -144,7 +146,7 @@ bool KinesisVideoStream::stop() {
STATUS status;

if (STATUS_FAILED(status = stopKinesisVideoStream(stream_handle_))) {
LOG_ERROR("Failed to stop the stream with: " << status);
LOG_ERROR("Failed to stop the stream with: " << status << " for " << this->stream_name_);
return false;
}

Expand All @@ -155,7 +157,7 @@ bool KinesisVideoStream::stopSync() {
STATUS status;

if (STATUS_FAILED(status = stopKinesisVideoStreamSync(stream_handle_))) {
LOG_ERROR("Failed to stop the stream with: " << status);
LOG_ERROR("Failed to stop the stream with: " << status << " for " << this->stream_name_);
return false;
}

Expand All @@ -164,7 +166,7 @@ bool KinesisVideoStream::stopSync() {

KinesisVideoStreamMetrics KinesisVideoStream::getMetrics() const {
STATUS status = ::getKinesisVideoStreamMetrics(stream_handle_, (PStreamMetrics) stream_metrics_.getRawMetrics());
LOG_AND_THROW_IF(STATUS_FAILED(status), "Failed to get stream metrics with: " << status);
LOG_AND_THROW_IF(STATUS_FAILED(status), "Failed to get stream metrics with: " << status << " for " << this->stream_name_);

return stream_metrics_;
}
Expand All @@ -174,7 +176,7 @@ bool KinesisVideoStream::putFragmentMetadata(const std::string &name, const std:
const char* pMetadataValue = value.c_str();
STATUS status = ::putKinesisVideoFragmentMetadata(stream_handle_, (PCHAR) pMetadataName, (PCHAR) pMetadataValue, persistent);
if (STATUS_FAILED(status)) {
LOG_ERROR("Failed to insert fragment metadata with: " << status);
LOG_ERROR("Failed to insert fragment metadata with: " << status << " for " << this->stream_name_);
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion src/gstreamer/KvsSinkStreamCallbackProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ KvsSinkStreamCallbackProvider::fragmentAckReceivedHandler(UINT64 custom_data,
auto customDataObj = reinterpret_cast<KvsSinkCustomData*>(custom_data);

if(customDataObj != NULL && customDataObj->kvsSink != NULL && pFragmentAck != NULL) {
LOG_TRACE("[" << customDataObj->kvssink->streamName << "] Fragment Ack received for " << pFragmentAck->ackType << " with timestamp " << pFragmentAck->timestamp);
LOG_TRACE("[" << customDataObj->kvsSink->stream_name << "] Ack timestamp for " << pFragmentAck->ackType << " is " << pFragmentAck->timestamp);
g_signal_emit(G_OBJECT(customDataObj->kvsSink), customDataObj->ack_signal_id, 0, pFragmentAck);
}

return STATUS_SUCCESS;
}

Expand Down
30 changes: 15 additions & 15 deletions src/gstreamer/gstkvssink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ void kinesis_video_producer_init(GstKvsSink *kvssink)
if (0 == strcmp(kvssink->session_token, DEFAULT_SESSION_TOKEN)) {
session_token_str = "";
if (nullptr != (session_token = getenv(SESSION_TOKEN_ENV_VAR))) {
LOG_INFO("Setting session token from env");
LOG_INFO("Setting session token from env for " << kvssink->stream_name);
session_token_str = string(session_token);
}
} else {
LOG_INFO("Setting session token from config");
LOG_INFO("Setting session token from config for " << kvssink->stream_name);
session_token_str = string(kvssink->session_token);
}

Expand All @@ -317,10 +317,10 @@ void kinesis_video_producer_init(GstKvsSink *kvssink)
unique_ptr<CredentialProvider> credential_provider;

if (kvssink->iot_certificate) {
LOG_INFO("Using iot credential provider within KVS sink");
LOG_INFO("Using iot credential provider within KVS sink for " << kvssink->stream_name);
std::map<std::string, std::string> iot_cert_params;
if (!kvs_sink_util::parseIotCredentialGstructure(kvssink->iot_certificate, iot_cert_params)){
LOG_AND_THROW("Failed to parse Iot credentials");
LOG_AND_THROW("Failed to parse Iot credentials for " << kvssink->stream_name);
}
std::map<std::string, std::string>::iterator it = iot_cert_params.find(IOT_THING_NAME);
if (it == iot_cert_params.end()) {
Expand All @@ -345,7 +345,7 @@ void kinesis_video_producer_init(GstKvsSink *kvssink)

// Handle env for providing CP URL
if(nullptr != (control_plane_uri = getenv(CONTROL_PLANE_URI_ENV_VAR))) {
LOG_INFO("Getting URL from env");
LOG_INFO("Getting URL from env for " << kvssink->stream_name);
control_plane_uri_str = string(control_plane_uri);
}

Expand All @@ -367,7 +367,7 @@ void create_kinesis_video_stream(GstKvsSink *kvssink) {
gboolean ret;
ret = kvs_sink_util::gstructToMap(kvssink->stream_tags, &stream_tags);
if (!ret) {
LOG_WARN("Failed to parse stream tags");
LOG_WARN("Failed to parse stream tags for " << kvssink->stream_name);
} else {
p_stream_tags = &stream_tags;
}
Expand Down Expand Up @@ -448,7 +448,7 @@ bool kinesis_video_stream_init(GstKvsSink *kvssink, string &err_msg) {
bool do_retry = true;
while(do_retry) {
try {
LOG_INFO("try creating stream");
LOG_INFO("try creating stream for " << kvssink->stream_name);
// stream is freed when createStreamSync fails
create_kinesis_video_stream(kvssink);
break;
Expand Down Expand Up @@ -1102,12 +1102,12 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads,
goto CleanUp;
}

LOG_INFO("received kvs-add-metadata event");
LOG_INFO("received kvs-add-metadata event for " << kvssink->stream_name);
if (NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_NAME) ||
NULL == gst_structure_get_string(structure, KVS_ADD_METADATA_VALUE) ||
!gst_structure_get_boolean(structure, KVS_ADD_METADATA_PERSISTENT, &persistent)) {

LOG_WARN("Event structure contains invalid field: " << std::string(gst_structure_to_string (structure)));
LOG_WARN("Event structure contains invalid field: " << std::string(gst_structure_to_string (structure)) << " for " << kvssink->stream_name);
goto CleanUp;
}

Expand All @@ -1117,7 +1117,7 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads,

bool result = data->kinesis_video_stream->putFragmentMetadata(metadata_name, metadata_value, is_persist);
if (!result) {
LOG_WARN("Failed to putFragmentMetadata. name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist);
LOG_WARN("Failed to putFragmentMetadata. name: " << metadata_name << ", value: " << metadata_value << ", persistent: " << is_persist << " for " << kvssink->stream_name);
}
gst_event_unref (event);
event = NULL;
Expand Down Expand Up @@ -1196,9 +1196,9 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,

// eos reached
if (buf == NULL && track_data == NULL) {
LOG_INFO("Received event");
LOG_INFO("Received event for " << kvssink->stream_name);
data->kinesis_video_stream->stopSync();
LOG_INFO("Sending eos");
LOG_INFO("Sending eos for " << kvssink->stream_name);

// send out eos message to gstreamer bus
message = gst_message_new_eos (GST_OBJECT_CAST (kvssink));
Expand All @@ -1214,7 +1214,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
if (IS_OFFLINE_STREAMING_MODE(kvssink->streaming_type) || !IS_RETRIABLE_ERROR(stream_status)) {
// fatal cases
GST_ELEMENT_ERROR (kvssink, STREAM, FAILED, (NULL),
("Stream error occurred. Status: 0x%08x", stream_status));
("[%s] Stream error occurred. Status: 0x%08x", kvssink->stream_name, stream_status));
ret = GST_FLOW_ERROR;
goto CleanUp;
} else {
Expand All @@ -1233,7 +1233,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
// drop if buffer contains header and has invalid timestamp
(GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_HEADER) && (!GST_BUFFER_PTS_IS_VALID(buf) || !GST_BUFFER_DTS_IS_VALID(buf)));
if (isDroppable) {
LOG_DEBUG("Dropping frame with flag: " << GST_BUFFER_FLAGS(buf));
LOG_DEBUG("Dropping frame with flag: " << GST_BUFFER_FLAGS(buf) << " for " << kvssink->stream_name);
goto CleanUp;
}

Expand Down Expand Up @@ -1288,7 +1288,7 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
data->frame_count++;
}
else {
LOG_WARN("GStreamer buffer is invalid");
LOG_WARN("GStreamer buffer is invalid for " << kvssink->stream_name);
}

CleanUp:
Expand Down

0 comments on commit 9b7df28

Please sign in to comment.