Skip to content

Commit

Permalink
Refactor PullOption
Browse files Browse the repository at this point in the history
- Make maxMessages a method parameter rather than an optional option
- Move MessageConsumer.PullOption to PubSub
- Remove MessageConsumer.start/stop methods in favor of close()
  • Loading branch information
mziccard committed May 10, 2016
1 parent 57d613b commit f72a623
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ enum OptionType implements Option.OptionType {
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

String getString(Map<Option.OptionType, ?> options) {
return get(options);
}

Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

private ListOption(OptionType option, Object value) {
Expand All @@ -73,27 +81,31 @@ public static ListOption pageToken(String pageToken) {
*/
final class PullOption extends Option {

private static final long serialVersionUID = -5220474819637439937L;
private static final long serialVersionUID = 4792164134340316582L;

enum OptionType implements Option.OptionType {
MAX_MESSAGES;
MAX_CONCURRENT_CALLBACKS;

@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

private PullOption(OptionType option, Object value) {
private PullOption(Option.OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify the maximum number of messages that can be returned by the pull
* operation.
* Returns an option to specify the maximum number of messages that can be executed
* concurrently at any time.
*/
public static PullOption maxMessages(int maxMessages) {
return new PullOption(OptionType.MAX_MESSAGES, maxMessages);
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

Expand All @@ -110,38 +122,6 @@ interface MessageProcessor {
*/
interface MessageConsumer extends AutoCloseable {

/**
* Class for specifying options to pull messages through a {@code MessageConsumer}.
*/
final class PullOption extends Option {

private static final long serialVersionUID = 4792164134340316582L;

enum OptionType implements Option.OptionType {
MAX_CONCURRENT_CALLBACKS;

@SuppressWarnings("unchecked")
<T> T get(Map<OptionType, ?> options) {
return (T) options.get(this);
}
}

private PullOption(OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify the maximum number of messages that can be executed
* concurrently at any time.
*/
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

void start(MessageConsumer.PullOption... options);

void stop();
}

Topic create(TopicInfo topic);
Expand Down Expand Up @@ -200,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {

Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

Iterator<ReceivedMessage> pull(String subscription, PullOption... options);
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options);
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

MessageConsumer pullAsync(String subscription, MessageProcessor callback);
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);

void ack(String subscription, String ackId, String... ackIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
// this should set return_immediately to true
return null;
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
Expand All @@ -211,7 +211,8 @@ public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOpti
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
return pubsub.replacePushConfigAsync(name(), pushConfig);
}

public Iterator<ReceivedMessage> pull(PullOption... options) {
return pubsub.pull(name(), options);
public Iterator<ReceivedMessage> pull(int maxMessages) {
return pubsub.pull(name(), maxMessages);
}

public Future<Iterator<ReceivedMessage>> pullAsync(PullOption... options) {
return pubsub.pullAsync(name(), options);
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
}

public MessageConsumer pullAsync(MessageProcessor callback) {
return pubsub.pullAsync(name(), callback);
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
return pubsub.pullAsync(name(), callback, options);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class PubSubTest {

private static final int PAGE_SIZE = 42;
private static final String PAGE_TOKEN = "page token";
private static final int MAX_MESSAGES = 42;
private static final int MAX_CONCURRENT_CALLBACKS = 42;

@Test
Expand All @@ -45,17 +44,8 @@ public void testListOption() {

@Test
public void testPullOptions() {
PullOption pullOption = PullOption.maxMessages(MAX_MESSAGES);
assertEquals(MAX_MESSAGES, pullOption.value());
assertEquals(PullOption.OptionType.MAX_MESSAGES, pullOption.optionType());
}

@Test
public void testMessageConsumerPullOptions() {
MessageConsumer.PullOption pullOption =
MessageConsumer.PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS);
PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS);
assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value());
assertEquals(MessageConsumer.PullOption.OptionType.MAX_CONCURRENT_CALLBACKS,
pullOption.optionType());
assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType());
}
}

0 comments on commit f72a623

Please sign in to comment.