Skip to content

Commit

Permalink
Offload multi run upload processing to message queue: Fixes Hyperfoil…
Browse files Browse the repository at this point in the history
  • Loading branch information
johnaohara committed Jul 24, 2024
1 parent b809cbe commit 43de3c1
Show file tree
Hide file tree
Showing 19 changed files with 284 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public DatastoreResponse handleRun(JsonNode payload,
request = new Request(
"GET",
"/" + apiRequest.index + "/_search");
request.addParameter("size", "1000");
request.setJsonEntity(mapper.writeValueAsString(apiRequest.query));
finalString = extracted(restClient, request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ String setRoles(Iterable<String> roles) {
return setRoles(String.join(",", roles));
}

String setRoles(String roles) {
public String setRoles(String roles) {
if (roles == null || roles.isEmpty() || Roles.HORREUM_SYSTEM.equals(roles)) {
return "";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.hyperfoil.tools.horreum.server;

import io.vertx.ext.web.handler.StaticHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import java.util.regex.Pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,10 @@ void startRecalculation(int testId, boolean notify, boolean debug, boolean clear
recalculation.clearDatapoints = clearDatapoints;

try {
log.debugf("Updating fingerprints for test %d", testId);
//update fingerprints before starting recalculation
//TODO: check if we need to update fingerprints for all tests
mediator.updateFingerprints(testId);
log.debugf("About to recalculate datapoints in test %d between %s and %s", testId, from, to);
//TODO:: determine if we should clear datapoints
recalculation.datasets = getDatasetsForRecalculation(testId, from, to, clearDatapoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ private void createFingerprint(int datasetId, int testId) {
}

@Transactional
@WithRoles( extras = Roles.HORREUM_SYSTEM)
void updateFingerprints(int testId) {
for(var dataset : DatasetDAO.<DatasetDAO>find("testid", testId).list()) {
FingerprintDAO.deleteById(dataset.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.hyperfoil.tools.horreum.hibernate.JsonBinaryType;
import io.hyperfoil.tools.horreum.hibernate.JsonbSetType;
import io.hyperfoil.tools.horreum.mapper.DatasetMapper;
import io.hyperfoil.tools.horreum.server.RoleManager;
import jakarta.annotation.security.PermitAll;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -116,6 +117,9 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
@Inject
SecurityIdentity identity;

@Inject
RoleManager roleManager;

@Inject
TransactionManager tm;

Expand Down Expand Up @@ -570,17 +574,53 @@ Response addRunFromData(String start, String stop, String test,
DatastoreResponse response = datastore.handleRun(data, metadata, testEntity.backendConfig, Optional.ofNullable(schemaUri), mapper);

List<Integer> runIds = new ArrayList<>();
if (datastore.uploadType() == Datastore.UploadType.MUILTI && response.payload instanceof ArrayNode){
response.payload.forEach(jsonNode -> {
runIds.add(getPersistRun(start, stop, test, owner, access, token, schemaUri, description, null, jsonNode, testEntity));
});
String responseString;
if (datastore.uploadType() == Datastore.UploadType.MUILTI
&& response.payload instanceof ArrayNode ){

//if we return more than 10 results, offload to async queue to process - this might take a LOOONG time
if ( response.payload.size() > 10 ) {
response.payload.forEach(jsonNode -> {
mediator.queueRunUpload(start, stop, test, owner, access, token, schemaUri, description, null, jsonNode, testEntity);
});
} else { //process synchronously
response.payload.forEach(jsonNode -> {
runIds.add(getPersistRun(start, stop, test, owner, access, token, schemaUri, description, metadata, response.payload, testEntity));
});
}
} else {
runIds.add(getPersistRun(start, stop, test, owner, access, token, schemaUri, description, metadata, response.payload, testEntity));
}
// return Response.status(Response.Status.OK).entity(String.valueOf(runId)).header(HttpHeaders.LOCATION, "/run/" + runId).build();
String reponseString = String.valueOf(runIds.stream().map(val -> Integer.toString(val)).collect(Collectors.joining(", ")));
return Response.status(Response.Status.OK).entity(reponseString).build();

responseString = runIds.size() > 0 ?
String.valueOf(runIds.stream().map(val -> Integer.toString(val)).collect(Collectors.joining(", ")))
: "More than 10 runs uploaded, processing asynchronously";

return Response.status(Response.Status.OK).entity(responseString).build();
}
@Transactional
public void persistRun(ServiceMediator.RunUpload runUpload) {
runUpload.roles.add("horreum.system");
roleManager.setRoles(runUpload.roles.stream().collect(Collectors.joining(",")));
TestDAO testEntity = TestDAO.findById(runUpload.testId);
if ( testEntity == null ){
log.errorf("Could not find Test (%d) for Run Upload", runUpload.testId);
return;
}
try {
Integer runID = getPersistRun(runUpload.start, runUpload.stop, runUpload.test,
runUpload.owner, runUpload.access, runUpload.token, runUpload.schemaUri,
runUpload.description, runUpload.metaData, runUpload.payload, testEntity);

if (runID == null) {
log.errorf("Could not persist Run for Test: %d", testEntity.name);
}
} catch (ServiceException serviceException){
log.errorf("Could not persist Run for Test: %d", testEntity.name, serviceException);

}
}


private Integer getPersistRun(String start, String stop, String test, String owner, Access access, String token, String schemaUri, String description, JsonNode metadata, JsonNode data, TestDAO testEntity) {
Object foundStart = findIfNotSet(start, data);
Expand Down Expand Up @@ -646,6 +686,7 @@ private Object findIfNotSet(String value, JsonNode data) {



@WithRoles(extras = Roles.HORREUM_SYSTEM)
private Integer addAuthenticated(RunDAO run, TestDAO test) {
// Id will be always generated anew
run.id = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.hyperfoil.tools.horreum.svc;

import com.fasterxml.jackson.databind.JsonNode;
import io.hyperfoil.tools.horreum.api.alerting.Change;
import io.hyperfoil.tools.horreum.api.alerting.DataPoint;
import io.hyperfoil.tools.horreum.api.data.Action;
import io.hyperfoil.tools.horreum.api.data.Dataset;
import io.hyperfoil.tools.horreum.api.data.Run;
import io.hyperfoil.tools.horreum.api.data.Test;
import io.hyperfoil.tools.horreum.api.data.TestExport;
import io.hyperfoil.tools.horreum.api.data.*;
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.TestDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.hyperfoil.tools.horreum.server.RoleManager;
import io.hyperfoil.tools.horreum.server.WithRoles;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -26,6 +27,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -72,6 +74,10 @@ public class ServiceMediator {
@Inject
private SchemaServiceImpl schemaService;

@Inject
SecurityIdentity identity;


@Inject
@ConfigProperty(name = "horreum.test-mode", defaultValue = "false")
private Boolean testMode;
Expand All @@ -88,6 +94,10 @@ public class ServiceMediator {
@Channel("schema-sync-out")
Emitter<Integer> schemaEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("run-upload-out")
Emitter<RunUpload> runUploadEmitter;

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

public ServiceMediator() {
Expand Down Expand Up @@ -119,6 +129,7 @@ void deleteTest(int testId) {
}

@Transactional
@WithRoles(extras = Roles.HORREUM_SYSTEM)
void newRun(Run run) {
actionService.onNewRun(run);
alertingService.removeExpected(run);
Expand Down Expand Up @@ -150,6 +161,7 @@ void newChange(Change.Event event) {
@Incoming("dataset-event-in")
@Blocking(ordered = false, value = "horreum.dataset.pool")
@ActivateRequestContext
@WithRoles(extras = Roles.HORREUM_SYSTEM)
public void processDatasetEvents(Dataset.EventNew newEvent) {
datasetService.onNewDatasetNoLock(newEvent);
validateDataset(newEvent.datasetId);
Expand Down Expand Up @@ -178,11 +190,27 @@ public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Incoming("run-upload-in")
@Blocking(ordered = false, value = "horreum.run.pool")
@ActivateRequestContext
public void processRunUpload(RunUpload runUpload) {
log.debugf("Run Upload: %d", runUpload.testId);
runService.persistRun(runUpload);

}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
public void queueRunUpload(String start, String stop, String test, String owner, Access access, String token, String schemaUri, String description, JsonNode metadata, JsonNode jsonNode, TestDAO testEntity) {
RunUpload upload = new RunUpload(start, stop, test, owner, access, token, schemaUri, description, metadata, jsonNode, testEntity.id, identity.getRoles());
runUploadEmitter.send(upload);
}


void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -267,4 +295,40 @@ public <T> BlockingQueue<T> getEventQueue(AsyncEventChannels channel, Integer id
}
}

static class RunUpload {
public String start;
public String stop;
public String test;
public String owner;
public Access access;
public String token;
public String schemaUri;
public String description;
public JsonNode metaData;
public JsonNode payload;
public Integer testId;
public Set<String> roles;

public RunUpload() {
}

public RunUpload(String start, String stop, String test, String owner,
Access access, String token, String schemaUri, String description,
JsonNode metaData, JsonNode payload, Integer testId,
Set<String> roles) {
this.start = start;
this.stop = stop;
this.test = test;
this.owner = owner;
this.access = access;
this.token = token;
this.schemaUri = schemaUri;
this.description = description;
this.metaData = metaData;
this.payload = payload;
this.testId = testId;
this.roles = roles;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void exportSubscriptions(TestExport test) {
void importSubscriptions(TestExport test) {
WatchDAO watch = WatchMapper.to(test.subscriptions);
watch.test = em.getReference(TestDAO.class, test.id);
if(watch.id != null && WatchDAO.findById(watch.id) == null) {
if( watch.id != null && WatchDAO.findById(watch.id) == null) {
watch.id = null;
watch.persist();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,8 @@ public void importTest(ObjectNode node) {
//first check if datastore already exists
boolean exists = DatastoreConfigDAO.findById(newTest.datastore.id) != null;
DatastoreConfigDAO datastore = DatasourceMapper.to(newTest.datastore);
if ( !exists)
datastore.id = null;
datastore.persist();
if(!exists) {
newTest.datastore.id = datastore.id;
Expand Down
22 changes: 22 additions & 0 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync
# schema-sync incoming
mp.messaging.incoming.run-upload-in.connector=smallrye-amqp
mp.messaging.incoming.run-upload-in.address=run-upload
mp.messaging.incoming.run-upload-in.durable=true
mp.messaging.incoming.run-upload-in.container-id=horreum-broker
mp.messaging.incoming.run-upload-in.link-name=run-upload
# schema-sync outgoing
mp.messaging.outgoing.run-upload-out.connector=smallrye-amqp
mp.messaging.outgoing.run-upload-out.address=run-upload
mp.messaging.outgoing.run-upload-out.durable=true
mp.messaging.outgoing.run-upload-out.container-id=horreum-broker
mp.messaging.outgoing.run-upload-out.link-name=run-upload

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down Expand Up @@ -226,3 +238,13 @@ quarkus.datasource.devservices.enabled=false
quarkus.datasource.migration.devservices.enabled=false
quarkus.keycloak.devservices.enabled=false
quarkus.elasticsearch.devservices.enabled=false

## Add a dummy administrator in dev mode with name "user" and password "secret" with Basic HTTP authentication
%dev.quarkus.http.auth.basic=true
%dev.quarkus.security.users.embedded.enabled=true
%dev.quarkus.security.users.embedded.plain-text=true
%dev.quarkus.security.users.embedded.users.user=secret
%dev.quarkus.security.users.embedded.roles.user=admin

quarkus.transaction-manager.default-transaction-timeout=1h
quarkus.arc.fail-on-intercepted-private-method=false
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,31 @@ public void multidocPayload(TestInfo info) throws InterruptedException {

}

@org.junit.jupiter.api.Test
public void largeMultidocPayload(TestInfo info) throws InterruptedException {
TestConfig testConfig = createNewTestAndDatastores(info);

String payload = """
{
"index": "large",
"type": "SEARCH",
"query": {
"query": {
"match_all" : {}
}
}
}
""";

String runResponse = uploadRun(payload, testConfig.test.name, testConfig.schema.uri);

assertNotNull(runResponse);
Assert.assertEquals("More than 10 runs uploaded, processing asynchronously", runResponse);


}


@org.junit.jupiter.api.Test
public void multiQueryPayload(TestInfo info) throws InterruptedException {
TestConfig testConfig = createNewTestAndDatastores(info);
Expand Down Expand Up @@ -237,6 +262,19 @@ public void configureElasticDatasets(){
uploadDoc("tfb", "uid", "data/experiment-ds3.json");
uploadDoc("tfb", "uid", "data/config-quickstart.jvm.json");

uploadDoc("large", "uid", "data/experiment-ds1.json");
uploadDoc("large", "uid", "data/experiment-ds2.json");
uploadDoc("large", "uid", "data/experiment-ds3.json");
uploadDoc("large", "uid", "data/experiment-ds4.json");
uploadDoc("large", "uid", "data/experiment-ds5.json");
uploadDoc("large", "uid", "data/experiment-ds6.json");
uploadDoc("large", "uid", "data/experiment-ds7.json");
uploadDoc("large", "uid", "data/experiment-ds8.json");
uploadDoc("large", "uid", "data/experiment-ds9.json");
uploadDoc("large", "uid", "data/experiment-ds10.json");
uploadDoc("large", "uid", "data/experiment-ds11.json");


//nasty hack; sleep for 10 seconds to "ensure" that the uploaded test data is indexed by ES
try {
Thread.currentThread().sleep(10_000);
Expand Down
12 changes: 12 additions & 0 deletions horreum-backend/src/test/resources/data/experiment-ds10.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"uid": "9a170eaf-d072-40f3-8fdd-eaa7f1266d70",
"start": "2021-07-14T04:17:48Z",
"stop": "2021-07-14T04:25:59Z",
"job": "Dummy",
"build-id": 10,
"build-timestamp": "2021-07-14T14:47:10Z",
"data": {
"cpu": 49.1,
"throughput": 12300
}
}
12 changes: 12 additions & 0 deletions horreum-backend/src/test/resources/data/experiment-ds11.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"uid": "86e69bdb-9aeb-4399-9c27-d8ecc87bf460",
"start": "2021-07-14T04:17:48Z",
"stop": "2021-07-14T04:25:59Z",
"job": "Dummy",
"build-id": 11,
"build-timestamp": "2021-07-14T14:47:10Z",
"data": {
"cpu": 49.1,
"throughput": 12300
}
}
Loading

0 comments on commit 43de3c1

Please sign in to comment.