Skip to content

Commit

Permalink
fix dotnet distributed tracing (autogenhub#4)
Browse files Browse the repository at this point in the history
* fix dotnet distributed tracing

* regenerate protos for python
  • Loading branch information
kostapetan authored Oct 1, 2024
1 parent 7fade2d commit 54c6382
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 17 deletions.
7 changes: 3 additions & 4 deletions dotnet/src/Microsoft.AutoGen.Agents.Client/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ private async Task HandleRpcMessage(Message msg)
{
case Message.MessageOneofCase.CloudEvent:
{
// TODO: fix activity extraction
var activity = default(Activity); // ExtractActivity(msg.Event.Type, msg.Event.Attributes);
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state) => state.Agent.CallHandler(state.Item),
(this, msg.CloudEvent),
Expand Down Expand Up @@ -190,11 +189,11 @@ protected async ValueTask PublishEvent(CloudEvent item)
{
//TODO: Reimplement
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
//activity?.SetTag("peer.service", $"{item.DataType}/{item.Namespace}");
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");

var completion = new TaskCompletionSource<CloudEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
// TODO: fix activity
//Context.DistributedContextPropagator.Inject(activity, item., static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
Context.DistributedContextPropagator.Inject(activity, item.Metadata, static (carrier, key, value) => ((IDictionary<string, string>)carrier!)[key] = value);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource<CloudEvent>) state) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private async Task RunReadPump()
message.Response.RequestId = request.OriginalRequestId;
request.Agent.ReceiveMessage(message);
break;
case Message.MessageOneofCase.Event:
case Message.MessageOneofCase.CloudEvent:
// TODO: Reimplement

// HACK: Send the message to an instance of each agent type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public async ValueTask BroadcastEvent(CloudEvent evt)

public async ValueTask<RpcResponse> InvokeRequest(RpcRequest request)
{
//TODO: Reimplement
(string Type, string Key) agentId = (request.Target.Type, request.Target.Key);
if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted)
{
Expand Down
9 changes: 5 additions & 4 deletions protos/cloudevent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ message CloudEvent {

// Optional & Extension Attributes
map<string, CloudEventAttributeValue> attributes = 5;
map<string, string> metadata = 6;

// -- CloudEvent Data (Bytes, Text, or Proto)
oneof data {
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
bytes binary_data = 7;
string text_data = 8;
google.protobuf.Any proto_data = 9;
}

/**
Expand All @@ -45,4 +46,4 @@ message CloudEvent {
google.protobuf.Timestamp ce_timestamp = 7;
}
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ class CloudEvent(google.protobuf.message.Message):
def HasField(self, field_name: typing.Literal["value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ...

@typing.final
class MetadataEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
value: builtins.str
def __init__(
self,
*,
key: builtins.str = ...,
value: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ...

@typing.final
class CloudEventAttributeValue(google.protobuf.message.Message):
"""*
Expand Down Expand Up @@ -80,6 +96,7 @@ class CloudEvent(google.protobuf.message.Message):
SPEC_VERSION_FIELD_NUMBER: builtins.int
TYPE_FIELD_NUMBER: builtins.int
ATTRIBUTES_FIELD_NUMBER: builtins.int
METADATA_FIELD_NUMBER: builtins.int
BINARY_DATA_FIELD_NUMBER: builtins.int
TEXT_DATA_FIELD_NUMBER: builtins.int
PROTO_DATA_FIELD_NUMBER: builtins.int
Expand All @@ -98,6 +115,8 @@ class CloudEvent(google.protobuf.message.Message):
def attributes(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, global___CloudEvent.CloudEventAttributeValue]:
"""Optional & Extension Attributes"""

@property
def metadata(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: ...
@property
def proto_data(self) -> google.protobuf.any_pb2.Any: ...
def __init__(
Expand All @@ -108,12 +127,13 @@ class CloudEvent(google.protobuf.message.Message):
spec_version: builtins.str = ...,
type: builtins.str = ...,
attributes: collections.abc.Mapping[builtins.str, global___CloudEvent.CloudEventAttributeValue] | None = ...,
metadata: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
binary_data: builtins.bytes = ...,
text_data: builtins.str = ...,
proto_data: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ...
def ClearField(self, field_name: typing.Literal["attributes", b"attributes", "binary_data", b"binary_data", "data", b"data", "id", b"id", "metadata", b"metadata", "proto_data", b"proto_data", "source", b"source", "spec_version", b"spec_version", "text_data", b"text_data", "type", b"type"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ...

global___CloudEvent = CloudEvent

0 comments on commit 54c6382

Please sign in to comment.