Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent blocking event loop thread by replacing ArrayDeque with HashIndexedQueue #2953

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.enabled();

public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = true;

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -115,6 +117,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final boolean useHashIndexedQueue;

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
Expand All @@ -131,6 +135,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.useHashIndexedQueue = builder.useHashIndexedQueue;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -149,6 +154,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.useHashIndexedQueue = original.isUseHashIndexedQueue();
}

/**
Expand Down Expand Up @@ -214,6 +220,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
}

Expand Down Expand Up @@ -269,8 +277,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
*
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
* @since 6.0
* @see DecodeBufferPolicies
* @since 6.0
*/
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {

Expand Down Expand Up @@ -317,8 +325,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
*
* @param protocolVersion version to use.
* @return {@code this}
* @since 6.0
* @see ProtocolVersion#newestSupported()
* @since 6.0
*/
public Builder protocolVersion(ProtocolVersion protocolVersion) {

Expand All @@ -337,9 +345,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.2
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
* @since 5.2
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
Expand Down Expand Up @@ -459,6 +467,20 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Use hash indexed queue, which provides O(1) remove(Object) thus won't cause blocking issues.
*
* @param useHashIndexedQueue true/false
* @return {@code this}
* @see io.lettuce.core.protocol.CommandHandler.AddToStack
* @since 6.6
*/
@SuppressWarnings("JavadocReference")
public Builder useHashIndexQueue(boolean useHashIndexedQueue) {
this.useHashIndexedQueue = useHashIndexedQueue;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -476,7 +498,6 @@ public ClientOptions build() {
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @since 5.1
*/
public ClientOptions.Builder mutate() {
Expand Down Expand Up @@ -535,7 +556,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
*
* @return zero.
* @since 5.2
*
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
*/
@Deprecated
Expand Down Expand Up @@ -684,6 +704,15 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
* @return if hash indexed queue should be used
*/
public boolean isUseHashIndexedQueue() {
return useHashIndexedQueue;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Copyright 2011-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*/
package io.lettuce.core.datastructure.queue;

import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import io.lettuce.core.internal.LettuceAssert;
import org.jetbrains.annotations.NotNull;

/**
* A queue implementation that supports O(1) removal of elements. The queue is backed by a hash map and a doubly linked list.
*
* @author chenxiaofan
*/
@SuppressWarnings("unchecked")
public class HashIndexedQueue<E> extends AbstractQueue<E> {

private final Map<E, Object> map; // Object can be Node<E> or List<Node<E>>

private Node<E> head;

private Node<E> tail;

private int size;

private static class Node<E> {

E value;

Node<E> next;

Node<E> prev;

Node(E value) {
this.value = value;
}

}

/**
* Create a new instance of the {@link HashIndexedQueue}.
*/
public HashIndexedQueue() {
map = new HashMap<>();
size = 0;
}

@Override
public boolean add(E e) {
return offer(e);
}

@Override
public boolean offer(E e) {
final Node<E> newNode = new Node<>(e);
if (tail == null) {
head = tail = newNode;
} else {
tail.next = newNode;
newNode.prev = tail;
tail = newNode;
}

if (!map.containsKey(e)) {
map.put(e, newNode);
} else {
Object current = map.get(e);
if (current instanceof Node) {
List<Node<E>> nodes = new ArrayList<>();
nodes.add((Node<E>) current);
nodes.add(newNode);
map.put(e, nodes);
} else {
((List<Node<E>>) current).add(newNode);
}
}
size++;
return true;
}

@Override
public E poll() {
if (head == null) {
return null;
}
E value = head.value;
removeNodeFromMap(head);
head = head.next;
if (head == null) {
tail = null;
} else {
head.prev = null;
}
size--;
return value;
}

@Override
public E peek() {
if (head == null) {
return null;
}
return head.value;
}

@Override
public boolean remove(Object o) {
return removeFirstOccurrence(o);
}

@Override
public int size() {
return size;
}

@Override
public boolean contains(Object o) {
return map.containsKey(o);
}

public class Iterator implements java.util.Iterator<E> {

private Node<E> current;

private Node<E> prev;

private Iterator() {
current = HashIndexedQueue.this.head;
prev = null;
}

@Override
public boolean hasNext() {
return current != null;
}

@Override
public E next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
E value = current.value;
prev = current;
current = current.next;
return value;
}

@Override
public void remove() {
if (prev != null) {
removeNodeFromMap(prev);
removeNode(prev);
size--;
// remove once
prev = null;
}
}

}

@NotNull
@Override
public Iterator iterator() {
return new Iterator();
}

@Override
public boolean removeAll(Collection<?> c) {
boolean modified = false;
for (Object e : c) {
if (removeAllOccurrences(e)) {
modified = true;
}
}
return modified;
}

@Override
public void clear() {
head = null;
tail = null;
map.clear();
size = 0;
}

private boolean removeFirstOccurrence(Object element) {
Object current = map.get(element);
if (current == null) {
return false;
}
if (current instanceof Node) {
Node<E> node = (Node<E>) current;
removeNode(node);
map.remove(element);
} else {
List<Node<E>> nodes = (List<Node<E>>) current;
Node<E> node = nodes.remove(0);
if (nodes.isEmpty()) {
map.remove(element);
}
removeNode(node);
}
size--;
return true;
}

private boolean removeAllOccurrences(Object element) {
Object current = map.get(element);
if (current == null) {
return false;
}
if (current instanceof Node) {
final Node<E> node = (Node<E>) current;
removeNode(node);
size--;
} else {
final List<Node<E>> nodes = (List<Node<E>>) current;
for (Node<E> node : nodes) {
removeNode(node);
size--;
}
}
map.remove(element);
return true;
}

private void removeNode(Node<E> node) {
if (node.prev != null) {
node.prev.next = node.next;
} else {
head = node.next;
}
if (node.next != null) {
node.next.prev = node.prev;
} else {
tail = node.prev;
}
}

private void removeNodeFromMap(Node<E> node) {
E value = node.value;
Object current = map.get(value);
if (current instanceof Node) {
LettuceAssert.assertState(current == node, "current != node");
map.remove(value);
} else {
List<Node<E>> nodes = (List<Node<E>>) current;
final boolean removed = nodes.remove(node);
LettuceAssert.assertState(removed, "!nodes.remove(node)");
if (nodes.isEmpty()) {
map.remove(value);
}
}
}

}
Loading
Loading