From ad8ba4f3bc4afa16333aee667acb747be4776bce Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 4 Dec 2024 11:33:42 +0200 Subject: [PATCH 01/17] OPIK-419 insert single and batch traces --- .../com/comet/opik/domain/ProjectDAO.java | 8 +++ .../com/comet/opik/domain/ProjectService.java | 9 +++ .../java/com/comet/opik/domain/TraceDAO.java | 16 +++++- .../com/comet/opik/domain/TraceService.java | 56 +++++++++++++------ ...006_add_projects_last_updated_trace_at.sql | 6 ++ 5 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java index b57105b7b4..f7b5015665 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java @@ -10,6 +10,7 @@ import org.jdbi.v3.sqlobject.customizer.BindList; import org.jdbi.v3.sqlobject.customizer.BindMethods; import org.jdbi.v3.sqlobject.customizer.Define; +import org.jdbi.v3.sqlobject.statement.SqlBatch; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; import org.jdbi.v3.stringtemplate4.UseStringTemplateEngine; @@ -85,4 +86,11 @@ default Optional fetch(UUID id, String workspaceId) { @SqlQuery("SELECT * FROM projects WHERE workspace_id = :workspaceId AND name IN ()") List findByNames(@Bind("workspaceId") String workspaceId, @BindList("names") Collection names); + + @SqlBatch("UPDATE projects SET last_updated_trace_at = :lastUpdatedAt " + + "WHERE workspace_id = :workspace_id" + + " AND id = :id" + + " AND last_updated_trace_at < :lastUpdatedAt") + int[] recordLastUpdatedTrace(@Bind("workspace_id") String workspaceId, + @BindMethods Collection lastUpdatedTraces); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index f16c31c219..1fb4931108 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -30,6 +30,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.time.Instant; +import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -73,6 +74,8 @@ public interface ProjectService { Project getOrCreate(String workspaceId, String projectName, String userName); Project retrieveByName(String projectName); + + void recordLastUpdatedTrace(String workspaceId, Collection lastUpdatedTraces); } @Slf4j @@ -392,4 +395,10 @@ public Project retrieveByName(@NonNull String projectName) { }); } + @Override + public void recordLastUpdatedTrace(String workspaceId, Collection lastUpdatedTraces) { + template.inTransaction(WRITE, + handle -> handle.attach(ProjectDAO.class).recordLastUpdatedTrace(workspaceId, lastUpdatedTraces)); + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index 69ccf87baf..a94502b706 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -147,7 +147,8 @@ INSERT INTO traces( tags, created_at, created_by, - last_updated_by + last_updated_by, + last_updated_at ) SELECT new_trace.id as id, @@ -197,7 +198,8 @@ INSERT INTO traces( LENGTH(old_trace.created_by) > 0, old_trace.created_by, new_trace.created_by ) as created_by, - new_trace.last_updated_by as last_updated_by + new_trace.last_updated_by as last_updated_by, + new_trace.last_updated_at as last_updated_at FROM ( SELECT :id as id, @@ -212,7 +214,8 @@ INSERT INTO traces( :tags as tags, now64(9) as created_at, :user_name as created_by, - :user_name as last_updated_by + :user_name as last_updated_by, + parseDateTime64BestEffort(:last_updated_at, 9) as last_updated_at ) as new_trace LEFT JOIN ( SELECT @@ -735,6 +738,10 @@ private Statement buildInsertStatement(Trace trace, Connection connection, ST te statement.bind("end_time", trace.endTime().toString()); } + if (trace.lastUpdatedAt() != null) { + statement.bind("last_updated_at", trace.lastUpdatedAt().toString()); + } + if (trace.metadata() != null) { statement.bind("metadata", trace.metadata().toString()); } else { @@ -756,6 +763,9 @@ private ST buildInsertTemplate(Trace trace) { Optional.ofNullable(trace.endTime()) .ifPresent(endTime -> template.add("end_time", endTime)); + Optional.ofNullable(trace.lastUpdatedAt()) + .ifPresent(lastUpdatedAt -> template.add("last_updated_at", lastUpdatedAt)); + return template; } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index 3a4b74d6b5..561ca9dda4 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -3,6 +3,7 @@ import com.clickhouse.client.ClickHouseException; import com.comet.opik.api.BiInformationResponse; import com.comet.opik.api.Project; +import com.comet.opik.api.ProjectIdLastUpdated; import com.comet.opik.api.ProjectStats; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; @@ -33,6 +34,7 @@ import reactor.core.scheduler.Schedulers; import java.time.Instant; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -92,35 +94,57 @@ public Mono create(@NonNull Trace trace) { String projectName = WorkspaceUtils.getProjectName(trace.projectName()); UUID id = trace.id() == null ? idGenerator.generateId() : trace.id(); + var finalTrace = trace.toBuilder().lastUpdatedAt(Instant.now()).build(); - return IdGenerator + return AsyncUtils.makeMonoContextAware((userName, workspaceId) -> IdGenerator .validateVersionAsync(id, TRACE_KEY) .then(Mono.defer(() -> getOrCreateProject(projectName))) .flatMap(project -> lockService.executeWithLock( new LockService.Lock(id, TRACE_KEY), - Mono.defer(() -> insertTrace(trace, project, id)))); + insertTrace(finalTrace, project, id) + .then(Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace( + workspaceId, List.of(new ProjectIdLastUpdated( + project.id(), finalTrace.lastUpdatedAt()))))) + .thenReturn(id)))); } @WithSpan public Mono create(TraceBatch batch) { - Preconditions.checkArgument(!batch.traces().isEmpty(), "Batch traces cannot be empty"); - List projectNames = batch.traces() - .stream() - .map(Trace::projectName) - .map(WorkspaceUtils::getProjectName) - .distinct() - .toList(); + return AsyncUtils.makeMonoContextAware(((userName, workspaceId) -> { + List projectNames = batch.traces() + .stream() + .map(Trace::projectName) + .map(WorkspaceUtils::getProjectName) + .distinct() + .toList(); + + Mono> resolveProjects = Flux.fromIterable(projectNames) + .flatMap(this::getOrCreateProject) + .collectList() + .map(projects -> bindTraceToProjectAndId(batch, projects)) + .subscribeOn(Schedulers.boundedElastic()); - Mono> resolveProjects = Flux.fromIterable(projectNames) - .flatMap(this::getOrCreateProject) - .collectList() - .map(projects -> bindTraceToProjectAndId(batch, projects)) - .subscribeOn(Schedulers.boundedElastic()); + return resolveProjects + .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection) + .flatMap(count -> Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace(workspaceId, + getLastUpdatedProjects(traces))) + .thenReturn(count)))); + })); + } + + private List getLastUpdatedProjects(List traces) { + return traces.stream().collect(Collectors.groupingBy(Trace::projectId)).entrySet().stream() + .map(entry -> new ProjectIdLastUpdated(entry.getKey(), findLatestUpdatedAt(entry.getValue()))) + .toList(); + } - return resolveProjects - .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection))); + private Instant findLatestUpdatedAt(List traces) { + return traces.stream() + .map(Trace::lastUpdatedAt) + .sorted(Comparator.reverseOrder()) + .toList().getFirst(); } private List bindTraceToProjectAndId(TraceBatch batch, List projects) { diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql new file mode 100644 index 0000000000..d4b7b77b89 --- /dev/null +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql @@ -0,0 +1,6 @@ +--liquibase formatted sql +--changeset idoberko2:add_projects_last_updated_trace_at + +ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6) NOT NULL DEFAULT '1970-01-01 00:00:01'; + +--rollback ALTER TABLE projects DROP COLUMN last_updated_trace_at; From 4f1289ed34ba964adc1fb93b5c797eef55369f88 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 4 Dec 2024 12:30:46 +0200 Subject: [PATCH 02/17] OPIK-419 null timestamp --- .../migrations/000006_add_projects_last_updated_trace_at.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql index d4b7b77b89..ea9f7dce8e 100644 --- a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql @@ -1,6 +1,6 @@ --liquibase formatted sql --changeset idoberko2:add_projects_last_updated_trace_at -ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6) NOT NULL DEFAULT '1970-01-01 00:00:01'; +ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6); --rollback ALTER TABLE projects DROP COLUMN last_updated_trace_at; From e4457100ee24d132044c3821b9545118cdc5d97a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 4 Dec 2024 14:30:35 +0200 Subject: [PATCH 03/17] OPIK-419 create single and batch --- .../comet/opik/api/events/TracesCreated.java | 20 ++++++ .../v1/events/ProjectEventListener.java | 41 ++++++++++++ .../com/comet/opik/domain/ProjectDAO.java | 2 +- .../java/com/comet/opik/domain/TraceDAO.java | 16 +---- .../com/comet/opik/domain/TraceService.java | 65 +++++++++---------- .../opik/domain/TraceServiceImplTest.java | 7 +- 6 files changed, 102 insertions(+), 49 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java new file mode 100644 index 0000000000..b47529ae6d --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java @@ -0,0 +1,20 @@ +package com.comet.opik.api.events; + +import com.comet.opik.infrastructure.events.BaseEvent; +import lombok.Getter; +import lombok.NonNull; +import lombok.experimental.Accessors; + +import java.util.Set; +import java.util.UUID; + +@Getter +@Accessors(fluent = true) +public class TracesCreated extends BaseEvent { + private final @NonNull Set projectIds; + + public TracesCreated(@NonNull Set projectIds, @NonNull String workspaceId, @NonNull String userName) { + super(workspaceId, userName); + this.projectIds = projectIds; + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java new file mode 100644 index 0000000000..38ae57ccdd --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java @@ -0,0 +1,41 @@ +package com.comet.opik.api.resources.v1.events; + +import com.comet.opik.api.ProjectIdLastUpdated; +import com.comet.opik.api.events.TracesCreated; +import com.comet.opik.domain.ProjectService; +import com.comet.opik.domain.TraceService; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import jakarta.inject.Inject; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton; + +@EagerSingleton +@Slf4j +public class ProjectEventListener { + private final ProjectService projectService; + private final TraceService traceService; + + @Inject + public ProjectEventListener(EventBus eventBus, ProjectService projectService, TraceService traceService) { + this.projectService = projectService; + this.traceService = traceService; + eventBus.register(this); + } + + @Subscribe + public void onTracesCreated(TracesCreated event) { + log.info("Recording last traces for projects '{}'", event.projectIds()); + + traceService.getLastUpdatedTraceAt(event.projectIds(), event.workspaceId()) + .flatMap(lastTraceByProjectId -> Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace( + event.workspaceId(), + lastTraceByProjectId.entrySet().stream() + .map(entry -> new ProjectIdLastUpdated(entry.getKey(), entry.getValue())).toList()))) + .block(); + + log.info("Recorded last traces for projects '{}'", event.projectIds()); + } + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java index f7b5015665..4832d79d7e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java @@ -90,7 +90,7 @@ default Optional fetch(UUID id, String workspaceId) { @SqlBatch("UPDATE projects SET last_updated_trace_at = :lastUpdatedAt " + "WHERE workspace_id = :workspace_id" + " AND id = :id" + - " AND last_updated_trace_at < :lastUpdatedAt") + " AND (last_updated_trace_at IS NULL OR last_updated_trace_at < :lastUpdatedAt)") int[] recordLastUpdatedTrace(@Bind("workspace_id") String workspaceId, @BindMethods Collection lastUpdatedTraces); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java index a94502b706..69ccf87baf 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java @@ -147,8 +147,7 @@ INSERT INTO traces( tags, created_at, created_by, - last_updated_by, - last_updated_at + last_updated_by ) SELECT new_trace.id as id, @@ -198,8 +197,7 @@ INSERT INTO traces( LENGTH(old_trace.created_by) > 0, old_trace.created_by, new_trace.created_by ) as created_by, - new_trace.last_updated_by as last_updated_by, - new_trace.last_updated_at as last_updated_at + new_trace.last_updated_by as last_updated_by FROM ( SELECT :id as id, @@ -214,8 +212,7 @@ INSERT INTO traces( :tags as tags, now64(9) as created_at, :user_name as created_by, - :user_name as last_updated_by, - parseDateTime64BestEffort(:last_updated_at, 9) as last_updated_at + :user_name as last_updated_by ) as new_trace LEFT JOIN ( SELECT @@ -738,10 +735,6 @@ private Statement buildInsertStatement(Trace trace, Connection connection, ST te statement.bind("end_time", trace.endTime().toString()); } - if (trace.lastUpdatedAt() != null) { - statement.bind("last_updated_at", trace.lastUpdatedAt().toString()); - } - if (trace.metadata() != null) { statement.bind("metadata", trace.metadata().toString()); } else { @@ -763,9 +756,6 @@ private ST buildInsertTemplate(Trace trace) { Optional.ofNullable(trace.endTime()) .ifPresent(endTime -> template.add("end_time", endTime)); - Optional.ofNullable(trace.lastUpdatedAt()) - .ifPresent(lastUpdatedAt -> template.add("last_updated_at", lastUpdatedAt)); - return template; } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index 561ca9dda4..bfee37faed 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -3,7 +3,6 @@ import com.clickhouse.client.ClickHouseException; import com.comet.opik.api.BiInformationResponse; import com.comet.opik.api.Project; -import com.comet.opik.api.ProjectIdLastUpdated; import com.comet.opik.api.ProjectStats; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; @@ -13,12 +12,14 @@ import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.error.IdentifierMismatchException; +import com.comet.opik.api.events.TracesCreated; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; import com.comet.opik.utils.AsyncUtils; import com.comet.opik.utils.WorkspaceUtils; import com.google.common.base.Preconditions; +import com.google.common.eventbus.EventBus; import com.google.inject.ImplementedBy; import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; @@ -34,7 +35,6 @@ import reactor.core.scheduler.Schedulers; import java.time.Instant; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,6 +70,8 @@ public interface TraceService { Mono getStats(TraceSearchCriteria searchCriteria); Mono getDailyCreatedCount(); + + Mono> getLastUpdatedTraceAt(Set projectIds, String workspaceId); } @Slf4j @@ -87,6 +89,7 @@ class TraceServiceImpl implements TraceService { private final @NonNull ProjectService projectService; private final @NonNull IdGenerator idGenerator; private final @NonNull LockService lockService; + private final @NonNull EventBus eventBus; @Override @WithSpan @@ -94,32 +97,32 @@ public Mono create(@NonNull Trace trace) { String projectName = WorkspaceUtils.getProjectName(trace.projectName()); UUID id = trace.id() == null ? idGenerator.generateId() : trace.id(); - var finalTrace = trace.toBuilder().lastUpdatedAt(Instant.now()).build(); - return AsyncUtils.makeMonoContextAware((userName, workspaceId) -> IdGenerator + return Mono.deferContextual(ctx -> IdGenerator .validateVersionAsync(id, TRACE_KEY) .then(Mono.defer(() -> getOrCreateProject(projectName))) .flatMap(project -> lockService.executeWithLock( new LockService.Lock(id, TRACE_KEY), - insertTrace(finalTrace, project, id) - .then(Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace( - workspaceId, List.of(new ProjectIdLastUpdated( - project.id(), finalTrace.lastUpdatedAt()))))) - .thenReturn(id)))); + Mono.defer(() -> insertTrace(trace, project, id))) + .doOnSuccess(__ -> eventBus.post(new TracesCreated( + Set.of(project.id()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME)))))); } @WithSpan public Mono create(TraceBatch batch) { + Preconditions.checkArgument(!batch.traces().isEmpty(), "Batch traces cannot be empty"); - return AsyncUtils.makeMonoContextAware(((userName, workspaceId) -> { - List projectNames = batch.traces() - .stream() - .map(Trace::projectName) - .map(WorkspaceUtils::getProjectName) - .distinct() - .toList(); + List projectNames = batch.traces() + .stream() + .map(Trace::projectName) + .map(WorkspaceUtils::getProjectName) + .distinct() + .toList(); + return Mono.deferContextual(ctx -> { Mono> resolveProjects = Flux.fromIterable(projectNames) .flatMap(this::getOrCreateProject) .collectList() @@ -127,24 +130,12 @@ public Mono create(TraceBatch batch) { .subscribeOn(Schedulers.boundedElastic()); return resolveProjects - .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection) - .flatMap(count -> Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace(workspaceId, - getLastUpdatedProjects(traces))) - .thenReturn(count)))); - })); - } - - private List getLastUpdatedProjects(List traces) { - return traces.stream().collect(Collectors.groupingBy(Trace::projectId)).entrySet().stream() - .map(entry -> new ProjectIdLastUpdated(entry.getKey(), findLatestUpdatedAt(entry.getValue()))) - .toList(); - } - - private Instant findLatestUpdatedAt(List traces) { - return traces.stream() - .map(Trace::lastUpdatedAt) - .sorted(Comparator.reverseOrder()) - .toList().getFirst(); + .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection)) + .doOnSuccess(__ -> eventBus.post(new TracesCreated( + traces.stream().map(Trace::projectId).collect(Collectors.toUnmodifiableSet()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME))))); + }); } private List bindTraceToProjectAndId(TraceBatch batch, List projects) { @@ -397,4 +388,10 @@ public Mono getDailyCreatedCount() { return dao.getDailyTraces(); } + + @Override + public Mono> getLastUpdatedTraceAt(Set projectIds, String workspaceId) { + return template + .nonTransaction(connection -> dao.getLastUpdatedTraceAt(projectIds, workspaceId, connection)); + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index 674d28b7a1..77b53c4086 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -10,6 +10,7 @@ import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; import com.fasterxml.uuid.Generators; +import com.google.common.eventbus.EventBus; import io.r2dbc.spi.Connection; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -58,6 +59,9 @@ class TraceServiceImplTest { @Mock private ProjectService projectService; + @Mock + private EventBus eventBus; + private final PodamFactory factory = new PodamFactoryImpl(); @BeforeEach @@ -69,7 +73,8 @@ void setUp() { template, projectService, () -> Generators.timeBasedEpochGenerator().generate(), - DUMMY_LOCK_SERVICE); + DUMMY_LOCK_SERVICE, + eventBus); } @Nested From 5a4c51687bc6568393c8c7f25f9d9f111e3fbb9b Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 4 Dec 2024 16:17:06 +0200 Subject: [PATCH 04/17] OPIK-419 coverage create single trace --- .../com/comet/opik/domain/ProjectDAO.java | 2 +- .../v1/priv/ProjectsResourceTest.java | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java index 4832d79d7e..ef544a85c0 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java @@ -24,7 +24,7 @@ @RegisterConstructorMapper(Project.class) @RegisterConstructorMapper(ProjectIdLastUpdated.class) @RegisterArgumentFactory(UUIDArgumentFactory.class) -interface ProjectDAO { +public interface ProjectDAO { @SqlUpdate("INSERT INTO projects (id, name, description, workspace_id, created_by, last_updated_by) VALUES (:bean.id, :bean.name, :bean.description, :workspaceId, :bean.createdBy, :bean.lastUpdatedBy)") void save(@Bind("workspaceId") String workspaceId, @BindMethods("bean") Project project); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 4f7a444a45..845eaa1374 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -19,6 +19,7 @@ import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; import com.comet.opik.api.sorting.SortingField; +import com.comet.opik.domain.ProjectDAO; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; import com.comet.opik.utils.JsonUtils; @@ -30,6 +31,7 @@ import jakarta.ws.rs.core.MediaType; import org.apache.hc.core5.http.HttpStatus; import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -56,6 +58,7 @@ import java.time.Instant; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; import java.util.stream.IntStream; @@ -116,6 +119,7 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; + private Jdbi jdbi; @BeforeAll void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { @@ -129,10 +133,13 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { this.baseURI = "http://localhost:%d".formatted(client.getPort()); this.client = client; + this.jdbi = jdbi; ClientSupportUtils.config(client); mockTargetWorkspace(API_KEY, TEST_WORKSPACE, WORKSPACE_ID); + + this.jdbi.installPlugin(new SqlObjectPlugin()); } private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) { @@ -1077,8 +1084,32 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt .isEqualTo(expectedProject2.lastUpdatedTraceAt()); assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); + + List dbProjects = jdbi.withExtension(ProjectDAO.class, projectDao -> projectDao.findByIds( + Set.of(expectedProject.id(), expectedProject2.id(), expectedProject3.id()), workspaceId)); + + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(this::compareInstants) + .isEqualTo(expectedProject.lastUpdatedTraceAt()); + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject2.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(this::compareInstants) + .isEqualTo(expectedProject2.lastUpdatedTraceAt()); + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject3.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(this::compareInstants) + .isEqualTo(expectedProject3.lastUpdatedTraceAt()); } + private int compareInstants(Instant i1, Instant i2) { + // Calculate the difference in nanoseconds + long nanoDifference = Math.abs(i1.getNano() - i2.getNano()); + if (nanoDifference < 1_000) { + return 0; // Consider equal if within a microsecond + } + return i1.compareTo(i2); + } } @Nested From 9d148faa92cb7e2efdf1789cb7a91c07fb009c4f Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 4 Dec 2024 17:30:58 +0200 Subject: [PATCH 05/17] OPIK-419 add a couple of todos to not forget --- .../opik/api/resources/v1/priv/ProjectsResourceTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 845eaa1374..f7df0fb326 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1102,6 +1102,11 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt .isEqualTo(expectedProject3.lastUpdatedTraceAt()); } + // todo: similarly cover trace batch + + // todo: similarly cover trace update + + // timestamps in mysql are microsecond while in clickhouse they are stored as nano private int compareInstants(Instant i1, Instant i2) { // Calculate the difference in nanoseconds long nanoDifference = Math.abs(i1.getNano() - i2.getNano()); From aec888289b75e8416efc643ef2e27049ba95e334 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 16:12:21 +0200 Subject: [PATCH 06/17] OPIK-419 refactor --- .../java/com/comet/opik/TestComparators.java | 14 ++++++++++++++ .../resources/v1/priv/ProjectsResourceTest.java | 17 ++++------------- 2 files changed, 18 insertions(+), 13 deletions(-) create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java diff --git a/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java b/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java new file mode 100644 index 0000000000..9ba51b38de --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java @@ -0,0 +1,14 @@ +package com.comet.opik; + +import java.time.Instant; + +public class TestComparators { + public static int compareMicroNanoTime(Instant i1, Instant i2) { + // Calculate the difference in nanoseconds + long nanoDifference = Math.abs(i1.getNano() - i2.getNano()); + if (nanoDifference < 1_000) { + return 0; // Consider equal if within a microsecond + } + return i1.compareTo(i2); + } +} diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index f7df0fb326..b285bb7a34 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1,5 +1,6 @@ package com.comet.opik.api.resources.v1.priv; +import com.comet.opik.TestComparators; import com.comet.opik.api.BatchDelete; import com.comet.opik.api.Project; import com.comet.opik.api.ProjectRetrieve; @@ -1090,31 +1091,21 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject.id())) .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(this::compareInstants) + .usingComparator(TestComparators::compareMicroNanoTime) .isEqualTo(expectedProject.lastUpdatedTraceAt()); assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject2.id())) .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(this::compareInstants) + .usingComparator(TestComparators::compareMicroNanoTime) .isEqualTo(expectedProject2.lastUpdatedTraceAt()); assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject3.id())) .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(this::compareInstants) + .usingComparator(TestComparators::compareMicroNanoTime) .isEqualTo(expectedProject3.lastUpdatedTraceAt()); } // todo: similarly cover trace batch // todo: similarly cover trace update - - // timestamps in mysql are microsecond while in clickhouse they are stored as nano - private int compareInstants(Instant i1, Instant i2) { - // Calculate the difference in nanoseconds - long nanoDifference = Math.abs(i1.getNano() - i2.getNano()); - if (nanoDifference < 1_000) { - return 0; // Consider equal if within a microsecond - } - return i1.compareTo(i2); - } } @Nested From 10ddfe6680e1a166e7c040bede33626384547539 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 16:30:29 +0200 Subject: [PATCH 07/17] OPIK-419 refactor --- .../v1/priv/ProjectsResourceTest.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index b285bb7a34..fc02748486 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -59,9 +59,9 @@ import java.time.Instant; import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -1086,21 +1086,20 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + expectedProject3)); + } + + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { List dbProjects = jdbi.withExtension(ProjectDAO.class, projectDao -> projectDao.findByIds( - Set.of(expectedProject.id(), expectedProject2.id(), expectedProject3.id()), workspaceId)); + expectedProjects.stream().map(Project::id).collect(Collectors.toUnmodifiableSet()), workspaceId)); - assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject.id())) - .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(TestComparators::compareMicroNanoTime) - .isEqualTo(expectedProject.lastUpdatedTraceAt()); - assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject2.id())) - .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(TestComparators::compareMicroNanoTime) - .isEqualTo(expectedProject2.lastUpdatedTraceAt()); - assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(expectedProject3.id())) - .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(TestComparators::compareMicroNanoTime) - .isEqualTo(expectedProject3.lastUpdatedTraceAt()); + for (Project project : expectedProjects) { + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(TestComparators::compareMicroNanoTime) + .isEqualTo(project.lastUpdatedTraceAt()); + } } // todo: similarly cover trace batch From 21784e50a76b24251591309f25237ac868a79a92 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 16:50:32 +0200 Subject: [PATCH 08/17] OPIK-419 cover batch --- .../v1/priv/ProjectsResourceTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index fc02748486..cae9c38eae 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -16,6 +16,7 @@ import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils; import com.comet.opik.api.resources.utils.TestUtils; import com.comet.opik.api.resources.utils.WireMockUtils; +import com.comet.opik.api.resources.utils.resources.TraceResourceClient; import com.comet.opik.api.sorting.Direction; import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; @@ -121,6 +122,7 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; private Jdbi jdbi; + private TraceResourceClient traceResourceClient; @BeforeAll void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { @@ -141,6 +143,7 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { mockTargetWorkspace(API_KEY, TEST_WORKSPACE, WORKSPACE_ID); this.jdbi.installPlugin(new SqlObjectPlugin()); + this.traceResourceClient = new TraceResourceClient(this.client, baseURI); } private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) { @@ -1090,6 +1093,76 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt expectedProject3)); } + @Test + @DisplayName("when projects is with traces created in batch, then return project with last updated trace at") + void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTraceAt() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var project = factory.manufacturePojo(Project.class); + var project2 = factory.manufacturePojo(Project.class); + var project3 = factory.manufacturePojo(Project.class); + + var id = createProject(project, apiKey, workspaceName); + var id2 = createProject(project2, apiKey, workspaceName); + var id3 = createProject(project3, apiKey, workspaceName); + + List traces = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project.name()) + .build()) + .toList(); + List traces2 = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project2.name()) + .build()) + .toList(); + List traces3 = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project3.name()) + .build()) + .toList(); + + traceResourceClient.batchCreateTraces( + Stream.concat(Stream.concat(traces.stream(), traces2.stream()), traces3.stream()).toList(), + apiKey, workspaceName); + + // all projects should have the same "last_updated_trace_at" + Trace actualTrace = traceResourceClient.getById(traces.getFirst().id(), workspaceName, apiKey); + + Project expectedProject = project.toBuilder().id(id) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + Project expectedProject2 = project2.toBuilder().id(id2) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + Project expectedProject3 = project3.toBuilder().id(id3) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(HttpStatus.SC_OK); + + assertThat(actualEntity.content().stream().map(Project::id).toList()) + .isEqualTo(List.of(id3, id2, id)); + + assertThat(actualEntity.content().get(0).lastUpdatedTraceAt()) + .isEqualTo(expectedProject3.lastUpdatedTraceAt()); + assertThat(actualEntity.content().get(1).lastUpdatedTraceAt()) + .isEqualTo(expectedProject2.lastUpdatedTraceAt()); + assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) + .isEqualTo(expectedProject.lastUpdatedTraceAt()); + + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + expectedProject3)); + } + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { List dbProjects = jdbi.withExtension(ProjectDAO.class, projectDao -> projectDao.findByIds( expectedProjects.stream().map(Project::id).collect(Collectors.toUnmodifiableSet()), workspaceId)); From 18248d003aa04118f761e607d4618df504acc6da Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 17:14:46 +0200 Subject: [PATCH 09/17] OPIK-419 update trace failing test --- .../utils/resources/TraceResourceClient.java | 14 +++++++++ .../v1/priv/ProjectsResourceTest.java | 31 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java index fc78668ebe..5e05510943 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java @@ -6,7 +6,9 @@ import com.comet.opik.api.FeedbackScoreBatchItem; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; +import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.resources.utils.TestUtils; +import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; @@ -126,4 +128,16 @@ public void deleteTraces(BatchDelete request, String workspaceName, String apiKe } } + public void updateTrace(UUID id, TraceUpdate traceUpdate, String apiKey, String workspaceName) { + try (var actualResponse = client.target(RESOURCE_PATH.formatted(baseURI)) + .path(id.toString()) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .method(HttpMethod.PATCH, Entity.json(traceUpdate))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204); + assertThat(actualResponse.hasEntity()).isFalse(); + } + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index cae9c38eae..8f2a29f39c 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -6,6 +6,7 @@ import com.comet.opik.api.ProjectRetrieve; import com.comet.opik.api.ProjectUpdate; import com.comet.opik.api.Trace; +import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.resources.utils.AuthTestUtils; import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; @@ -60,6 +61,7 @@ import java.time.Instant; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1163,6 +1165,35 @@ void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTr expectedProject3)); } + @Test + @DisplayName("when projects with traces, then return project with last updated trace at") + void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var project = factory.manufacturePojo(Project.class); + + var projectId = createProject(project, apiKey, workspaceName); + + UUID traceId = traceResourceClient.createTrace(factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project.name()).build(), apiKey, workspaceName); + + traceResourceClient.updateTrace(traceId, TraceUpdate.builder() + .tags(Set.of("tag1", "tag2")) + .projectName(project.name()) + .build(), apiKey, workspaceName); + + Trace trace = getTrace(traceId, apiKey, workspaceName); + + Project expectedProject = project.toBuilder().id(projectId).lastUpdatedTraceAt(trace.lastUpdatedAt()) + .build(); + + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject)); + } + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { List dbProjects = jdbi.withExtension(ProjectDAO.class, projectDao -> projectDao.findByIds( expectedProjects.stream().map(Project::id).collect(Collectors.toUnmodifiableSet()), workspaceId)); From 265f1549573c8225479cc3fedd101a542bb4ea8f Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 17:25:47 +0200 Subject: [PATCH 10/17] OPIK-419 update trace failing test green --- .../comet/opik/api/events/TracesUpdated.java | 20 +++++++++++++++++ .../v1/events/ProjectEventListener.java | 22 ++++++++++++++----- .../com/comet/opik/domain/TraceService.java | 12 ++++++---- 3 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java new file mode 100644 index 0000000000..b4f1aa7753 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java @@ -0,0 +1,20 @@ +package com.comet.opik.api.events; + +import com.comet.opik.infrastructure.events.BaseEvent; +import lombok.Getter; +import lombok.NonNull; +import lombok.experimental.Accessors; + +import java.util.Set; +import java.util.UUID; + +@Getter +@Accessors(fluent = true) +public class TracesUpdated extends BaseEvent { + private final @NonNull Set projectIds; + + public TracesUpdated(@NonNull Set projectIds, @NonNull String workspaceId, @NonNull String userName) { + super(workspaceId, userName); + this.projectIds = projectIds; + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java index 38ae57ccdd..52900cb011 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java @@ -2,6 +2,7 @@ import com.comet.opik.api.ProjectIdLastUpdated; import com.comet.opik.api.events.TracesCreated; +import com.comet.opik.api.events.TracesUpdated; import com.comet.opik.domain.ProjectService; import com.comet.opik.domain.TraceService; import com.google.common.eventbus.EventBus; @@ -11,6 +12,9 @@ import reactor.core.publisher.Mono; import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton; +import java.util.Set; +import java.util.UUID; + @EagerSingleton @Slf4j public class ProjectEventListener { @@ -26,16 +30,24 @@ public ProjectEventListener(EventBus eventBus, ProjectService projectService, Tr @Subscribe public void onTracesCreated(TracesCreated event) { - log.info("Recording last traces for projects '{}'", event.projectIds()); + updateProjectsLastUpdatedTraceAt(event.workspaceId(), event.projectIds()); + } - traceService.getLastUpdatedTraceAt(event.projectIds(), event.workspaceId()) + @Subscribe + public void onTracesUpdated(TracesUpdated event) { + updateProjectsLastUpdatedTraceAt(event.workspaceId(), event.projectIds()); + } + + private void updateProjectsLastUpdatedTraceAt(String workspaceId, Set projectIds) { + log.info("Recording last traces for projects '{}'", projectIds); + + traceService.getLastUpdatedTraceAt(projectIds, workspaceId) .flatMap(lastTraceByProjectId -> Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace( - event.workspaceId(), + workspaceId, lastTraceByProjectId.entrySet().stream() .map(entry -> new ProjectIdLastUpdated(entry.getKey(), entry.getValue())).toList()))) .block(); - log.info("Recorded last traces for projects '{}'", event.projectIds()); + log.info("Recorded last traces for projects '{}'", projectIds); } - } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index bfee37faed..144f5e3644 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -13,6 +13,7 @@ import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.error.IdentifierMismatchException; import com.comet.opik.api.events.TracesCreated; +import com.comet.opik.api.events.TracesUpdated; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; @@ -238,7 +239,7 @@ public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id) { var projectName = WorkspaceUtils.getProjectName(traceUpdate.projectName()); - return getProjectById(traceUpdate) + return Mono.deferContextual(ctx -> getProjectById(traceUpdate) .switchIfEmpty(Mono.defer(() -> getOrCreateProject(projectName))) .subscribeOn(Schedulers.boundedElastic()) .flatMap(project -> lockService.executeWithLock( @@ -247,8 +248,12 @@ public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id) { .flatMap(trace -> updateOrFail(traceUpdate, id, trace, project).thenReturn(id)) .switchIfEmpty(Mono.defer(() -> insertUpdate(project, traceUpdate, id)) .thenReturn(id)) - .onErrorResume(this::handleDBError)))) - .then(); + .onErrorResume(this::handleDBError) + .doOnSuccess(__ -> eventBus.post(new TracesUpdated( + Set.of(project.id()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME))))))) + .then()); } private Mono insertUpdate(Project project, TraceUpdate traceUpdate, UUID id) { @@ -388,7 +393,6 @@ public Mono getDailyCreatedCount() { return dao.getDailyTraces(); } - @Override public Mono> getLastUpdatedTraceAt(Set projectIds, String workspaceId) { return template From 6e174adbc1c1c24b9446bbbcacc34f586d668412 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 17:30:20 +0200 Subject: [PATCH 11/17] OPIK-419 remove comment --- .../opik/api/resources/v1/priv/ProjectsResourceTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 8f2a29f39c..d078ed8f8a 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1205,10 +1205,6 @@ private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List< .isEqualTo(project.lastUpdatedTraceAt()); } } - - // todo: similarly cover trace batch - - // todo: similarly cover trace update } @Nested From aa32f995fa7542b327a0b60e7e82e1fae5f31036 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Tue, 10 Dec 2024 20:29:37 +0200 Subject: [PATCH 12/17] OPIK-419 fix broken test --- .../test/java/com/comet/opik/domain/TraceServiceImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index 77b53c4086..e1b1738b5e 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -96,7 +96,7 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle .thenThrow(new EntityAlreadyExistsException(new ErrorMessage(List.of("Project already exists")))); when(projectService.findByNames(workspaceId, List.of(projectName))) - .thenReturn(List.of(Project.builder().name(projectName).build())); // simulate project was already created + .thenReturn(List.of(Project.builder().id(UUID.randomUUID()).name(projectName).build())); // simulate project was already created when(template.nonTransaction(any())) .thenAnswer(invocation -> { From a2d06d850a0ef4b189e174363a231e28bdb98df6 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 11 Dec 2024 10:50:29 +0200 Subject: [PATCH 13/17] OPIK-419 projectdao not public --- .../java/com/comet/opik/domain/ProjectDAO.java | 2 +- .../com/comet/opik/domain/ProjectService.java | 12 ++++++++++++ .../resources/v1/priv/ProjectsResourceTest.java | 15 +++++++-------- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java index ef544a85c0..4832d79d7e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java @@ -24,7 +24,7 @@ @RegisterConstructorMapper(Project.class) @RegisterConstructorMapper(ProjectIdLastUpdated.class) @RegisterArgumentFactory(UUIDArgumentFactory.class) -public interface ProjectDAO { +interface ProjectDAO { @SqlUpdate("INSERT INTO projects (id, name, description, workspace_id, created_by, last_updated_by) VALUES (:bean.id, :bean.name, :bean.description, :workspaceId, :bean.createdBy, :bean.lastUpdatedBy)") void save(@Bind("workspaceId") String workspaceId, @BindMethods("bean") Project project); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index 1fb4931108..6e25544810 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -69,6 +69,8 @@ public interface ProjectService { Page find(int page, int size, ProjectCriteria criteria, List sortingFields); + List findByIds(String workspaceId, Set ids); + List findByNames(String workspaceId, List names); Project getOrCreate(String workspaceId, String projectName, String userName); @@ -280,6 +282,16 @@ public Page find(int page, int size, @NonNull ProjectCriteria criteria, sortingFactory.getSortableFields()); } + @Override + public List findByIds(String workspaceId, Set ids) { + if (ids.isEmpty()) { + log.info("ids list is empty, returning"); + return List.of(); + } + + return template.inTransaction(READ_ONLY, handle -> handle.attach(ProjectDAO.class).findByIds(ids, workspaceId)); + } + private Page findWithLastTraceSorting(int page, int size, @NonNull ProjectCriteria criteria, @NonNull SortingField sortingField) { String workspaceId = requestContext.get().getWorkspaceId(); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index d078ed8f8a..0c33674a24 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -22,7 +22,7 @@ import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; import com.comet.opik.api.sorting.SortingField; -import com.comet.opik.domain.ProjectDAO; +import com.comet.opik.domain.ProjectService; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; import com.comet.opik.utils.JsonUtils; @@ -34,7 +34,6 @@ import jakarta.ws.rs.core.MediaType; import org.apache.hc.core5.http.HttpStatus; import org.jdbi.v3.core.Jdbi; -import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -83,6 +82,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; +//@TestGuiceyApp(OpikApplication.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) @DisplayName("Project Resource Test") class ProjectsResourceTest { @@ -123,11 +123,11 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; - private Jdbi jdbi; + private ProjectService projectService; private TraceResourceClient traceResourceClient; @BeforeAll - void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { + void setUpAll(ClientSupport client, Jdbi jdbi, ProjectService projectService) throws SQLException { MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); @@ -138,13 +138,12 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { this.baseURI = "http://localhost:%d".formatted(client.getPort()); this.client = client; - this.jdbi = jdbi; + this.projectService = projectService; ClientSupportUtils.config(client); mockTargetWorkspace(API_KEY, TEST_WORKSPACE, WORKSPACE_ID); - this.jdbi.installPlugin(new SqlObjectPlugin()); this.traceResourceClient = new TraceResourceClient(this.client, baseURI); } @@ -1195,8 +1194,8 @@ void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { } private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { - List dbProjects = jdbi.withExtension(ProjectDAO.class, projectDao -> projectDao.findByIds( - expectedProjects.stream().map(Project::id).collect(Collectors.toUnmodifiableSet()), workspaceId)); + List dbProjects = projectService.findByIds(workspaceId, expectedProjects.stream() + .map(Project::id).collect(Collectors.toUnmodifiableSet())); for (Project project : expectedProjects) { assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) From 335b41fbc8bc7c665eec0afeab6efbffc8406ee6 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 11 Dec 2024 11:49:36 +0200 Subject: [PATCH 14/17] OPIK-419 pr comments --- .../comet/opik/api/resources/v1/priv/ProjectsResourceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 0c33674a24..2f057dc3b4 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -82,7 +82,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; -//@TestGuiceyApp(OpikApplication.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) @DisplayName("Project Resource Test") class ProjectsResourceTest { From 925724b547a613716239b035974bd945ff655224 Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 11 Dec 2024 12:01:26 +0200 Subject: [PATCH 15/17] OPIK-419 cover new functionality in service test --- .../com/comet/opik/domain/TraceServiceImplTest.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index e1b1738b5e..42f041cd11 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -6,6 +6,7 @@ import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.error.InvalidUUIDVersionException; +import com.comet.opik.api.events.TracesCreated; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; @@ -18,7 +19,9 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Mono; import uk.co.jemos.podam.api.PodamFactory; @@ -26,6 +29,7 @@ import java.time.Instant; import java.util.List; +import java.util.Set; import java.util.UUID; import static com.comet.opik.domain.ProjectService.DEFAULT_USER; @@ -87,16 +91,20 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle // given var projectName = "projectName"; + var projectId = UUID.randomUUID(); var traceId = Generators.timeBasedEpochGenerator().generate(); var connection = mock(Connection.class); String workspaceId = UUID.randomUUID().toString(); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(TracesCreated.class); // when when(projectService.getOrCreate(workspaceId, projectName, DEFAULT_USER)) .thenThrow(new EntityAlreadyExistsException(new ErrorMessage(List.of("Project already exists")))); when(projectService.findByNames(workspaceId, List.of(projectName))) - .thenReturn(List.of(Project.builder().id(UUID.randomUUID()).name(projectName).build())); // simulate project was already created + .thenReturn(List.of(Project.builder().id(projectId).name(projectName).build())); // simulate project was already created + + Mockito.doNothing().when(eventBus).post(eventCaptor.capture()); when(template.nonTransaction(any())) .thenAnswer(invocation -> { @@ -121,7 +129,8 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle .block(); // then - Assertions.assertEquals(traceId, actualResult); + assertThat(actualResult).isEqualTo(traceId); + assertThat(eventCaptor.getValue().projectIds()).isEqualTo(Set.of(projectId)); } @Test From 0c9b6e4c0ade96b999b660d8e9cabd9b6307d64a Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 11 Dec 2024 12:08:52 +0200 Subject: [PATCH 16/17] OPIK-419 handle async update to last_trace_updated_at --- .../api/resources/v1/priv/ProjectsResourceTest.java | 13 ++++++++----- .../com/comet/opik/domain/TraceServiceImplTest.java | 4 ++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 2f057dc3b4..6b85fc99ab 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -50,6 +50,7 @@ import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; import ru.vyarus.dropwizard.guice.test.ClientSupport; import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension; import uk.co.jemos.podam.api.PodamFactory; @@ -1164,7 +1165,7 @@ void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTr } @Test - @DisplayName("when projects with traces, then return project with last updated trace at") + @DisplayName("when updating a trace, then return project with last updated trace at") void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { String workspaceName = UUID.randomUUID().toString(); String apiKey = UUID.randomUUID().toString(); @@ -1197,10 +1198,12 @@ private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List< .map(Project::id).collect(Collectors.toUnmodifiableSet())); for (Project project : expectedProjects) { - assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) - .findFirst().orElseThrow().lastUpdatedTraceAt()) - .usingComparator(TestComparators::compareMicroNanoTime) - .isEqualTo(project.lastUpdatedTraceAt()); + Awaitility.await().untilAsserted(() -> { + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(TestComparators::compareMicroNanoTime) + .isEqualTo(project.lastUpdatedTraceAt()); + }); } } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index 42f041cd11..dcc7d63b75 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Mono; import uk.co.jemos.podam.api.PodamFactory; @@ -38,6 +37,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -104,7 +104,7 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle when(projectService.findByNames(workspaceId, List.of(projectName))) .thenReturn(List.of(Project.builder().id(projectId).name(projectName).build())); // simulate project was already created - Mockito.doNothing().when(eventBus).post(eventCaptor.capture()); + doNothing().when(eventBus).post(eventCaptor.capture()); when(template.nonTransaction(any())) .thenAnswer(invocation -> { From 53675a5e5f2073b1d479c2e2c84e67491ed95cef Mon Sep 17 00:00:00 2001 From: Ido Berkovich Date: Wed, 11 Dec 2024 14:44:06 +0200 Subject: [PATCH 17/17] OPIK-419 default null --- .../migrations/000006_add_projects_last_updated_trace_at.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql index ea9f7dce8e..b3fdebcf45 100644 --- a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql @@ -1,6 +1,6 @@ --liquibase formatted sql --changeset idoberko2:add_projects_last_updated_trace_at -ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6); +ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6) DEFAULT NULL; --rollback ALTER TABLE projects DROP COLUMN last_updated_trace_at;