From 0d51f89e9de6c782fdf36a0e799253020719df6d Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 29 Feb 2024 16:59:52 -0500 Subject: [PATCH] feat(graphql): add top-level query for targetNodes (#307) * feat(graphql): add top-level query for targetNodes * return targets with distinct JVM IDs, implement archived recordings query * implement active recordings query * refactor enum registration * implement subqueries (actually nested mutations) for doStartRecording and doSnapshot * add mbeanMetrics query * messaging server uses shared ObjectMapper * add Jackson serialization customizer so Map is encoded the same way GraphQL extension does --- .../cryostat/ObjectMapperCustomization.java | 83 ++++ .../java/io/cryostat/graphql/RootNode.java | 6 +- .../java/io/cryostat/graphql/TargetNodes.java | 363 ++++++++++++++++++ .../cryostat/recordings/RecordingHelper.java | 270 ++++++++----- .../io/cryostat/recordings/Recordings.java | 108 ++---- .../java/io/cryostat/rules/RuleService.java | 73 ++-- .../java/io/cryostat/ws/MessagingServer.java | 2 +- src/main/resources/application.properties | 1 + src/test/java/itest/CustomTargetsTest.java | 12 +- 9 files changed, 697 insertions(+), 221 deletions(-) create mode 100644 src/main/java/io/cryostat/ObjectMapperCustomization.java create mode 100644 src/main/java/io/cryostat/graphql/TargetNodes.java diff --git a/src/main/java/io/cryostat/ObjectMapperCustomization.java b/src/main/java/io/cryostat/ObjectMapperCustomization.java new file mode 100644 index 000000000..7284b0282 --- /dev/null +++ b/src/main/java/io/cryostat/ObjectMapperCustomization.java @@ -0,0 +1,83 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat; + +import java.io.IOException; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; +import com.fasterxml.jackson.databind.type.MapType; +import io.quarkus.jackson.ObjectMapperCustomizer; +import jakarta.inject.Singleton; + +@Singleton +public class ObjectMapperCustomization implements ObjectMapperCustomizer { + + @Override + public void customize(ObjectMapper objectMapper) { + // FIXME get this version information from the maven build somehow + SimpleModule mapModule = + new SimpleModule( + "MapSerialization", new Version(3, 0, 0, null, "io.cryostat", "cryostat")); + + mapModule.setSerializerModifier(new MapSerializerModifier()); + + objectMapper.registerModule(mapModule); + } + + static class MapSerializerModifier extends BeanSerializerModifier { + @Override + public JsonSerializer modifyMapSerializer( + SerializationConfig config, + MapType valueType, + BeanDescription beanDesc, + JsonSerializer serializer) { + if (valueType.getKeyType().getRawClass().equals(String.class) + && valueType.getContentType().getRawClass().equals(String.class)) { + return new MapSerializer(); + } + return serializer; + } + } + + static class MapSerializer extends JsonSerializer> { + + @Override + public void serialize(Map map, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeStartArray(); + + for (var entry : map.entrySet()) { + gen.writeStartObject(); + + gen.writePOJOField("key", entry.getKey()); + gen.writePOJOField("value", entry.getValue()); + + gen.writeEndObject(); + } + + gen.writeEndArray(); + } + } +} diff --git a/src/main/java/io/cryostat/graphql/RootNode.java b/src/main/java/io/cryostat/graphql/RootNode.java index c8b687d5c..d74f77c27 100644 --- a/src/main/java/io/cryostat/graphql/RootNode.java +++ b/src/main/java/io/cryostat/graphql/RootNode.java @@ -24,6 +24,7 @@ import io.cryostat.discovery.DiscoveryNode; import io.cryostat.graphql.matchers.LabelSelectorMatcher; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.graphql.api.Nullable; import org.eclipse.microprofile.graphql.Description; import org.eclipse.microprofile.graphql.GraphQLApi; @@ -43,7 +44,7 @@ public DiscoveryNode getRootNode() { "Get target nodes that are descendants of this node. That is, get the set of leaf nodes" + " from anywhere below this node's subtree.") public List descendantTargets( - @Source DiscoveryNode discoveryNode, DescendantTargetsFilterInput filter) { + @Source DiscoveryNode discoveryNode, DiscoveryNodeFilter filter) { // TODO do this filtering at the database query level as much as possible. As is, this will // load the entire discovery tree out of the database, then perform the filtering at the // application level. @@ -64,7 +65,8 @@ private Set recurseChildren( return result; } - public static class DescendantTargetsFilterInput implements Predicate { + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class DiscoveryNodeFilter implements Predicate { public @Nullable Long id; public @Nullable String name; public @Nullable List names; diff --git a/src/main/java/io/cryostat/graphql/TargetNodes.java b/src/main/java/io/cryostat/graphql/TargetNodes.java new file mode 100644 index 000000000..5ea0e203a --- /dev/null +++ b/src/main/java/io/cryostat/graphql/TargetNodes.java @@ -0,0 +1,363 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.graphql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Predicate; + +import org.openjdk.jmc.common.unit.QuantityConversionException; + +import io.cryostat.core.net.JFRConnection; +import io.cryostat.core.net.MBeanMetrics; +import io.cryostat.core.templates.Template; +import io.cryostat.core.templates.TemplateType; +import io.cryostat.discovery.DiscoveryNode; +import io.cryostat.graphql.RootNode.DiscoveryNodeFilter; +import io.cryostat.graphql.matchers.LabelSelectorMatcher; +import io.cryostat.recordings.ActiveRecording; +import io.cryostat.recordings.RecordingHelper; +import io.cryostat.recordings.RecordingHelper.RecordingOptions; +import io.cryostat.recordings.RecordingHelper.RecordingReplace; +import io.cryostat.recordings.Recordings.ArchivedRecording; +import io.cryostat.recordings.Recordings.Metadata; +import io.cryostat.targets.Target; +import io.cryostat.targets.TargetConnectionManager; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import graphql.schema.DataFetchingEnvironment; +import graphql.schema.GraphQLEnumType; +import graphql.schema.GraphQLEnumValueDefinition; +import graphql.schema.GraphQLSchema; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.graphql.api.Context; +import io.smallrye.graphql.api.Nullable; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; +import jdk.jfr.RecordingState; +import org.eclipse.microprofile.graphql.Description; +import org.eclipse.microprofile.graphql.GraphQLApi; +import org.eclipse.microprofile.graphql.NonNull; +import org.eclipse.microprofile.graphql.Query; +import org.eclipse.microprofile.graphql.Source; + +@GraphQLApi +public class TargetNodes { + + @Inject RecordingHelper recordingHelper; + @Inject TargetConnectionManager connectionManager; + + public GraphQLSchema.Builder registerRecordingStateEnum( + @Observes GraphQLSchema.Builder builder) { + return createEnumType( + builder, RecordingState.class, "Running state of an active Flight Recording"); + } + + private static GraphQLSchema.Builder createEnumType( + GraphQLSchema.Builder builder, Class> klazz, String description) { + return builder.additionalType( + GraphQLEnumType.newEnum() + .name(klazz.getSimpleName()) + .description(description) + .values( + Arrays.asList(klazz.getEnumConstants()).stream() + .map( + s -> + new GraphQLEnumValueDefinition.Builder() + .name(s.name()) + .value(s) + .description(s.name()) + .build()) + .toList()) + .build()); + } + + @Blocking + @Query("targetNodes") + @Description("Get the Target discovery nodes, i.e. the leaf nodes of the discovery tree") + public List getTargetNodes(DiscoveryNodeFilter filter) { + // TODO do this filtering at the database query level as much as possible. As is, this will + // load the entire discovery tree out of the database, then perform the filtering at the + // application level. + return Target.findAll().stream() + .filter(distinctWith(t -> t.jvmId)) + .map(t -> t.discoveryNode) + .filter(n -> filter == null ? true : filter.test(n)) + .toList(); + } + + private static Predicate distinctWith(Function fn) { + Set observed = ConcurrentHashMap.newKeySet(); + return t -> observed.add(fn.apply(t)); + } + + @Blocking + @Description("Get the active and archived recordings belonging to this target") + public Recordings recordings(@Source Target target, Context context) { + var dfe = context.unwrap(DataFetchingEnvironment.class); + var requestedFields = + dfe.getSelectionSet().getFields().stream().map(field -> field.getName()).toList(); + + var recordings = new Recordings(); + + if (requestedFields.contains("active")) { + recordings.active = new ActiveRecordings(); + recordings.active.data = target.activeRecordings; + recordings.active.aggregate = new AggregateInfo(); + recordings.active.aggregate.count = recordings.active.data.size(); + recordings.active.aggregate.size = 0; + } + + if (requestedFields.contains("archived")) { + recordings.archived = new ArchivedRecordings(); + recordings.archived.data = recordingHelper.listArchivedRecordings(target); + recordings.archived.aggregate = new AggregateInfo(); + recordings.archived.aggregate.count = recordings.archived.data.size(); + recordings.archived.aggregate.size = + recordings.archived.data.stream().mapToLong(ArchivedRecording::size).sum(); + } + + return recordings; + } + + public ActiveRecordings active(@Source Recordings recordings, ActiveRecordingsFilter filter) { + var out = new ActiveRecordings(); + out.data = new ArrayList<>(); + out.aggregate = new AggregateInfo(); + + var in = recordings.active; + if (in != null && in.data != null) { + out.data = + in.data.stream().filter(r -> filter == null ? true : filter.test(r)).toList(); + out.aggregate.size = 0; + out.aggregate.count = out.data.size(); + } + + return out; + } + + public ArchivedRecordings archived( + @Source Recordings recordings, ArchivedRecordingsFilter filter) { + var out = new ArchivedRecordings(); + out.data = new ArrayList<>(); + out.aggregate = new AggregateInfo(); + + var in = recordings.archived; + if (in != null && in.data != null) { + out.data = + in.data.stream().filter(r -> filter == null ? true : filter.test(r)).toList(); + out.aggregate.size = 0; + out.aggregate.count = out.data.size(); + } + + return out; + } + + @Blocking + @Description("Get live MBean metrics snapshot from the specified Target") + public Uni mbeanMetrics(@Source Target target) { + return connectionManager.executeConnectedTaskUni(target, JFRConnection::getMBeanMetrics); + } + + @Blocking + @Transactional + @Description("Start a new Flight Recording on the specified Target") + public Uni doStartRecording( + @Source Target target, @NonNull RecordingSettings settings) + throws QuantityConversionException { + var fTarget = Target.findById(target.id); + Template template = + recordingHelper.getPreferredTemplate( + fTarget, settings.template, settings.templateType); + return recordingHelper.startRecording( + fTarget, + RecordingReplace.STOPPED, + template, + settings.asOptions(), + settings.metadata.labels()); + } + + @Blocking + @Transactional + @Description("Create a new Flight Recorder Snapshot on the specified Target") + public Uni doSnapshot(@Source Target target) { + var fTarget = Target.findById(target.id); + return recordingHelper.createSnapshot(fTarget); + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class Recordings { + public @NonNull ActiveRecordings active; + public @NonNull ArchivedRecordings archived; + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class ActiveRecordings { + public @NonNull List data; + public @NonNull AggregateInfo aggregate; + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class ArchivedRecordings { + public @NonNull List data; + public @NonNull AggregateInfo aggregate; + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class AggregateInfo { + public @NonNull @Description("The number of elements in this collection") long count; + public @NonNull @Description( + "The sum of sizes of elements in this collection, or 0 if not applicable") long + size; + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class ActiveRecordingsFilter implements Predicate { + public @Nullable String name; + public @Nullable List names; + public @Nullable List labels; + public @Nullable RecordingState state; + public @Nullable Boolean continuous; + public @Nullable Boolean toDisk; + public @Nullable Long durationMsGreaterThanEqual; + public @Nullable Long durationMsLessThanEqual; + public @Nullable Long startTimeMsAfterEqual; + public @Nullable Long startTimeMsBeforeEqual; + + @Override + public boolean test(ActiveRecording r) { + Predicate matchesName = + n -> name == null || Objects.equals(name, n.name); + Predicate matchesNames = n -> names == null || names.contains(n.name); + Predicate matchesLabels = + n -> + labels == null + || labels.stream() + .allMatch( + label -> + LabelSelectorMatcher.parse(label) + .test(n.metadata.labels())); + Predicate matchesState = n -> state == null || n.state.equals(state); + Predicate matchesContinuous = + n -> continuous == null || continuous.equals(n.continuous); + Predicate matchesToDisk = + n -> toDisk == null || toDisk.equals(n.toDisk); + Predicate matchesDurationGte = + n -> + durationMsGreaterThanEqual == null + || durationMsGreaterThanEqual >= n.duration; + Predicate matchesDurationLte = + n -> durationMsLessThanEqual == null || durationMsLessThanEqual <= n.duration; + Predicate matchesStartTimeAfter = + n -> startTimeMsAfterEqual == null || startTimeMsAfterEqual >= n.startTime; + Predicate matchesStartTimeBefore = + n -> startTimeMsBeforeEqual == null || startTimeMsBeforeEqual <= n.startTime; + + return matchesName + .and(matchesNames) + .and(matchesLabels) + .and(matchesState) + .and(matchesContinuous) + .and(matchesToDisk) + .and(matchesDurationGte) + .and(matchesDurationLte) + .and(matchesStartTimeBefore) + .and(matchesStartTimeAfter) + .test(r); + } + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class ArchivedRecordingsFilter implements Predicate { + public @Nullable String name; + public @Nullable List names; + public @Nullable List labels; + public @Nullable Long sizeBytesGreaterThanEqual; + public @Nullable Long sizeBytesLessThanEqual; + public @Nullable Long archivedTimeAfterEqual; + public @Nullable Long archivedTimeBeforeEqual; + + @Override + public boolean test(ArchivedRecording r) { + Predicate matchesName = + n -> name == null || Objects.equals(name, n.name()); + Predicate matchesNames = + n -> names == null || names.contains(n.name()); + Predicate matchesLabels = + n -> + labels == null + || labels.stream() + .allMatch( + label -> + LabelSelectorMatcher.parse(label) + .test(n.metadata().labels())); + Predicate matchesSizeGte = + n -> sizeBytesGreaterThanEqual == null || sizeBytesGreaterThanEqual >= n.size(); + Predicate matchesSizeLte = + n -> sizeBytesLessThanEqual == null || sizeBytesLessThanEqual <= n.size(); + Predicate matchesArchivedTimeGte = + n -> + archivedTimeAfterEqual == null + || archivedTimeAfterEqual >= n.archivedTime(); + Predicate matchesArchivedTimeLte = + n -> + archivedTimeBeforeEqual == null + || archivedTimeBeforeEqual <= n.archivedTime(); + + return matchesName + .and(matchesNames) + .and(matchesLabels) + .and(matchesSizeGte) + .and(matchesSizeLte) + .and(matchesArchivedTimeGte) + .and(matchesArchivedTimeLte) + .test(r); + } + } + + @SuppressFBWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public static class RecordingSettings { + public @NonNull String name; + public @NonNull String template; + public @NonNull TemplateType templateType; + public @Nullable RecordingReplace replace; + public @Nullable Boolean continuous; + public @Nullable Boolean archiveOnStop; + public @Nullable Boolean toDisk; + public @Nullable Long duration; + public @Nullable Long maxSize; + public @Nullable Long maxAge; + public @Nullable Metadata metadata; + + public RecordingOptions asOptions() { + return new RecordingOptions( + name, + Optional.ofNullable(toDisk), + Optional.ofNullable(archiveOnStop), + Optional.ofNullable(duration), + Optional.ofNullable(maxSize), + Optional.ofNullable(maxAge)); + } + } +} diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index 07f219cce..b12a8fc81 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -39,11 +39,13 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.openjdk.jmc.common.unit.IConstrainedMap; +import org.openjdk.jmc.common.unit.QuantityConversionException; import org.openjdk.jmc.flightrecorder.configuration.events.EventOptionID; import org.openjdk.jmc.flightrecorder.configuration.recording.RecordingOptionsBuilder; import org.openjdk.jmc.rjmx.services.jfr.IEventTypeInfo; @@ -193,110 +195,140 @@ void onStart(@Observes StartupEvent evt) { } } - @Blocking - public ActiveRecording startRecording( + public Uni startRecording( Target target, - IConstrainedMap recordingOptions, - Template eventTemplate, - Metadata metadata, - boolean archiveOnStop, RecordingReplace replace, - JFRConnection connection) - throws Exception { - String recordingName = (String) recordingOptions.get(RecordingOptionsBuilder.KEY_NAME); - getDescriptorByName(connection, recordingName) - .ifPresent( - previous -> { - RecordingState previousState = mapState(previous); - boolean restart = - shouldRestartRecording(replace, previousState, recordingName); - if (!restart) { - throw new EntityExistsException("Recording", recordingName); - } - if (!ActiveRecording.deleteFromTarget(target, recordingName)) { - logger.warnf( - "Could not delete recording %s from target %s", - recordingName, target.alias); - } - }); - - IRecordingDescriptor desc = - connection - .getService() - .start(recordingOptions, enableEvents(target, eventTemplate)); - - Map labels = metadata.labels(); - labels.put("template.name", eventTemplate.getName()); - labels.put("template.type", eventTemplate.getType().toString()); - Metadata meta = new Metadata(labels); - - ActiveRecording recording = ActiveRecording.from(target, desc, meta); - recording.persist(); - - target.activeRecordings.add(recording); - target.persist(); - - logger.tracev("Started recording: {0} {1}", target.connectUrl, target.activeRecordings); - - return recording; + Template template, + RecordingOptions options, + Map rawLabels) + throws QuantityConversionException { + return connectionManager.executeConnectedTaskUni( + target, + conn -> { + RecordingOptionsBuilder optionsBuilder = + recordingOptionsBuilderFactory + .create(conn.getService()) + .name(options.name()); + if (options.duration().isPresent()) { + optionsBuilder = + optionsBuilder.duration( + TimeUnit.SECONDS.toMillis(options.duration().get())); + } + if (options.toDisk().isPresent()) { + optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); + } + if (options.maxAge().isPresent()) { + optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); + } + if (options.maxSize().isPresent()) { + optionsBuilder = optionsBuilder.maxSize(options.maxSize().get()); + } + IConstrainedMap recordingOptions = optionsBuilder.build(); + getDescriptorByName(conn, options.name()) + .ifPresent( + previous -> { + RecordingState previousState = mapState(previous); + boolean restart = + shouldRestartRecording( + replace, previousState, options.name()); + if (!restart) { + throw new EntityExistsException( + "Recording", options.name()); + } + if (!ActiveRecording.deleteFromTarget( + target, options.name())) { + logger.warnf( + "Could not delete recording %s from target %s", + options.name(), target.alias); + } + }); + + IRecordingDescriptor desc = + conn.getService() + .start(recordingOptions, enableEvents(target, template)); + + Map labels = new HashMap<>(rawLabels); + labels.put("template.name", template.getName()); + labels.put("template.type", template.getType().toString()); + Metadata meta = new Metadata(labels); + + ActiveRecording recording = ActiveRecording.from(target, desc, meta); + recording.persist(); + + target.activeRecordings.add(recording); + target.persist(); + + logger.tracev( + "Started recording: {0} {1}", + target.connectUrl, target.activeRecordings); + return recording; + }); } @Blocking - public ActiveRecording createSnapshot(Target target, JFRConnection connection) - throws Exception { - IRecordingDescriptor desc = connection.getService().getSnapshotRecording(); - - String rename = String.format("%s-%d", desc.getName().toLowerCase(), desc.getId()); - - RecordingOptionsBuilder recordingOptionsBuilder = - recordingOptionsBuilderFactory.create(connection.getService()); - recordingOptionsBuilder.name(rename); + public Uni createSnapshot(Target target) { + return connectionManager.executeConnectedTaskUni( + target, + connection -> { + IRecordingDescriptor desc = connection.getService().getSnapshotRecording(); - connection.getService().updateRecordingOptions(desc, recordingOptionsBuilder.build()); + String rename = + String.format("%s-%d", desc.getName().toLowerCase(), desc.getId()); - Optional updatedDescriptor = getDescriptorByName(connection, rename); + RecordingOptionsBuilder recordingOptionsBuilder = + recordingOptionsBuilderFactory.create(connection.getService()); + recordingOptionsBuilder.name(rename); - if (updatedDescriptor.isEmpty()) { - throw new IllegalStateException( - "The most recent snapshot of the recording cannot be" - + " found after renaming."); - } + connection + .getService() + .updateRecordingOptions(desc, recordingOptionsBuilder.build()); - desc = updatedDescriptor.get(); + Optional updatedDescriptor = + getDescriptorByName(connection, rename); - try (InputStream snapshot = remoteRecordingStreamFactory.open(connection, target, desc)) { - if (!snapshotIsReadable(target, snapshot)) { - connection.getService().close(desc); - throw new SnapshotCreationException( - "Snapshot was not readable - are there any source recordings?"); - } - } + if (updatedDescriptor.isEmpty()) { + throw new IllegalStateException( + "The most recent snapshot of the recording cannot be" + + " found after renaming."); + } - ActiveRecording recording = - ActiveRecording.from( - target, - desc, - new Metadata( - Map.of( - "jvmId", - target.jvmId, - "connectUrl", - target.connectUrl.toString()))); - recording.persist(); - - target.activeRecordings.add(recording); - target.persist(); + desc = updatedDescriptor.get(); - var event = - new ActiveRecordingEvent( - Recordings.RecordingEventCategory.SNAPSHOT_CREATED, - ActiveRecordingEvent.Payload.of(this, recording)); - bus.publish(event.category().category(), event.payload().recording()); - bus.publish( - MessagingServer.class.getName(), - new Notification(event.category().category(), event.payload())); + try (InputStream snapshot = + remoteRecordingStreamFactory.open(connection, target, desc)) { + if (!snapshotIsReadable(target, snapshot)) { + connection.getService().close(desc); + throw new SnapshotCreationException( + "Snapshot was not readable - are there any source recordings?"); + } + } - return recording; + ActiveRecording recording = + ActiveRecording.from( + target, + desc, + new Metadata( + Map.of( + "jvmId", + target.jvmId, + "connectUrl", + target.connectUrl.toString()))); + recording.persist(); + + target.activeRecordings.add(recording); + target.persist(); + + var event = + new ActiveRecordingEvent( + Recordings.RecordingEventCategory.SNAPSHOT_CREATED, + ActiveRecordingEvent.Payload.of(this, recording)); + bus.publish(event.category().category(), event.payload().recording()); + bus.publish( + MessagingServer.class.getName(), + new Notification(event.category().category(), event.payload())); + + return recording; + }); } @Blocking @@ -402,7 +434,7 @@ private IConstrainedMap enableEvents(Target target, Template even @Blocking public Template getPreferredTemplate( - Target target, String templateName, TemplateType templateType) throws Exception { + Target target, String templateName, TemplateType templateType) { Objects.requireNonNull(target); Objects.requireNonNull(templateName); if (templateName.equals(EventTemplates.ALL_EVENTS_TEMPLATE.getName())) { @@ -496,6 +528,29 @@ public List listArchivedRecordingObjects() { return listArchivedRecordingObjects(null); } + @Blocking + public List listArchivedRecordings() { + return listArchivedRecordingObjects().stream() + .map( + item -> { + String path = item.key().strip(); + String[] parts = path.split("/"); + String jvmId = parts[0]; + String filename = parts[1]; + Metadata metadata = + getArchivedRecordingMetadata(jvmId, filename) + .orElseGet(Metadata::empty); + return new ArchivedRecording( + filename, + downloadUrl(jvmId, filename), + reportUrl(jvmId, filename), + metadata, + item.size(), + item.lastModified().getEpochSecond()); + }) + .toList(); + } + @Blocking public List listArchivedRecordingObjects(String jvmId) { var builder = ListObjectsV2Request.builder().bucket(archiveBucket); @@ -512,6 +567,29 @@ public List listArchivedRecordingObjects(String jvmId) { .toList(); } + @Blocking + public List listArchivedRecordings(Target target) { + return listArchivedRecordingObjects(target.jvmId).stream() + .map( + item -> { + String path = item.key().strip(); + String[] parts = path.split("/"); + String jvmId = parts[0]; + String filename = parts[1]; + Metadata metadata = + getArchivedRecordingMetadata(jvmId, filename) + .orElseGet(Metadata::empty); + return new ArchivedRecording( + filename, + downloadUrl(jvmId, filename), + reportUrl(jvmId, filename), + metadata, + item.size(), + item.lastModified().getEpochSecond()); + }) + .toList(); + } + public String saveRecording(ActiveRecording recording) throws Exception { return saveRecording(recording, null); } @@ -946,6 +1024,14 @@ Optional getRecordingCopyPath( }); } + public record RecordingOptions( + String name, + Optional toDisk, + Optional archiveOnStop, + Optional duration, + Optional maxSize, + Optional maxAge) {} + public enum RecordingReplace { ALWAYS, NEVER, diff --git a/src/main/java/io/cryostat/recordings/Recordings.java b/src/main/java/io/cryostat/recordings/Recordings.java index 0bcea5e3c..23c529370 100644 --- a/src/main/java/io/cryostat/recordings/Recordings.java +++ b/src/main/java/io/cryostat/recordings/Recordings.java @@ -52,6 +52,7 @@ import io.cryostat.core.templates.Template; import io.cryostat.core.templates.TemplateType; import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent; +import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.recordings.RecordingHelper.SnapshotCreationException; import io.cryostat.targets.Target; @@ -156,29 +157,7 @@ void onStart(@Observes StartupEvent evt) { @Path("/api/v1/recordings") @RolesAllowed("read") public List listArchivesV1() { - var result = new ArrayList(); - recordingHelper - .listArchivedRecordingObjects() - .forEach( - item -> { - String path = item.key().strip(); - String[] parts = path.split("/"); - String jvmId = parts[0]; - String filename = parts[1]; - Metadata metadata = - recordingHelper - .getArchivedRecordingMetadata(jvmId, filename) - .orElseGet(Metadata::empty); - result.add( - new ArchivedRecording( - filename, - recordingHelper.downloadUrl(jvmId, filename), - recordingHelper.reportUrl(jvmId, filename), - metadata, - item.size(), - item.lastModified().getEpochSecond())); - }); - return result; + return recordingHelper.listArchivedRecordings(); } @POST @@ -510,9 +489,8 @@ public Response patchV1(@RestPath URI connectUrl, @RestPath String recordingName @RolesAllowed("write") public Uni createSnapshotV1(@RestPath URI connectUrl) throws Exception { Target target = Target.getTargetByConnectUrl(connectUrl); - return connectionManager - .executeConnectedTaskUni( - target, connection -> recordingHelper.createSnapshot(target, connection)) + return recordingHelper + .createSnapshot(target) .onItem() .transform( recording -> @@ -527,9 +505,8 @@ public Uni createSnapshotV1(@RestPath URI connectUrl) throws Exception @RolesAllowed("write") public Uni createSnapshotV2(@RestPath URI connectUrl) throws Exception { Target target = Target.getTargetByConnectUrl(connectUrl); - return connectionManager - .executeConnectedTaskUni( - target, connection -> recordingHelper.createSnapshot(target, connection)) + return recordingHelper + .createSnapshot(target) .onItem() .transform( recording -> @@ -552,9 +529,8 @@ public Uni createSnapshotV2(@RestPath URI connectUrl) throws Exception @RolesAllowed("write") public Uni createSnapshot(@RestPath long id) throws Exception { Target target = Target.find("id", id).singleResult(); - return connectionManager - .executeConnectedTaskUni( - target, connection -> recordingHelper.createSnapshot(target, connection)) + return recordingHelper + .createSnapshot(target) .onItem() .transform( recording -> @@ -598,50 +574,32 @@ public Response createRecording( Template template = recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); + Map labels = new HashMap<>(); + if (rawMetadata.isPresent()) { + labels.putAll(mapper.readValue(rawMetadata.get(), Metadata.class).labels); + } + RecordingReplace replacement = RecordingReplace.NEVER; + if (replace.isPresent()) { + replacement = RecordingReplace.fromString(replace.get()); + } else if (restart.isPresent()) { + replacement = restart.get() ? RecordingReplace.ALWAYS : RecordingReplace.NEVER; + } ActiveRecording recording = - connectionManager.executeConnectedTask( - target, - connection -> { - RecordingOptionsBuilder optionsBuilder = - recordingOptionsBuilderFactory - .create(connection.getService()) - .name(recordingName); - if (duration.isPresent()) { - optionsBuilder.duration(TimeUnit.SECONDS.toMillis(duration.get())); - } - if (toDisk.isPresent()) { - optionsBuilder.toDisk(toDisk.get()); - } - if (maxAge.isPresent()) { - optionsBuilder.maxAge(maxAge.get()); - } - if (maxSize.isPresent()) { - optionsBuilder.maxSize(maxSize.get()); - } - Map labels = new HashMap<>(); - if (rawMetadata.isPresent()) { - labels.putAll( - mapper.readValue(rawMetadata.get(), Metadata.class).labels); - } - RecordingReplace replacement = RecordingReplace.NEVER; - if (replace.isPresent()) { - replacement = RecordingReplace.fromString(replace.get()); - } else if (restart.isPresent()) { - replacement = - restart.get() - ? RecordingReplace.ALWAYS - : RecordingReplace.NEVER; - } - IConstrainedMap recordingOptions = optionsBuilder.build(); - return recordingHelper.startRecording( - target, - recordingOptions, - template, - new Metadata(labels), - archiveOnStop.orElse(false), - replacement, - connection); - }); + recordingHelper + .startRecording( + target, + replacement, + template, + new RecordingOptions( + recordingName, + toDisk, + archiveOnStop, + duration, + maxSize, + maxAge), + labels) + .await() + .atMost(Duration.ofSeconds(10)); if (recording.duration > 0) { scheduler.schedule( diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index bf726e608..e24915ef1 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -15,32 +15,25 @@ */ package io.cryostat.rules; -import java.io.IOException; +import java.time.Duration; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.openjdk.jmc.common.unit.IConstrainedMap; -import org.openjdk.jmc.common.unit.QuantityConversionException; -import org.openjdk.jmc.flightrecorder.configuration.recording.RecordingOptionsBuilder; -import org.openjdk.jmc.rjmx.ConnectionException; -import org.openjdk.jmc.rjmx.ServiceNotAvailableException; - -import io.cryostat.core.net.JFRConnection; import io.cryostat.core.templates.Template; import io.cryostat.core.templates.TemplateType; import io.cryostat.expressions.MatchExpressionEvaluator; import io.cryostat.recordings.ActiveRecording; import io.cryostat.recordings.RecordingHelper; +import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.recordings.RecordingOptionsBuilderFactory; -import io.cryostat.recordings.Recordings.Metadata; import io.cryostat.rules.Rule.RuleEvent; import io.cryostat.targets.Target; import io.cryostat.targets.TargetConnectionManager; @@ -139,30 +132,22 @@ public void handleRuleRecordingCleanup(Rule rule) { @Transactional public void activate(Rule rule, Target target) throws Exception { - ActiveRecording recording = - connectionManager.executeConnectedTask( - target, - connection -> { - var recordingOptions = createRecordingOptions(rule, connection); + var options = createRecordingOptions(rule); - Pair pair = - recordingHelper.parseEventSpecifier(rule.eventSpecifier); - Template template = - recordingHelper.getPreferredTemplate( - target, pair.getKey(), pair.getValue()); + Pair pair = recordingHelper.parseEventSpecifier(rule.eventSpecifier); + Template template = + recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); - Map labels = new HashMap<>(); - labels.put("rule", rule.name); - Metadata meta = new Metadata(labels); - return recordingHelper.startRecording( - target, - recordingOptions, - template, - meta, - false, - RecordingReplace.ALWAYS, - connection); - }); + ActiveRecording recording = + recordingHelper + .startRecording( + target, + RecordingReplace.STOPPED, + template, + options, + Map.of("rule", rule.name)) + .await() + .atMost(Duration.ofSeconds(10)); Target attachedTarget = entityManager.merge(target); var relatedRecordings = ruleRecordingMap.get(rule.id); @@ -173,22 +158,14 @@ public void activate(Rule rule, Target target) throws Exception { } } - private IConstrainedMap createRecordingOptions(Rule rule, JFRConnection connection) - throws ConnectionException, - QuantityConversionException, - IOException, - ServiceNotAvailableException { - RecordingOptionsBuilder optionsBuilder = - recordingOptionsBuilderFactory - .create(connection.getService()) - .name(rule.getRecordingName()); - if (rule.maxAgeSeconds > 0) { - optionsBuilder.maxAge(rule.maxAgeSeconds); - } - if (rule.maxSizeBytes > 0) { - optionsBuilder.maxSize(rule.maxSizeBytes); - } - return optionsBuilder.build(); + private RecordingOptions createRecordingOptions(Rule rule) { + return new RecordingOptions( + rule.getRecordingName(), + Optional.of(true), + Optional.of(true), + Optional.empty(), + Optional.ofNullable((long) rule.maxSizeBytes), + Optional.ofNullable((long) rule.maxAgeSeconds)); } @Transactional diff --git a/src/main/java/io/cryostat/ws/MessagingServer.java b/src/main/java/io/cryostat/ws/MessagingServer.java index 8644dfe99..239ebc215 100644 --- a/src/main/java/io/cryostat/ws/MessagingServer.java +++ b/src/main/java/io/cryostat/ws/MessagingServer.java @@ -47,11 +47,11 @@ public class MessagingServer { private static final String CLIENT_ACTIVITY_CATEGORY = "WsClientActivity"; + @Inject ObjectMapper mapper; @Inject Logger logger; private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final BlockingQueue msgQ; private final Set sessions = new CopyOnWriteArraySet<>(); - private final ObjectMapper mapper = new ObjectMapper(); MessagingServer(@ConfigProperty(name = "cryostat.messaging.queue.size") int capacity) { this.msgQ = new ArrayBlockingQueue<>(capacity); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4d0254252..ecbbd52c5 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -41,6 +41,7 @@ quarkus.smallrye-openapi.info-contact-url=https://cryostat.io quarkus.smallrye-openapi.info-license-name=Apache 2.0 quarkus.smallrye-openapi.info-license-url=https://github.com/cryostatio/cryostat3/blob/main/LICENSE +quarkus.smallrye-graphql.events.enabled=true quarkus.smallrye-graphql.root-path=/api/v3/graphql quarkus.smallrye-graphql.http.get.enabled=true quarkus.smallrye-graphql.print-data-fetcher-exception=true diff --git a/src/test/java/itest/CustomTargetsTest.java b/src/test/java/itest/CustomTargetsTest.java index ccfea9b51..f5865310d 100644 --- a/src/test/java/itest/CustomTargetsTest.java +++ b/src/test/java/itest/CustomTargetsTest.java @@ -16,6 +16,7 @@ package itest; import java.net.UnknownHostException; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -224,16 +225,21 @@ void shouldBeAbleToDefineTarget() MatcherAssert.assertThat(item.getString("jvmId"), Matchers.equalTo(itestJvmId)); MatcherAssert.assertThat(item.getString("alias"), Matchers.equalTo(alias)); MatcherAssert.assertThat(item.getString("connectUrl"), Matchers.equalTo(SELF_JMX_URL)); - MatcherAssert.assertThat(item.getJsonObject("labels"), Matchers.equalTo(new JsonObject())); + MatcherAssert.assertThat(item.getJsonArray("labels"), Matchers.equalTo(new JsonArray())); MatcherAssert.assertThat( item.getJsonObject("annotations"), Matchers.equalTo( new JsonObject( Map.of( "platform", - Map.of(), + List.of(), "cryostat", - Map.of("REALM", "Custom Targets"))))); + List.of( + Map.of( + "key", + "REALM", + "value", + "Custom Targets")))))); } @Test