diff --git a/internal/io/mqtt/sink.go b/internal/io/mqtt/sink.go index 7bf7c32ec1..5906915fb3 100644 --- a/internal/io/mqtt/sink.go +++ b/internal/io/mqtt/sink.go @@ -22,6 +22,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/util" + "github.com/lf-edge/ekuiper/v2/internal/topo/node/tracenode" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/connection" ) @@ -115,7 +116,16 @@ func (ms *Sink) Collect(ctx api.StreamContext, item api.RawTuple) error { } } } - + traced, _, span := tracenode.TraceInput(ctx, item, fmt.Sprintf("%s_emit", ctx.GetOpId())) + if traced { + defer span.End() + traceID := span.SpanContext().TraceID() + spanID := span.SpanContext().SpanID() + if props == nil { + props = make(map[string]string) + } + props["traceparent"] = tracenode.BuildTraceParentId(traceID, spanID) + } ctx.GetLogger().Debugf("publishing to topic %s", tpc) return ms.cli.Publish(ctx, tpc, ms.adconf.Qos, ms.adconf.Retained, item.Raw(), props) } diff --git a/internal/io/mqtt/source.go b/internal/io/mqtt/source.go index 7646a8279a..3c6687dcfc 100644 --- a/internal/io/mqtt/source.go +++ b/internal/io/mqtt/source.go @@ -113,12 +113,17 @@ func (ms *SourceConnector) Subscribe(ctx api.StreamContext, ingest api.BytesInge func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg any, ingest api.BytesIngest) { rcvTime := timex.GetNow() - payload, meta, _ := ms.cli.ParseMsg(ctx, msg) + payload, meta, props := ms.cli.ParseMsg(ctx, msg) if ms.eof != nil && ms.eofPayload != nil && bytes.Equal(ms.eofPayload, payload) { ms.eof(ctx) return } - // TODO property trace + // extract trace id + if props != nil { + if tid, ok := props["traceparent"]; ok { + meta["traceId"] = tid + } + } ingest(ctx, payload, meta, rcvTime) }