Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.a2a.extras.pushnotificationconfigstore.database.jpa;

import io.a2a.server.config.A2AConfigProvider;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.persistence.TypedQuery;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -29,18 +32,42 @@ public class JpaDatabasePushNotificationConfigStore implements PushNotificationC
private static final Logger LOGGER = LoggerFactory.getLogger(JpaDatabasePushNotificationConfigStore.class);

private static final Instant NULL_TIMESTAMP_SENTINEL = Instant.EPOCH;
private static final String A2A_PUSH_NOTIFICATION_MAX_PAGE_SIZE_CONFIG = "a2a.push-notification-config.max-page-size";
private static final int A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE = 100;

@PersistenceContext(unitName = "a2a-java")
EntityManager em;

@Inject
A2AConfigProvider configProvider;

/**
* Maximum page size when listing push notification configurations for a task.
* Requested page sizes exceeding this value will be capped to this limit.
* <p>
* Property: {@code a2a.push-notification-config.max-page-size}<br>
* Default: 100<br>
* Note: Property override requires a configurable {@link A2AConfigProvider} on the classpath.
*/
int maxPageSize;

@PostConstruct
void initConfig() {
try {
maxPageSize = Integer.parseInt(configProvider.getValue(A2A_PUSH_NOTIFICATION_MAX_PAGE_SIZE_CONFIG));
} catch (Exception e) {
LOGGER.warn("Failed to read '{}' configuration, falling back to default page size of {}.",
A2A_PUSH_NOTIFICATION_MAX_PAGE_SIZE_CONFIG, A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE, e);
maxPageSize = A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE;
}
Comment on lines +58 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching the generic Exception is overly broad and can hide other unexpected issues. It's better to catch the specific exceptions you expect, such as IllegalArgumentException (if the config value is missing from the provider) and NumberFormatException (if the value is not a valid integer). This makes the error handling more precise and robust.

Suggested change
} catch (Exception e) {
LOGGER.warn("Failed to read '{}' configuration, falling back to default page size of {}.",
A2A_PUSH_NOTIFICATION_MAX_PAGE_SIZE_CONFIG, A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE, e);
maxPageSize = A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE;
}
} catch (IllegalArgumentException | NumberFormatException e) {
LOGGER.warn("Failed to read or parse '{}' configuration, falling back to default page size of {}.",
A2A_PUSH_NOTIFICATION_MAX_PAGE_SIZE_CONFIG, A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE, e);
maxPageSize = A2A_PUSH_NOTIFICATION_DEFAULT_MAX_PAGE_SIZE;
}

}

@Transactional
@Override
public PushNotificationConfig setInfo(String taskId, PushNotificationConfig notificationConfig) {
// Ensure config has an ID - default to taskId if not provided (mirroring InMemoryPushNotificationConfigStore behavior)
PushNotificationConfig.Builder builder = PushNotificationConfig.builder(notificationConfig);
if (notificationConfig.id() == null || notificationConfig.id().isEmpty()) {
// This means the taskId and configId are same. This will not allow having multiple configs for a single Task.
// The configId is a required field in the spec and should not be empty
builder.id(taskId);
}
notificationConfig = builder.build();
Expand Down Expand Up @@ -80,44 +107,48 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf
LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}' with params: pageSize={}, pageToken={}",
taskId, params.pageSize(), params.pageToken());
try {
StringBuilder queryBuilder = new StringBuilder("SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId");
// Parse pageToken once upfront
Instant tokenTimestamp = null;
String tokenId = null;

if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
// Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length != 2) {
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: pageToken must be in 'timestamp_millis:configId' format", null);
}

try {
long timestampMillis = Long.parseLong(tokenParts[0]);
tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
tokenId = tokenParts[1];
} catch (NumberFormatException e) {
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
}

// Build query using the parsed values
StringBuilder queryBuilder = new StringBuilder("SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId");

if (tokenTimestamp != null) {
// Keyset pagination: get notifications where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
queryBuilder.append(" AND (COALESCE(c.createdAt, :nullSentinel) < :tokenTimestamp OR (COALESCE(c.createdAt, :nullSentinel) = :tokenTimestamp AND c.id.configId > :tokenId))");
} else {
// Based on the comments in the test case, if the pageToken is invalid start from the beginning.
}
}

queryBuilder.append(" ORDER BY COALESCE(c.createdAt, :nullSentinel) DESC, c.id.configId ASC");

// Create query and set parameters
TypedQuery<JpaPushNotificationConfig> query = em.createQuery(queryBuilder.toString(), JpaPushNotificationConfig.class);
query.setParameter("taskId", taskId);
query.setParameter("nullSentinel", NULL_TIMESTAMP_SENTINEL);

if (params.pageToken() != null && !params.pageToken().isEmpty()) {
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
try {
long timestampMillis = Long.parseLong(tokenParts[0]);
String tokenId = tokenParts[1];

Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
query.setParameter("tokenTimestamp", tokenTimestamp);
query.setParameter("tokenId", tokenId);
} catch (NumberFormatException e) {
// Malformed timestamp in pageToken
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
}
if (tokenTimestamp != null) {
query.setParameter("tokenTimestamp", tokenTimestamp);
query.setParameter("tokenId", tokenId);
}

int pageSize = params.getEffectivePageSize();
int pageSize = params.getEffectivePageSize(maxPageSize);
query.setMaxResults(pageSize + 1);
List<JpaPushNotificationConfig> jpaConfigsPage = query.getResultList();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# A2A JPA Database Push Notification Config Store Default Configuration

# Maximum page size when listing push notification configurations for a task
# Requested page sizes exceeding this value will be capped to this limit
# Used as default when pageSize parameter is not specified or is invalid
a2a.push-notification-config.max-page-size=100

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.a2a.spec.InvalidParamsError;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testPaginationWithZeroPageSize() {
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
assertEquals(5, result.configs().size(), "Should return all 5 configs when pageSize=0");
assertEquals(5, result.configs().size(), "Should return all default capped 5 configs when pageSize=0");
assertNull(result.nextPageToken(), "Should not have nextPageToken when returning all");
}

Expand Down Expand Up @@ -355,12 +356,10 @@ public void testPaginationWithInvalidToken() {
// Request with invalid pageToken - JPA implementation behavior is to start from beginning
ListTaskPushNotificationConfigParams params = new ListTaskPushNotificationConfigParams(
taskId, 2, "invalid_token_that_does_not_exist", "");
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
// When token is not found, implementation starts from beginning
assertEquals(2, result.configs().size(), "Should return first page when token is not found");
assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist");
assertThrows(InvalidParamsError.class, () ->
pushNotificationConfigStore.getInfo(params),
"Invalid pageToken format: pageToken must be in 'timestamp_millis:configId' format");
}

@Test
Expand Down Expand Up @@ -428,12 +427,10 @@ public void testPageTokenWithMissingColon() {

ListTaskPushNotificationConfigParams params =
new ListTaskPushNotificationConfigParams(taskId, 2, "123456789cfg1", "");
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
assertEquals(2, result.configs().size(),
"Should return first page when pageToken format is invalid (missing colon)");
assertNotNull(result.nextPageToken(), "Should have nextPageToken since more items exist");
assertThrows(InvalidParamsError.class, () ->
pushNotificationConfigStore.getInfo(params),
"Invalid pageToken format: pageToken must be in 'timestamp_millis:configId' format");
}

@Test
Expand Down Expand Up @@ -552,6 +549,47 @@ public void testPaginationOrderingConsistency() {
assertEquals("cfg0", allConfigIds.get(14),
"Last config should be oldest created");
}

@Test
@Transactional
public void testPageSizeExceedingConfiguredMaxLimit() {
String taskId = "task_max_page_size_" + System.currentTimeMillis();
// Create 5 configs (more than test max page size of 5)
createSamples(taskId, 7);

// Request with pageSize=7 (exceeds configured max of 5 in test application.properties)
// Should be capped to maxPageSize (5) from config
ListTaskPushNotificationConfigParams params =
new ListTaskPushNotificationConfigParams(taskId, 7, "", "");
ListTaskPushNotificationConfigResult result = pushNotificationConfigStore.getInfo(params);

assertNotNull(result);
// Should return 5 configs (capped to maxPageSize from test config), not 7
assertEquals(5, result.configs().size(),
"Page size should be capped to configured maxPageSize (5 in tests) when requested size exceeds limit");
assertNotNull(result.nextPageToken(),
"Should have nextPageToken since more configs remain");

// Verify we can iterate through all pages and get all 7 configs
List<String> allConfigIds = new java.util.ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fully qualified name java.util.ArrayList is used here. It would be more conventional to import java.util.ArrayList at the top of the file and use the short name new ArrayList<>(). This improves readability. A similar simplification can be applied to new java.util.HashSet<>() on line 589 by importing java.util.HashSet.

Suggested change
List<String> allConfigIds = new java.util.ArrayList<>();
List<String> allConfigIds = new ArrayList<>();

result.configs().forEach(c -> allConfigIds.add(c.pushNotificationConfig().id()));

String nextToken = result.nextPageToken();
while (nextToken != null) {
ListTaskPushNotificationConfigParams nextParams =
new ListTaskPushNotificationConfigParams(taskId, 7, nextToken, "");
ListTaskPushNotificationConfigResult nextResult = pushNotificationConfigStore.getInfo(nextParams);

nextResult.configs().forEach(c -> allConfigIds.add(c.pushNotificationConfig().id()));
nextToken = nextResult.nextPageToken();
}

assertEquals(7, allConfigIds.size(),
"Should retrieve all 7 configs across multiple pages");
assertEquals(7, new java.util.HashSet<>(allConfigIds).size(),
"All config IDs should be unique - no duplicates");
}

private void createSamples(String taskId, int size) {
// Create configs with slight delays to ensure unique timestamps for deterministic ordering
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# A2A SDK Default Configuration Values
# These values are used when no other configuration source provides them

# DefaultRequestHandler - Blocking call timeouts
# Timeout for agent execution to complete (seconds)
# Increase for slow agents: LLM-based, data processing, external APIs
a2a.blocking.agent.timeout.seconds=30

# Timeout for event consumption/persistence to complete (seconds)
# Ensures TaskStore is fully updated before returning to client
a2a.blocking.consumption.timeout.seconds=5

# AsyncExecutorProducer - Thread pool configuration
# Core pool size for async agent execution
a2a.executor.core-pool-size=5

# Maximum pool size for async agent execution
a2a.executor.max-pool-size=50

# Keep-alive time for idle threads (seconds)
a2a.executor.keep-alive-seconds=60

# A2A JPA Database Push Notification Config Store Default Configuration

# A2A Configuration - Override max page size for testing
# Set to a lower value (2) to make tests faster and verify capping behavior
a2a.push-notification-config.max-page-size=5

Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ quarkus.hibernate-orm.log.format-sql=true

# Transaction timeout (set to 30 minutes for debugging - 1800 seconds)
# quarkus.transaction-manager.default-transaction-timeout=1800s
a2a.defaults.resource=META-INF/a2a-test-defaults.properties
1 change: 0 additions & 1 deletion server-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,6 +32,9 @@ public class DefaultValuesConfigProvider implements A2AConfigProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultValuesConfigProvider.class);
private static final String DEFAULTS_RESOURCE = "META-INF/a2a-defaults.properties";

@ConfigProperty(name = "a2a.defaults.resource", defaultValue = DEFAULTS_RESOURCE)
String defaultsResource = DEFAULTS_RESOURCE;

private final Map<String, String> defaults = new HashMap<>();

@PostConstruct
Expand All @@ -42,7 +46,7 @@ private void loadDefaultsFromClasspath() {
try {
Enumeration<URL> resources = Thread.currentThread()
.getContextClassLoader()
.getResources(DEFAULTS_RESOURCE);
.getResources(defaultsResource);

Map<String, String> sourceTracker = new HashMap<>(); // Track which file each key came from

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ public ListTaskPushNotificationConfigParams(String id) {
}

/**
* Validates and returns the effective page size (between 1 and 100, defaults to 100).
* Validates and returns the effective page size.
* If the requested pageSize is invalid (≤ 0 or > maxPageSize), returns maxPageSize.
*
* @return the effective page size
* @param maxPageSize the maximum allowed page size
* @return the effective page size (between 1 and maxPageSize)
*/
public int getEffectivePageSize() {
if (pageSize <= 0 || pageSize > 100) {
return 100;
public int getEffectivePageSize(int maxPageSize) {
if (pageSize <= 0 || pageSize > maxPageSize) {
return maxPageSize;
}
return pageSize;
}
Expand Down