Skip to content

Commit

Permalink
issue #1053: handles queries and emitter exceptions with the right lo…
Browse files Browse the repository at this point in the history
…g level
  • Loading branch information
mrk-vi committed Jul 31, 2024
1 parent 1b9c608 commit 1a4e798
Showing 1 changed file with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,48 +82,68 @@ Uni<Void> send(SendRequest sendRequest) {
var vScheduleId = scheduleId + VectorPipeline.VECTOR_PIPELINE_SUFFIX;

return QuarkusCacheUtil.getAsync(
cache,
new CompositeCacheKey(scheduleId),
sessionFactory.withStatelessSession(tenantId, (s -> s
.createNamedQuery(VectorIndex.FETCH_BY_SCHEDULE_ID, VectorIndex.class)
.setParameter("scheduleId", scheduleId)
.getSingleResult()
.flatMap(vectorIndex -> s.createNamedQuery(
EmbeddingModel.FETCH_CURRENT,
EmbeddingModel.class
cache,
new CompositeCacheKey(scheduleId),
sessionFactory.withStatelessSession(tenantId, s -> s
.createNamedQuery(
VectorIndex.FETCH_BY_SCHEDULE_ID, VectorIndex.class)
.setParameter("scheduleId", scheduleId)
.getSingleResult()
.flatMap(ignore -> s.createNamedQuery(
EmbeddingModel.FETCH_CURRENT,
EmbeddingModel.class
)
.getSingleResult()
)
)
.getSingleResult()
.map(ignore -> Status.ENABLED)
.onFailure().recoverWithItem(Status.DISABLED)
)
))
).onItem().invoke(() -> {
.invoke((status) -> {

log.infof("VectorIndex is active for scheduleId %s", scheduleId);
switch (status) {
case ENABLED -> {

var payload = sendRequest.payload();
log.infof("VectorIndex is active for scheduleId %s", scheduleId);

var dataPayload = Json.decodeValue(Buffer.buffer(payload), DataPayload.class);
var ingestionPayload = ingestionPayloadMapper.map(dataPayload);
var payload = sendRequest.payload();

var metadata = Metadata.of(
OutgoingRabbitMQMetadata
.builder()
.withRoutingKey(ShardingKey.asString(tenantId, vScheduleId))
.withDeliveryMode(2)
.build()
);
var dataPayload = Json.decodeValue(
Buffer.buffer(payload), DataPayload.class);

var ingestionIndexWriterPayload = IngestionIndexWriterPayload.builder()
.ingestionPayload(ingestionPayload)
.build();
var ingestionPayload = ingestionPayloadMapper.map(dataPayload);

emitter.send(Message.of(ingestionIndexWriterPayload, metadata));
var metadata = Metadata.of(
OutgoingRabbitMQMetadata
.builder()
.withRoutingKey(ShardingKey.asString(tenantId, vScheduleId))
.withDeliveryMode(2)
.build()
);

}).onFailure().invoke((throwable) ->
log.infof("No active vector index for scheduleId %s", scheduleId)
).replaceWithVoid();
var ingestionIndexWriterPayload = IngestionIndexWriterPayload.builder()
.ingestionPayload(ingestionPayload)
.build();

emitter.send(Message.of(ingestionIndexWriterPayload, metadata));

}
case DISABLED -> log.infof(
"No active vector index for scheduleId %s", scheduleId);
}
})
.onFailure()
.invoke(throwable -> log.errorf(
throwable, "Cannot send payload to VectorPipeline for scheduleId %s", scheduleId)
)
.replaceWithVoid();

}

private record SendRequest(ShardingKey shardingKey, byte[] payload) {}

private enum Status {
ENABLED,
DISABLED
}
}

0 comments on commit 1a4e798

Please sign in to comment.