diff --git a/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java index 76d921303fb..075f9ea9bfd 100644 --- a/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java +++ b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java @@ -19,9 +19,6 @@ import java.util.Iterator; import java.util.ServiceLoader; - -import org.apache.rocketmq.apis.consumer.PushConsumerBuilder; -import org.apache.rocketmq.apis.consumer.SimpleConsumerBuilder; import org.apache.rocketmq.apis.message.MessageBuilder; import org.apache.rocketmq.apis.producer.ProducerBuilder; @@ -46,20 +43,6 @@ static ClientServiceProvider loadService() { */ ProducerBuilder newProducerBuilder(); - /** - * Get the simple consumer builder by current provider. - * - * @return the simple consumer builder instance. - */ - SimpleConsumerBuilder newSimpleConsumerBuilder(); - - /** - * Get the push consumer builder by current provider. - * - * @return the push consumer builder instance. - */ - PushConsumerBuilder newPushConsumerBuilder(); - /** * Get the message builder by current provider. * diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeResult.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeResult.java deleted file mode 100644 index 8dde6f6a276..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeResult.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.rocketmq.apis.consumer; - -public enum ConsumeResult { - /** - * Consume message success and need commit this message. - */ - SUCCESS, - - /** - * Failed to consume the message, expecting potential delivery after configured backoff. - */ - FAILURE -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java deleted file mode 100644 index b5cf27429fd..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.rocketmq.apis.consumer; - -import java.util.Objects; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class FilterExpression { - public static final String TAG_EXPRESSION_SUB_ALL = "*"; - public static final String TAG_EXPRESSION_SPLITTER = "\\|\\|"; - private final String expression; - private final FilterExpressionType filterExpressionType; - - public FilterExpression(String expression, FilterExpressionType filterExpressionType) { - this.expression = checkNotNull(expression, "expression should not be null"); - this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null"); - } - - public String getExpression() { - return expression; - } - - public FilterExpressionType getFilterExpressionType() { - return filterExpressionType; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FilterExpression that = (FilterExpression) o; - return expression.equals(that.expression) && filterExpressionType == that.filterExpressionType; - } - - @Override - public int hashCode() { - return Objects.hash(expression, filterExpressionType); - } -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java deleted file mode 100644 index 99bc4871691..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.rocketmq.apis.consumer; - -public enum FilterExpressionType { - /** - * Follows SQL92 standard. - */ - SQL92, - /** - * Only support or operation such as - * "tag1 || tag2 || tag3",
- * If null or * expression,meaning subscribe all. - */ - TAG -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java deleted file mode 100644 index a56e677816c..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.apis.consumer; - -import org.apache.rocketmq.apis.message.MessageView; - -/** - *

MessageListener is used only by PushConsumer to process messages - * synchronously. - * - *

PushConsumer will fetch messages from brokers and dispatch them to an - * embedded thread pool in form of Runnable tasks to achieve desirable processing concurrency. - * - *

Refer to {@link PushConsumer} for more further specs. - * - *

- * Thread Safety - * This class may be called concurrently by multiple threads. Implementation should be thread safe. - *

- */ -public interface MessageListener { - - /** - * Callback interface to handle incoming messages. - * - * Application developers are expected to implement this interface to fulfill business requirements through - * processing message and return - * ConsumeResult accordingly. - * - * PushConsumer will, on behalf of its group, acknowledge the message to broker on success; In case of failure or - * unexpected exceptions were raised, it will negatively acknowledge message, which would potentially - * get re-delivered after the configured back off period. - * - * @param message The message passed to the listener. - * @return {@link ConsumeResult#SUCCESS} if message is properly processed; {@link - * ConsumeResult#FAILURE} otherwise. - */ - ConsumeResult onMessage(MessageView message); -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java deleted file mode 100644 index dc59ff69b80..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.apis.consumer; - -import org.apache.rocketmq.apis.exception.ClientException; - -import java.io.Closeable; -import java.util.Map; - - -/** - * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}. - * - *

Consumers of the same group are designed to share messages from broker servers. As a result, consumers of the same - * group must have exactly identical subscription expressions, otherwise the behavior is undefined. - * - *

For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages - * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of - * group. Thus, consumers can safely restart and resume their progress automatically.

- * - *

There are scenarios where fan-out is preferred, - * recommended solution is to use dedicated group of each client. - * - *

To mitigate latency, PushConsumer adopts - * reactive streams pattern. Namely, - * messages received from broker servers are first cached locally, amount of which is controlled by - * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and - * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve - * desirable concurrency. - */ -public interface PushConsumer extends Closeable { - /** - * Get the load balancing group for consumer. - * - * @return consumer load balancing group. - */ - String getConsumerGroup(); - - /** - * List the existed subscription expressions in push consumer. - * - * @return map of topic to filter expression. - */ - Map subscriptionExpressions(); - - /** - * Add subscription expression dynamically. - * - *

If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then - * second subscriptionExpression which contains topicA and tag2, the result is that the second one - * replaces the first one instead of integrating them. - * - * @param topic new topic that need to add or update. - * @param filterExpression new filter expression to add or update. - * @return push consumer instance. - */ - PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException; - - /** - * Remove subscription expression dynamically by topic. - * - *

It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic - * was removed before would not be delivered to {@link MessageListener} anymore. - * - *

Nothing occurs if the specified topic does not exist in subscription expressions of push consumer. - * - * @param topic the topic to remove subscription. - * @return push consumer instance. - */ - PushConsumer unsubscribe(String topic) throws ClientException; - - /** - * Close the push consumer and release all related resources. - * - *

Once push consumer is closed, it could not be started once again. we maintained an FSM - * (finite-state machine) to record the different states for each producer, which is similar to - */ - @Override - void close(); -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java deleted file mode 100644 index 166d48f8e37..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.apis.consumer; - -import org.apache.rocketmq.apis.ClientConfiguration; -import org.apache.rocketmq.apis.exception.ClientException; - -import java.util.Map; - -public interface PushConsumerBuilder { - /** - * Set the client configuration for consumer. - * - * @param clientConfiguration client's configuration. - * @return the consumer builder instance. - */ - PushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); - - /** - * Set the load balancing group for consumer. - * - * @param consumerGroup consumer load balancing group. - * @return the consumer builder instance. - */ - PushConsumerBuilder setConsumerGroup(String consumerGroup); - - /** - * Add subscriptionExpressions for consumer. - * - * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression. - * @return the consumer builder instance. - */ - PushConsumerBuilder setSubscriptionExpressions(Map subscriptionExpressions); - - /** - * Register message listener, all messages meet the subscription expression would across listener here. - * - * @param listener message listener. - * @return the consumer builder instance. - */ - PushConsumerBuilder setMessageListener(MessageListener listener); - - /** - * Set the maximum number of messages cached locally. - * - * @param count message count. - * @return the consumer builder instance. - */ - PushConsumerBuilder setMaxCacheMessageCount(int count); - - /** - * Set the maximum bytes of messages cached locally. - * - * @param bytes message size. - * @return the consumer builder instance. - */ - PushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes); - - /** - * Set the consumption thread count in parallel. - * - * @param count thread count. - * @return the consumer builder instance. - */ - PushConsumerBuilder setThreadCount(int count); - - /** - * Finalize the build of {@link PushConsumer}. - * - * @return the push consumer instance. - */ - PushConsumer build() throws ClientException; -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java deleted file mode 100644 index 0fd5d51c1a4..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.apis.consumer; - -import java.io.Closeable; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import org.apache.rocketmq.apis.exception.ClientException; -import org.apache.rocketmq.apis.message.MessageView; - -/** - * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group. - * - *

Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself, - * simple consumer should be your first consideration. - * - *

Consumers belong to the same consumer group share messages from server, - * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is - * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once - * consumer is started, server records its consumption progress and derives it in subsequent startup. - * - *

You may intend to maintain different consumption progress for different consumer, different consumer group - * should be set in this case. - * - *

Simple consumer divide message consumption to 3 parts. - * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message. - * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter. - * Also, you can change the invisibleDuration by call changeInvisibleDuration api. - */ -public interface SimpleConsumer extends Closeable { - /** - * Get the load balancing group for simple consumer. - * - * @return consumer load balancing group. - */ - String getConsumerGroup(); - - /** - * Add subscription expression dynamically. - * - *

If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then - * second subscriptionExpression which contains topicA and tag2, the result is that the second one - * replaces the first one instead of integrating them. - * - * @param topic new topic that need to add or update. - * @param filterExpression new filter expression to add or update. - * @return simple consumer instance. - */ - SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException; - - /** - * Remove subscription expression dynamically by topic. - * - *

It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic - * was removed before would not be delivered to {@link MessageListener} anymore. - * - *

Nothing occurs if the specified topic does not exist in subscription expressions of push consumer. - * - * @param topic the topic to remove subscription. - * @return simple consumer instance. - */ - SimpleConsumer unsubscribe(String topic) throws ClientException; - - /** - * List the existed subscription expressions in simple consumer. - * - * @return map of topic to filter expression. - */ - Map subscriptionExpressions(); - - /** - * Fetch messages from server synchronously. - *

This method returns immediately if there are messages available. - * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned. - * @param maxMessageNum max message num when server returns. - * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout. - * @return list of messageView - */ - List receive(int maxMessageNum, Duration invisibleDuration) throws ClientException; - - /** - * Fetch messages from server asynchronously. - *

This method returns immediately if there are messages available. - * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned. - * @param maxMessageNum max message num when server returns. - * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout. - * @return list of messageView - */ - CompletableFuture> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException; - - /** - * Ack message to server synchronously, server commit this message. - * - *

Duplicate ack request does not take effect and throw exception. - * @param messageView special messageView with handle want to ack. - */ - void ack(MessageView messageView) throws ClientException; - - /** - * Ack message to server asynchronously, server commit this message. - * - *

Duplicate ack request does not take effect and throw exception. - * @param messageView special messageView with handle want to ack. - * @return CompletableFuture of this request. - */ - CompletableFuture ackAsync(MessageView messageView); - - /** - * Changes the invisible duration of a specified message synchronously. - * - *

The origin invisible duration for a message decide by ack request. - * - *

You must call change request before the origin invisible duration timeout. - * If called change request later than the origin invisible duration, this request does not take effect and throw exception. - * Duplicate change request will refresh the next visible time of this message to other consumers. - * @param messageView special messageView with handle want to change. - * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time. - */ - void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException; - - /** - * Changes the invisible duration of a specified message asynchronously. - * - *

The origin invisible duration for a message decide by ack request. - * - *

You must call change request before the origin invisible duration timeout. - * If called change request later than the origin invisible duration, this request does not take effect and throw exception. - * Duplicate change request will refresh the next visible time of this message to other consumers. - * @param messageView special messageView with handle want to change. - * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time. - * @return CompletableFuture of this request. - */ - CompletableFuture changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration); - - @Override - void close(); -} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java deleted file mode 100644 index e1ff6c76fee..00000000000 --- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.apis.consumer; - -import java.time.Duration; -import java.util.Map; - -import org.apache.rocketmq.apis.ClientConfiguration; -import org.apache.rocketmq.apis.exception.ClientException; - -public interface SimpleConsumerBuilder { - /** - * Set the client configuration for simple consumer. - * - * @param clientConfiguration client's configuration. - * @return the simple consumer builder instance. - */ - SimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); - - /** - * Set the load balancing group for simple consumer. - * - * @param consumerGroup consumer load balancing group. - * @return the consumer builder instance. - */ - SimpleConsumerBuilder setConsumerGroup(String consumerGroup); - - /** - * Add subscriptionExpressions for simple consumer. - * - * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression. - * @return the consumer builder instance. - */ - SimpleConsumerBuilder setSubscriptionExpressions(Map subscriptionExpressions); - - /** - * Set the max await time when receive message from server. - * The simple consumer will hold this long-polling receive requests until a message is returned or a timeout occurs. - * @param awaitDuration The maximum time to block when no message available. - * @return the consumer builder instance. - */ - SimpleConsumerBuilder setAwaitDuration(Duration awaitDuration); - - /** - * Finalize the build of the {@link SimpleConsumer} instance and start. - * - *

This method will block until simple consumer starts successfully. - * - * @return the simple consumer instance. - */ - SimpleConsumer build() throws ClientException; -}