Skip to content

Commit

Permalink
[4.x] 6037 AQ connector @ConnectorAttribute (#6038)
Browse files Browse the repository at this point in the history
* 6037 AQ connector @ConnectorAttribute
  • Loading branch information
danielkec authored Feb 2, 2023
1 parent c97e77c commit 3a6123f
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,12 +26,113 @@
import io.helidon.common.Builder;
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.messaging.connectors.jms.JmsConnector;

import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;

/**
* Reactive Messaging Oracle AQ connector.
*/
@ConnectorAttribute(name = AqConnector.DATASOURCE_ATTRIBUTE,
description = "name of the datasource bean used to connect Oracle DB with AQ",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = AqConnector.URL_ATTRIBUTE,
description = "jdbc connection string used to connect Oracle DB with AQ (forbidden when datasource is specified)",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.USERNAME_ATTRIBUTE,
description = "User name used to connect JMS session",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.PASSWORD_ATTRIBUTE,
description = "Password to connect JMS session",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.TYPE_ATTRIBUTE,
description = "Possible values are: queue, topic",
defaultValue = "queue",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.DESTINATION_ATTRIBUTE,
description = "Queue or topic name",
mandatory = true,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.ACK_MODE_ATTRIBUTE,
description = "Possible values are: "
+ "AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, "
+ "CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, "
+ "DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.",
defaultValue = "AUTO_ACKNOWLEDGE",
direction = ConnectorAttribute.Direction.INCOMING,
type = "io.helidon.messaging.connectors.jms.AcknowledgeMode")
@ConnectorAttribute(name = JmsConnector.TRANSACTED_ATTRIBUTE,
description = "Indicates whether the session will use a local transaction.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.AWAIT_ACK_ATTRIBUTE,
description = "Wait for the acknowledgement of previous message before pulling next one.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE,
description = "JMS API message selector expression based on a subset of the SQL92. "
+ "Expression can only access headers and properties, not the payload.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING,
type = "string")
@ConnectorAttribute(name = JmsConnector.CLIENT_ID_ATTRIBUTE,
description = "Client identifier for JMS connection.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.DURABLE_ATTRIBUTE,
description = "True for creating durable consumer (only for topic).",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.SUBSCRIBER_NAME_ATTRIBUTE,
description = "Subscriber name for durable consumer used to identify subscription.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING,
type = "string")
@ConnectorAttribute(name = JmsConnector.NON_LOCAL_ATTRIBUTE,
description = "If true then any messages published to the topic using this session’s connection, "
+ "or any other connection with the same client identifier, "
+ "will not be added to the durable subscription.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.NAMED_FACTORY_ATTRIBUTE,
description = "Select in case factory is injected as a named bean or configured with name.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.POLL_TIMEOUT_ATTRIBUTE,
description = "Timeout for polling for next message in every poll cycle in millis. Default value: 50",
mandatory = false,
defaultValue = "50",
direction = ConnectorAttribute.Direction.INCOMING,
type = "long")
@ConnectorAttribute(name = JmsConnector.PERIOD_EXECUTIONS_ATTRIBUTE,
description = "Period for executing poll cycles in millis.",
mandatory = false,
defaultValue = "100",
direction = ConnectorAttribute.Direction.INCOMING,
type = "long")
@ConnectorAttribute(name = JmsConnector.SESSION_GROUP_ID_ATTRIBUTE,
description = "When multiple channels share same session-group-id, "
+ "they share same JMS session and same JDBC connection as well.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
public interface AqConnector extends ConnectorFactory {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.AWAIT_ACK_ATTRIBUTE,
description = "Wait for the acknowledgement of previous message before pulling next one.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE,
description = "JMS API message selector expression based on a subset of the SQL92. "
+ "Expression can only access headers and properties, not the payload.",
Expand Down Expand Up @@ -186,43 +192,93 @@ public class JmsConnector implements IncomingConnectorFactory, OutgoingConnector
/**
* Select in case factory is injected as a named bean or configured with name.
*/
protected static final String NAMED_FACTORY_ATTRIBUTE = "named-factory";
public static final String NAMED_FACTORY_ATTRIBUTE = "named-factory";

/**
* User name used with ConnectionFactory.
* Username used with ConnectionFactory.
*/
protected static final String USERNAME_ATTRIBUTE = "username";
public static final String USERNAME_ATTRIBUTE = "username";

/**
* Password used with ConnectionFactory.
*/
protected static final String PASSWORD_ATTRIBUTE = "password";
public static final String PASSWORD_ATTRIBUTE = "password";

/**
* Client identifier for JMS connection.
*/
protected static final String CLIENT_ID_ATTRIBUTE = "client-id";
public static final String CLIENT_ID_ATTRIBUTE = "client-id";

/**
* True for creating durable consumer (only for topic).
*/
protected static final String DURABLE_ATTRIBUTE = "durable";
public static final String DURABLE_ATTRIBUTE = "durable";

/**
* Subscriber name for durable consumer used to identify subscription.
*/
protected static final String SUBSCRIBER_NAME_ATTRIBUTE = "subscriber-name";
public static final String SUBSCRIBER_NAME_ATTRIBUTE = "subscriber-name";

/**
* If true then any messages published to the topic using this session's connection,
* or any other connection with the same client identifier,
* will not be added to the durable subscription.
*/
protected static final String NON_LOCAL_ATTRIBUTE = "non-local";

static final String ACK_MODE_ATTRIBUTE = "acknowledge-mode";
static final String TRANSACTED_ATTRIBUTE = "transacted";
static final String AWAIT_ACK_ATTRIBUTE = "await-ack";
static final String MESSAGE_SELECTOR_ATTRIBUTE = "message-selector";
static final String POLL_TIMEOUT_ATTRIBUTE = "poll-timeout";
static final String PERIOD_EXECUTIONS_ATTRIBUTE = "period-executions";
static final String TYPE_ATTRIBUTE = "type";
static final String DESTINATION_ATTRIBUTE = "destination";
static final String SESSION_GROUP_ID_ATTRIBUTE = "session-group-id";
public static final String NON_LOCAL_ATTRIBUTE = "non-local";

/**
* JMS acknowledge mode.
* <p>
* Possible values are:
* </p>
* <ul>
* <li>AUTO_ACKNOWLEDGE - session automatically acknowledges a client’s receipt of a message,
* <li>CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually,
* <li>DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.
* </ul>
*/
public static final String ACK_MODE_ATTRIBUTE = "acknowledge-mode";

/**
* Indicates whether the session will use a local transaction.
*/
public static final String TRANSACTED_ATTRIBUTE = "transacted";

/**
* Wait for the acknowledgement of previous message before pulling next one.
*/
public static final String AWAIT_ACK_ATTRIBUTE = "await-ack";

/**
* JMS API message selector expression based on a subset of the SQL92.
* Expression can only access headers and properties, not the payload.
*/
public static final String MESSAGE_SELECTOR_ATTRIBUTE = "message-selector";

/**
* Timeout for polling for next message in every poll cycle in millis.
*/
public static final String POLL_TIMEOUT_ATTRIBUTE = "poll-timeout";

/**
* Period for executing poll cycles in millis.
*/
public static final String PERIOD_EXECUTIONS_ATTRIBUTE = "period-executions";

/**
* Possible values are: queue, topic.
*/
public static final String TYPE_ATTRIBUTE = "type";

/**
* Queue or topic name.
*/
public static final String DESTINATION_ATTRIBUTE = "destination";

/**
* When multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well.
*/
public static final String SESSION_GROUP_ID_ATTRIBUTE = "session-group-id";
static final String JNDI_ATTRIBUTE = "jndi";
static final String JNDI_PROPS_ATTRIBUTE = "env-properties";
static final String JNDI_JMS_FACTORY_ATTRIBUTE = "jms-factory";
Expand Down

0 comments on commit 3a6123f

Please sign in to comment.