Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5134311
add APIs for llm obs
gary-huang Sep 19, 2024
b5c7a0a
add llm message class to support llm spans
gary-huang Mar 6, 2025
7f8f586
add llm message class to support llm spans
gary-huang Mar 6, 2025
9206dbe
impl llmobs agent and llmobs apis
gary-huang Jan 27, 2025
f5efd26
support llm messages with tool calls
gary-huang Mar 6, 2025
76740e9
handle default model name and provider
gary-huang Mar 26, 2025
5e66a2f
rm unneeded file
gary-huang Apr 11, 2025
75290af
impl llmobs agent and llmobs apis
gary-huang Jan 27, 2025
7b2db7c
impl llmobs agent
gary-huang Jan 27, 2025
d76c94d
working writer
gary-huang Feb 7, 2025
e0b4510
add support for llm message and tool calls
gary-huang Mar 6, 2025
87cdaf1
impl llmobs agent and llmobs apis
gary-huang Jan 27, 2025
f78b2c5
use new ctx api to track parent span
gary-huang Apr 1, 2025
5fc9069
add api for evals
gary-huang Mar 5, 2025
a74e456
working impl supporting both agentless and agent
gary-huang Mar 6, 2025
19d95d7
handle null tags and default to default ml app if null or empty strin…
gary-huang Apr 21, 2025
52d3502
cleaned up whitespace
Jul 2, 2025
f54c468
resolve merge conflicts
Jul 2, 2025
688d984
remaining merge conflicts
Jul 2, 2025
e62d569
merge with feature branch
Jul 2, 2025
157e53e
fix bad method call
Jul 2, 2025
ade08ca
fixed llmobs intake creation if llmobs not enabled
Jul 3, 2025
973c09f
removed print statements
Jul 3, 2025
384141d
Merge branch 'gary/add-llm-obs-writer' into gary/use-ctx-api
nayeem-kamal Jul 3, 2025
c74a642
ran spotless
Jul 3, 2025
84f1a67
ran spotless
Jul 4, 2025
1eb8db3
added tests for llmobsspanmapper
Jul 7, 2025
75d85a2
fixed coverage for tags
Jul 7, 2025
ec3cf25
Merge branch 'gary/add-llm-obs-writer' into gary/use-ctx-api
nayeem-kamal Jul 8, 2025
7c16dd2
Merge branch 'gary/llmobs-sdk-merge' into gary/use-ctx-api
gary-huang Jul 8, 2025
8c08304
Merge branch 'gary/llmobs-sdk-merge' into gary/use-ctx-api
gary-huang Jul 8, 2025
a8ef30d
Merge branch 'gary/use-ctx-api' into gary/submit-evals-2
gary-huang Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dd-java-agent/agent-llmobs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ minimumInstructionCoverage = 0.0

dependencies {
api libs.slf4j
implementation libs.jctools

implementation project(':communication')
implementation project(':components:json')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package datadog.trace.llmobs;

import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.http.HttpRetryPolicy;
import datadog.communication.http.OkHttpUtils;
import datadog.trace.api.Config;
import datadog.trace.llmobs.domain.LLMObsEval;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EvalProcessingWorker implements AutoCloseable {

private static final String EVAL_METRIC_API_DOMAIN = "api";
private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric";

private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY";

private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);

private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
private final Thread serializerThread;

public EvalProcessingWorker(
final int capacity,
final long flushInterval,
final TimeUnit timeUnit,
final SharedCommunicationObjects sco,
Config config) {
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);

boolean isAgentless = config.isLlmObsAgentlessEnabled();
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
log.error("Agentless eval metric submission requires an API key");
}

Headers headers;
HttpUrl submissionUrl;
if (isAgentless) {
submissionUrl =
HttpUrl.get(
"https://"
+ EVAL_METRIC_API_DOMAIN
+ "."
+ config.getSite()
+ "/"
+ EVAL_METRIC_API_PATH);
headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey());
} else {
submissionUrl =
HttpUrl.get(
sco.agentUrl.toString()
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
+ EVAL_METRIC_API_PATH);
headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN);
}

EvalSerializingHandler serializingHandler =
new EvalSerializingHandler(queue, flushInterval, timeUnit, submissionUrl, headers);
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler);
}

public void start() {
this.serializerThread.start();
}

public boolean addToQueue(final LLMObsEval eval) {
return queue.offer(eval);
}

@Override
public void close() {
serializerThread.interrupt();
try {
serializerThread.join(THREAD_JOIN_TIMOUT_MS);
} catch (InterruptedException ignored) {
}
}

public static class EvalSerializingHandler implements Runnable {

private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
private static final int FLUSH_THRESHOLD = 50;

private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
private final long ticksRequiredToFlush;
private long lastTicks;

private final Moshi moshi;
private final JsonAdapter<LLMObsEval.Request> evalJsonAdapter;
private final OkHttpClient httpClient;
private final HttpUrl submissionUrl;
private final Headers headers;

private final List<LLMObsEval> buffer = new ArrayList<>();

public EvalSerializingHandler(
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
final long flushInterval,
final TimeUnit timeUnit,
final HttpUrl submissionUrl,
final Headers headers) {
this.queue = queue;
this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build();

this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class);
this.httpClient = new OkHttpClient();
this.submissionUrl = submissionUrl;
this.headers = headers;

this.lastTicks = System.nanoTime();
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);

log.debug("starting eval metric serializer, url={}", submissionUrl);
}

@Override
public void run() {
try {
runDutyCycle();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.debug(
"eval processor worker exited. submitting evals stopped. unsubmitted evals left: "
+ !queuesAreEmpty());
}

private void runDutyCycle() throws InterruptedException {
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
consumeBatch();
flushIfNecessary();
}
}

private void consumeBatch() {
queue.drain(buffer::add, queue.size());
}

protected void flushIfNecessary() {
if (buffer.isEmpty()) {
return;
}
if (shouldFlush()) {
LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer);
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);

String reqBod = evalJsonAdapter.toJson(llmobsEvalReq);

RequestBody requestBody =
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
Request request =
new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build();

try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {

if (response.isSuccessful()) {
log.debug("successfully flushed evaluation request with {} evals", this.buffer.size());
this.buffer.clear();
} else {
log.error(
"Could not submit eval metrics (HTTP code "
+ response.code()
+ ")"
+ (response.body() != null ? ": " + response.body().string() : ""));
}
} catch (Exception e) {
log.error("Could not submit eval metrics", e);
}
}
}

private boolean shouldFlush() {
long nanoTime = System.nanoTime();
long ticks = nanoTime - lastTicks;
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
lastTicks = nanoTime;
return true;
}
return false;
}

protected boolean queuesAreEmpty() {
return queue.isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import datadog.trace.api.llmobs.LLMObsTags;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.llmobs.domain.DDLLMObsSpan;
import datadog.trace.llmobs.domain.LLMObsEval;
import datadog.trace.llmobs.domain.LLMObsInternal;
import java.lang.instrument.Instrumentation;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,6 +33,98 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) {

LLMObsInternal.setLLMObsSpanFactory(
new LLMObsManualSpanFactory(config.getLlmObsMlApp(), config.getServiceName()));

String mlApp = config.getLlmObsMlApp();
LLMObsInternal.setLLMObsSpanFactory(
new LLMObsManualSpanFactory(mlApp, config.getServiceName()));

LLMObsInternal.setLLMObsEvalProcessor(new LLMObsCustomEvalProcessor(mlApp, sco, config));
}

private static class LLMObsCustomEvalProcessor implements LLMObs.LLMObsEvalProcessor {
private final String defaultMLApp;
private final EvalProcessingWorker evalProcessingWorker;

public LLMObsCustomEvalProcessor(
String defaultMLApp, SharedCommunicationObjects sco, Config config) {

this.defaultMLApp = defaultMLApp;
this.evalProcessingWorker =
new EvalProcessingWorker(1024, 100, TimeUnit.MILLISECONDS, sco, config);
this.evalProcessingWorker.start();
}

@Override
public void SubmitEvaluation(
LLMObsSpan llmObsSpan, String label, double scoreValue, Map<String, Object> tags) {
SubmitEvaluation(llmObsSpan, label, scoreValue, defaultMLApp, tags);
}

@Override
public void SubmitEvaluation(
LLMObsSpan llmObsSpan,
String label,
double scoreValue,
String mlApp,
Map<String, Object> tags) {
if (llmObsSpan == null) {
LOGGER.error("null llm obs span provided, eval not recorded");
return;
}

if (mlApp == null || mlApp.isEmpty()) {
mlApp = defaultMLApp;
}
String traceID = llmObsSpan.getTraceId().toHexString();
long spanID = llmObsSpan.getSpanId();
LLMObsEval.Score score =
new LLMObsEval.Score(
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, scoreValue);
if (!this.evalProcessingWorker.addToQueue(score)) {
LOGGER.warn(
"queue full, failed to add score eval, ml_app={}, trace_id={}, span_id={}, label={}",
mlApp,
traceID,
spanID,
label);
}
}

@Override
public void SubmitEvaluation(
LLMObsSpan llmObsSpan, String label, String categoricalValue, Map<String, Object> tags) {
SubmitEvaluation(llmObsSpan, label, categoricalValue, defaultMLApp, tags);
}

@Override
public void SubmitEvaluation(
LLMObsSpan llmObsSpan,
String label,
String categoricalValue,
String mlApp,
Map<String, Object> tags) {
if (llmObsSpan == null) {
LOGGER.error("null llm obs span provided, eval not recorded");
return;
}

if (mlApp == null || mlApp.isEmpty()) {
mlApp = defaultMLApp;
}
String traceID = llmObsSpan.getTraceId().toHexString();
long spanID = llmObsSpan.getSpanId();
LLMObsEval.Categorical category =
new LLMObsEval.Categorical(
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, categoricalValue);
if (!this.evalProcessingWorker.addToQueue(category)) {
LOGGER.warn(
"queue full, failed to add categorical eval, ml_app={}, trace_id={}, span_id={}, label={}",
mlApp,
traceID,
spanID,
label);
}
}
}

private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package datadog.trace.llmobs.domain;

import datadog.context.ContextScope;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.llmobs.LLMObs;
import datadog.trace.api.llmobs.LLMObsSpan;
import datadog.trace.api.llmobs.LLMObsTags;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.util.Collections;
Expand All @@ -29,13 +32,15 @@ public class DDLLMObsSpan implements LLMObsSpan {
private static final String OUTPUT = LLMOBS_TAG_PREFIX + "output";
private static final String SPAN_KIND = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND;
private static final String METADATA = LLMOBS_TAG_PREFIX + LLMObsTags.METADATA;
private static final String PARENT_ID_TAG_INTERNAL = "parent_id";

private static final String LLM_OBS_INSTRUMENTATION_NAME = "llmobs";

private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);

private final AgentSpan span;
private final String spanKind;
private final ContextScope scope;

private boolean finished = false;

Expand Down Expand Up @@ -63,6 +68,24 @@ public DDLLMObsSpan(
if (sessionId != null && !sessionId.isEmpty()) {
this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionId);
}

AgentSpanContext parent = LLMObsState.getLLMObsParentContext();
String parentSpanID = LLMObsState.ROOT_SPAN_ID;
if (null != parent) {
if (parent.getTraceId() != this.span.getTraceId()) {
LOGGER.error(
"trace ID mismatch, retrieved parent from context trace_id={}, span_id={}, started span trace_id={}, span_id={}",
parent.getTraceId(),
parent.getSpanId(),
this.span.getTraceId(),
this.span.getSpanId());
} else {
parentSpanID = String.valueOf(parent.getSpanId());
}
}
this.span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID);
this.scope = LLMObsState.attach();
LLMObsState.setLLMObsParentContext(this.span.context());
}

@Override
Expand Down Expand Up @@ -271,6 +294,17 @@ public void finish() {
return;
}
this.span.finish();
this.scope.close();
this.finished = true;
}

@Override
public DDTraceId getTraceId() {
return this.span.getTraceId();
}

@Override
public long getSpanId() {
return this.span.getSpanId();
}
}
Loading