diff --git a/dd-java-agent/agent-llmobs/build.gradle b/dd-java-agent/agent-llmobs/build.gradle index 5e4d7cd6a78..3b81fa1e2e2 100644 --- a/dd-java-agent/agent-llmobs/build.gradle +++ b/dd-java-agent/agent-llmobs/build.gradle @@ -22,6 +22,7 @@ minimumInstructionCoverage = 0.0 dependencies { api libs.slf4j + implementation libs.jctools implementation project(':communication') implementation project(':components:json') diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java new file mode 100644 index 00000000000..ef75000c7bf --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/EvalProcessingWorker.java @@ -0,0 +1,205 @@ +package datadog.trace.llmobs; + +import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR; +import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; +import static datadog.trace.util.AgentThreadFactory.newAgentThread; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.communication.http.HttpRetryPolicy; +import datadog.communication.http.OkHttpUtils; +import datadog.trace.api.Config; +import datadog.trace.llmobs.domain.LLMObsEval; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import okhttp3.Headers; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.jctools.queues.MpscBlockingConsumerArrayQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EvalProcessingWorker implements AutoCloseable { + + private static final String EVAL_METRIC_API_DOMAIN = "api"; + private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric"; + + private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain"; + private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY"; + + private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class); + + private final MpscBlockingConsumerArrayQueue queue; + private final Thread serializerThread; + + public EvalProcessingWorker( + final int capacity, + final long flushInterval, + final TimeUnit timeUnit, + final SharedCommunicationObjects sco, + Config config) { + this.queue = new MpscBlockingConsumerArrayQueue<>(capacity); + + boolean isAgentless = config.isLlmObsAgentlessEnabled(); + if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) { + log.error("Agentless eval metric submission requires an API key"); + } + + Headers headers; + HttpUrl submissionUrl; + if (isAgentless) { + submissionUrl = + HttpUrl.get( + "https://" + + EVAL_METRIC_API_DOMAIN + + "." + + config.getSite() + + "/" + + EVAL_METRIC_API_PATH); + headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey()); + } else { + submissionUrl = + HttpUrl.get( + sco.agentUrl.toString() + + DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT + + EVAL_METRIC_API_PATH); + headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN); + } + + EvalSerializingHandler serializingHandler = + new EvalSerializingHandler(queue, flushInterval, timeUnit, submissionUrl, headers); + this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler); + } + + public void start() { + this.serializerThread.start(); + } + + public boolean addToQueue(final LLMObsEval eval) { + return queue.offer(eval); + } + + @Override + public void close() { + serializerThread.interrupt(); + try { + serializerThread.join(THREAD_JOIN_TIMOUT_MS); + } catch (InterruptedException ignored) { + } + } + + public static class EvalSerializingHandler implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class); + private static final int FLUSH_THRESHOLD = 50; + + private final MpscBlockingConsumerArrayQueue queue; + private final long ticksRequiredToFlush; + private long lastTicks; + + private final Moshi moshi; + private final JsonAdapter evalJsonAdapter; + private final OkHttpClient httpClient; + private final HttpUrl submissionUrl; + private final Headers headers; + + private final List buffer = new ArrayList<>(); + + public EvalSerializingHandler( + final MpscBlockingConsumerArrayQueue queue, + final long flushInterval, + final TimeUnit timeUnit, + final HttpUrl submissionUrl, + final Headers headers) { + this.queue = queue; + this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build(); + + this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class); + this.httpClient = new OkHttpClient(); + this.submissionUrl = submissionUrl; + this.headers = headers; + + this.lastTicks = System.nanoTime(); + this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval); + + log.debug("starting eval metric serializer, url={}", submissionUrl); + } + + @Override + public void run() { + try { + runDutyCycle(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + log.debug( + "eval processor worker exited. submitting evals stopped. unsubmitted evals left: " + + !queuesAreEmpty()); + } + + private void runDutyCycle() throws InterruptedException { + Thread thread = Thread.currentThread(); + while (!thread.isInterrupted()) { + consumeBatch(); + flushIfNecessary(); + } + } + + private void consumeBatch() { + queue.drain(buffer::add, queue.size()); + } + + protected void flushIfNecessary() { + if (buffer.isEmpty()) { + return; + } + if (shouldFlush()) { + LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer); + HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true); + + String reqBod = evalJsonAdapter.toJson(llmobsEvalReq); + + RequestBody requestBody = + RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod); + Request request = + new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build(); + + try (okhttp3.Response response = + OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { + + if (response.isSuccessful()) { + log.debug("successfully flushed evaluation request with {} evals", this.buffer.size()); + this.buffer.clear(); + } else { + log.error( + "Could not submit eval metrics (HTTP code " + + response.code() + + ")" + + (response.body() != null ? ": " + response.body().string() : "")); + } + } catch (Exception e) { + log.error("Could not submit eval metrics", e); + } + } + } + + private boolean shouldFlush() { + long nanoTime = System.nanoTime(); + long ticks = nanoTime - lastTicks; + if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) { + lastTicks = nanoTime; + return true; + } + return false; + } + + protected boolean queuesAreEmpty() { + return queue.isEmpty(); + } + } +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java index fbfef5c771f..e184a645f71 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java @@ -7,8 +7,11 @@ import datadog.trace.api.llmobs.LLMObsTags; import datadog.trace.bootstrap.instrumentation.api.Tags; import datadog.trace.llmobs.domain.DDLLMObsSpan; +import datadog.trace.llmobs.domain.LLMObsEval; import datadog.trace.llmobs.domain.LLMObsInternal; import java.lang.instrument.Instrumentation; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,98 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) { LLMObsInternal.setLLMObsSpanFactory( new LLMObsManualSpanFactory(config.getLlmObsMlApp(), config.getServiceName())); + + String mlApp = config.getLlmObsMlApp(); + LLMObsInternal.setLLMObsSpanFactory( + new LLMObsManualSpanFactory(mlApp, config.getServiceName())); + + LLMObsInternal.setLLMObsEvalProcessor(new LLMObsCustomEvalProcessor(mlApp, sco, config)); + } + + private static class LLMObsCustomEvalProcessor implements LLMObs.LLMObsEvalProcessor { + private final String defaultMLApp; + private final EvalProcessingWorker evalProcessingWorker; + + public LLMObsCustomEvalProcessor( + String defaultMLApp, SharedCommunicationObjects sco, Config config) { + + this.defaultMLApp = defaultMLApp; + this.evalProcessingWorker = + new EvalProcessingWorker(1024, 100, TimeUnit.MILLISECONDS, sco, config); + this.evalProcessingWorker.start(); + } + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, double scoreValue, Map tags) { + SubmitEvaluation(llmObsSpan, label, scoreValue, defaultMLApp, tags); + } + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + double scoreValue, + String mlApp, + Map tags) { + if (llmObsSpan == null) { + LOGGER.error("null llm obs span provided, eval not recorded"); + return; + } + + if (mlApp == null || mlApp.isEmpty()) { + mlApp = defaultMLApp; + } + String traceID = llmObsSpan.getTraceId().toHexString(); + long spanID = llmObsSpan.getSpanId(); + LLMObsEval.Score score = + new LLMObsEval.Score( + traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, scoreValue); + if (!this.evalProcessingWorker.addToQueue(score)) { + LOGGER.warn( + "queue full, failed to add score eval, ml_app={}, trace_id={}, span_id={}, label={}", + mlApp, + traceID, + spanID, + label); + } + } + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, String categoricalValue, Map tags) { + SubmitEvaluation(llmObsSpan, label, categoricalValue, defaultMLApp, tags); + } + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + String categoricalValue, + String mlApp, + Map tags) { + if (llmObsSpan == null) { + LOGGER.error("null llm obs span provided, eval not recorded"); + return; + } + + if (mlApp == null || mlApp.isEmpty()) { + mlApp = defaultMLApp; + } + String traceID = llmObsSpan.getTraceId().toHexString(); + long spanID = llmObsSpan.getSpanId(); + LLMObsEval.Categorical category = + new LLMObsEval.Categorical( + traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, categoricalValue); + if (!this.evalProcessingWorker.addToQueue(category)) { + LOGGER.warn( + "queue full, failed to add categorical eval, ml_app={}, trace_id={}, span_id={}, label={}", + mlApp, + traceID, + spanID, + label); + } + } } private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory { diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java index 734bf1df526..71d13596090 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java @@ -1,10 +1,13 @@ package datadog.trace.llmobs.domain; +import datadog.context.ContextScope; import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTraceId; import datadog.trace.api.llmobs.LLMObs; import datadog.trace.api.llmobs.LLMObsSpan; import datadog.trace.api.llmobs.LLMObsTags; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.Tags; import java.util.Collections; @@ -29,6 +32,7 @@ public class DDLLMObsSpan implements LLMObsSpan { private static final String OUTPUT = LLMOBS_TAG_PREFIX + "output"; private static final String SPAN_KIND = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND; private static final String METADATA = LLMOBS_TAG_PREFIX + LLMObsTags.METADATA; + private static final String PARENT_ID_TAG_INTERNAL = "parent_id"; private static final String LLM_OBS_INSTRUMENTATION_NAME = "llmobs"; @@ -36,6 +40,7 @@ public class DDLLMObsSpan implements LLMObsSpan { private final AgentSpan span; private final String spanKind; + private final ContextScope scope; private boolean finished = false; @@ -63,6 +68,24 @@ public DDLLMObsSpan( if (sessionId != null && !sessionId.isEmpty()) { this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionId); } + + AgentSpanContext parent = LLMObsState.getLLMObsParentContext(); + String parentSpanID = LLMObsState.ROOT_SPAN_ID; + if (null != parent) { + if (parent.getTraceId() != this.span.getTraceId()) { + LOGGER.error( + "trace ID mismatch, retrieved parent from context trace_id={}, span_id={}, started span trace_id={}, span_id={}", + parent.getTraceId(), + parent.getSpanId(), + this.span.getTraceId(), + this.span.getSpanId()); + } else { + parentSpanID = String.valueOf(parent.getSpanId()); + } + } + this.span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID); + this.scope = LLMObsState.attach(); + LLMObsState.setLLMObsParentContext(this.span.context()); } @Override @@ -271,6 +294,17 @@ public void finish() { return; } this.span.finish(); + this.scope.close(); this.finished = true; } + + @Override + public DDTraceId getTraceId() { + return this.span.getTraceId(); + } + + @Override + public long getSpanId() { + return this.span.getSpanId(); + } } diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsEval.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsEval.java new file mode 100644 index 00000000000..8438e398e38 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsEval.java @@ -0,0 +1,135 @@ +package datadog.trace.llmobs.domain; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonDataException; +import com.squareup.moshi.JsonReader; +import com.squareup.moshi.JsonWriter; +import com.squareup.moshi.Moshi; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; + +public abstract class LLMObsEval { + private static final String METRIC_TYPE_SCORE = "score"; + private static final String METRIC_TYPE_CATEGORICAL = "categorical"; + + public final String trace_id; + public final String span_id; + public final long timestamp_ms; + public final String ml_app; + public final String metric_type; + public final String label; + public final List tags; + + public LLMObsEval( + String traceID, + String spanID, + long timestampMs, + String mlApp, + String metricType, + String label, + Map tags) { + this.trace_id = traceID; + this.span_id = spanID; + this.timestamp_ms = timestampMs; + this.ml_app = mlApp; + this.metric_type = metricType; + this.label = label; + if (tags != null) { + List tagsList = new ArrayList<>(tags.size()); + for (Map.Entry entry : tags.entrySet()) { + tagsList.add(entry.getKey() + ":" + entry.getValue()); + } + this.tags = tagsList; + } else { + this.tags = null; + } + } + + public static final class Adapter extends JsonAdapter { + private final Moshi moshi = new Moshi.Builder().build(); + private final JsonAdapter scoreJsonAdapter = moshi.adapter(Score.class); + private final JsonAdapter categoricalJsonAdapter = + moshi.adapter(Categorical.class); + + @Nullable + @Override + public LLMObsEval fromJson(JsonReader reader) { + return null; + } + + @Override + public void toJson(JsonWriter writer, LLMObsEval value) throws IOException { + if (value == null) { + throw new JsonDataException("unexpectedly got null llm obs eval "); + } + if (value instanceof Score) { + scoreJsonAdapter.toJson(writer, (Score) value); + } else if (value instanceof Categorical) { + categoricalJsonAdapter.toJson(writer, (Categorical) value); + } else { + throw new JsonDataException("Unknown llm obs eval subclass: " + value.getClass()); + } + } + } + + public static final class Score extends LLMObsEval { + public final double score_value; + + public Score( + String traceID, + long spanID, + long timestampMS, + String mlApp, + String label, + Map tags, + double scoreValue) { + super(traceID, String.valueOf(spanID), timestampMS, mlApp, METRIC_TYPE_SCORE, label, tags); + this.score_value = scoreValue; + } + } + + public static final class Categorical extends LLMObsEval { + public final String categorical_value; + + public Categorical( + String traceID, + long spanID, + long timestampMS, + String mlApp, + String label, + Map tags, + String categoricalValue) { + super( + traceID, + String.valueOf(spanID), + timestampMS, + mlApp, + METRIC_TYPE_CATEGORICAL, + label, + tags); + this.categorical_value = categoricalValue; + } + } + + public static final class Request { + public final Data data; + + public static class Data { + public final String type = "evaluation_metric"; + public Attributes attributes; + } + + public static class Attributes { + public List metrics; + } + + public Request(List metrics) { + this.data = new Data(); + this.data.attributes = new Attributes(); + this.data.attributes.metrics = metrics; + } + } +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java index 42b0c097e48..85e1482b412 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java @@ -3,8 +3,11 @@ import datadog.trace.api.llmobs.LLMObs; public class LLMObsInternal extends LLMObs { - public static void setLLMObsSpanFactory(final LLMObsSpanFactory factory) { LLMObs.SPAN_FACTORY = factory; } + + public static void setLLMObsEvalProcessor(final LLMObsEvalProcessor evalProcessor) { + LLMObs.EVAL_PROCESSOR = evalProcessor; + } } diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsState.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsState.java new file mode 100644 index 00000000000..84f05afc94a --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsState.java @@ -0,0 +1,37 @@ +package datadog.trace.llmobs.domain; + +import datadog.context.Context; +import datadog.context.ContextKey; +import datadog.context.ContextScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; + +public class LLMObsState { + public static final String ROOT_SPAN_ID = "undefined"; + + private static final ContextKey CONTEXT_KEY = ContextKey.named("llmobs_span"); + + private AgentSpanContext parentSpanID; + + public static ContextScope attach() { + return Context.current().with(CONTEXT_KEY, new LLMObsState()).attach(); + } + + private static LLMObsState fromContext() { + return Context.current().get(CONTEXT_KEY); + } + + public static AgentSpanContext getLLMObsParentContext() { + LLMObsState state = fromContext(); + if (state != null) { + return state.parentSpanID; + } + return null; + } + + public static void setLLMObsParentContext(AgentSpanContext llmObsParentContext) { + LLMObsState state = fromContext(); + if (state != null) { + state.parentSpanID = llmObsParentContext; + } + } +} diff --git a/dd-trace-api/build.gradle b/dd-trace-api/build.gradle index bbf7d4a0953..405d4fc6dd5 100644 --- a/dd-trace-api/build.gradle +++ b/dd-trace-api/build.gradle @@ -38,6 +38,7 @@ excludedClassesCoverage += [ 'datadog.trace.api.llmobs.LLMObsSpan', 'datadog.trace.api.llmobs.noop.NoOpLLMObsSpan', 'datadog.trace.api.llmobs.noop.NoOpLLMObsSpanFactory', + 'datadog.trace.api.llmobs.noop.NoOpLLMObsEvalProcessor', 'datadog.trace.api.experimental.DataStreamsCheckpointer', 'datadog.trace.api.experimental.DataStreamsCheckpointer.NoOp', 'datadog.trace.api.experimental.DataStreamsContextCarrier', @@ -56,6 +57,7 @@ excludedClassesCoverage += [ description = 'dd-trace-api' dependencies { + api libs.slf4j testImplementation libs.guava testImplementation project(':utils:test-utils') } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java index fd3e1f0a952..392e76a4a82 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java @@ -1,5 +1,6 @@ package datadog.trace.api.llmobs; +import datadog.trace.api.llmobs.noop.NoOpLLMObsEvalProcessor; import datadog.trace.api.llmobs.noop.NoOpLLMObsSpanFactory; import java.util.List; import java.util.Map; @@ -9,6 +10,7 @@ public class LLMObs { protected LLMObs() {} protected static LLMObsSpanFactory SPAN_FACTORY = NoOpLLMObsSpanFactory.INSTANCE; + protected static LLMObsEvalProcessor EVAL_PROCESSOR = NoOpLLMObsEvalProcessor.INSTANCE; public static LLMObsSpan startLLMSpan( String spanName, @@ -44,6 +46,34 @@ public static LLMObsSpan startWorkflowSpan( return SPAN_FACTORY.startWorkflowSpan(spanName, mlApp, sessionId); } + public static void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, String categoricalValue, Map tags) { + EVAL_PROCESSOR.SubmitEvaluation(llmObsSpan, label, categoricalValue, tags); + } + + public static void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + String categoricalValue, + String mlApp, + Map tags) { + EVAL_PROCESSOR.SubmitEvaluation(llmObsSpan, label, categoricalValue, mlApp, tags); + } + + public static void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, double scoreValue, Map tags) { + EVAL_PROCESSOR.SubmitEvaluation(llmObsSpan, label, scoreValue, tags); + } + + public static void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + double scoreValue, + String mlApp, + Map tags) { + EVAL_PROCESSOR.SubmitEvaluation(llmObsSpan, label, scoreValue, mlApp, tags); + } + public interface LLMObsSpanFactory { LLMObsSpan startLLMSpan( String spanName, @@ -62,6 +92,28 @@ LLMObsSpan startWorkflowSpan( String spanName, @Nullable String mlApp, @Nullable String sessionId); } + public interface LLMObsEvalProcessor { + void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, double scoreValue, Map tags); + + void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + double scoreValue, + String mlApp, + Map tags); + + void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, String categoricalValue, Map tags); + + void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + String categoricalValue, + String mlApp, + Map tags); + } + public static class ToolCall { private String name; private String type; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java index 80668eabd57..e86c9717e24 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java @@ -1,5 +1,6 @@ package datadog.trace.api.llmobs; +import datadog.trace.api.DDTraceId; import java.util.List; import java.util.Map; @@ -142,4 +143,18 @@ public interface LLMObsSpan { /** Finishes (closes) a span */ void finish(); + + /** + * Gets the TraceId of the span's trace. + * + * @return The TraceId of the span's trace, or {@link DDTraceId#ZERO} if not set. + */ + DDTraceId getTraceId(); + + /** + * Gets the SpanId. + * + * @return The span identifier. + */ + long getSpanId(); } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsEvalProcessor.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsEvalProcessor.java new file mode 100644 index 00000000000..403c0afce02 --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsEvalProcessor.java @@ -0,0 +1,33 @@ +package datadog.trace.api.llmobs.noop; + +import datadog.trace.api.llmobs.LLMObs; +import datadog.trace.api.llmobs.LLMObsSpan; +import java.util.Map; + +public class NoOpLLMObsEvalProcessor implements LLMObs.LLMObsEvalProcessor { + public static final NoOpLLMObsEvalProcessor INSTANCE = new NoOpLLMObsEvalProcessor(); + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, double scoreValue, Map tags) {} + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + double scoreValue, + String mlApp, + Map tags) {} + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, String label, String categoricalValue, Map tags) {} + + @Override + public void SubmitEvaluation( + LLMObsSpan llmObsSpan, + String label, + String categoricalValue, + String mlApp, + Map tags) {} +} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java index a1b160616e7..17ea94101b4 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java @@ -1,5 +1,6 @@ package datadog.trace.api.llmobs.noop; +import datadog.trace.api.DDTraceId; import datadog.trace.api.llmobs.LLMObs; import datadog.trace.api.llmobs.LLMObsSpan; import java.util.List; @@ -58,4 +59,14 @@ public void addThrowable(Throwable throwable) {} @Override public void finish() {} + + @Override + public DDTraceId getTraceId() { + return DDTraceId.ZERO; + } + + @Override + public long getSpanId() { + return 0; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java index 1d832bf3523..fce921787e2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -75,6 +75,8 @@ public class LLMObsSpanMapper implements RemoteMapper { private static final byte[] LLM_TOOL_CALL_ARGUMENTS = "arguments".getBytes(StandardCharsets.UTF_8); + private static final String PARENT_ID_TAG_INTERNAL_FULL = LLMOBS_TAG_PREFIX + "parent_id"; + private final LLMObsSpanMapper.MetaWriter metaWriter = new MetaWriter(); private final int size; @@ -113,8 +115,8 @@ public void map(List> trace, Writable writable) { // 3 writable.writeUTF8(PARENT_ID); - // TODO fix after parent ID tracking is in place - writable.writeString("undefined", null); + writable.writeString(span.getTag(PARENT_ID_TAG_INTERNAL_FULL), null); + span.removeTag(PARENT_ID_TAG_INTERNAL_FULL); // 4 writable.writeUTF8(NAME); diff --git a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java index 67bdf576607..33affffa533 100644 --- a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java +++ b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java @@ -57,7 +57,9 @@ public enum AgentThread { RETRANSFORMER("dd-retransformer"), - LOGS_INTAKE("dd-logs-intake"); + LOGS_INTAKE("dd-logs-intake"), + + LLMOBS_EVALS_PROCESSOR("dd-llmobs-evals-processor"); public final String threadName;