Skip to content

Commit

Permalink
Offload multi run upload processing to message queue: Fixes Hyperfoil…
Browse files Browse the repository at this point in the history
  • Loading branch information
johnaohara committed Jul 3, 2024
1 parent 855387a commit 725e1af
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ String setRoles(Iterable<String> roles) {
return setRoles(String.join(",", roles));
}

String setRoles(String roles) {
public String setRoles(String roles) {
Query setRoles = em.createNativeQuery(SET_ROLES);
setRoles.setParameter(1, roles == null ? "" : roles);
Object[] row = (Object[]) setRoles.getSingleResult();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.hyperfoil.tools.horreum.server;

import io.vertx.ext.web.handler.StaticHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import java.util.regex.Pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,10 @@ void startRecalculation(int testId, boolean notify, boolean debug, boolean clear
recalculation.clearDatapoints = clearDatapoints;

try {
log.debugf("Updating fingerprints for test %d", testId);
//update fingerprints before starting recalculation
//TODO: check if we need to update fingerprints for all tests
mediator.updateFingerprints(testId);
log.debugf("About to recalculate datapoints in test %d between %s and %s", testId, from, to);
//TODO:: determine if we should clear datapoints
recalculation.datasets = getDatasetsForRecalculation(testId, from, to, clearDatapoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ private void createFingerprint(int datasetId, int testId) {
}

@Transactional
@WithRoles( extras = Roles.HORREUM_SYSTEM)
void updateFingerprints(int testId) {
for(var dataset : DatasetDAO.<DatasetDAO>find("testid", testId).list()) {
FingerprintDAO.deleteById(dataset.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType;
import io.hyperfoil.tools.horreum.hibernate.JsonbSetType;
import io.hyperfoil.tools.horreum.mapper.DatasetMapper;
import io.hyperfoil.tools.horreum.server.RoleManager;
import jakarta.annotation.security.PermitAll;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -116,6 +117,9 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
@Inject
SecurityIdentity identity;

@Inject
RoleManager roleManager;

@Inject
TransactionManager tm;

Expand Down Expand Up @@ -570,17 +574,40 @@ Response addRunFromData(String start, String stop, String test,
DatastoreResponse response = datastore.handleRun(data, metadata, testEntity.backendConfig, Optional.ofNullable(schemaUri), mapper);

List<Integer> runIds = new ArrayList<>();
if (datastore.uploadType() == Datastore.UploadType.MUILTI && response.payload instanceof ArrayNode){
if (datastore.uploadType() == Datastore.UploadType.MUILTI && response.payload instanceof ArrayNode){
//need to offload to an async process - this might take a LOOONG time
response.payload.forEach(jsonNode -> {
runIds.add(getPersistRun(start, stop, test, owner, access, token, schemaUri, description, null, jsonNode, testEntity));
mediator.queueRunUpload(start, stop, test, owner, access, token, schemaUri, description, null, jsonNode, testEntity);
});
} else {
runIds.add(getPersistRun(start, stop, test, owner, access, token, schemaUri, description, metadata, response.payload, testEntity));
}
// return Response.status(Response.Status.OK).entity(String.valueOf(runId)).header(HttpHeaders.LOCATION, "/run/" + runId).build();
String reponseString = String.valueOf(runIds.stream().map(val -> Integer.toString(val)).collect(Collectors.joining(", ")));
return Response.status(Response.Status.OK).entity(reponseString).build();
}
@Transactional
public void persistRun(ServiceMediator.RunUpload runUpload) {
runUpload.roles.add("horreum.system");
roleManager.setRoles(runUpload.roles.stream().collect(Collectors.joining(",")));
TestDAO testEntity = TestDAO.findById(runUpload.testId);
if ( testEntity == null ){
log.errorf("Could not find Test (%d) for Run Upload", runUpload.testId);
return;
}
try {
Integer runID = getPersistRun(runUpload.start, runUpload.stop, runUpload.test,
runUpload.owner, runUpload.access, runUpload.token, runUpload.schemaUri,
runUpload.description, runUpload.metaData, runUpload.payload, testEntity);

if (runID == null) {
log.errorf("Could not persist Run for Test: %d", testEntity.name);
}
} catch (ServiceException serviceException){
log.errorf("Could not persist Run for Test: %d", testEntity.name, serviceException);

}
}


private Integer getPersistRun(String start, String stop, String test, String owner, Access access, String token, String schemaUri, String description, JsonNode metadata, JsonNode data, TestDAO testEntity) {
Object foundStart = findIfNotSet(start, data);
Expand Down Expand Up @@ -646,6 +673,7 @@ private Object findIfNotSet(String value, JsonNode data) {



@WithRoles(extras = Roles.HORREUM_SYSTEM)
private Integer addAuthenticated(RunDAO run, TestDAO test) {
// Id will be always generated anew
run.id = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.hyperfoil.tools.horreum.svc;

import com.fasterxml.jackson.databind.JsonNode;
import io.hyperfoil.tools.horreum.api.alerting.Change;
import io.hyperfoil.tools.horreum.api.alerting.DataPoint;
import io.hyperfoil.tools.horreum.api.data.Action;
import io.hyperfoil.tools.horreum.api.data.Dataset;
import io.hyperfoil.tools.horreum.api.data.Run;
import io.hyperfoil.tools.horreum.api.data.Test;
import io.hyperfoil.tools.horreum.api.data.TestExport;
import io.hyperfoil.tools.horreum.api.data.*;
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.TestDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.hyperfoil.tools.horreum.server.RoleManager;
import io.hyperfoil.tools.horreum.server.WithRoles;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -26,6 +27,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -72,6 +74,10 @@ public class ServiceMediator {
@Inject
private SchemaServiceImpl schemaService;

@Inject
SecurityIdentity identity;


@Inject
@ConfigProperty(name = "horreum.test-mode", defaultValue = "false")
private Boolean testMode;
Expand All @@ -88,6 +94,10 @@ public class ServiceMediator {
@Channel("schema-sync-out")
Emitter<Integer> schemaEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("run-upload-out")
Emitter<RunUpload> runUploadEmitter;

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

public ServiceMediator() {
Expand Down Expand Up @@ -119,6 +129,7 @@ void deleteTest(int testId) {
}

@Transactional
@WithRoles(extras = Roles.HORREUM_SYSTEM)
void newRun(Run run) {
actionService.onNewRun(run);
alertingService.removeExpected(run);
Expand Down Expand Up @@ -150,6 +161,7 @@ void newChange(Change.Event event) {
@Incoming("dataset-event-in")
@Blocking(ordered = false, value = "horreum.dataset.pool")
@ActivateRequestContext
@WithRoles(extras = Roles.HORREUM_SYSTEM)
public void processDatasetEvents(Dataset.EventNew newEvent) {
datasetService.onNewDatasetNoLock(newEvent);
validateDataset(newEvent.datasetId);
Expand Down Expand Up @@ -178,11 +190,27 @@ public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Incoming("run-upload-in")
@Blocking(ordered = false, value = "horreum.run.pool")
@ActivateRequestContext
public void processRunUpload(RunUpload runUpload) {
log.debugf("Run Upload: %d", runUpload.testId);
runService.persistRun(runUpload);

}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
public void queueRunUpload(String start, String stop, String test, String owner, Access access, String token, String schemaUri, String description, JsonNode metadata, JsonNode jsonNode, TestDAO testEntity) {
RunUpload upload = new RunUpload(start, stop, test, owner, access, token, schemaUri, description, metadata, jsonNode, testEntity.id, identity.getRoles());
runUploadEmitter.send(upload);
}


void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -267,4 +295,40 @@ public <T> BlockingQueue<T> getEventQueue(AsyncEventChannels channel, Integer id
}
}

static class RunUpload {
public String start;
public String stop;
public String test;
public String owner;
public Access access;
public String token;
public String schemaUri;
public String description;
public JsonNode metaData;
public JsonNode payload;
public Integer testId;
public Set<String> roles;

public RunUpload() {
}

public RunUpload(String start, String stop, String test, String owner,
Access access, String token, String schemaUri, String description,
JsonNode metaData, JsonNode payload, Integer testId,
Set<String> roles) {
this.start = start;
this.stop = stop;
this.test = test;
this.owner = owner;
this.access = access;
this.token = token;
this.schemaUri = schemaUri;
this.description = description;
this.metaData = metaData;
this.payload = payload;
this.testId = testId;
this.roles = roles;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void exportSubscriptions(TestExport test) {
void importSubscriptions(TestExport test) {
WatchDAO watch = WatchMapper.to(test.subscriptions);
watch.test = em.getReference(TestDAO.class, test.id);
if(watch.id != null && WatchDAO.findById(watch.id) == null) {
if( watch.id != null && WatchDAO.findById(watch.id) == null) {
watch.id = null;
watch.persist();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,8 @@ public void importTest(ObjectNode node) {
//first check if datastore already exists
boolean exists = DatastoreConfigDAO.findById(newTest.datastore.id) != null;
DatastoreConfigDAO datastore = DatasourceMapper.to(newTest.datastore);
if ( !exists)
datastore.id = null;
datastore.persist();
if(!exists) {
newTest.datastore.id = datastore.id;
Expand Down
22 changes: 22 additions & 0 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync
# schema-sync incoming
mp.messaging.incoming.run-upload-in.connector=smallrye-amqp
mp.messaging.incoming.run-upload-in.address=run-upload
mp.messaging.incoming.run-upload-in.durable=true
mp.messaging.incoming.run-upload-in.container-id=horreum-broker
mp.messaging.incoming.run-upload-in.link-name=run-upload
# schema-sync outgoing
mp.messaging.outgoing.run-upload-out.connector=smallrye-amqp
mp.messaging.outgoing.run-upload-out.address=run-upload
mp.messaging.outgoing.run-upload-out.durable=true
mp.messaging.outgoing.run-upload-out.container-id=horreum-broker
mp.messaging.outgoing.run-upload-out.link-name=run-upload

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down Expand Up @@ -226,3 +238,13 @@ quarkus.datasource.devservices.enabled=false
quarkus.datasource.migration.devservices.enabled=false
quarkus.keycloak.devservices.enabled=false
quarkus.elasticsearch.devservices.enabled=false

## Add a dummy administrator in dev mode with name "user" and password "secret" with Basic HTTP authentication
%dev.quarkus.http.auth.basic=true
%dev.quarkus.security.users.embedded.enabled=true
%dev.quarkus.security.users.embedded.plain-text=true
%dev.quarkus.security.users.embedded.users.user=secret
%dev.quarkus.security.users.embedded.roles.user=admin

quarkus.transaction-manager.default-transaction-timeout=1h
quarkus.arc.fail-on-intercepted-private-method=false

0 comments on commit 725e1af

Please sign in to comment.