Skip to content

Commit

Permalink
feat: include notification into polling
Browse files Browse the repository at this point in the history
  • Loading branch information
levinkerschberger committed Nov 12, 2024
1 parent c1dcf05 commit e498c16
Show file tree
Hide file tree
Showing 18 changed files with 363 additions and 174 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* (C) 2024 */
package rocks.inspectit.gepard.agentmanager.application.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
public class EventConfig {

@Bean
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();

eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import io.swagger.v3.oas.annotations.Operation;
import jakarta.validation.Valid;
import java.util.Map;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
Expand All @@ -17,10 +18,15 @@ public class ConfigurationController {

private final ConfigurationService configurationService;

@GetMapping
@Operation(summary = "Get the agent configuration.")
public ResponseEntity<InspectitConfiguration> getAgentConfiguration() {
InspectitConfiguration configuration = configurationService.getConfiguration();
@GetMapping("/{agentId}")
@Operation(
summary =
"Get the agent configuration and register the agent with the given id and agent info in the configuration server.")
public ResponseEntity<InspectitConfiguration> getAgentConfiguration(
@PathVariable String agentId, @RequestHeader Map<String, String> headers) {

InspectitConfiguration configuration =
configurationService.handleConfigurationRequest(agentId, headers);

// No config available
if (Objects.isNull(configuration)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* (C) 2024 */
package rocks.inspectit.gepard.agentmanager.configuration.events;

import java.util.Map;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;

@Getter
public class ConfigurationRequestEvent extends ApplicationEvent {

private final String agentId;
private final Map<String, String> headers;

public ConfigurationRequestEvent(Object source, String agentId, Map<String, String> headers) {
super(source);
this.agentId = agentId;
this.headers = headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import lombok.AllArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import rocks.inspectit.gepard.agentmanager.configuration.events.ConfigurationRequestEvent;
import rocks.inspectit.gepard.agentmanager.exception.JsonParseException;
import rocks.inspectit.gepard.config.model.InspectitConfiguration;

Expand All @@ -16,6 +19,8 @@ public class ConfigurationService {

private final ObjectMapper objectMapper;

private final ApplicationEventPublisher applicationEventPublisher;

/**
* Retrieves the current configuration from the local Git repository.
*
Expand Down Expand Up @@ -44,4 +49,23 @@ public void updateConfiguration(InspectitConfiguration configuration) {
throw new JsonParseException("Failed to serialize InspectitConfiguration to JSON", e);
}
}

/**
* Handles a configuration request from an agent. If the agent is not connected, it will be
* connected. The last fetch time of the agent will be updated. The configuration will be
* returned.
*
* @param agentId the id of the agent requesting the configuration
* @param headers the request headers, which should contain the agent information
* @return the inspectit configuration for this agent
*/
public InspectitConfiguration handleConfigurationRequest(
String agentId, Map<String, String> headers) {

// Event Emitter instead
ConfigurationRequestEvent event = new ConfigurationRequestEvent(this, agentId, headers);
applicationEventPublisher.publishEvent(event);

return getConfiguration();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* (C) 2024 */
package rocks.inspectit.gepard.agentmanager.configuration.validation;

import java.util.Map;
import rocks.inspectit.gepard.agentmanager.exception.MissingHeaderException;

public class ConfigurationRequestHeaderValidator {
public static void validateConfigurationRequestHeaders(Map<String, String> headers) {
validateHeader(headers, "x-gepard-service-name");
validateHeader(headers, "x-gepard-vm-id");
validateHeader(headers, "x-gepard-gepard-version");
validateHeader(headers, "x-gepard-otel-version");
validateHeader(headers, "x-gepard-java-version");
validateHeader(headers, "x-gepard-start-time");
}

private static void validateHeader(Map<String, String> headers, String headerName) {
String value = headers.get(headerName);
if (value == null) {
throw new MissingHeaderException(headerName + " header is required");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import rocks.inspectit.gepard.agentmanager.connection.model.Connection;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.ConnectionDto;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.CreateConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.QueryConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.UpdateConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.service.ConnectionService;
Expand All @@ -32,15 +29,6 @@ public class ConnectionController {

private final ConnectionService connectionService;

@PostMapping("/{id}")
@Operation(summary = "Connect an agent to the agent manager.")
public ResponseEntity<Void> connect(
@PathVariable String id, @Valid @RequestBody CreateConnectionRequest connectRequest) {
Connection connection = connectionService.handleConnectRequest(id, connectRequest);
return ResponseEntity.created(ServletUriComponentsBuilder.fromCurrentRequest().build().toUri())
.build();
}

@PatchMapping("/{id}")
@Operation(summary = "Update the agent connection.")
public ResponseEntity<ConnectionDto> update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public class Connection {
/** The registration time. * */
private Instant registrationTime;

/** The time of the last communication. */
private Instant lastFetch;

/** The status of the connection. */
private ConnectionStatus connectionStatus;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package rocks.inspectit.gepard.agentmanager.connection.model.dto;

import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import rocks.inspectit.gepard.agentmanager.connection.model.Connection;
Expand All @@ -11,6 +12,7 @@
public record ConnectionDto(
@NotNull(message = "Registration Time missing.") Instant registrationTime,
@NotNull(message = "Connection status is missing") ConnectionStatus connectionStatus,
@NotNull(message = "Time since last fetch is missing.") Duration timeSinceLastFetch,
@NotNull(message = "Service Name missing.") String serviceName,
@NotNull(message = "Gepard Version missing.") String gepardVersion,
@NotNull(message = "OpenTelemetry Version missing.") String otelVersion,
Expand All @@ -23,6 +25,7 @@ public static ConnectionDto fromConnection(Connection connection) {
return new ConnectionDto(
connection.getRegistrationTime(),
connection.getConnectionStatus(),
Duration.between(connection.getLastFetch(), Instant.now()),
connection.getAgent().getServiceName(),
connection.getAgent().getGepardVersion(),
connection.getAgent().getOtelVersion(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
/* (C) 2024 */
package rocks.inspectit.gepard.agentmanager.connection.service;

import static rocks.inspectit.gepard.agentmanager.configuration.validation.ConfigurationRequestHeaderValidator.validateConfigurationRequestHeaders;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import rocks.inspectit.gepard.agentmanager.configuration.events.ConfigurationRequestEvent;
import rocks.inspectit.gepard.agentmanager.connection.model.Connection;
import rocks.inspectit.gepard.agentmanager.connection.model.ConnectionStatus;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.ConnectionDto;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.CreateConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.QueryConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.model.dto.UpdateConnectionRequest;
import rocks.inspectit.gepard.agentmanager.connection.validation.RegexQueryService;
Expand All @@ -26,18 +31,18 @@ public class ConnectionService {
private final RegexQueryService regexQueryService;

/**
* Handles a connection request from an agent.
* Handles a ConfigurationRequestEvent. If the agent is not connected, it will be connected. If it
* is already connected, the last fetch time of the agent will be updated.
*
* @param connectionId The id for the created connection.
* @param connectRequest The request for the new connection to be created.
* @return Connection The response containing all saved information.
* @param event The configuration request event.
*/
public Connection handleConnectRequest(
String connectionId, CreateConnectionRequest connectRequest) {
Connection connection = CreateConnectionRequest.toConnection(connectRequest);
connectionCache.put(connectionId, connection);

return connection;
@EventListener
public void handleConfigurationRequestEvent(ConfigurationRequestEvent event) {
if (!isAgentConnected(event.getAgentId())) {
connectAgent(event.getAgentId(), event.getHeaders());
} else {
updateConnectionLastFetch(event.getAgentId());
}
}

/**
Expand Down Expand Up @@ -95,6 +100,29 @@ public ConnectionDto getConnection(String id) {
return ConnectionDto.fromConnection(connection);
}

/**
* Determines if an agent is connected.
*
* @param agentId The id of the agent to be searched for.
* @return true if the agent was found in the cache, false otherwise.
*/
private boolean isAgentConnected(String agentId) {
return connectionCache.get(agentId) != null;
}

/**
* Updates the last fetch time of an agent connection.
*
* @param agentId The id of the agent to be updated.
*/
private void updateConnectionLastFetch(String agentId) {
Connection connection = connectionCache.get(agentId);
if (connection != null) connection.setLastFetch(Instant.now());
else
throw new NoSuchElementException(
"No connection for agent id " + agentId + " found in cache.");
}

/**
* Checks if a connection matches the given query.
*
Expand Down Expand Up @@ -165,4 +193,48 @@ private boolean matchesAttributes(
&& regexQueryService.matches(actualValue, queryEntry.getValue());
});
}

/**
* Handles a connection request from an agent.
*
* @param connectionId The id for the created connection.
* @param headers The request headers, which should contain the agent information.
*/
private void connectAgent(String connectionId, Map<String, String> headers) {

validateConfigurationRequestHeaders(headers);

String serviceName = headers.get("x-gepard-service-name");
String vmId = headers.get("x-gepard-vm-id");
String gepardVersion = headers.get("x-gepard-gepard-version");
String otelVersion = headers.get("x-gepard-otel-version");
String javaVersion = headers.get("x-gepard-java-version");
String startTime = headers.get("x-gepard-start-time");

Map<String, String> attributes =
headers.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("x-gepard-attribute-"))
.collect(
Collectors.toMap(
entry ->
entry
.getKey()
.substring("x-gepard-attribute-".length()), // remove the prefix
Map.Entry::getValue));

Agent agent =
new Agent(
serviceName,
vmId,
gepardVersion,
otelVersion,
Instant.parse(startTime),
javaVersion,
attributes);

Connection connection =
new Connection(Instant.now(), Instant.now(), ConnectionStatus.CONNECTED, agent);

connectionCache.put(connectionId, connection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,17 @@ public ResponseEntity<ApiError> handleInvalidPatternSyntax(

return new ResponseEntity<>(apiError, HttpStatus.BAD_REQUEST);
}

@ExceptionHandler(MissingHeaderException.class)
public ResponseEntity<ApiError> handleMissingHeaderException(
MissingHeaderException ex, HttpServletRequest request) {
ApiError apiError =
new ApiError(
request.getRequestURI(),
List.of(ex.getMessage()),
HttpStatus.BAD_REQUEST.value(),
LocalDateTime.now());

return new ResponseEntity<>(apiError, HttpStatus.BAD_REQUEST);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* (C) 2024 */
package rocks.inspectit.gepard.agentmanager.exception;

public class MissingHeaderException extends RuntimeException {
public MissingHeaderException(String message) {
super(message);
}
}
Loading

0 comments on commit e498c16

Please sign in to comment.