Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace usages of Atomic* for Atomic*FieldUpdater #195

Merged
merged 1 commit into from
Feb 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public void safeRun() {
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
checkArgument(this == firstInQueue);

ml.numberOfEntries.incrementAndGet();
ml.totalSize.addAndGet(dataLength);
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
if (ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
ml.entryCache.insert(new EntryImpl(ledger.getId(), entryId, data));
Expand All @@ -148,7 +148,7 @@ public void safeRun() {
data.release();

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ml.entriesAddedCounter.incrementAndGet();
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
ml.lastConfirmedEntry = lastEntry;

if (closeWhenDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.yahoo.pulsar.broker.namespace;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
Expand All @@ -35,7 +35,11 @@ public class OwnedBundle {
* based on {@link #active} flag
*/
private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock();
private final AtomicBoolean isActive = new AtomicBoolean(true);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<OwnedBundle> IS_ACTIVE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OwnedBundle.class, "isActive");
private volatile int isActive = TRUE;

/**
* constructor
Expand All @@ -44,6 +48,7 @@ public class OwnedBundle {
*/
public OwnedBundle(NamespaceBundle suName) {
this.bundle = suName;
IS_ACTIVE_UPDATER.set(this, TRUE);
};

/**
Expand All @@ -55,7 +60,7 @@ public OwnedBundle(NamespaceBundle suName) {
*/
public OwnedBundle(NamespaceBundle suName, boolean active) {
this.bundle = suName;
this.isActive.set(active);
IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE);
}

/**
Expand Down Expand Up @@ -90,11 +95,11 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {

try {
// set the flag locally s.t. no more producer/consumer to this namespace is allowed
if (!this.isActive.compareAndSet(true, false)) {
if (!IS_ACTIVE_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// An exception is thrown when the namespace is not in active state (i.e. another thread is
// removing/have removed it)
throw new IllegalStateException(
"Namespace is not active. ns:" + this.bundle + "; state:" + this.isActive.get());
"Namespace is not active. ns:" + this.bundle + "; state:" + IS_ACTIVE_UPDATER.get(this));
}
} finally {
// no matter success or not, unlock
Expand Down Expand Up @@ -137,10 +142,10 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {
* @return boolean value indicate that the namespace is active or not.
*/
public boolean isActive() {
return this.isActive.get();
return IS_ACTIVE_UPDATER.get(this) == TRUE;
}

public void setActive(boolean active) {
isActive.set(active);
IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -67,18 +67,24 @@ public class Consumer {
// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
// increase its availability
private final AtomicInteger messagePermits = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
private volatile int messagePermits = 0;
// It starts keep tracking of messagePermits once consumer gets blocked, as consumer needs two separate counts:
// messagePermits (1) before and (2) after being blocked: to dispatch only blockedPermit number of messages at the
// time of redelivery
private final AtomicInteger permitsReceivedWhileConsumerBlocked = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
private volatile int permitsReceivedWhileConsumerBlocked = 0;

private final ConcurrentOpenHashMap<PositionImpl, Integer> pendingAcks;

private final ConsumerStats stats;

private final int maxUnackedMessages;
private AtomicInteger unackedMessages = new AtomicInteger(0);
private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
Expand All @@ -93,6 +99,9 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);

stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
Expand Down Expand Up @@ -181,7 +190,7 @@ public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
}

private void incrementUnackedMessages(int ackedMessages) {
if (unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
if (UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
blockedConsumerOnUnackedMsgs = true;
}
}
Expand Down Expand Up @@ -226,7 +235,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) {
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = messagePermits.addAndGet(-permitsToReduce);
int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (permits < 0) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -305,15 +314,15 @@ void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.get(this) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages);
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits = permitsReceivedWhileConsumerBlocked.getAndAdd(additionalNumberOfMessages);
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
}

if (log.isDebugEnabled()) {
Expand All @@ -331,15 +340,15 @@ void flowPermits(int additionalNumberOfMessages) {
* Consumer whose blockedPermits needs to be dispatched
*/
void flowConsumerBlockedPermits(Consumer consumer) {
int additionalNumberOfPermits = consumer.permitsReceivedWhileConsumerBlocked.getAndSet(0);
int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
// add newly flow permits to actual consumer.messagePermits
consumer.messagePermits.getAndAdd(additionalNumberOfPermits);
MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
// dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer
subscription.consumerFlow(consumer, additionalNumberOfPermits);
}

public int getAvailablePermits() {
return messagePermits.get();
return MESSAGE_PERMITS_UPDATER.get(this);
}

public boolean isBlocked() {
Expand Down Expand Up @@ -367,7 +376,7 @@ public void updateRates() {

public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get();
stats.unackedMessages = UNACKED_MESSAGES_UPDATER.get(this);
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
return stats;
}
Expand Down Expand Up @@ -433,7 +442,7 @@ private void removePendingAcks(PositionImpl position) {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) <= (maxUnackedMessages / 2))
if (((UNACKED_MESSAGES_UPDATER.addAndGet(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
Expand All @@ -450,7 +459,7 @@ public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {

public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
blockedConsumerOnUnackedMsgs = false;
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
Expand Down Expand Up @@ -479,21 +488,20 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
}
}

unackedMessages.addAndGet(-totalRedeliveryMessages);
UNACKED_MESSAGES_UPDATER.addAndGet(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);

int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
permitsReceivedWhileConsumerBlocked.get());
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this));

// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
permitsReceivedWhileConsumerBlocked.getAndAdd(-numberOfBlockedPermits);
messagePermits.getAndAdd(numberOfBlockedPermits);
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits);
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
subscription.consumerFlow(this, numberOfBlockedPermits);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -46,7 +46,9 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche

private final PersistentTopic topic;
private final ManagedCursor cursor;
private final AtomicReference<Consumer> activeConsumer = new AtomicReference<Consumer>();
private static final AtomicReferenceFieldUpdater<PersistentDispatcherSingleActiveConsumer, Consumer> ACTIVE_CONSUMER_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
private final CopyOnWriteArrayList<Consumer> consumers;
private boolean havePendingRead = false;
private CompletableFuture<Void> closeFuture = null;
Expand All @@ -67,6 +69,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
this.partitionIndex = partitionIndex;
this.subscriptionType = subscriptionType;
this.readBatchSize = MaxReadBatchSize;
ACTIVE_CONSUMER_UPDATER.set(this, null);
}

private void pickAndScheduleActiveConsumer() {
Expand All @@ -75,9 +78,9 @@ private void pickAndScheduleActiveConsumer() {
consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName()));

int index = partitionIndex % consumers.size();
Consumer prevConsumer = activeConsumer.getAndSet(consumers.get(index));
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

if (prevConsumer == activeConsumer.get()) {
if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
// Active consumer did not change. Do nothing at this point
return;
}
Expand All @@ -90,7 +93,7 @@ private void pickAndScheduleActiveConsumer() {
// let it finish and then rewind
if (!havePendingRead) {
cursor.rewind();
readMoreEntries(activeConsumer.get());
readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
}
}

Expand All @@ -115,7 +118,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
}

if (consumers.isEmpty()) {
activeConsumer.set(null);
ACTIVE_CONSUMER_UPDATER.set(this, null);
}

if (closeFuture == null && !consumers.isEmpty()) {
Expand Down Expand Up @@ -143,7 +146,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
*/
@Override
public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, activeConsumer.get());
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}

/**
Expand Down Expand Up @@ -189,7 +192,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o

readFailureBackoff.reduceToHalf();

Consumer currentConsumer = activeConsumer.get();
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
Expand All @@ -203,7 +206,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
if (future.isSuccess()) {
// Schedule a new read batch operation only after the previous batch has been written to the socket
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = activeConsumer.get();
Consumer newConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
Expand All @@ -222,7 +225,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
@Override
public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!havePendingRead) {
if (activeConsumer.get() == consumer) {
if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
}
Expand All @@ -242,7 +245,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
if (consumer != activeConsumer.get()) {
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
consumer);
return;
Expand Down Expand Up @@ -320,7 +323,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer = activeConsumer.get();
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
Expand All @@ -340,7 +343,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

@Override
public boolean isConsumerConnected() {
return activeConsumer.get() != null;
return ACTIVE_CONSUMER_UPDATER.get(this) != null;
}

@Override
Expand All @@ -354,7 +357,7 @@ public SubType getType() {
}

public Consumer getActiveConsumer() {
return activeConsumer.get();
return ACTIVE_CONSUMER_UPDATER.get(this);
}

}
Loading