Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tippmar-nr committed Nov 11, 2024
1 parent a0919a5 commit f843d47
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusReceiver.EntityPath; // marty-test-queue
//string identifier = serviceBusReceiver.Identifier; // -9e860ed4-b16b-4d02-96e4-d8ed224ae24b
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // mt-test-servicebus.servicebus.windows.net
string queueName = serviceBusReceiver.EntityPath; // some-queue-name
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

MessageBrokerAction action =
instrumentedMethodCall.MethodCall.Method.MethodName switch
{
"ReceiveMessagesAsync" => MessageBrokerAction.Consume,
"ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume,
"PeekMessagesInternalAsync" => MessageBrokerAction.Peek,
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ???,
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? Abandon sends the message back to the queue for re-delivery
"CompleteMessageAsync" => MessageBrokerAction.Consume,
"DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ???
"DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
Expand All @@ -52,40 +51,39 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
queueName,
serverAddress: fqns );

if (instrumentedMethodCall.IsAsync)
{
return instrumentedMethodCall.IsAsync
?
// return an async delegate
return Delegates.GetAsyncDelegateFor<Task>(
Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
false,
HandleResponse,
TaskContinuationOptions.ExecuteSynchronously);
TaskContinuationOptions.ExecuteSynchronously)
: Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: segment.End,
onSuccess: ExtractDTHeadersIfAvailable);

void HandleResponse(Task responseTask)
void HandleResponse(Task responseTask)
{
try
{
try
{
if (responseTask.IsFaulted)
{
transaction.NoticeError(responseTask.Exception); // TODO ???
return;
}

var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj);
}
finally
if (responseTask.IsFaulted)
{
segment.End();
transaction.NoticeError(responseTask.Exception);
return;
}

var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj);
}
finally
{
segment.End();
}
}

return Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: () => segment.End(),
onSuccess: ExtractDTHeadersIfAvailable);


void ExtractDTHeadersIfAvailable(object resultObj)
Expand All @@ -97,7 +95,7 @@ void ExtractDTHeadersIfAvailable(object resultObj)
case "ReceiveMessagesAsync":
case "ReceiveDeferredMessagesAsync":
case "PeekMessagesInternalAsync":
// if the response contains a list of messages,
// the response contains a list of messages.
// get the first message from the response and extract DT headers
dynamic messages = resultObj;
if (messages.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusReceiver.EntityPath; // marty-test-queue
//string identifier = serviceBusReceiver.Identifier; // -9e860ed4-b16b-4d02-96e4-d8ed224ae24b
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // mt-test-servicebus.servicebus.windows.net
string queueName = serviceBusReceiver.EntityPath; // some-queue-name
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

// determine message broker action based on method name
MessageBrokerAction action =
instrumentedMethodCall.MethodCall.Method.MethodName switch
{
"SendMessagesAsync" => MessageBrokerAction.Produce,
"ScheduleMessagesAsync" => MessageBrokerAction.Produce,
"CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ???,
"CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ???
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};

Expand All @@ -54,14 +53,13 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
if (message.ApplicationProperties is IDictionary<string, object> applicationProperties)
transaction.InsertDistributedTraceHeaders(applicationProperties, ProcessHeaders);
}
}

// return an async delegate
return Delegates.GetAsyncDelegateFor<Task>(agent, segment);

void ProcessHeaders(IDictionary<string, object> applicationProperties, string key, string value)
{
applicationProperties.Add(key, value);
void ProcessHeaders(IDictionary<string, object> applicationProperties, string key, string value)
{
applicationProperties.Add(key, value);
}
}

return instrumentedMethodCall.IsAsync ? Delegates.GetAsyncDelegateFor<Task>(agent, segment) : Delegates.GetDelegateFor(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public abstract class AzureServiceBusWrapperBase : IWrapper
protected const string BrokerVendorName = "AzureServiceBus";

public bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction

public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo);

public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,ITransaction transaction);
Expand Down

0 comments on commit f843d47

Please sign in to comment.