Skip to content

Commit

Permalink
Polish code
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Jun 13, 2023
1 parent 57eb6d6 commit 20ba65e
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
Expand All @@ -39,29 +35,11 @@ public class AsyncProducerExample {
private AsyncProducerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
public static void main(String[] args) throws ClientException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourTopic";
// In most case, you don't need to create too many producers, singleton pattern is recommended.
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
// route before message publishing.
.setTopics(topic)
// May throw {@link ClientException} if the producer is not initialized.
.build();
final Producer producer = ProducerSingleton.getInstance(topic);
// Define your message body.
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
Expand All @@ -88,6 +66,7 @@ public static void main(String[] args) throws ClientException, IOException, Inte
// Block to avoid exist of background threads.
Thread.sleep(Long.MAX_VALUE);
// Close the producer when you don't need it anymore.
producer.close();
// You could close it manually or add this into the JVM shutdown hook.
// producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void main(String[] args) throws ClientException {
}), receiveCallbackExecutor);
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.apache.rocketmq.client.apis.ClientException;
Expand All @@ -34,7 +33,7 @@ public class ProducerDelayMessageExample {
private ProducerDelayMessageExample() {
}

public static void main(String[] args) throws ClientException, IOException {
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

String topic = "yourDelayTopic";
Expand All @@ -61,6 +60,7 @@ public static void main(String[] args) throws ClientException, IOException {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
// You could close it manually or add this into the JVM shutdown hook.
// producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
Expand All @@ -33,7 +32,7 @@ public class ProducerFifoMessageExample {
private ProducerFifoMessageExample() {
}

public static void main(String[] args) throws ClientException, IOException {
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

String topic = "yourFifoTopic";
Expand All @@ -59,6 +58,7 @@ public static void main(String[] args) throws ClientException, IOException {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
// You could close it manually or add this into the JVM shutdown hook.
// producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ public static void main(String[] args) throws ClientException {
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
Expand All @@ -29,47 +28,57 @@

public class ProducerSingleton {
private static volatile Producer PRODUCER;
private static volatile Producer TRANSACTIONAL_PRODUCER;
private static final String ACCESS_KEY = "yourAccessKey";
private static final String SECRET_KEY = "yourSecretKey";
private static final String ENDPOINTS = "foobar.com:8080";

private ProducerSingleton() {
}

public static Producer getInstance(String... topics) throws ClientException {
return getInstance(null, topics);
private static Producer buildProducer(TransactionChecker checker, String... topics) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
// This parameter is necessary only when the server ACL is enabled. Otherwise,
// it does not need to be set by default.
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(ENDPOINTS)
.setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch
// the topic route before message publishing.
.setTopics(topics);
if (checker != null) {
// Set the transaction checker.
builder.setTransactionChecker(checker);
}
return builder.build();
}

public static Producer getInstance(TransactionChecker checker, String... topics) throws ClientException {
public static Producer getInstance(String... topics) throws ClientException {
if (null == PRODUCER) {
synchronized (ProducerSingleton.class) {
if (null == PRODUCER) {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(ENDPOINTS)
.setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch
// the topic route before message publishing.
.setTopics(topics);
if (checker != null) {
builder.setTransactionChecker(checker);
}
PRODUCER = builder.build();
PRODUCER = buildProducer(null, topics);
}
}
}
return PRODUCER;
}

public static void shutdown() throws IOException {
if (null != PRODUCER) {
PRODUCER.close();
public static Producer getTransactionalInstance(TransactionChecker checker,
String... topics) throws ClientException {
if (null == TRANSACTIONAL_PRODUCER) {
synchronized (ProducerSingleton.class) {
if (null == TRANSACTIONAL_PRODUCER) {
TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
}
}
}
return TRANSACTIONAL_PRODUCER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void main(String[] args) throws ClientException {
return TransactionResolution.COMMIT;
};
// Get producer using singleton pattern.
final Producer producer = ProducerSingleton.getInstance(checker, topic);
final Producer producer = ProducerSingleton.getTransactionalInstance(checker, topic);
final Transaction transaction = producer.beginTransaction();
// Define your message body.
byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
Expand All @@ -70,5 +70,9 @@ public static void main(String[] args) throws ClientException {
transaction.commit();
// Or rollback the transaction.
// transaction.rollback();

// Close the producer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class PushConsumerExample {
private PushConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
public static void main(String[] args) throws ClientException, InterruptedException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
Expand Down Expand Up @@ -71,6 +71,7 @@ public static void main(String[] args) throws ClientException, IOException, Inte
// Block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// Close the push consumer when you don't need it anymore.
pushConsumer.close();
// You could close it manually or add this into the JVM shutdown hook.
pushConsumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static void main(String[] args) throws ClientException {
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}

0 comments on commit 20ba65e

Please sign in to comment.