diff --git a/apis/pom.xml b/apis/pom.xml new file mode 100644 index 000000000000..200a1c0e127e --- /dev/null +++ b/apis/pom.xml @@ -0,0 +1,22 @@ + + + + rocketmq-all + org.apache.rocketmq + 5.0.0-BETA-SNAPSHOT + + 4.0.0 + jar + rocketmq-apis + rocketmq-apis ${project.version} + + + + com.google.guava + guava + provided + + + \ No newline at end of file diff --git a/apis/src/main/java/org/apache/rocketmq/apis/ClientConfiguration.java b/apis/src/main/java/org/apache/rocketmq/apis/ClientConfiguration.java new file mode 100644 index 000000000000..3aab733a2646 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/ClientConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; + +/** + * Common client configuration. + */ +public class ClientConfiguration { + private final String endpoints; + private final SessionCredentialsProvider sessionCredentialsProvider; + private final Duration requestTimeout; + private final boolean enableTracing; + + public static ClientConfigurationBuilder newBuilder() { + return new ClientConfigurationBuilder(); + } + + public ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider, + Duration requestTimeout, boolean enableTracing) { + this.endpoints = checkNotNull(endpoints, "endpoints should not be null"); + this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be" + + " null"); + this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should be not null"); + this.enableTracing = enableTracing; + } + + public String getEndpoints() { + return endpoints; + } + + public SessionCredentialsProvider getCredentialsProvider() { + return sessionCredentialsProvider; + } + + public Duration getRequestTimeout() { + return requestTimeout; + } + + public boolean isEnableTracing() { + return enableTracing; + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/ClientConfigurationBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/ClientConfigurationBuilder.java new file mode 100644 index 000000000000..52c3f60a996f --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/ClientConfigurationBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; + +/** + * Builder to set {@link ClientConfiguration}. + */ +public class ClientConfigurationBuilder { + private String endpoints; + private SessionCredentialsProvider sessionCredentialsProvider; + private Duration requestTimeout; + private boolean enableTracing; + + /** + * Configure the endpoints with which the SDK should communicate. + * + *

Endpoints here means address of service, complying with the following scheme(part square brackets is + * optional). + *

1. DNS scheme(default): dns:host[:port], host is the host to resolve via DNS, port is the port to return + * for each address. If not specified, 443 is used. + *

2. ipv4 scheme: ipv4:address:port[,address:port,...] + *

3. ipv6 scheme: ipv6:address:port[,address:port,...] + *

4. http/https scheme: http|https://host[:port], similar to DNS scheme, if port not specified, 443 is used. + * + * @param endpoints address of service. + * @return the client configuration builder instance. + */ + public ClientConfigurationBuilder setEndpoints(String endpoints) { + checkNotNull(endpoints, "endpoints should not be not null"); + this.endpoints = endpoints; + return this; + } + + public ClientConfigurationBuilder setCredentialProvider(SessionCredentialsProvider sessionCredentialsProvider) { + this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be " + + "null"); + return this; + } + + public ClientConfigurationBuilder setRequestTimeout(Duration requestTimeout) { + this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should not be not null"); + return this; + } + + public ClientConfigurationBuilder enableTracing(boolean enableTracing) { + this.enableTracing = enableTracing; + return this; + } + + public ClientConfiguration build() { + return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, enableTracing); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java new file mode 100644 index 000000000000..075f9ea9bfd0 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +import java.util.Iterator; +import java.util.ServiceLoader; +import org.apache.rocketmq.apis.message.MessageBuilder; +import org.apache.rocketmq.apis.producer.ProducerBuilder; + +/** + * Service provider to seek client, which load client according to + * Java SPI mechanism. + */ +public interface ClientServiceProvider { + static ClientServiceProvider loadService() { + final ServiceLoader loaders = ServiceLoader.load(ClientServiceProvider.class); + final Iterator iterators = loaders.iterator(); + if (iterators.hasNext()) { + return iterators.next(); + } + throw new UnsupportedOperationException("Client service provider not found"); + } + + /** + * Get the producer builder by current provider. + * + * @return the producer builder instance. + */ + ProducerBuilder newProducerBuilder(); + + /** + * Get the message builder by current provider. + * + * @return the message builder instance. + */ + MessageBuilder newMessageBuilder(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/MessageQueue.java b/apis/src/main/java/org/apache/rocketmq/apis/MessageQueue.java new file mode 100644 index 000000000000..93df5866238d --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/MessageQueue.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +public interface MessageQueue { + String getTopic(); + + String getId(); +} \ No newline at end of file diff --git a/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentials.java b/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentials.java new file mode 100644 index 000000000000..3161e88ab160 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentials.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Optional; + +/** + * Session credentials used in service authentications. + */ +public class SessionCredentials { + private final String accessKey; + private final String accessSecret; + private final String securityToken; + + public SessionCredentials(String accessKey, String accessSecret, String securityToken) { + this.accessKey = checkNotNull(accessKey, "accessKey should not be null"); + this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null"); + this.securityToken = checkNotNull(securityToken, "securityToken should not be null"); + } + + public SessionCredentials(String accessKey, String accessSecret) { + this.accessKey = checkNotNull(accessKey, "accessKey should not be null"); + this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null"); + this.securityToken = null; + } + + public String getAccessKey() { + return accessKey; + } + + public String getAccessSecret() { + return accessSecret; + } + + public Optional getSecurityToken() { + if (null == securityToken) { + return Optional.empty(); + } + return Optional.of(securityToken); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentialsProvider.java b/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentialsProvider.java new file mode 100644 index 000000000000..c23ee278dd04 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/SessionCredentialsProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +/** + * Abstract provider to provide {@link SessionCredentials}. + */ +public interface SessionCredentialsProvider { + /** + * Get the provided credentials. + * + * @return provided credentials. + */ + SessionCredentials getCredentials(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/StaticSessionCredentialsProvider.java b/apis/src/main/java/org/apache/rocketmq/apis/StaticSessionCredentialsProvider.java new file mode 100644 index 000000000000..c38982ca3b88 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/StaticSessionCredentialsProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis; + +public class StaticSessionCredentialsProvider implements SessionCredentialsProvider { + private final SessionCredentials credentials; + + public StaticSessionCredentialsProvider(String accessKey, String accessSecret) { + this.credentials = new SessionCredentials(accessKey, accessSecret); + } + + public StaticSessionCredentialsProvider(String accessKey, String accessSecret, String securityToken) { + this.credentials = new SessionCredentials(accessKey, accessSecret, securityToken); + } + + @Override + public SessionCredentials getCredentials() { + return credentials; + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthenticationException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthenticationException.java new file mode 100644 index 000000000000..d39be1b9432e --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthenticationException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +/** + * The difference between {@link AuthorisationException} and {@link AuthenticationException} is that + * {@link AuthenticationException} here means current user's identity could not be recognized. + * + *

For example, {@link AuthenticationException} will be thrown if access key is invalid. + */ +public class AuthenticationException extends ClientException { + public AuthenticationException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthorisationException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthorisationException.java new file mode 100644 index 000000000000..58c9cf1873d0 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/AuthorisationException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +import org.apache.rocketmq.apis.message.Message; +import org.apache.rocketmq.apis.producer.Producer; + +/** + * The difference between {@link AuthenticationException} and {@link AuthorisationException} is that + * {@link AuthorisationException} here means current users don't have permission to do current operation. + * + *

For example, current user is forbidden to send message to this topic, {@link AuthorisationException} will be + * thrown in {@link Producer#send(Message)}. + */ +public class AuthorisationException extends ClientException { + public AuthorisationException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/ClientException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/ClientException.java new file mode 100644 index 000000000000..cb34dec494c2 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/ClientException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +import com.google.common.base.MoreObjects; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Base exception for all exception raised in client, each exception should derive from current class. + * It should throw exception which is derived from {@link ClientException} rather than {@link ClientException} itself. + */ +public abstract class ClientException extends Exception { + /** + * For those {@link ClientException} along with a remote procedure call, request id could be used to track the + * request. + */ + protected static final String REQUEST_ID_KEY = "request-id"; + + private final ErrorCode errorCode; + private final Map context; + + ClientException(ErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + this.context = new HashMap<>(); + } + + ClientException(ErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + this.context = new HashMap<>(); + } + + @SuppressWarnings("SameParameterValue") + protected void putMetadata(String key, String value) { + context.put(key, value); + } + + public Optional getRequestId() { + final String requestId = context.get(REQUEST_ID_KEY); + return null == requestId ? Optional.empty() : Optional.of(requestId); + } + + public ErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(super.toString()) + .add("errorCode", errorCode) + .add("context", context) + .toString(); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/ErrorCode.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/ErrorCode.java new file mode 100644 index 000000000000..327e3634b637 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/ErrorCode.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +/** + * Indicates the reason why the exception is thrown, it can be easily divided into the following categories. + * + *

+ * + * + * + * + * + * + * + *
Error Categories and Exceptions
Category + * Exception + * Code range + *
Illegal client argument + * {@link RemoteIllegalArgumentException} + *

{@link IllegalArgumentException} + *

{@code [101..199]} + *
Authorisation failure + * {@link AuthorisationException} + * {@code [201..299]} + *
Resource not found + * {@link ResourceNotFoundException} + * {@code [301..399]} + *
+ * + *
+ */ +public enum ErrorCode { + /** + * Format of topic is illegal. + */ + INVALID_TOPIC(101), + /** + * Format of consumer group is illegal. + */ + INVALID_CONSUMER_GROUP(102), + /** + * Message is forbidden to publish. + */ + MESSAGE_PUBLISH_FORBIDDEN(201), + /** + * Topic does not exist. + */ + TOPIC_DOES_NOT_EXIST(301), + /** + * Consumer group does not exist. + */ + CONSUMER_GROUP_DOES_NOT_EXIST(302); + + private final int code; + + ErrorCode(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + @Override + public String toString() { + return String.valueOf(code); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/FlowControlException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/FlowControlException.java new file mode 100644 index 000000000000..98001f67b735 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/FlowControlException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class FlowControlException extends ClientException { + public FlowControlException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/NetworkException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/NetworkException.java new file mode 100644 index 000000000000..ad9c5ee05ec8 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/NetworkException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class NetworkException extends ClientException { + public NetworkException(ErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } + + public NetworkException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/RemoteIllegalArgumentException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/RemoteIllegalArgumentException.java new file mode 100644 index 000000000000..4ae6ddae0a36 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/RemoteIllegalArgumentException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class RemoteIllegalArgumentException extends ClientException { + public RemoteIllegalArgumentException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} \ No newline at end of file diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotFoundException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotFoundException.java new file mode 100644 index 000000000000..95860c8d9030 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotFoundException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class ResourceNotFoundException extends ClientException { + public ResourceNotFoundException(ErrorCode code, String message, String requestId) { + super(code, message); + putMetadata(REQUEST_ID_KEY, requestId); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotMatchException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotMatchException.java new file mode 100644 index 000000000000..2f80de1489c0 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/ResourceNotMatchException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class ResourceNotMatchException extends ClientException { + public ResourceNotMatchException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/exception/TimeoutException.java b/apis/src/main/java/org/apache/rocketmq/apis/exception/TimeoutException.java new file mode 100644 index 000000000000..11c77440a346 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/exception/TimeoutException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.exception; + +public class TimeoutException extends ClientException { + public TimeoutException(ErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } + + public TimeoutException(ErrorCode code, String message) { + super(code, message); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/message/Message.java b/apis/src/main/java/org/apache/rocketmq/apis/message/Message.java new file mode 100644 index 000000000000..0503253d41ae --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/message/Message.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.message; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.apis.producer.Producer; + +/** + * Abstract message only used for {@link Producer}. + */ +public interface Message { + /** + * Get the topic of message, which is the first classifier for message. + * + * @return topic of message. + */ + String getTopic(); + + /** + * Get the deep copy of message body. + * + * @return the deep copy of message body. + */ + byte[] getBody(); + + /** + * Get the deep copy of message properties. + * + * @return the deep copy of message properties. + */ + Map getProperties(); + + /** + * Get the tag of message, which is the second classifier besides topic. + * + * @return the tag of message. + */ + Optional getTag(); + + /** + * Get the key collection of message. + * + * @return the key collection of message. + */ + Collection getKeys(); + + /** + * Get the message group, which make sense only when topic type is fifo. + * + * @return message group, which is optional. + */ + Optional getMessageGroup(); + + /** + * Get the expected delivery timestamp, which make sense only when topic type is delay. + * + * @return message expected delivery timestamp, which is optional. + */ + Optional getDeliveryTimestamp(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/message/MessageBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageBuilder.java new file mode 100644 index 000000000000..dfe6e3267df7 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.message; + +import java.util.Collection; + +/** + * Builder to config {@link Message}. + */ +public interface MessageBuilder { + /** + * Set the topic for message. + * + * @param topic topic for the message. + * @return the message builder instance. + */ + MessageBuilder setTopic(String topic); + + /** + * Set the body for message. + * + * @param body body for the message. + * @return the message builder instance. + */ + MessageBuilder setBody(byte[] body); + + /** + * Set the tag for message. + * + * @param tag tag for the message. + * @return the message builder instance. + */ + MessageBuilder setTag(String tag); + + /** + * Set the key for message. + * + * @param key key for the message. + * @return the message builder instance. + */ + MessageBuilder setKey(String key); + + /** + * Set the key collection for message. + * + * @param keys key collection for the message. + * @return the message builder instance. + */ + MessageBuilder setKeys(Collection keys); + + /** + * Set the group for message. + * + * @param messageGroup group for the message. + * @return the message builder instance. + */ + MessageBuilder setMessageGroup(String messageGroup); + + /** + * Add user property for message. + * + * @param key single property key. + * @param value single property value. + * @return the message builder instance. + */ + MessageBuilder addProperty(String key, String value); + + /** + * Finalize the build of the {@link Message} instance. + * + *

Unique {@link MessageId} is generated after message building.

+ * + * @return the message instance. + */ + Message build(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/message/MessageId.java b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageId.java new file mode 100644 index 000000000000..01aa304b0b76 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageId.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.message; + +/** + * Abstract message id, the implement must override {@link Object#toString()}, which indicates the message id using + * string form. + */ +public interface MessageId { + /** + * Get the version of message id. + * + * @return the version of message id. + */ + MessageIdVersion getVersion(); + + /** + * The implementation must override this method, which indicates the message id using string form. + * + * @return string-formed string id. + */ + String toString(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/message/MessageIdVersion.java b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageIdVersion.java new file mode 100644 index 000000000000..1ef3683fe0fb --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageIdVersion.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.message; + +public enum MessageIdVersion { + /** + * V0 version, whose length is 32. + */ + V0, + /** + * V1 version, whose length is 34 and begins with "01". + */ + V1 +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/message/MessageView.java b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageView.java new file mode 100644 index 000000000000..8bd6e1045814 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/message/MessageView.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.message; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.apis.MessageQueue; + +/** + * {@link MessageView} provides a read-only view for message, that's why setters do not exist here. In addition, + * it only makes sense when {@link Message} is sent successfully, or it could be considered as a return receipt + * for producer/consumer. + */ +public interface MessageView { + /** + * Get the unique id of message. + * + * @return unique id. + */ + MessageId getMessageId(); + + /** + * Get the topic of message. + * + * @return topic of message. + */ + String getTopic(); + + /** + * Get the deep copy of message body, which makes the modification of return value does not + * affect the message itself. + * + * @return the deep copy of message body. + */ + byte[] getBody(); + + /** + * Get the deep copy of message properties, which makes the modification of return value does + * not affect the message itself. + * + * @return the deep copy of message properties. + */ + Map getProperties(); + + /** + * Get the tag of message, which is optional. + * + * @return the tag of message, which is optional. + */ + Optional getTag(); + + /** + * Get the key collection of message. + * + * @return the key collection of message. + */ + Collection getKeys(); + + /** + * Get the message group, which is optional and only make sense only when topic type is fifo. + * + * @return message group, which is optional. + */ + Optional getMessageGroup(); + + /** + * Get the expected delivery timestamp, which make sense only when topic type is delay. + * + * @return message expected delivery timestamp, which is optional. + */ + Optional getDeliveryTimestamp(); + + /** + * Get the born host of message. + * + * @return born host of message. + */ + String getBornHost(); + + /** + * Get the born timestamp of message. + * + * @return born timestamp of message. + */ + long getBornTimestamp(); + + /** + * Get the delivery attempt for message. + * + * @return delivery attempt. + */ + int getDeliveryAttempt(); + + /** + * Get the {@link MessageQueue} of message. + * + * @return message queue. + */ + MessageQueue getMessageQueue(); + + /** + * Get the position of message in {@link MessageQueue}. + */ + long getOffset(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/Producer.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/Producer.java new file mode 100644 index 000000000000..5a8c13130776 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/Producer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.apis.exception.ClientException; +import org.apache.rocketmq.apis.message.Message; + +/** + * Producer is a thread-safe rocketmq client which is used to publish messages. + * + *

On account of network timeout or other reasons, rocketmq producer only promised the at-least-once semantics. + * For producer, at-least-once semantics means potentially attempts are made at sending it, messages may be + * duplicated but not lost. + */ +public interface Producer extends Closeable { + /** + * Sends a message synchronously. + * + *

This method does not return until it gets the definitive result. + * + * @param message message to send. + */ + SendReceipt send(Message message) throws ClientException; + + /** + * Sends a transactional message synchronously. + * + * @param message message to send. + * @param transaction transaction to bind. + * @return the message id assigned to the appointed message. + */ + SendReceipt send(Message message, Transaction transaction) throws ClientException; + + /** + * Sends a message asynchronously. + * + *

This method returns immediately, the result is included in the {@link CompletableFuture}; + * + * @param message message to send. + * @return a future that indicates the result. + */ + CompletableFuture sendAsync(Message message); + + /** + * Sends batch messages synchronously. + * + *

This method does not return until it gets the definitive result. + * + *

All messages to send should have the same topic. + * + * @param messages batch messages to send. + * @return collection indicates the message id assigned to the appointed message, which keep the same order + * messages collection. + */ + List send(List messages) throws ClientException; + + /** + * Begins a transaction. + * + *

For example: + * + *

{@code
+     * Transaction transaction = producer.beginTransaction();
+     * SendReceipt receipt1 = producer.send(message1, transaction);
+     * SendReceipt receipt2 = producer.send(message2, transaction);
+     * transaction.commit();
+     * }
+ * + * @return a transaction entity to execute commit/rollback operation. + */ + Transaction beginTransaction() throws ClientException; + + /** + * Close the producer and release all related resources. + * + *

This method does not return until all related resource is released. Once producer is closed, it could + * not be started once again. we maintained an FSM (finite-state machine) to record the different states + * for each producer. + */ + @Override + void close(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/ProducerBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/ProducerBuilder.java new file mode 100644 index 000000000000..d4f8f196498f --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/ProducerBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +import org.apache.rocketmq.apis.ClientConfiguration; +import org.apache.rocketmq.apis.exception.ClientException; +import org.apache.rocketmq.apis.message.Message; +import org.apache.rocketmq.apis.retry.BackoffRetryPolicy; + +/** + * Builder to config and start {@link Producer}. + */ +public interface ProducerBuilder { + /** + * Set the client configuration for producer. + * + * @param clientConfiguration client's configuration. + * @return the producer builder instance. + */ + ProducerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Declare topics ahead of message sending/preparation. + * + *

Even though the declaration is not essential, we highly recommend to declare the topics in + * advance, which could help to discover potential mistakes. + * + * @param topics topics to send/prepare. + * @return the producer builder instance. + */ + ProducerBuilder setTopics(String... topics); + + /** + * Set the threads count for {@link Producer#sendAsync(Message)}. + * + * @return the producer builder instance. + */ + ProducerBuilder setAsyncThreadCount(int count); + + /** + * Set the retry policy to send message. + * + * @param retryPolicy policy to re-send message when failure encountered. + * @return the producer builder instance. + */ + ProducerBuilder setRetryPolicy(BackoffRetryPolicy retryPolicy); + + /** + * Set the transaction checker for producer. + * + * @param checker transaction checker. + * @return the produce builder instance. + */ + ProducerBuilder setTransactionChecker(TransactionChecker checker); + + /** + * Finalize the build of {@link Producer} instance and start. + * + *

The producer does a series of preparatory work during startup, which could help to identify more unexpected + * error earlier. + * + *

Especially, if this method is invoked more than once, different producer will be created and started. + * + * @return the producer instance. + */ + Producer build() throws ClientException; +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/SendReceipt.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/SendReceipt.java new file mode 100644 index 000000000000..7d6b899578f8 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/SendReceipt.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +import org.apache.rocketmq.apis.MessageQueue; +import org.apache.rocketmq.apis.message.MessageId; + +public interface SendReceipt { + MessageId getMessageId(); + + MessageQueue getMessageQueue(); + + long getOffset(); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/Transaction.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/Transaction.java new file mode 100644 index 000000000000..4ad720495793 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/Transaction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +import org.apache.rocketmq.apis.exception.ClientException; + +/** + * An entity to describe an independent transaction. + * + *

Once request of commit of roll-back reached server, subsequent arrived commit or roll-back request in + * {@link Transaction} would be ignored by server. + * + *

If transaction is not commit/roll-back in time, it is suspended until it is solved by {@link TransactionChecker} + * or reach the end of life. + */ +public interface Transaction { + /** + * Try to commit the transaction, which would expose the message before the transaction is closed if no exception + * thrown. + * + *

What you should pay more attention is that commit may be successful even exception thrown. + */ + void commit() throws ClientException; + + /** + * Try to roll back the transaction, which would expose the message before the transaction is closed if no exception + * thrown. + * + *

What you should pay more attention is that roll-back may be successful even exception thrown. + */ + void rollback() throws ClientException; +} \ No newline at end of file diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionChecker.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionChecker.java new file mode 100644 index 000000000000..e20eb4376a68 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionChecker.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +import org.apache.rocketmq.apis.message.MessageView; + +/** + * Used to determine {@link TransactionResolution} when {@link Transaction} is not committed or roll-backed in time. + * {@link Transaction#commit()} and {@link Transaction#rollback()} does not promise that it would be applied + * successfully, so that checker here is necessary. + * + *

If {@link TransactionChecker#check(MessageView)} returns {@link TransactionResolution#UNKNOWN} or exception + * raised during the invocation of {@link TransactionChecker#check(MessageView)}, the examination from server will be + * performed periodically. + */ +public interface TransactionChecker { + /** + * Server will solve the suspended transactional message by this method. + * + *

If exception was thrown in this method, which equals {@link TransactionResolution#UNKNOWN} is returned. + * + * @param messageView message to determine {@link TransactionResolution}. + * @return the transaction resolution. + */ + TransactionResolution check(MessageView messageView); +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionResolution.java b/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionResolution.java new file mode 100644 index 000000000000..084e609ce1e6 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/producer/TransactionResolution.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.producer; + +public enum TransactionResolution { + /** + * Notify server that current transaction should be committed. + */ + COMMIT, + /** + * Notify server that current transaction should be roll-backed. + */ + ROLLBACK, + /** + * Notify server that the state of this transaction is not sure. You should be cautions before return unknown + * because the examination from server will be performed periodically. + */ + UNKNOWN; +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/retry/BackOffRetryPolicyBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/retry/BackOffRetryPolicyBuilder.java new file mode 100644 index 000000000000..8addf9337e23 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/retry/BackOffRetryPolicyBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.retry; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; + +public class BackOffRetryPolicyBuilder { + private int maxAttempts = 3; + private Duration initialBackoff = Duration.ofMillis(100); + private Duration maxBackoff = Duration.ofSeconds(1); + private int backoffMultiplier = 2; + + public BackOffRetryPolicyBuilder() { + } + + BackOffRetryPolicyBuilder setMaxAttempts(int maxAttempts) { + checkArgument(maxAttempts > 0, "maxAttempts must be positive"); + this.maxAttempts = maxAttempts; + return this; + } + + BackOffRetryPolicyBuilder setInitialBackoff(Duration initialBackoff) { + this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null"); + return this; + } + + BackOffRetryPolicyBuilder setMaxBackoff(Duration maxBackoff) { + this.maxBackoff = checkNotNull(maxBackoff, "maxBackoff should not be null"); + return this; + } + + BackOffRetryPolicyBuilder setBackoffMultiplier(int backoffMultiplier) { + checkArgument(backoffMultiplier > 0, "backoffMultiplier must be positive"); + this.backoffMultiplier = backoffMultiplier; + return this; + } + + BackoffRetryPolicy build() { + return new BackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/retry/BackoffRetryPolicy.java b/apis/src/main/java/org/apache/rocketmq/apis/retry/BackoffRetryPolicy.java new file mode 100644 index 000000000000..db71ba767d02 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/retry/BackoffRetryPolicy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.retry; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import java.time.Duration; +import java.util.Random; + +/** + * The {@link BackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to + * gRPC Retry Design. + */ +public class BackoffRetryPolicy implements RetryPolicy { + public static BackOffRetryPolicyBuilder newBuilder() { + return new BackOffRetryPolicyBuilder(); + } + + private final Random random; + private final int maxAttempts; + private final Duration initialBackoff; + private final Duration maxBackoff; + private final int backoffMultiplier; + + public BackoffRetryPolicy(int maxAttempts, Duration initialBackoff, Duration maxBackoff, int backoffMultiplier) { + checkArgument(maxBackoff.compareTo(initialBackoff) <= 0, "initialBackoff should not be minor than maxBackoff"); + checkArgument(maxAttempts > 0, "maxAttempts must be positive"); + this.random = new Random(); + this.maxAttempts = maxAttempts; + this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null"); + this.maxBackoff = maxBackoff; + this.backoffMultiplier = backoffMultiplier; + } + + @Override + public int getMaxAttempts() { + return maxAttempts; + } + + @Override + public Duration getNextAttemptDelay(int attempt) { + checkArgument(attempt > 0, "attempt must be positive"); + int randomNumberBound = Math.min(initialBackoff.getNano() * (backoffMultiplier ^ (attempt - 1)), + maxBackoff.getNano()); + return Duration.ofNanos(random.nextInt(randomNumberBound)); + } + + public Duration getInitialBackoff() { + return initialBackoff; + } + + public Duration getMaxBackoff() { + return maxBackoff; + } + + public int getBackoffMultiplier() { + return backoffMultiplier; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxAttempts", maxAttempts) + .add("initialBackoff", initialBackoff) + .add("maxBackoff", maxBackoff) + .add("backoffMultiplier", backoffMultiplier) + .toString(); + } +} diff --git a/apis/src/main/java/org/apache/rocketmq/apis/retry/RetryPolicy.java b/apis/src/main/java/org/apache/rocketmq/apis/retry/RetryPolicy.java new file mode 100644 index 000000000000..53b079935003 --- /dev/null +++ b/apis/src/main/java/org/apache/rocketmq/apis/retry/RetryPolicy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.apis.retry; + +import java.time.Duration; + +/** + * Internal interface for retry policy. + */ +interface RetryPolicy { + /** + * Get the max attempt times for retry. + * + * @return max attempt times. + */ + int getMaxAttempts(); + + /** + * Get await time after current attempts, the attempt index starts at 1. + * + * @param attempt current attempt. + * @return await time. + */ + Duration getNextAttemptDelay(int attempt); +} diff --git a/pom.xml b/pom.xml index 880df2db6e6a..b4a7c9dd4598 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ + apis client common broker @@ -277,6 +278,9 @@ prepare-agent-integration + + **/apache/rocketmq/apis/* + ${project.build.directory}/jacoco-it.exec failsafeArgLine