diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java new file mode 100644 index 0000000000..b47529ae6d --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesCreated.java @@ -0,0 +1,20 @@ +package com.comet.opik.api.events; + +import com.comet.opik.infrastructure.events.BaseEvent; +import lombok.Getter; +import lombok.NonNull; +import lombok.experimental.Accessors; + +import java.util.Set; +import java.util.UUID; + +@Getter +@Accessors(fluent = true) +public class TracesCreated extends BaseEvent { + private final @NonNull Set projectIds; + + public TracesCreated(@NonNull Set projectIds, @NonNull String workspaceId, @NonNull String userName) { + super(workspaceId, userName); + this.projectIds = projectIds; + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java new file mode 100644 index 0000000000..b4f1aa7753 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/events/TracesUpdated.java @@ -0,0 +1,20 @@ +package com.comet.opik.api.events; + +import com.comet.opik.infrastructure.events.BaseEvent; +import lombok.Getter; +import lombok.NonNull; +import lombok.experimental.Accessors; + +import java.util.Set; +import java.util.UUID; + +@Getter +@Accessors(fluent = true) +public class TracesUpdated extends BaseEvent { + private final @NonNull Set projectIds; + + public TracesUpdated(@NonNull Set projectIds, @NonNull String workspaceId, @NonNull String userName) { + super(workspaceId, userName); + this.projectIds = projectIds; + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java new file mode 100644 index 0000000000..52900cb011 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/ProjectEventListener.java @@ -0,0 +1,53 @@ +package com.comet.opik.api.resources.v1.events; + +import com.comet.opik.api.ProjectIdLastUpdated; +import com.comet.opik.api.events.TracesCreated; +import com.comet.opik.api.events.TracesUpdated; +import com.comet.opik.domain.ProjectService; +import com.comet.opik.domain.TraceService; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import jakarta.inject.Inject; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton; + +import java.util.Set; +import java.util.UUID; + +@EagerSingleton +@Slf4j +public class ProjectEventListener { + private final ProjectService projectService; + private final TraceService traceService; + + @Inject + public ProjectEventListener(EventBus eventBus, ProjectService projectService, TraceService traceService) { + this.projectService = projectService; + this.traceService = traceService; + eventBus.register(this); + } + + @Subscribe + public void onTracesCreated(TracesCreated event) { + updateProjectsLastUpdatedTraceAt(event.workspaceId(), event.projectIds()); + } + + @Subscribe + public void onTracesUpdated(TracesUpdated event) { + updateProjectsLastUpdatedTraceAt(event.workspaceId(), event.projectIds()); + } + + private void updateProjectsLastUpdatedTraceAt(String workspaceId, Set projectIds) { + log.info("Recording last traces for projects '{}'", projectIds); + + traceService.getLastUpdatedTraceAt(projectIds, workspaceId) + .flatMap(lastTraceByProjectId -> Mono.fromRunnable(() -> projectService.recordLastUpdatedTrace( + workspaceId, + lastTraceByProjectId.entrySet().stream() + .map(entry -> new ProjectIdLastUpdated(entry.getKey(), entry.getValue())).toList()))) + .block(); + + log.info("Recorded last traces for projects '{}'", projectIds); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java index b57105b7b4..4832d79d7e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectDAO.java @@ -10,6 +10,7 @@ import org.jdbi.v3.sqlobject.customizer.BindList; import org.jdbi.v3.sqlobject.customizer.BindMethods; import org.jdbi.v3.sqlobject.customizer.Define; +import org.jdbi.v3.sqlobject.statement.SqlBatch; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; import org.jdbi.v3.stringtemplate4.UseStringTemplateEngine; @@ -85,4 +86,11 @@ default Optional fetch(UUID id, String workspaceId) { @SqlQuery("SELECT * FROM projects WHERE workspace_id = :workspaceId AND name IN ()") List findByNames(@Bind("workspaceId") String workspaceId, @BindList("names") Collection names); + + @SqlBatch("UPDATE projects SET last_updated_trace_at = :lastUpdatedAt " + + "WHERE workspace_id = :workspace_id" + + " AND id = :id" + + " AND (last_updated_trace_at IS NULL OR last_updated_trace_at < :lastUpdatedAt)") + int[] recordLastUpdatedTrace(@Bind("workspace_id") String workspaceId, + @BindMethods Collection lastUpdatedTraces); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java index f16c31c219..6e25544810 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ProjectService.java @@ -30,6 +30,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.time.Instant; +import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -68,11 +69,15 @@ public interface ProjectService { Page find(int page, int size, ProjectCriteria criteria, List sortingFields); + List findByIds(String workspaceId, Set ids); + List findByNames(String workspaceId, List names); Project getOrCreate(String workspaceId, String projectName, String userName); Project retrieveByName(String projectName); + + void recordLastUpdatedTrace(String workspaceId, Collection lastUpdatedTraces); } @Slf4j @@ -277,6 +282,16 @@ public Page find(int page, int size, @NonNull ProjectCriteria criteria, sortingFactory.getSortableFields()); } + @Override + public List findByIds(String workspaceId, Set ids) { + if (ids.isEmpty()) { + log.info("ids list is empty, returning"); + return List.of(); + } + + return template.inTransaction(READ_ONLY, handle -> handle.attach(ProjectDAO.class).findByIds(ids, workspaceId)); + } + private Page findWithLastTraceSorting(int page, int size, @NonNull ProjectCriteria criteria, @NonNull SortingField sortingField) { String workspaceId = requestContext.get().getWorkspaceId(); @@ -392,4 +407,10 @@ public Project retrieveByName(@NonNull String projectName) { }); } + @Override + public void recordLastUpdatedTrace(String workspaceId, Collection lastUpdatedTraces) { + template.inTransaction(WRITE, + handle -> handle.attach(ProjectDAO.class).recordLastUpdatedTrace(workspaceId, lastUpdatedTraces)); + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java index 3a4b74d6b5..144f5e3644 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/TraceService.java @@ -12,12 +12,15 @@ import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.error.IdentifierMismatchException; +import com.comet.opik.api.events.TracesCreated; +import com.comet.opik.api.events.TracesUpdated; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; import com.comet.opik.utils.AsyncUtils; import com.comet.opik.utils.WorkspaceUtils; import com.google.common.base.Preconditions; +import com.google.common.eventbus.EventBus; import com.google.inject.ImplementedBy; import io.opentelemetry.instrumentation.annotations.WithSpan; import jakarta.inject.Inject; @@ -68,6 +71,8 @@ public interface TraceService { Mono getStats(TraceSearchCriteria searchCriteria); Mono getDailyCreatedCount(); + + Mono> getLastUpdatedTraceAt(Set projectIds, String workspaceId); } @Slf4j @@ -85,6 +90,7 @@ class TraceServiceImpl implements TraceService { private final @NonNull ProjectService projectService; private final @NonNull IdGenerator idGenerator; private final @NonNull LockService lockService; + private final @NonNull EventBus eventBus; @Override @WithSpan @@ -93,12 +99,16 @@ public Mono create(@NonNull Trace trace) { String projectName = WorkspaceUtils.getProjectName(trace.projectName()); UUID id = trace.id() == null ? idGenerator.generateId() : trace.id(); - return IdGenerator + return Mono.deferContextual(ctx -> IdGenerator .validateVersionAsync(id, TRACE_KEY) .then(Mono.defer(() -> getOrCreateProject(projectName))) .flatMap(project -> lockService.executeWithLock( new LockService.Lock(id, TRACE_KEY), - Mono.defer(() -> insertTrace(trace, project, id)))); + Mono.defer(() -> insertTrace(trace, project, id))) + .doOnSuccess(__ -> eventBus.post(new TracesCreated( + Set.of(project.id()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME)))))); } @WithSpan @@ -113,14 +123,20 @@ public Mono create(TraceBatch batch) { .distinct() .toList(); - Mono> resolveProjects = Flux.fromIterable(projectNames) - .flatMap(this::getOrCreateProject) - .collectList() - .map(projects -> bindTraceToProjectAndId(batch, projects)) - .subscribeOn(Schedulers.boundedElastic()); + return Mono.deferContextual(ctx -> { + Mono> resolveProjects = Flux.fromIterable(projectNames) + .flatMap(this::getOrCreateProject) + .collectList() + .map(projects -> bindTraceToProjectAndId(batch, projects)) + .subscribeOn(Schedulers.boundedElastic()); - return resolveProjects - .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection))); + return resolveProjects + .flatMap(traces -> template.nonTransaction(connection -> dao.batchInsert(traces, connection)) + .doOnSuccess(__ -> eventBus.post(new TracesCreated( + traces.stream().map(Trace::projectId).collect(Collectors.toUnmodifiableSet()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME))))); + }); } private List bindTraceToProjectAndId(TraceBatch batch, List projects) { @@ -223,7 +239,7 @@ public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id) { var projectName = WorkspaceUtils.getProjectName(traceUpdate.projectName()); - return getProjectById(traceUpdate) + return Mono.deferContextual(ctx -> getProjectById(traceUpdate) .switchIfEmpty(Mono.defer(() -> getOrCreateProject(projectName))) .subscribeOn(Schedulers.boundedElastic()) .flatMap(project -> lockService.executeWithLock( @@ -232,8 +248,12 @@ public Mono update(@NonNull TraceUpdate traceUpdate, @NonNull UUID id) { .flatMap(trace -> updateOrFail(traceUpdate, id, trace, project).thenReturn(id)) .switchIfEmpty(Mono.defer(() -> insertUpdate(project, traceUpdate, id)) .thenReturn(id)) - .onErrorResume(this::handleDBError)))) - .then(); + .onErrorResume(this::handleDBError) + .doOnSuccess(__ -> eventBus.post(new TracesUpdated( + Set.of(project.id()), + ctx.get(RequestContext.WORKSPACE_ID), + ctx.get(RequestContext.USER_NAME))))))) + .then()); } private Mono insertUpdate(Project project, TraceUpdate traceUpdate, UUID id) { @@ -373,4 +393,9 @@ public Mono getDailyCreatedCount() { return dao.getDailyTraces(); } + @Override + public Mono> getLastUpdatedTraceAt(Set projectIds, String workspaceId) { + return template + .nonTransaction(connection -> dao.getLastUpdatedTraceAt(projectIds, workspaceId, connection)); + } } diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql new file mode 100644 index 0000000000..b3fdebcf45 --- /dev/null +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-state/migrations/000006_add_projects_last_updated_trace_at.sql @@ -0,0 +1,6 @@ +--liquibase formatted sql +--changeset idoberko2:add_projects_last_updated_trace_at + +ALTER TABLE projects ADD COLUMN last_updated_trace_at TIMESTAMP(6) DEFAULT NULL; + +--rollback ALTER TABLE projects DROP COLUMN last_updated_trace_at; diff --git a/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java b/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java new file mode 100644 index 0000000000..9ba51b38de --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/TestComparators.java @@ -0,0 +1,14 @@ +package com.comet.opik; + +import java.time.Instant; + +public class TestComparators { + public static int compareMicroNanoTime(Instant i1, Instant i2) { + // Calculate the difference in nanoseconds + long nanoDifference = Math.abs(i1.getNano() - i2.getNano()); + if (nanoDifference < 1_000) { + return 0; // Consider equal if within a microsecond + } + return i1.compareTo(i2); + } +} diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java index fc78668ebe..5e05510943 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/resources/TraceResourceClient.java @@ -6,7 +6,9 @@ import com.comet.opik.api.FeedbackScoreBatchItem; import com.comet.opik.api.Trace; import com.comet.opik.api.TraceBatch; +import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.resources.utils.TestUtils; +import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; @@ -126,4 +128,16 @@ public void deleteTraces(BatchDelete request, String workspaceName, String apiKe } } + public void updateTrace(UUID id, TraceUpdate traceUpdate, String apiKey, String workspaceName) { + try (var actualResponse = client.target(RESOURCE_PATH.formatted(baseURI)) + .path(id.toString()) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .method(HttpMethod.PATCH, Entity.json(traceUpdate))) { + + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(204); + assertThat(actualResponse.hasEntity()).isFalse(); + } + } } diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java index 4f7a444a45..6b85fc99ab 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/ProjectsResourceTest.java @@ -1,10 +1,12 @@ package com.comet.opik.api.resources.v1.priv; +import com.comet.opik.TestComparators; import com.comet.opik.api.BatchDelete; import com.comet.opik.api.Project; import com.comet.opik.api.ProjectRetrieve; import com.comet.opik.api.ProjectUpdate; import com.comet.opik.api.Trace; +import com.comet.opik.api.TraceUpdate; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.resources.utils.AuthTestUtils; import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; @@ -15,10 +17,12 @@ import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils; import com.comet.opik.api.resources.utils.TestUtils; import com.comet.opik.api.resources.utils.WireMockUtils; +import com.comet.opik.api.resources.utils.resources.TraceResourceClient; import com.comet.opik.api.sorting.Direction; import com.comet.opik.api.sorting.SortableFields; import com.comet.opik.api.sorting.SortingFactory; import com.comet.opik.api.sorting.SortingField; +import com.comet.opik.domain.ProjectService; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.podam.PodamFactoryUtils; import com.comet.opik.utils.JsonUtils; @@ -46,6 +50,7 @@ import org.testcontainers.clickhouse.ClickHouseContainer; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; import ru.vyarus.dropwizard.guice.test.ClientSupport; import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension; import uk.co.jemos.podam.api.PodamFactory; @@ -56,8 +61,10 @@ import java.time.Instant; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -116,9 +123,11 @@ class ProjectsResourceTest { private String baseURI; private ClientSupport client; + private ProjectService projectService; + private TraceResourceClient traceResourceClient; @BeforeAll - void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { + void setUpAll(ClientSupport client, Jdbi jdbi, ProjectService projectService) throws SQLException { MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); @@ -129,10 +138,13 @@ void setUpAll(ClientSupport client, Jdbi jdbi) throws SQLException { this.baseURI = "http://localhost:%d".formatted(client.getPort()); this.client = client; + this.projectService = projectService; ClientSupportUtils.config(client); mockTargetWorkspace(API_KEY, TEST_WORKSPACE, WORKSPACE_ID); + + this.traceResourceClient = new TraceResourceClient(this.client, baseURI); } private static void mockTargetWorkspace(String apiKey, String workspaceName, String workspaceId) { @@ -1077,8 +1089,123 @@ void getProjects__whenProjectsHasTraces__thenReturnProjectWithLastUpdatedTraceAt .isEqualTo(expectedProject2.lastUpdatedTraceAt()); assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) .isEqualTo(expectedProject.lastUpdatedTraceAt()); + + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + expectedProject3)); + } + + @Test + @DisplayName("when projects is with traces created in batch, then return project with last updated trace at") + void getProjects__whenProjectsHasTracesBatch__thenReturnProjectWithLastUpdatedTraceAt() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var project = factory.manufacturePojo(Project.class); + var project2 = factory.manufacturePojo(Project.class); + var project3 = factory.manufacturePojo(Project.class); + + var id = createProject(project, apiKey, workspaceName); + var id2 = createProject(project2, apiKey, workspaceName); + var id3 = createProject(project3, apiKey, workspaceName); + + List traces = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project.name()) + .build()) + .toList(); + List traces2 = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project2.name()) + .build()) + .toList(); + List traces3 = IntStream.range(0, 5) + .mapToObj(i -> factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project3.name()) + .build()) + .toList(); + + traceResourceClient.batchCreateTraces( + Stream.concat(Stream.concat(traces.stream(), traces2.stream()), traces3.stream()).toList(), + apiKey, workspaceName); + + // all projects should have the same "last_updated_trace_at" + Trace actualTrace = traceResourceClient.getById(traces.getFirst().id(), workspaceName, apiKey); + + Project expectedProject = project.toBuilder().id(id) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + Project expectedProject2 = project2.toBuilder().id(id2) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + Project expectedProject3 = project3.toBuilder().id(id3) + .lastUpdatedTraceAt(actualTrace.lastUpdatedAt()).build(); + + var actualResponse = client.target(URL_TEMPLATE.formatted(baseURI)) + .request() + .header(HttpHeaders.AUTHORIZATION, apiKey) + .header(WORKSPACE_HEADER, workspaceName) + .get(); + + var actualEntity = actualResponse.readEntity(Project.ProjectPage.class); + assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(HttpStatus.SC_OK); + + assertThat(actualEntity.content().stream().map(Project::id).toList()) + .isEqualTo(List.of(id3, id2, id)); + + assertThat(actualEntity.content().get(0).lastUpdatedTraceAt()) + .isEqualTo(expectedProject3.lastUpdatedTraceAt()); + assertThat(actualEntity.content().get(1).lastUpdatedTraceAt()) + .isEqualTo(expectedProject2.lastUpdatedTraceAt()); + assertThat(actualEntity.content().get(2).lastUpdatedTraceAt()) + .isEqualTo(expectedProject.lastUpdatedTraceAt()); + + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject, expectedProject2, + expectedProject3)); + } + + @Test + @DisplayName("when updating a trace, then return project with last updated trace at") + void getProjects__whenTraceIsUpdated__thenUpdateProjectsLastUpdatedTraceAt() { + String workspaceName = UUID.randomUUID().toString(); + String apiKey = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + mockTargetWorkspace(apiKey, workspaceName, workspaceId); + + var project = factory.manufacturePojo(Project.class); + + var projectId = createProject(project, apiKey, workspaceName); + + UUID traceId = traceResourceClient.createTrace(factory.manufacturePojo(Trace.class).toBuilder() + .projectName(project.name()).build(), apiKey, workspaceName); + + traceResourceClient.updateTrace(traceId, TraceUpdate.builder() + .tags(Set.of("tag1", "tag2")) + .projectName(project.name()) + .build(), apiKey, workspaceName); + + Trace trace = getTrace(traceId, apiKey, workspaceName); + + Project expectedProject = project.toBuilder().id(projectId).lastUpdatedTraceAt(trace.lastUpdatedAt()) + .build(); + + assertAllProjectsHavePersistedLastTraceAt(workspaceId, List.of(expectedProject)); } + private void assertAllProjectsHavePersistedLastTraceAt(String workspaceId, List expectedProjects) { + List dbProjects = projectService.findByIds(workspaceId, expectedProjects.stream() + .map(Project::id).collect(Collectors.toUnmodifiableSet())); + + for (Project project : expectedProjects) { + Awaitility.await().untilAsserted(() -> { + assertThat(dbProjects.stream().filter(dbProject -> dbProject.id().equals(project.id())) + .findFirst().orElseThrow().lastUpdatedTraceAt()) + .usingComparator(TestComparators::compareMicroNanoTime) + .isEqualTo(project.lastUpdatedTraceAt()); + }); + } + } } @Nested diff --git a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java index 674d28b7a1..dcc7d63b75 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java @@ -6,10 +6,12 @@ import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.api.error.InvalidUUIDVersionException; +import com.comet.opik.api.events.TracesCreated; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.db.TransactionTemplateAsync; import com.comet.opik.infrastructure.lock.LockService; import com.fasterxml.uuid.Generators; +import com.google.common.eventbus.EventBus; import io.r2dbc.spi.Connection; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -17,6 +19,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import reactor.core.publisher.Mono; @@ -25,6 +28,7 @@ import java.time.Instant; import java.util.List; +import java.util.Set; import java.util.UUID; import static com.comet.opik.domain.ProjectService.DEFAULT_USER; @@ -33,6 +37,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -58,6 +63,9 @@ class TraceServiceImplTest { @Mock private ProjectService projectService; + @Mock + private EventBus eventBus; + private final PodamFactory factory = new PodamFactoryImpl(); @BeforeEach @@ -69,7 +77,8 @@ void setUp() { template, projectService, () -> Generators.timeBasedEpochGenerator().generate(), - DUMMY_LOCK_SERVICE); + DUMMY_LOCK_SERVICE, + eventBus); } @Nested @@ -82,16 +91,20 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle // given var projectName = "projectName"; + var projectId = UUID.randomUUID(); var traceId = Generators.timeBasedEpochGenerator().generate(); var connection = mock(Connection.class); String workspaceId = UUID.randomUUID().toString(); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(TracesCreated.class); // when when(projectService.getOrCreate(workspaceId, projectName, DEFAULT_USER)) .thenThrow(new EntityAlreadyExistsException(new ErrorMessage(List.of("Project already exists")))); when(projectService.findByNames(workspaceId, List.of(projectName))) - .thenReturn(List.of(Project.builder().name(projectName).build())); // simulate project was already created + .thenReturn(List.of(Project.builder().id(projectId).name(projectName).build())); // simulate project was already created + + doNothing().when(eventBus).post(eventCaptor.capture()); when(template.nonTransaction(any())) .thenAnswer(invocation -> { @@ -116,7 +129,8 @@ void create__whenConcurrentTraceCreationsWithSameProjectNameConflict__thenHandle .block(); // then - Assertions.assertEquals(traceId, actualResult); + assertThat(actualResult).isEqualTo(traceId); + assertThat(eventCaptor.getValue().projectIds()).isEqualTo(Set.of(projectId)); } @Test