Skip to content

Move ConversantMedia disruptor blocking queue to new module #2914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions log4j-conversant/.log4j-plugin-processing-activator
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This file is here to activate the `plugin-processing` Maven profile.
90 changes: 90 additions & 0 deletions log4j-conversant/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to you under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j</artifactId>
<version>${revision}</version>
<relativePath>../log4j-parent</relativePath>
</parent>

<artifactId>log4j-conversant</artifactId>
<name>Apache Log4j Conversant Disruptor-based supplements</name>
<description>Provides ConversantMedia Disruptor-based data structure implementations for the Apache Log4j.</description>

<properties>
<conversant.disruptor.version>1.2.21</conversant.disruptor.version>
</properties>

<dependencyManagement>
<dependencies>

<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>${conversant.disruptor.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.jspecify</groupId>
<artifactId>jspecify</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-plugins</artifactId>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.core.async;
package org.apache.logging.log4j.conversant;

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.core.async.BlockingQueueFactory;
import org.apache.logging.log4j.plugins.Configurable;
import org.apache.logging.log4j.plugins.Plugin;
import org.apache.logging.log4j.plugins.PluginAttribute;
import org.apache.logging.log4j.plugins.PluginFactory;

/**
* Factory for creating instances of {@link DisruptorBlockingQueue}.
* A {@link BlockingQueueFactory} based on <a href="https://github.com/conversant/disruptor">Conversant Disruptor BlockingQueue</a>.
*
* @since 2.7
* @since 3.0.0
*/
@Configurable(elementType = BlockingQueueFactory.ELEMENT_TYPE, printObject = true)
@Plugin("DisruptorBlockingQueue")
public class DisruptorBlockingQueueFactory implements BlockingQueueFactory {
public final class DisruptorBlockingQueueFactory implements BlockingQueueFactory {

private final SpinPolicy spinPolicy;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.conversant;

import static java.util.Objects.requireNonNull;

import aQute.bnd.annotation.spi.ServiceProvider;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Integers;
import org.apache.logging.log4j.kit.env.PropertyEnvironment;
import org.apache.logging.log4j.kit.recycler.Recycler;
import org.apache.logging.log4j.kit.recycler.RecyclerFactory;
import org.apache.logging.log4j.kit.recycler.RecyclerFactoryProvider;
import org.apache.logging.log4j.kit.recycler.RecyclerProperties;
import org.apache.logging.log4j.kit.recycler.support.AbstractRecycler;
import org.apache.logging.log4j.status.StatusLogger;

/**
* A {@link Recycler} factory provider implementation based on
* <a href="https://github.com/conversant/disruptor>Conversant's Disruptor BlockingQueue</a>.
*
* @since 3.0.0
*/
@ServiceProvider(value = RecyclerFactoryProvider.class)
public final class DisruptorRecyclerFactoryProvider implements RecyclerFactoryProvider {

private static final Logger LOGGER = StatusLogger.getLogger();

@Override
public int getOrder() {
return 600;
}

@Override
public String getName() {
return "conversant-disruptor";
}

@Override
public RecyclerFactory createForEnvironment(final PropertyEnvironment environment) {
requireNonNull(environment, "environment");
return new DisruptorRecyclerFactory(
environment.getProperty(RecyclerProperties.class),
environment.getProperty(DisruptorRecyclerProperties.class));
}

private static final class DisruptorRecyclerFactory implements RecyclerFactory {

/**
* Minimum capacity of the disruptor
*/
private static final int MIN_CAPACITY = 8;

private final int capacity;
private final SpinPolicy spinPolicy;

private DisruptorRecyclerFactory(
final RecyclerProperties recyclerProperties, final DisruptorRecyclerProperties disruptorProperties) {
this.capacity = validateCapacity(recyclerProperties.capacity());
this.spinPolicy = disruptorProperties.spinPolicy();
}

@Override
public <V> Recycler<V> create(final Supplier<V> supplier, final Consumer<V> cleaner) {
requireNonNull(supplier, "supplier");
requireNonNull(cleaner, "cleaner");
final DisruptorBlockingQueue<V> queue = new DisruptorBlockingQueue<>(capacity, spinPolicy);
return new DisruptorRecycler<>(supplier, cleaner, queue);
}

private static Integer validateCapacity(final int capacity) {
if (capacity < MIN_CAPACITY) {
LOGGER.warn(
"Invalid DisruptorBlockingQueue capacity {}, using minimum size {}.", capacity, MIN_CAPACITY);
return MIN_CAPACITY;
}
final int roundedCapacity = Integers.ceilingNextPowerOfTwo(capacity);
if (capacity != roundedCapacity) {
LOGGER.warn(
"Invalid DisruptorBlockingQueue size {}, using rounded size {}.", capacity, roundedCapacity);
}
return roundedCapacity;
}

private static final class DisruptorRecycler<V> extends AbstractRecycler<V> {

private final Consumer<V> cleaner;

private final Queue<V> queue;

private DisruptorRecycler(final Supplier<V> supplier, final Consumer<V> cleaner, final Queue<V> queue) {
super(supplier);
this.cleaner = cleaner;
this.queue = queue;
}

@Override
public V acquire() {
final V value = queue.poll();
return value != null ? value : createInstance();
}

@Override
public void release(final V value) {
requireNonNull(value, "value");
cleaner.accept(value);
queue.offer(value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.conversant;

import com.conversantmedia.util.concurrent.SpinPolicy;
import org.apache.logging.log4j.kit.env.Log4jProperty;
import org.jspecify.annotations.NullMarked;

@NullMarked
@Log4jProperty(name = "recycler.conversant")
public record DisruptorRecyclerProperties(@Log4jProperty(defaultValue = "WAITING") SpinPolicy spinPolicy) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Export
@Version("3.0.0")
package org.apache.logging.log4j.conversant;

import org.osgi.annotation.bundle.Export;
import org.osgi.annotation.versioning.Version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.logging.log4j.conversant.test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AsyncAppender;
import org.apache.logging.log4j.core.test.appender.ListAppender;
import org.apache.logging.log4j.core.test.junit.LoggerContextSource;
import org.apache.logging.log4j.spi.ExtendedLogger;
import org.junit.jupiter.api.Test;

class DisruptorBlockingQueueFactoryTest {

private static void exceptionTest(final LoggerContext context) throws InterruptedException {
final ExtendedLogger logger = context.getLogger(AsyncAppender.class);
final Exception parent = new IllegalStateException("Test");
final Throwable child = new LoggingException("This is a test", parent);
logger.error("This is a test", child);
final ListAppender appender = context.getConfiguration().getAppender("LIST");
final List<String> messages;
try {
messages = appender.getMessages(1, 2, TimeUnit.SECONDS);
} finally {
appender.clear();
}
assertNotNull(messages);
assertEquals(1, messages.size());
assertTrue(messages.get(0).contains(parent.getClass().getName()));
}

private static void rewriteTest(final LoggerContext context) throws InterruptedException {
final ExtendedLogger logger = context.getLogger(AsyncAppender.class);
logger.error("This is a test");
logger.warn("Hello world!");
final ListAppender appender = context.getConfiguration().getAppender("LIST");
final List<String> messages;
try {
messages = appender.getMessages(2, 2, TimeUnit.SECONDS);
} finally {
appender.clear();
}
assertNotNull(messages);
assertEquals(2, messages.size());
assertEquals("This is a test", messages.get(0));
assertEquals("Hello world!", messages.get(1));
}

private static void assertConversantDisruptorIsUsed(final LoggerContext context) {
final AsyncAppender appender = context.getConfiguration().getAppender("ASYNC");
assertThat(appender).isNotNull();
final BlockingQueue<?> queue = (BlockingQueue<?>) assertDoesNotThrow(() -> {
Field queueField = AsyncAppender.class.getDeclaredField("queue");
queueField.setAccessible(true);
return queueField.get(appender);
});
assertThat(queue).isInstanceOf(DisruptorBlockingQueue.class);
}

@Test
@LoggerContextSource("DisruptorBlockingQueueFactoryTest.xml")
public void testJcToolsBlockingQueue(final LoggerContext context) throws InterruptedException {
assertConversantDisruptorIsUsed(context);
rewriteTest(context);
exceptionTest(context);
}
}
Loading
Loading