diff --git a/log4j-flume-ng/.log4j-plugin-processing-activator b/log4j-flume-ng/.log4j-plugin-processing-activator deleted file mode 100644 index ba133f36961..00000000000 --- a/log4j-flume-ng/.log4j-plugin-processing-activator +++ /dev/null @@ -1 +0,0 @@ -This file is here to activate the `plugin-processing` Maven profile. diff --git a/log4j-flume-ng/pom.xml b/log4j-flume-ng/pom.xml deleted file mode 100644 index 505f5969a68..00000000000 --- a/log4j-flume-ng/pom.xml +++ /dev/null @@ -1,122 +0,0 @@ - - - - - 4.0.0 - - - org.apache.logging.log4j - log4j - ${revision} - ../log4j-parent - - - log4j-flume-ng - - Apache Log4j Flume Bridge - The Apache Log4j Flume Appender - - - false - - - org.apache.logging.log4j.flume - - - je;transitive=false;static=true - - org.apache.logging.log4j.core - - - - - org.apache.logging.log4j - log4j-api - - - org.apache.logging.log4j - log4j-core - - - org.apache.flume - flume-ng-core - true - - - org.apache.flume - flume-ng-embedded-agent - true - - - org.apache.flume - flume-ng-sdk - - - com.sleepycat - je - - - org.apache.logging.log4j - log4j-1.2-api - test - - - org.apache.logging.log4j - log4j-core-test - test - - - org.apache.logging.log4j - log4j-jcl - test - - - org.apache.logging.log4j - log4j-slf4j-impl - test - - - org.apache.flume.flume-ng-channels - flume-file-channel - test - - - org.apache.hadoop - hadoop-core - test - - - org.junit.jupiter - junit-jupiter-engine - test - - - org.junit.vintage - junit-vintage-engine - test - - - io.netty - netty-all - test - - - - diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java deleted file mode 100644 index 75b4fc1ae05..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/AbstractFlumeManager.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import org.apache.flume.Event; -import org.apache.logging.log4j.core.appender.AbstractManager; - -/** - * - */ -public abstract class AbstractFlumeManager extends AbstractManager { - - public AbstractFlumeManager(final String name) { - super(null, name); - } - - public abstract void send(Event event); -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java deleted file mode 100644 index ae4c616a651..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Agent.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.config.plugins.PluginAttribute; -import org.apache.logging.log4j.core.config.plugins.PluginFactory; -import org.apache.logging.log4j.core.util.Integers; -import org.apache.logging.log4j.status.StatusLogger; - -/** - * Agent Specification for FlumeAvroAppender. - */ -@Plugin(name = "Agent", category = "Core", printObject = true) -public final class Agent { - - private static final String DEFAULT_HOST = "localhost"; - - private static final int DEFAULT_PORT = 35853; - - private static final Logger LOGGER = StatusLogger.getLogger(); - - private final String host; - - private final int port; - - private Agent(final String host, final int port) { - this.host = host; - this.port = port; - } - - /** - * Retrieve the host name. - * @return The name of the host. - */ - public String getHost() { - return host; - } - - /** - * Retrieve the port number. - * @return The port number. - */ - public int getPort() { - return port; - } - - @Override - public String toString() { - return "host=" + host + " port=" + port; - } - - /** - * Create an Agent. - * @param host The host name. - * @param port The port number. - * @return The Agent. - */ - @PluginFactory - public static Agent createAgent(@PluginAttribute("host") String host, @PluginAttribute("port") final String port) { - if (host == null) { - host = DEFAULT_HOST; - } - - int portNum; - try { - portNum = Integers.parseInt(port, DEFAULT_PORT); - } catch (final Exception ex) { - LOGGER.error("Error parsing port number " + port, ex); - return null; - } - return new Agent(host, portNum); - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java deleted file mode 100644 index 70423ebb505..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/BatchEvent.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import java.util.ArrayList; -import java.util.List; -import org.apache.flume.Event; - -/** - * - */ -public class BatchEvent { - - private final List events = new ArrayList<>(); - - public void addEvent(final Event event) { - events.add(event); - } - - public List getEvents() { - return events; - } - - public int size() { - return events.size(); - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java deleted file mode 100644 index 2acc0e442ed..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.apache.logging.log4j.util.Strings.toRootUpperCase; - -import java.io.Serializable; -import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Filter; -import org.apache.logging.log4j.core.Layout; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.config.plugins.PluginAliases; -import org.apache.logging.log4j.core.config.plugins.PluginAttribute; -import org.apache.logging.log4j.core.config.plugins.PluginElement; -import org.apache.logging.log4j.core.config.plugins.PluginFactory; -import org.apache.logging.log4j.core.layout.Rfc5424Layout; -import org.apache.logging.log4j.core.net.Facility; -import org.apache.logging.log4j.core.util.Booleans; -import org.apache.logging.log4j.core.util.Integers; -import org.apache.logging.log4j.util.Timer; - -/** - * An Appender that uses the Avro protocol to route events to Flume. - */ -@Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true) -public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { - - private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; - private static final int DEFAULT_MAX_DELAY = 60000; - - private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; - - private final AbstractFlumeManager manager; - - private final String mdcIncludes; - private final String mdcExcludes; - private final String mdcRequired; - - private final String eventPrefix; - - private final String mdcPrefix; - - private final boolean compressBody; - - private final FlumeEventFactory factory; - - private Timer timer = new Timer("FlumeEvent", 5000); - private volatile long count; - - /** - * Which Manager will be used by the appender instance. - */ - private enum ManagerType { - AVRO, - EMBEDDED, - PERSISTENT; - - public static ManagerType getType(final String type) { - return valueOf(toRootUpperCase(type)); - } - } - - private FlumeAppender( - final String name, - final Filter filter, - final Layout layout, - final boolean ignoreExceptions, - final String includes, - final String excludes, - final String required, - final String mdcPrefix, - final String eventPrefix, - final boolean compress, - final FlumeEventFactory factory, - final Property[] properties, - final AbstractFlumeManager manager) { - super(name, filter, layout, ignoreExceptions, properties); - this.manager = manager; - this.mdcIncludes = includes; - this.mdcExcludes = excludes; - this.mdcRequired = required; - this.eventPrefix = eventPrefix; - this.mdcPrefix = mdcPrefix; - this.compressBody = compress; - this.factory = factory == null ? this : factory; - } - - /** - * Publish the event. - * @param event The LogEvent. - */ - @Override - public void append(final LogEvent event) { - final String name = event.getLoggerName(); - if (name != null) { - for (final String pkg : EXCLUDED_PACKAGES) { - if (name.startsWith(pkg)) { - return; - } - } - } - timer.startOrResume(); - final FlumeEvent flumeEvent = - factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, eventPrefix, compressBody); - flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); - if (update()) { - final String msg = timer.stop(); - LOGGER.debug(msg); - } else { - timer.pause(); - } - manager.send(flumeEvent); - } - - private synchronized boolean update() { - if (++count == 5000) { - count = 0; - return true; - } - return false; - } - - @Override - public boolean stop(final long timeout, final TimeUnit timeUnit) { - setStopping(); - boolean stopped = super.stop(timeout, timeUnit, false); - stopped &= manager.stop(timeout, timeUnit); - setStopped(); - return stopped; - } - - /** - * Create a Flume event. - * @param event The Log4j LogEvent. - * @param includes comma separated list of mdc elements to include. - * @param excludes comma separated list of mdc elements to exclude. - * @param required comma separated list of mdc elements that must be present with a value. - * @param mdcPrefix The prefix to add to MDC key names. - * @param eventPrefix The prefix to add to event fields. - * @param compress If true the body will be compressed. - * @return A Flume Event. - */ - @Override - public FlumeEvent createEvent( - final LogEvent event, - final String includes, - final String excludes, - final String required, - final String mdcPrefix, - final String eventPrefix, - final boolean compress) { - return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, eventPrefix, compressBody); - } - - /** - * Create a Flume Avro Appender. - * @param agents An array of Agents. - * @param properties Properties to pass to the embedded agent. - * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. - * Note: The embedded attribute is deprecated in favor of specifying the type attribute. - * @param type Avro (default), Embedded, or Persistent. - * @param dataDir The directory where the Flume FileChannel should write its data. - * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is - * 1000. - * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000. - * @param agentRetries The number of times to retry an agent before failing to the next agent. - * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch. - * @param name The name of the Appender. - * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise - * they are propagated to the caller. - * @param excludes A comma separated list of MDC elements to exclude. - * @param includes A comma separated list of MDC elements to include. - * @param required A comma separated list of MDC elements that are required. - * @param mdcPrefix The prefix to add to MDC key names. - * @param eventPrefix The prefix to add to event key names. - * @param compressBody If true the event body will be compressed. - * @param batchSize Number of events to include in a batch. Defaults to 1. - * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. - * @param factory The factory to use to create Flume events. - * @param layout The layout to format the event. - * @param filter A Filter to filter events. - * - * @return A Flume Avro Appender. - */ - @PluginFactory - public static FlumeAppender createAppender( - @PluginElement("Agents") final Agent[] agents, - @PluginElement("Properties") final Property[] properties, - @PluginAttribute("hosts") final String hosts, - @PluginAttribute("embedded") final String embedded, - @PluginAttribute("type") final String type, - @PluginAttribute("dataDir") final String dataDir, - @PluginAliases("connectTimeout") @PluginAttribute("connectTimeoutMillis") - final String connectionTimeoutMillis, - @PluginAliases("requestTimeout") @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis, - @PluginAttribute("agentRetries") final String agentRetries, - @PluginAliases("maxDelay") // deprecated - @PluginAttribute("maxDelayMillis") - final String maxDelayMillis, - @PluginAttribute("name") final String name, - @PluginAttribute("ignoreExceptions") final String ignore, - @PluginAttribute("mdcExcludes") final String excludes, - @PluginAttribute("mdcIncludes") final String includes, - @PluginAttribute("mdcRequired") final String required, - @PluginAttribute("mdcPrefix") final String mdcPrefix, - @PluginAttribute("eventPrefix") final String eventPrefix, - @PluginAttribute("compress") final String compressBody, - @PluginAttribute("batchSize") final String batchSize, - @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, - @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, - @PluginElement("Layout") Layout layout, - @PluginElement("Filter") final Filter filter) { - - final boolean embed = embedded != null - ? Boolean.parseBoolean(embedded) - : (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) - && properties != null - && properties.length > 0; - final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); - final boolean compress = Booleans.parseBoolean(compressBody, true); - ManagerType managerType; - if (type != null) { - if (embed && embedded != null) { - try { - managerType = ManagerType.getType(type); - LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); - } catch (final Exception ex) { - LOGGER.warn( - "Embedded and type attributes are mutually exclusive and type " + type + " is invalid."); - managerType = ManagerType.EMBEDDED; - } - } else { - try { - managerType = ManagerType.getType(type); - } catch (final Exception ex) { - LOGGER.warn("Type " + type + " is invalid."); - managerType = ManagerType.EMBEDDED; - } - } - } else if (embed) { - managerType = ManagerType.EMBEDDED; - } else { - managerType = ManagerType.AVRO; - } - - final int batchCount = Integers.parseInt(batchSize, 1); - final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0); - final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0); - final int retries = Integers.parseInt(agentRetries, 0); - final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); - final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY); - - if (layout == null) { - final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; - layout = new Rfc5424Layout.Rfc5424LayoutBuilder() - .setFacility(Facility.LOCAL0) - .setEin(String.valueOf(enterpriseNumber)) - .setIncludeMDC(true) - .setMdcId(Rfc5424Layout.DEFAULT_MDCID) - .setMdcPrefix(mdcPrefix) - .setEventPrefix(eventPrefix) - .setIncludeNL(false) - .setExcludes(excludes) - .setIncludes(includes) - .setRequired(required) - .setUseTLSMessageFormat(false) - .build(); - } - - if (name == null) { - LOGGER.error("No name provided for Appender"); - return null; - } - - AbstractFlumeManager manager; - - switch (managerType) { - case EMBEDDED: - manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); - break; - case AVRO: - manager = FlumeAvroManager.getManager( - name, - getAgents(agents, hosts), - batchCount, - delayMillis, - retries, - connectTimeoutMillis, - reqTimeoutMillis); - break; - case PERSISTENT: - manager = FlumePersistentManager.getManager( - name, - getAgents(agents, hosts), - properties, - batchCount, - retries, - connectTimeoutMillis, - reqTimeoutMillis, - delayMillis, - lockTimeoutRetryCount, - dataDir); - break; - default: - LOGGER.debug("No manager type specified. Defaulting to AVRO"); - manager = FlumeAvroManager.getManager( - name, - getAgents(agents, hosts), - batchCount, - delayMillis, - retries, - connectTimeoutMillis, - reqTimeoutMillis); - } - - if (manager == null) { - return null; - } - - return new FlumeAppender( - name, - filter, - layout, - ignoreExceptions, - includes, - excludes, - required, - mdcPrefix, - eventPrefix, - compress, - factory, - Property.EMPTY_ARRAY, - manager); - } - - private static Agent[] getAgents(Agent[] agents, final String hosts) { - if (agents == null || agents.length == 0) { - if (hosts != null && !hosts.isEmpty()) { - LOGGER.debug("Parsing agents from hosts parameter"); - final String[] hostports = hosts.split(","); - agents = new Agent[hostports.length]; - for (int i = 0; i < hostports.length; ++i) { - final String[] h = hostports[i].split(":"); - agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null); - } - } else { - LOGGER.debug("No agents provided, using defaults"); - agents = new Agent[] {Agent.createAgent(null, null)}; - } - } - - LOGGER.debug("Using agents {}", agents); - return agents; - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java deleted file mode 100644 index b0e0f177f97..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.apache.flume.Event; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; -import org.apache.logging.log4j.core.appender.ManagerFactory; - -/** - * Manager for FlumeAvroAppenders. - */ -public class FlumeAvroManager extends AbstractFlumeManager { - - private static final int MAX_RECONNECTS = 3; - private static final int MINIMUM_TIMEOUT = 1000; - - private static final AvroManagerFactory factory = new AvroManagerFactory(); - - private final Agent[] agents; - - private final int batchSize; - - private final long delayNanos; - private final int delayMillis; - - private final int retries; - - private final int connectTimeoutMillis; - - private final int requestTimeoutMillis; - - private final int current = 0; - - private volatile RpcClient rpcClient; - - private BatchEvent batchEvent = new BatchEvent(); - private long nextSend = 0; - - /** - * Constructor - * @param name The unique name of this manager. - * @param agents An array of Agents. - * @param batchSize The number of events to include in a batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectTimeout The connection timeout in ms. - * @param requestTimeout The request timeout in ms. - * - */ - protected FlumeAvroManager( - final String name, - final String shortName, - final Agent[] agents, - final int batchSize, - final int delayMillis, - final int retries, - final int connectTimeout, - final int requestTimeout) { - super(name); - this.agents = agents; - this.batchSize = batchSize; - this.delayMillis = delayMillis; - this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis); - this.retries = retries; - this.connectTimeoutMillis = connectTimeout; - this.requestTimeoutMillis = requestTimeout; - this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout); - } - - /** - * Returns a FlumeAvroManager. - * @param name The name of the manager. - * @param agents The agents to use. - * @param batchSize The number of events to include in a batch. - * @param delayMillis The number of milliseconds to wait before sending an incomplete batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectTimeoutMillis The connection timeout in ms. - * @param requestTimeoutMillis The request timeout in ms. - * @return A FlumeAvroManager. - */ - public static FlumeAvroManager getManager( - final String name, - final Agent[] agents, - int batchSize, - final int delayMillis, - final int retries, - final int connectTimeoutMillis, - final int requestTimeoutMillis) { - if (agents == null || agents.length == 0) { - throw new IllegalArgumentException("At least one agent is required"); - } - - if (batchSize <= 0) { - batchSize = 1; - } - final StringBuilder sb = new StringBuilder(name); - sb.append(" FlumeAvro["); - boolean first = true; - for (final Agent agent : agents) { - if (!first) { - sb.append(','); - } - sb.append(agent.getHost()).append(':').append(agent.getPort()); - first = false; - } - sb.append(']'); - return getManager( - sb.toString(), - factory, - new FactoryData( - name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis)); - } - - /** - * Returns the agents. - * @return The agent array. - */ - public Agent[] getAgents() { - return agents; - } - - /** - * Returns the index of the current agent. - * @return The index for the current agent. - */ - public int getCurrent() { - return current; - } - - public int getRetries() { - return retries; - } - - public int getConnectTimeoutMillis() { - return connectTimeoutMillis; - } - - public int getRequestTimeoutMillis() { - return requestTimeoutMillis; - } - - public int getBatchSize() { - return batchSize; - } - - public int getDelayMillis() { - return delayMillis; - } - - public void send(final BatchEvent events) { - if (rpcClient == null) { - synchronized (this) { - if (rpcClient == null) { - rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); - } - } - } - - if (rpcClient != null) { - try { - LOGGER.trace("Sending batch of {} events", events.getEvents().size()); - rpcClient.appendBatch(events.getEvents()); - } catch (final Exception ex) { - rpcClient.close(); - rpcClient = null; - final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' - + agents[current].getPort(); - LOGGER.warn(msg, ex); - throw new AppenderLoggingException("No Flume agents are available"); - } - } else { - final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' - + agents[current].getPort(); - LOGGER.warn(msg); - throw new AppenderLoggingException("No Flume agents are available"); - } - } - - @Override - public void send(final Event event) { - if (batchSize == 1) { - if (rpcClient == null) { - rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis); - } - - if (rpcClient != null) { - try { - rpcClient.append(event); - } catch (final Exception ex) { - rpcClient.close(); - rpcClient = null; - final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' - + agents[current].getPort(); - LOGGER.warn(msg, ex); - throw new AppenderLoggingException("No Flume agents are available"); - } - } else { - final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' - + agents[current].getPort(); - LOGGER.warn(msg); - throw new AppenderLoggingException("No Flume agents are available"); - } - } else { - int eventCount; - BatchEvent batch = null; - synchronized (this) { - batchEvent.addEvent(event); - eventCount = batchEvent.size(); - final long now = System.nanoTime(); - if (eventCount == 1) { - nextSend = now + delayNanos; - } - if (eventCount >= batchSize || now >= nextSend) { - batch = batchEvent; - batchEvent = new BatchEvent(); - } - } - if (batch != null) { - send(batch); - } - } - } - - /** - * There is a very good chance that this will always return the first agent even if it isn't available. - * @param agents The list of agents to choose from - * @return The FlumeEventAvroServer. - */ - private RpcClient connect( - final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) { - try { - final Properties props = new Properties(); - - props.put("client.type", "default_failover"); - - int agentCount = 1; - final StringBuilder sb = new StringBuilder(); - for (final Agent agent : agents) { - if (sb.length() > 0) { - sb.append(' '); - } - final String hostName = "host" + agentCount++; - props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort()); - sb.append(hostName); - } - props.put("hosts", sb.toString()); - if (batchSize > 0) { - props.put("batch-size", Integer.toString(batchSize)); - } - if (retries > 1) { - if (retries > MAX_RECONNECTS) { - retries = MAX_RECONNECTS; - } - props.put("max-attempts", Integer.toString(retries * agents.length)); - } - if (requestTimeoutMillis >= MINIMUM_TIMEOUT) { - props.put("request-timeout", Integer.toString(requestTimeoutMillis)); - } - if (connectTimeoutMillis >= MINIMUM_TIMEOUT) { - props.put("connect-timeout", Integer.toString(connectTimeoutMillis)); - } - return RpcClientFactory.getInstance(props); - } catch (final Exception ex) { - LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage()); - return null; - } - } - - @Override - protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { - boolean closed = true; - if (rpcClient != null) { - try { - synchronized (this) { - try { - if (batchSize > 1 && batchEvent.getEvents().size() > 0) { - send(batchEvent); - } - } catch (final Exception ex) { - LOGGER.error("Error sending final batch: {}", ex.getMessage()); - closed = false; - } - } - rpcClient.close(); - } catch (final Exception ex) { - LOGGER.error("Attempt to close RPC client failed", ex); - closed = false; - } - } - rpcClient = null; - return closed; - } - - /** - * Factory data. - */ - private static class FactoryData { - private final String name; - private final Agent[] agents; - private final int batchSize; - private final int delayMillis; - private final int retries; - private final int conntectTimeoutMillis; - private final int requestTimeoutMillis; - - /** - * Constructor. - * @param name The name of the Appender. - * @param agents The agents. - * @param batchSize The number of events to include in a batch. - */ - public FactoryData( - final String name, - final Agent[] agents, - final int batchSize, - final int delayMillis, - final int retries, - final int connectTimeoutMillis, - final int requestTimeoutMillis) { - this.name = name; - this.agents = agents; - this.batchSize = batchSize; - this.delayMillis = delayMillis; - this.retries = retries; - this.conntectTimeoutMillis = connectTimeoutMillis; - this.requestTimeoutMillis = requestTimeoutMillis; - } - } - - /** - * Avro Manager Factory. - */ - private static class AvroManagerFactory implements ManagerFactory { - - /** - * Create the FlumeAvroManager. - * @param name The name of the entity to manage. - * @param data The data required to create the entity. - * @return The FlumeAvroManager. - */ - @Override - public FlumeAvroManager createManager(final String name, final FactoryData data) { - try { - - return new FlumeAvroManager( - name, - data.name, - data.agents, - data.batchSize, - data.delayMillis, - data.retries, - data.conntectTimeoutMillis, - data.requestTimeoutMillis); - } catch (final Exception ex) { - LOGGER.error("Could not create FlumeAvroManager", ex); - } - return null; - } - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java deleted file mode 100644 index 9f24f8cee4e..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedManager.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.apache.logging.log4j.util.Strings.toRootUpperCase; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.agent.embedded.EmbeddedAgent; -import org.apache.logging.log4j.LoggingException; -import org.apache.logging.log4j.core.appender.ManagerFactory; -import org.apache.logging.log4j.core.config.ConfigurationException; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.util.NameUtil; -import org.apache.logging.log4j.util.PropertiesUtil; -import org.apache.logging.log4j.util.Strings; - -public class FlumeEmbeddedManager extends AbstractFlumeManager { - - private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); - - private static final String IN_MEMORY = "InMemory"; - - private static final FlumeManagerFactory FACTORY = new FlumeManagerFactory(); - - private final EmbeddedAgent agent; - - private final String shortName; - - /** - * Constructor - * @param name The unique name of this manager. - * @param shortName The short version of the agent name. - * @param agent The embedded agent. - */ - protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) { - super(name); - this.agent = agent; - this.shortName = shortName; - } - - /** - * Returns a FlumeEmbeddedManager. - * @param name The name of the manager. - * @param agents The agents to use. - * @param properties Properties for the embedded manager. - * @param batchSize The number of events to include in a batch. - * @param dataDir The directory where the Flume FileChannel should write to. - * @return A FlumeAvroManager. - */ - public static FlumeEmbeddedManager getManager( - final String name, final Agent[] agents, final Property[] properties, int batchSize, final String dataDir) { - - if (batchSize <= 0) { - batchSize = 1; - } - - if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { - throw new IllegalArgumentException("Either an Agent or properties are required"); - } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { - throw new IllegalArgumentException("Cannot configure both Agents and Properties."); - } - - final String extendedName = extendManagerName(name, agents, properties); - return getManager(extendedName, FACTORY, new FactoryData(name, agents, properties, batchSize, dataDir)); - } - - private static String extendManagerName(final String name, final Agent[] agents, final Property[] properties) { - - final StringBuilder sb = new StringBuilder(); - boolean first = true; - - if (agents != null && agents.length > 0) { - sb.append(name).append('['); - for (final Agent agent : agents) { - if (!first) { - sb.append('_'); - } - sb.append(agent.getHost()).append('-').append(agent.getPort()); - first = false; - } - sb.append(']'); - } else { - String sep = Strings.EMPTY; - sb.append(name).append('-'); - final StringBuilder props = new StringBuilder(); - for (final Property prop : properties) { - props.append(sep); - props.append(prop.getName()).append('=').append(prop.getValue()); - sep = "_"; - } - sb.append(NameUtil.md5(props.toString())); - } - - return sb.toString(); - } - - @Override - public void send(final Event event) { - try { - agent.put(event); - } catch (final EventDeliveryException ex) { - throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex); - } - } - - @Override - protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { - agent.stop(); - return true; - } - - /** - * Factory data. - */ - private static class FactoryData { - private final Agent[] agents; - private final Property[] properties; - private final int batchSize; - private final String dataDir; - private final String name; - - /** - * Constructor. - * @param name The name of the Appender. - * @param agents The agents. - * @param properties The Flume configuration properties. - * @param batchSize The number of events to include in a batch. - * @param dataDir The directory where Flume should write to. - */ - public FactoryData( - final String name, - final Agent[] agents, - final Property[] properties, - final int batchSize, - final String dataDir) { - this.name = name; - this.agents = agents; - this.batchSize = batchSize; - this.properties = properties; - this.dataDir = dataDir; - } - } - - /** - * Avro Manager Factory. - */ - private static class FlumeManagerFactory implements ManagerFactory { - - /** - * Create the FlumeAvroManager. - * @param name The name of the entity to manage. - * @param data The data required to create the entity. - * @return The FlumeAvroManager. - */ - @Override - public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { - try { - final Map props = - createProperties(data.name, data.agents, data.properties, data.batchSize, data.dataDir); - final EmbeddedAgent agent = new EmbeddedAgent(name); - agent.configure(props); - agent.start(); - LOGGER.debug("Created Agent " + name); - return new FlumeEmbeddedManager(name, data.name, agent); - } catch (final Exception ex) { - LOGGER.error("Could not create FlumeEmbeddedManager", ex); - } - return null; - } - - private Map createProperties( - final String name, - final Agent[] agents, - final Property[] properties, - final int batchSize, - String dataDir) { - final Map props = new HashMap<>(); - - if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { - LOGGER.error("No Flume configuration provided"); - throw new ConfigurationException("No Flume configuration provided"); - } - - if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { - LOGGER.error("Agents and Flume configuration cannot both be specified"); - throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); - } - - if (agents != null && agents.length > 0) { - - if (Strings.isNotEmpty(dataDir)) { - if (dataDir.equals(IN_MEMORY)) { - props.put("channel.type", "memory"); - } else { - props.put("channel.type", "file"); - - if (!dataDir.endsWith(FILE_SEP)) { - dataDir = dataDir + FILE_SEP; - } - - props.put("channel.checkpointDir", dataDir + "checkpoint"); - props.put("channel.dataDirs", dataDir + "data"); - } - - } else { - props.put("channel.type", "file"); - } - - final StringBuilder sb = new StringBuilder(); - String leading = Strings.EMPTY; - final int priority = agents.length; - for (int i = 0; i < priority; ++i) { - sb.append(leading).append("agent").append(i); - leading = " "; - final String prefix = "agent" + i; - props.put(prefix + ".type", "avro"); - props.put(prefix + ".hostname", agents[i].getHost()); - props.put(prefix + ".port", Integer.toString(agents[i].getPort())); - props.put(prefix + ".batch-size", Integer.toString(batchSize)); - props.put("processor.priority." + prefix, Integer.toString(agents.length - i)); - } - props.put("sinks", sb.toString()); - props.put("processor.type", "failover"); - } else { - String[] sinks = null; - - for (final Property property : properties) { - final String key = property.getName(); - - if (Strings.isEmpty(key)) { - final String msg = "A property name must be provided"; - LOGGER.error(msg); - throw new ConfigurationException(msg); - } - - final String upperKey = toRootUpperCase(key); - - if (upperKey.startsWith(toRootUpperCase(name))) { - final String msg = - "Specification of the agent name is not allowed in Flume Appender configuration: " - + key; - LOGGER.error(msg); - throw new ConfigurationException(msg); - } - - final String value = property.getValue(); - if (Strings.isEmpty(value)) { - final String msg = "A value for property " + key + " must be provided"; - LOGGER.error(msg); - throw new ConfigurationException(msg); - } - - if (upperKey.equals("SINKS")) { - sinks = value.trim().split(" "); - } - - props.put(key, value); - } - - if (sinks == null || sinks.length == 0) { - final String msg = "At least one Sink must be specified"; - LOGGER.error(msg); - throw new ConfigurationException(msg); - } - } - return props; - } - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java deleted file mode 100644 index 37fd627382f..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEvent.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.zip.GZIPOutputStream; -import org.apache.flume.event.SimpleEvent; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LoggingException; -import org.apache.logging.log4j.Marker; -import org.apache.logging.log4j.ThreadContext; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.impl.Log4jLogEvent; -import org.apache.logging.log4j.core.impl.ThrowableProxy; -import org.apache.logging.log4j.core.time.Instant; -import org.apache.logging.log4j.core.util.Patterns; -import org.apache.logging.log4j.core.util.UuidUtil; -import org.apache.logging.log4j.message.MapMessage; -import org.apache.logging.log4j.message.Message; -import org.apache.logging.log4j.message.StructuredDataId; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.util.Constants; -import org.apache.logging.log4j.util.ReadOnlyStringMap; -import org.apache.logging.log4j.util.Strings; - -/** - * Class that is both a Flume and Log4j Event. - */ -public class FlumeEvent extends SimpleEvent implements LogEvent { - - static final String GUID = "guId"; - /** - * Generated serial version ID. - */ - private static final long serialVersionUID = -8988674608627854140L; - - private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY; - - private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY; - - private static final String EVENT_TYPE = "eventType"; - - private static final String EVENT_ID = "eventId"; - - private static final String TIMESTAMP = "timeStamp"; - - private final LogEvent event; - - private final Map contextMap = new HashMap<>(); - - private final boolean compress; - - /** - * Construct the FlumeEvent. - * @param event The Log4j LogEvent. - * @param includes A comma separated list of MDC elements to include. - * @param excludes A comma separated list of MDC elements to exclude. - * @param required A comma separated list of MDC elements that are required to be defined. - * @param mdcPrefix The value to prefix to MDC keys. - * @param eventPrefix The value to prefix to event keys. - * @param compress If true the event body should be compressed. - */ - public FlumeEvent( - final LogEvent event, - final String includes, - final String excludes, - final String required, - String mdcPrefix, - String eventPrefix, - final boolean compress) { - this.event = event; - this.compress = compress; - final Map headers = getHeaders(); - headers.put(TIMESTAMP, Long.toString(event.getTimeMillis())); - if (mdcPrefix == null) { - mdcPrefix = DEFAULT_MDC_PREFIX; - } - if (eventPrefix == null) { - eventPrefix = DEFAULT_EVENT_PREFIX; - } - final Map mdc = event.getContextData().toMap(); - if (includes != null) { - final String[] array = includes.split(Patterns.COMMA_SEPARATOR); - if (array.length > 0) { - for (String str : array) { - str = str.trim(); - if (mdc.containsKey(str)) { - contextMap.put(str, mdc.get(str)); - } - } - } - } else if (excludes != null) { - final String[] array = excludes.split(Patterns.COMMA_SEPARATOR); - if (array.length > 0) { - final List list = new ArrayList<>(array.length); - for (final String value : array) { - list.add(value.trim()); - } - for (final Map.Entry entry : mdc.entrySet()) { - if (!list.contains(entry.getKey())) { - contextMap.put(entry.getKey(), entry.getValue()); - } - } - } - } else { - contextMap.putAll(mdc); - } - - if (required != null) { - final String[] array = required.split(Patterns.COMMA_SEPARATOR); - if (array.length > 0) { - for (String str : array) { - str = str.trim(); - if (!mdc.containsKey(str)) { - throw new LoggingException("Required key " + str + " is missing from the MDC"); - } - } - } - } - final String guid = UuidUtil.getTimeBasedUuid().toString(); - final Message message = event.getMessage(); - if (message instanceof MapMessage) { - // Add the guid to the Map so that it can be included in the Layout. - @SuppressWarnings("unchecked") - final MapMessage stringMapMessage = (MapMessage) message; - stringMapMessage.put(GUID, guid); - if (message instanceof StructuredDataMessage) { - addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); - } - addMapData(eventPrefix, headers, stringMapMessage); - } else { - headers.put(GUID, guid); - } - - addContextData(mdcPrefix, headers, contextMap); - } - - protected void addStructuredData( - final String prefix, final Map fields, final StructuredDataMessage msg) { - fields.put(prefix + EVENT_TYPE, msg.getType()); - final StructuredDataId id = msg.getId(); - fields.put(prefix + EVENT_ID, id.getName()); - } - - protected void addMapData(final String prefix, final Map fields, final MapMessage msg) { - final Map data = msg.getData(); - for (final Map.Entry entry : data.entrySet()) { - fields.put(prefix + entry.getKey(), entry.getValue()); - } - } - - protected void addContextData( - final String prefix, final Map fields, final Map context) { - final Map map = new HashMap<>(); - for (final Map.Entry entry : context.entrySet()) { - if (entry.getKey() != null && entry.getValue() != null) { - fields.put(prefix + entry.getKey(), entry.getValue()); - map.put(prefix + entry.getKey(), entry.getValue()); - } - } - context.clear(); - context.putAll(map); - } - - @Override - public LogEvent toImmutable() { - return Log4jLogEvent.createMemento(this); - } - - /** - * Set the body in the event. - * @param body The body to add to the event. - */ - @Override - public void setBody(final byte[] body) { - if (body == null || body.length == 0) { - super.setBody(Constants.EMPTY_BYTE_ARRAY); - return; - } - if (compress) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (final GZIPOutputStream os = new GZIPOutputStream(baos)) { - os.write(body); - } catch (final IOException ioe) { - throw new LoggingException("Unable to compress message", ioe); - } - super.setBody(baos.toByteArray()); - } else { - super.setBody(body); - } - } - - /** - * Get the Frequently Qualified Class Name. - * @return the FQCN String. - */ - @Override - public String getLoggerFqcn() { - return event.getLoggerFqcn(); - } - - /** - * Returns the logging Level. - * @return the Level. - */ - @Override - public Level getLevel() { - return event.getLevel(); - } - - /** - * Returns the logger name. - * @return the logger name. - */ - @Override - public String getLoggerName() { - return event.getLoggerName(); - } - - /** - * Returns the StackTraceElement for the caller of the logging API. - * @return the StackTraceElement of the caller. - */ - @Override - public StackTraceElement getSource() { - return event.getSource(); - } - - /** - * Returns the Message. - * @return the Message. - */ - @Override - public Message getMessage() { - return event.getMessage(); - } - - /** - * Returns the Marker. - * @return the Marker. - */ - @Override - public Marker getMarker() { - return event.getMarker(); - } - - /** - * Returns the ID of the Thread. - * @return the ID of the Thread. - */ - @Override - public long getThreadId() { - return event.getThreadId(); - } - - /** - * Returns the priority of the Thread. - * @return the priority of the Thread. - */ - @Override - public int getThreadPriority() { - return event.getThreadPriority(); - } - - /** - * Returns the name of the Thread. - * @return the name of the Thread. - */ - @Override - public String getThreadName() { - return event.getThreadName(); - } - - /** - * Returns the event timestamp. - * @return the event timestamp. - */ - @Override - public long getTimeMillis() { - return event.getTimeMillis(); - } - - /** - * {@inheritDoc} - * @since 2.11 - */ - @Override - public Instant getInstant() { - return event.getInstant(); - } - - /** - * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created, - * or a dummy value if it is known that this value will not be used downstream. - * @return the event nanosecond timestamp. - */ - @Override - public long getNanoTime() { - return event.getNanoTime(); - } - - /** - * Returns the Throwable associated with the event, if any. - * @return the Throwable. - */ - @Override - public Throwable getThrown() { - return event.getThrown(); - } - - /** - * Returns the Throwable associated with the event, if any. - * @return the Throwable. - */ - @Override - public ThrowableProxy getThrownProxy() { - return event.getThrownProxy(); - } - - /** - * Returns a copy of the context Map. - * @return a copy of the context Map. - */ - @Override - public Map getContextMap() { - return contextMap; - } - - /** - * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with. - * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with. - */ - @Override - public ReadOnlyStringMap getContextData() { - return event.getContextData(); - } - - /** - * Returns a copy of the context stack. - * @return a copy of the context stack. - */ - @Override - public ThreadContext.ContextStack getContextStack() { - return event.getContextStack(); - } - - @Override - public boolean isIncludeLocation() { - return event.isIncludeLocation(); - } - - @Override - public void setIncludeLocation(final boolean includeLocation) { - event.setIncludeLocation(includeLocation); - } - - @Override - public boolean isEndOfBatch() { - return event.isEndOfBatch(); - } - - @Override - public void setEndOfBatch(final boolean endOfBatch) { - event.setEndOfBatch(endOfBatch); - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java deleted file mode 100644 index e67a7001bc3..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeEventFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import org.apache.logging.log4j.core.LogEvent; - -/** - * Factory to create Flume events. - */ -public interface FlumeEventFactory { - - /** - * Creates a Flume event. - * @param event The Log4j LogEvent. - * @param includes A comma separated list of MDC elements to include. - * @param excludes A comma separated list of MDC elements to exclude. - * @param required A comma separated list of MDC elements that are required. - * @param mdcPrefix The value to prefix to MDC keys. - * @param eventPrefix The value to prefix to event keys. - * @param compress If true the event body should be compressed. - * @return A FlumeEvent. - */ - FlumeEvent createEvent( - LogEvent event, - String includes, - String excludes, - String required, - String mdcPrefix, - String eventPrefix, - boolean compress); -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java deleted file mode 100644 index 3d8e15818f9..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java +++ /dev/null @@ -1,911 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import com.sleepycat.je.Cursor; -import com.sleepycat.je.CursorConfig; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.StatsConfig; -import com.sleepycat.je.Transaction; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.crypto.Cipher; -import javax.crypto.SecretKey; -import org.apache.flume.Event; -import org.apache.flume.event.SimpleEvent; -import org.apache.logging.log4j.LoggingException; -import org.apache.logging.log4j.core.appender.ManagerFactory; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.util.PluginManager; -import org.apache.logging.log4j.core.config.plugins.util.PluginType; -import org.apache.logging.log4j.core.util.ExecutorServices; -import org.apache.logging.log4j.core.util.FileUtils; -import org.apache.logging.log4j.core.util.Log4jThread; -import org.apache.logging.log4j.core.util.Log4jThreadFactory; -import org.apache.logging.log4j.core.util.SecretKeyProvider; -import org.apache.logging.log4j.util.LoaderUtil; -import org.apache.logging.log4j.util.Strings; - -/** - * Manager that persists data to Berkeley DB before passing it on to Flume. - */ -public class FlumePersistentManager extends FlumeAvroManager { - - /** Attribute name for the key provider. */ - public static final String KEY_PROVIDER = "keyProvider"; - - private static final Charset UTF8 = StandardCharsets.UTF_8; - - private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; - - private static final long SHUTDOWN_WAIT_MILLIS = 60000; - - private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500; - - private static final BDBManagerFactory factory = new BDBManagerFactory(); - - private final Database database; - - private final Environment environment; - - private final WriterThread worker; - - private final Gate gate = new Gate(); - - private final SecretKey secretKey; - - private final int lockTimeoutRetryCount; - - private final ExecutorService threadPool; - - private final AtomicLong dbCount = new AtomicLong(); - - /** - * Constructor - * @param name The unique name of this manager. - * @param shortName Original name for the Manager. - * @param agents An array of Agents. - * @param batchSize The number of events to include in a batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectionTimeout The amount of time to wait for a connection to be established. - * @param requestTimeout The amount of time to wair for a response to a request. - * @param delay The amount of time to wait between retries. - * @param database The database to write to. - * @param environment The database environment. - * @param secretKey The SecretKey to use for encryption. - * @param lockTimeoutRetryCount The number of times to retry a lock timeout. - */ - protected FlumePersistentManager( - final String name, - final String shortName, - final Agent[] agents, - final int batchSize, - final int retries, - final int connectionTimeout, - final int requestTimeout, - final int delay, - final Database database, - final Environment environment, - final SecretKey secretKey, - final int lockTimeoutRetryCount) { - super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); - this.database = database; - this.environment = environment; - dbCount.set(database.count()); - this.worker = new WriterThread( - database, environment, this, gate, batchSize, secretKey, dbCount, lockTimeoutRetryCount); - this.worker.start(); - this.secretKey = secretKey; - this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume")); - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - /** - * Returns a FlumeAvroManager. - * @param name The name of the manager. - * @param agents The agents to use. - * @param properties Properties to pass to the Manager. - * @param batchSize The number of events to include in a batch. - * @param retries The number of times to retry connecting before giving up. - * @param connectionTimeout The amount of time to wait to establish a connection. - * @param requestTimeout The amount of time to wait for a response to a request. - * @param delayMillis Amount of time to delay before delivering a batch. - * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. - * @param dataDir The location of the Berkeley database. - * @return A FlumeAvroManager. - */ - public static FlumePersistentManager getManager( - final String name, - final Agent[] agents, - final Property[] properties, - int batchSize, - final int retries, - final int connectionTimeout, - final int requestTimeout, - final int delayMillis, - final int lockTimeoutRetryCount, - final String dataDir) { - if (agents == null || agents.length == 0) { - throw new IllegalArgumentException("At least one agent is required"); - } - - if (batchSize <= 0) { - batchSize = 1; - } - final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; - - final StringBuilder sb = new StringBuilder("FlumePersistent["); - boolean first = true; - for (final Agent agent : agents) { - if (!first) { - sb.append(','); - } - sb.append(agent.getHost()).append(':').append(agent.getPort()); - first = false; - } - sb.append(']'); - sb.append(' ').append(dataDirectory); - return getManager( - sb.toString(), - factory, - new FactoryData( - name, - agents, - batchSize, - retries, - connectionTimeout, - requestTimeout, - delayMillis, - lockTimeoutRetryCount, - dataDir, - properties)); - } - - @Override - @SuppressFBWarnings( - value = {"CIPHER_INTEGRITY", "ECB_MODE"}, - justification = "Work-in-progress: https://github.com/apache/logging-log4j2/issues/1947") - public void send(final Event event) { - if (worker.isShutdown()) { - throw new LoggingException("Unable to record event"); - } - - final Map headers = event.getHeaders(); - final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream daos = new DataOutputStream(baos); - daos.writeInt(event.getBody().length); - daos.write(event.getBody(), 0, event.getBody().length); - daos.writeInt(event.getHeaders().size()); - for (final Map.Entry entry : headers.entrySet()) { - daos.writeUTF(entry.getKey()); - daos.writeUTF(entry.getValue()); - } - byte[] eventData = baos.toByteArray(); - if (secretKey != null) { - final Cipher cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.ENCRYPT_MODE, secretKey); - eventData = cipher.doFinal(eventData); - } - final Future future = threadPool.submit(new BDBWriter( - keyData, eventData, environment, database, gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); - try { - future.get(); - } catch (final InterruptedException ie) { - // preserve interruption status - Thread.currentThread().interrupt(); - } - } catch (final Exception ex) { - throw new LoggingException("Exception occurred writing log event", ex); - } - } - - @Override - protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { - boolean closed = true; - LOGGER.debug("Shutting down FlumePersistentManager"); - worker.shutdown(); - final long requestedTimeoutMillis = timeUnit.toMillis(timeout); - final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS; - try { - worker.join(shutdownWaitMillis); - } catch (final InterruptedException ie) { - // Ignore the exception and shutdown. - } - ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString()); - try { - worker.join(); - } catch (final InterruptedException ex) { - logDebug("interrupted while waiting for worker to complete", ex); - } - try { - LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); - database.close(); - } catch (final Exception ex) { - logWarn("Failed to close database", ex); - closed = false; - } - try { - environment.cleanLog(); - environment.close(); - } catch (final Exception ex) { - logWarn("Failed to close environment", ex); - closed = false; - } - return closed && super.releaseSub(timeout, timeUnit); - } - - private void doSend(final SimpleEvent event) { - LOGGER.debug("Sending event to Flume"); - super.send(event); - } - - /** - * Thread for writing to Berkeley DB to avoid having interrupts close the database. - */ - private static class BDBWriter implements Callable { - private final byte[] eventData; - private final byte[] keyData; - private final Environment environment; - private final Database database; - private final Gate gate; - private final AtomicLong dbCount; - private final long batchSize; - private final int lockTimeoutRetryCount; - - public BDBWriter( - final byte[] keyData, - final byte[] eventData, - final Environment environment, - final Database database, - final Gate gate, - final AtomicLong dbCount, - final long batchSize, - final int lockTimeoutRetryCount) { - this.keyData = keyData; - this.eventData = eventData; - this.environment = environment; - this.database = database; - this.gate = gate; - this.dbCount = dbCount; - this.batchSize = batchSize; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - @Override - public Integer call() throws Exception { - final DatabaseEntry key = new DatabaseEntry(keyData); - final DatabaseEntry data = new DatabaseEntry(eventData); - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - Transaction txn = null; - try { - txn = environment.beginTransaction(null, null); - try { - database.put(txn, key, data); - txn.commit(); - txn = null; - if (dbCount.incrementAndGet() >= batchSize) { - gate.open(); - } - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - // Fall through and retry. - } catch (final Exception ex) { - if (txn != null) { - txn.abort(); - } - throw ex; - } finally { - if (txn != null) { - txn.abort(); - txn = null; - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); - } - } - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - throw exception; - } - return eventData.length; - } - } - - /** - * Factory data. - */ - private static class FactoryData { - private final String name; - private final Agent[] agents; - private final int batchSize; - private final String dataDir; - private final int retries; - private final int connectionTimeout; - private final int requestTimeout; - private final int delayMillis; - private final int lockTimeoutRetryCount; - private final Property[] properties; - - /** - * Constructor. - * @param name The name of the Appender. - * @param agents The agents. - * @param batchSize The number of events to include in a batch. - * @param dataDir The directory for data. - */ - public FactoryData( - final String name, - final Agent[] agents, - final int batchSize, - final int retries, - final int connectionTimeout, - final int requestTimeout, - final int delayMillis, - final int lockTimeoutRetryCount, - final String dataDir, - final Property[] properties) { - this.name = name; - this.agents = agents; - this.batchSize = batchSize; - this.dataDir = dataDir; - this.retries = retries; - this.connectionTimeout = connectionTimeout; - this.requestTimeout = requestTimeout; - this.delayMillis = delayMillis; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - this.properties = properties; - } - } - - /** - * Avro Manager Factory. - */ - private static class BDBManagerFactory implements ManagerFactory { - - /** - * Create the FlumeKratiManager. - * @param name The name of the entity to manage. - * @param data The data required to create the entity. - * @return The FlumeKratiManager. - */ - @Override - @SuppressFBWarnings( - value = "PATH_TRAVERSAL_IN", - justification = "The name of the directory is provided in a configuration file.") - public FlumePersistentManager createManager(final String name, final FactoryData data) { - SecretKey secretKey = null; - Database database = null; - Environment environment = null; - - final Map properties = new HashMap<>(); - if (data.properties != null) { - for (final Property property : data.properties) { - properties.put(property.getName(), property.getValue()); - } - } - - try { - final File dir = new File(data.dataDir); - FileUtils.mkdir(dir, true); - final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); - dbEnvConfig.setTransactional(true); - dbEnvConfig.setAllowCreate(true); - dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); - environment = new Environment(dir, dbEnvConfig); - final DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - database = environment.openDatabase(null, name, dbConfig); - } catch (final Exception ex) { - LOGGER.error("Could not create FlumePersistentManager", ex); - // For consistency, close database as well as environment even though it should never happen since the - // database is that last thing in the block above, but this does guard against a future line being - // inserted at the end that would bomb (like some debug logging). - if (database != null) { - database.close(); - database = null; - } - if (environment != null) { - environment.close(); - environment = null; - } - return null; - } - - try { - String key = null; - for (final Map.Entry entry : properties.entrySet()) { - if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { - key = entry.getValue(); - break; - } - } - if (key != null) { - final PluginManager manager = new PluginManager("KeyProvider"); - manager.collectPlugins(); - final Map> plugins = manager.getPlugins(); - if (plugins != null) { - boolean found = false; - for (final Map.Entry> entry : plugins.entrySet()) { - if (entry.getKey().equalsIgnoreCase(key)) { - found = true; - final Class cl = entry.getValue().getPluginClass(); - try { - final SecretKeyProvider provider = (SecretKeyProvider) LoaderUtil.newInstanceOf(cl); - secretKey = provider.getSecretKey(); - LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); - } catch (final Exception ex) { - LOGGER.error( - "Unable to create SecretKeyProvider {}, encryption will be disabled", - cl.getName()); - } - break; - } - } - if (!found) { - LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); - } - } else { - LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); - } - } - } catch (final Exception ex) { - LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); - } - return new FlumePersistentManager( - name, - data.name, - data.agents, - data.batchSize, - data.retries, - data.connectionTimeout, - data.requestTimeout, - data.delayMillis, - database, - environment, - secretKey, - data.lockTimeoutRetryCount); - } - } - - /** - * Thread that sends data to Flume and pulls it from Berkeley DB. - */ - private static class WriterThread extends Log4jThread { - private volatile boolean shutdown; - private final Database database; - private final Environment environment; - private final FlumePersistentManager manager; - private final Gate gate; - private final SecretKey secretKey; - private final int batchSize; - private final AtomicLong dbCounter; - private final int lockTimeoutRetryCount; - - public WriterThread( - final Database database, - final Environment environment, - final FlumePersistentManager manager, - final Gate gate, - final int batchsize, - final SecretKey secretKey, - final AtomicLong dbCount, - final int lockTimeoutRetryCount) { - super("FlumePersistentManager-Writer"); - this.database = database; - this.environment = environment; - this.manager = manager; - this.gate = gate; - this.batchSize = batchsize; - this.secretKey = secretKey; - this.setDaemon(true); - this.dbCounter = dbCount; - this.lockTimeoutRetryCount = lockTimeoutRetryCount; - } - - public void shutdown() { - LOGGER.debug("Writer thread shutting down"); - this.shutdown = true; - gate.open(); - } - - public boolean isShutdown() { - return shutdown; - } - - @Override - public void run() { - LOGGER.trace( - "WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); - long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); - while (!shutdown) { - final long nowMillis = System.currentTimeMillis(); - final long dbCount = database.count(); - dbCounter.set(dbCount); - if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.getDelayMillis(); - try { - boolean errors = false; - final DatabaseEntry key = new DatabaseEntry(); - final DatabaseEntry data = new DatabaseEntry(); - - gate.close(); - OperationStatus status; - if (batchSize > 1) { - try { - errors = sendBatch(key, data); - } catch (final Exception ex) { - break; - } - } else { - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - exception = null; - Transaction txn = null; - Cursor cursor = null; - try { - txn = environment.beginTransaction(null, null); - cursor = database.openCursor(txn, null); - try { - status = cursor.getFirst(key, data, LockMode.RMW); - while (status == OperationStatus.SUCCESS) { - final SimpleEvent event = createEvent(data); - if (event != null) { - try { - manager.doSend(event); - } catch (final Exception ioe) { - errors = true; - LOGGER.error("Error sending event", ioe); - break; - } - try { - cursor.delete(); - } catch (final Exception ex) { - LOGGER.error("Unable to delete event", ex); - } - } - status = cursor.getNext(key, data, LockMode.RMW); - } - if (cursor != null) { - cursor.close(); - cursor = null; - } - txn.commit(); - txn = null; - dbCounter.decrementAndGet(); - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - // Fall through and retry. - } catch (final Exception ex) { - LOGGER.error("Error reading or writing to database", ex); - shutdown = true; - break; - } finally { - if (cursor != null) { - cursor.close(); - cursor = null; - } - if (txn != null) { - txn.abort(); - txn = null; - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting tx during lock conflict."); - } - } - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - LOGGER.error("Unable to read or update data base", exception); - } - } - if (errors) { - Thread.sleep(manager.getDelayMillis()); - continue; - } - } catch (final Exception ex) { - LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); - } - } else { - if (nextBatchMillis <= nowMillis) { - nextBatchMillis = nowMillis + manager.getDelayMillis(); - } - try { - final long interval = nextBatchMillis - nowMillis; - gate.waitForOpen(interval); - } catch (final InterruptedException ie) { - LOGGER.warn("WriterThread interrupted, continuing"); - } catch (final Exception ex) { - LOGGER.error("WriterThread encountered an exception waiting for work", ex); - break; - } - } - } - - if (batchSize > 1 && database.count() > 0) { - final DatabaseEntry key = new DatabaseEntry(); - final DatabaseEntry data = new DatabaseEntry(); - try { - sendBatch(key, data); - } catch (final Exception ex) { - LOGGER.warn("Unable to write final batch"); - } - } - LOGGER.trace("WriterThread exiting"); - } - - private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { - boolean errors = false; - OperationStatus status; - Cursor cursor = null; - try { - final BatchEvent batch = new BatchEvent(); - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - try { - cursor = database.openCursor(null, CursorConfig.DEFAULT); - status = cursor.getFirst(key, data, null); - - for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { - final SimpleEvent event = createEvent(data); - if (event != null) { - batch.addEvent(event); - } - status = cursor.getNext(key, data, null); - } - break; - } catch (final LockConflictException lce) { - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - } - } - - try { - manager.send(batch); - } catch (final Exception ioe) { - LOGGER.error("Error sending events", ioe); - errors = true; - } - if (!errors) { - if (cursor != null) { - cursor.close(); - cursor = null; - } - Transaction txn = null; - Exception exception = null; - for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { - try { - txn = environment.beginTransaction(null, null); - try { - for (final Event event : batch.getEvents()) { - try { - final Map headers = event.getHeaders(); - key = new DatabaseEntry( - headers.get(FlumeEvent.GUID).getBytes(UTF8)); - database.delete(txn, key); - } catch (final Exception ex) { - LOGGER.error("Error deleting key from database", ex); - } - } - txn.commit(); - long count = dbCounter.get(); - while (!dbCounter.compareAndSet( - count, count - batch.getEvents().size())) { - count = dbCounter.get(); - } - exception = null; - break; - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting transaction during lock conflict."); - } - } - } catch (final Exception ex) { - LOGGER.error("Unable to commit transaction", ex); - if (txn != null) { - txn.abort(); - } - } - } catch (final LockConflictException lce) { - exception = lce; - if (cursor != null) { - try { - cursor.close(); - cursor = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception closing cursor during lock conflict."); - } - } - if (txn != null) { - try { - txn.abort(); - txn = null; - } catch (final Exception ex) { - LOGGER.trace("Ignored exception aborting transaction during lock conflict."); - } - } - } finally { - if (cursor != null) { - cursor.close(); - cursor = null; - } - if (txn != null) { - txn.abort(); - txn = null; - } - } - try { - Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); - } catch (final InterruptedException ie) { - // Ignore the error - } - } - if (exception != null) { - LOGGER.error("Unable to delete events from data base", exception); - } - } - } catch (final Exception ex) { - LOGGER.error("Error reading database", ex); - shutdown = true; - throw ex; - } finally { - if (cursor != null) { - cursor.close(); - } - } - - return errors; - } - - @SuppressFBWarnings( - value = {"CIPHER_INTEGRITY", "ECB_MODE"}, - justification = "Work-in-progress: https://github.com/apache/logging-log4j2/issues/1947") - private SimpleEvent createEvent(final DatabaseEntry data) { - final SimpleEvent event = new SimpleEvent(); - try { - byte[] eventData = data.getData(); - if (secretKey != null) { - final Cipher cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, secretKey); - eventData = cipher.doFinal(eventData); - } - final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); - final DataInputStream dais = new DataInputStream(bais); - int length = dais.readInt(); - final byte[] bytes = new byte[length]; - dais.read(bytes, 0, length); - event.setBody(bytes); - length = dais.readInt(); - final Map map = new HashMap<>(length); - for (int i = 0; i < length; ++i) { - final String headerKey = dais.readUTF(); - final String value = dais.readUTF(); - map.put(headerKey, value); - } - event.setHeaders(map); - return event; - } catch (final Exception ex) { - LOGGER.error("Error retrieving event", ex); - return null; - } - } - } - - /** - * An internal class. - */ - private static class Gate { - - private boolean isOpen = false; - - public boolean isOpen() { - return isOpen; - } - - public synchronized void open() { - isOpen = true; - notifyAll(); - } - - public synchronized void close() { - isOpen = false; - } - - public synchronized void waitForOpen(final long timeout) throws InterruptedException { - wait(timeout); - } - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java deleted file mode 100644 index 33233303ce3..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/Log4jEventSource.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import org.apache.flume.ChannelException; -import org.apache.flume.Event; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.instrumentation.SourceCounter; -import org.apache.flume.source.AbstractSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class Log4jEventSource extends AbstractSource implements EventDrivenSource { - - private static final Logger LOGGER = LoggerFactory.getLogger(Log4jEventSource.class); - - private final SourceCounter sourceCounter = new SourceCounter("log4j"); - - public Log4jEventSource() { - setName("Log4jEvent"); - } - - @Override - public synchronized void start() { - super.start(); - - LOGGER.info("Log4j Source started"); - } - - @Override - public synchronized void stop() { - super.stop(); - - LOGGER.info("Log4j Source stopped. Metrics {}", sourceCounter); - } - - public void send(final Event event) { - sourceCounter.incrementAppendReceivedCount(); - sourceCounter.incrementEventReceivedCount(); - try { - getChannelProcessor().processEvent(event); - } catch (final ChannelException ex) { - LOGGER.warn("Unable to process event {}", event, ex); - throw ex; - } - sourceCounter.incrementAppendAcceptedCount(); - sourceCounter.incrementEventAcceptedCount(); - } -} diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java deleted file mode 100644 index 77dac0c5d45..00000000000 --- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/package-info.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ -/** - * Apache Flume Appender. Requires the user specifically include Flume and its dependencies. - */ -@Export -@Open("org.apache.logging.log4j.core") -@Version("2.20.2") -package org.apache.logging.log4j.flume.appender; - -import aQute.bnd.annotation.jpms.Open; -import org.osgi.annotation.bundle.Export; -import org.osgi.annotation.versioning.Version; diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java deleted file mode 100644 index 0e917efef5e..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.zip.GZIPInputStream; -import org.apache.flume.Channel; -import org.apache.flume.ChannelException; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; -import org.apache.flume.conf.Configurables; -import org.apache.flume.lifecycle.LifecycleController; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.source.AvroSource; -import org.apache.logging.log4j.EventLogger; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.ThreadContext; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Logger; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.core.util.Integers; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.status.StatusLogger; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class FlumeAppenderTest { - - private AvroSource eventSource; - private Channel channel; - private Logger avroLogger; - private String testPort; - - @BeforeClass - public static void setupClass() { - StatusLogger.getLogger().setLevel(Level.OFF); - } - - @Before - public void setUp() throws Exception { - eventSource = new AvroSource(); - channel = new MemoryChannel(); - - Configurables.configure(channel, new Context()); - - avroLogger = (Logger) LogManager.getLogger("avrologger"); - /* - * Clear out all other appenders associated with this logger to ensure - * we're only hitting the Avro appender. - */ - removeAppenders(avroLogger); - final Context context = new Context(); - testPort = String.valueOf(AvailablePortFinder.getNextAvailable()); - context.put("port", testPort); - context.put("bind", "0.0.0.0"); - Configurables.configure(eventSource, context); - - final List channels = new ArrayList<>(); - channels.add(channel); - - final ChannelSelector cs = new ReplicatingChannelSelector(); - cs.setChannels(channels); - - eventSource.setChannelProcessor(new ChannelProcessor(cs)); - - eventSource.start(); - - Assert.assertTrue( - "Reached start or error", LifecycleController.waitForOneOf(eventSource, LifecycleState.START_OR_ERROR)); - Assert.assertEquals("Server is started", LifecycleState.START, eventSource.getLifecycleState()); - } - - @After - public void teardown() throws Exception { - removeAppenders(avroLogger); - eventSource.stop(); - Assert.assertTrue( - "Reached stop or error", LifecycleController.waitForOneOf(eventSource, LifecycleState.STOP_OR_ERROR)); - Assert.assertEquals("Server is stopped", LifecycleState.STOP, eventSource.getLifecycleState()); - } - - @Test - public void testLog4jAvroAppender() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - avroLogger.info("Test message"); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - final Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message")); - transaction.commit(); - transaction.close(); - - eventSource.stop(); - } - - @Test - public void testLog4jAvroAppenderWithHostsParam() throws IOException { - final String hosts = String.format("localhost:%s", testPort); - final FlumeAppender avroAppender = FlumeAppender.createAppender( - null, null, hosts, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - avroLogger.info("Test message"); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - final Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message")); - transaction.commit(); - transaction.close(); - - eventSource.stop(); - } - - @Test - public void testStructured() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, "ReqCtx_", null, "true", "1", null, null, null, null); - avroAppender.start(); - final Logger eventLogger = (Logger) LogManager.getLogger("EventLogger"); - Assert.assertNotNull(eventLogger); - eventLogger.addAppender(avroAppender); - eventLogger.setLevel(Level.ALL); - - final StructuredDataMessage msg = new StructuredDataMessage("Transfer", "Success", "Audit"); - msg.put("memo", "This is a memo"); - msg.put("acct", "12345"); - msg.put("amount", "100.00"); - ThreadContext.put("id", UUID.randomUUID().toString()); - ThreadContext.put("memo", null); - ThreadContext.put("test", "123"); - - EventLogger.logEvent(msg); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - final Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Success")); - transaction.commit(); - transaction.close(); - - eventSource.stop(); - eventLogger.removeAppender(avroAppender); - avroAppender.stop(); - } - - @Test - public void testMultiple() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - for (int i = 0; i < 10; ++i) { - avroLogger.info("Test message " + i); - } - - for (int i = 0; i < 10; ++i) { - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - final Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message " + i)); - transaction.commit(); - transaction.close(); - } - - eventSource.stop(); - } - - // @Ignore //(Remko: this test hangs my build...) - @Test - public void testIncompleteBatch() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "500", "avro", "false", null, null, - null, null, null, "true", "10", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - avroLogger.info("Test message 0"); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - Event event = channel.take(); - Assert.assertNull("Received event", event); - - try { - Thread.sleep(500); - } catch (final InterruptedException ie) { - } - - avroLogger.info("Test message 1"); - for (int i = 0; i < 2; ++i) { - event = channel.take(); - Assert.assertNotNull("No event for item " + i, event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message " + i)); - } - transaction.commit(); - transaction.close(); - - eventSource.stop(); - } - - @Test - public void testIncompleteBatch2() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "500", "avro", "false", null, null, - null, null, null, "true", "10", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - avroLogger.info("Test message 0"); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - avroLogger.info("Test message 1"); - avroLogger.info("Test message 2"); - avroAppender.stop(); - for (int i = 0; i < 3; ++i) { - final Event event = channel.take(); - Assert.assertNotNull("No event for item " + i, event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received : " + getBody(event), - getBody(event).endsWith("Test message " + i)); - } - transaction.commit(); - transaction.close(); - - eventSource.stop(); - } - - @Test - public void testBatch() throws IOException { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "10", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - Assert.assertNotNull(avroLogger); - - for (int i = 0; i < 10; ++i) { - avroLogger.info("Test message " + i); - } - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - for (int i = 0; i < 10; ++i) { - final Event event = channel.take(); - Assert.assertNotNull("No event for item " + i, event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message " + i)); - } - transaction.commit(); - transaction.close(); - - eventSource.stop(); - } - - @Test - public void testConnectionRefused() { - final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - eventSource.stop(); - - boolean caughtException = false; - - try { - avroLogger.info("message 1"); - } catch (final Throwable t) { - // logger.debug("Logging to a non-existent server failed (as expected)", - // t); - - caughtException = true; - } - - Assert.assertTrue(caughtException); - } - - @Test - public void testNotConnected() throws Exception { - eventSource.stop(); - final String altPort = Integer.toString(Integers.parseInt(testPort) + 1); - final Agent[] agents = - new Agent[] {Agent.createAgent("localhost", testPort), Agent.createAgent("localhost", altPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - Assert.assertTrue("Appender Not started", avroAppender.isStarted()); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - try { - avroLogger.info("Test message"); - Assert.fail("Exception should have been thrown"); - } catch (final Exception ex) { - - } - - try { - final Context context = new Context(); - context.put("port", altPort); - context.put("bind", "0.0.0.0"); - - Configurables.configure(eventSource, context); - - eventSource.start(); - } catch (final ChannelException e) { - Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage()); - } - - avroLogger.info("Test message 2"); - - final Transaction transaction = channel.getTransaction(); - transaction.begin(); - - final Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message 2")); - transaction.commit(); - transaction.close(); - } - - @Test - public void testReconnect() throws Exception { - final String altPort = Integer.toString(Integers.parseInt(testPort) + 1); - final Agent[] agents = - new Agent[] {Agent.createAgent("localhost", testPort), Agent.createAgent("localhost", altPort)}; - final FlumeAppender avroAppender = FlumeAppender.createAppender( - agents, null, null, "false", "Avro", null, "1000", "1000", "1", "1000", "avro", "false", null, null, - null, null, null, "true", "1", null, null, null, null); - avroAppender.start(); - avroLogger.addAppender(avroAppender); - avroLogger.setLevel(Level.ALL); - - avroLogger.info("Test message"); - - Transaction transaction = channel.getTransaction(); - transaction.begin(); - - Event event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received : " + getBody(event), - getBody(event).endsWith("Test message")); - transaction.commit(); - transaction.close(); - - eventSource.stop(); - try { - final Context context = new Context(); - context.put("port", altPort); - context.put("bind", "0.0.0.0"); - - Configurables.configure(eventSource, context); - - eventSource.start(); - } catch (final ChannelException e) { - Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage()); - } - - avroLogger.info("Test message 2"); - - transaction = channel.getTransaction(); - transaction.begin(); - - event = channel.take(); - Assert.assertNotNull(event); - Assert.assertTrue( - "Channel contained event, but not expected message", - getBody(event).endsWith("Test message 2")); - transaction.commit(); - transaction.close(); - } - - private void removeAppenders(final Logger logger) { - final Map map = logger.getAppenders(); - for (final Map.Entry entry : map.entrySet()) { - final Appender app = entry.getValue(); - avroLogger.removeAppender(app); - app.stop(); - } - } - - private String getBody(final Event event) throws IOException { - if (event == null) { - return ""; - } - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); - int n = 0; - while (-1 != (n = is.read())) { - baos.write(n); - } - return new String(baos.toByteArray()); - } -} diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java deleted file mode 100644 index bbc1fd9dfcb..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.junit.Assert.fail; - -import com.google.common.base.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.netty.NettyServer; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.AvroSourceProtocol; -import org.apache.flume.source.avro.Status; -import org.apache.logging.log4j.EventLogger; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.ConfigurationFactory; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.status.StatusLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class FlumeEmbeddedAgentTest { - private static final String CONFIG = "default_embedded.xml"; - private static final String HOSTNAME = "localhost"; - private static LoggerContext ctx; - - private EventCollector primary; - private EventCollector alternate; - - @BeforeClass - public static void setupClass() { - // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); - final File file = new File("target/file-channel"); - if (!deleteFiles(file)) { - System.err.println("Warning - unable to delete target/file-channel. Test errors may occur"); - } - } - - @AfterClass - public static void cleanupClass() { - StatusLogger.getLogger().reset(); - } - - @Before - public void setUp() throws Exception { - - final File file = new File("target/file-channel"); - deleteFiles(file); - - /* - * Clear out all other appenders associated with this logger to ensure we're - * only hitting the Avro appender. - */ - final int primaryPort = AvailablePortFinder.getNextAvailable(); - final int altPort = AvailablePortFinder.getNextAvailable(); - System.setProperty("primaryPort", Integer.toString(primaryPort)); - System.setProperty("alternatePort", Integer.toString(altPort)); - primary = new EventCollector(primaryPort); - alternate = new EventCollector(altPort); - System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG); - ctx = LoggerContext.getContext(false); - ctx.reconfigure(); - } - - @After - public void teardown() throws Exception { - System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); - ctx.reconfigure(); - primary.stop(); - alternate.stop(); - final File file = new File("target/file-channel"); - deleteFiles(file); - final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null); - for (final ObjectName name : names) { - try { - server.unregisterMBean(name); - } catch (final Exception ex) { - System.out.println("Unable to unregister " + name.toString()); - } - } - } - - @Test - public void testLog4Event() throws IOException { - - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); - EventLogger.logEvent(msg); - - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith("Test Log4j")); - } - - @Test - public void testMultiple() throws IOException { - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - final String expected = "Test Multiple " + i; - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith(expected)); - } - } - - @Test - public void testFailover() throws InterruptedException, IOException { - final Logger logger = LogManager.getLogger("testFailover"); - logger.debug("Starting testFailover"); - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - final String expected = "Test Primary " + i; - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith(expected)); - } - - // Give the AvroSink time to receive notification and notify the channel. - Thread.sleep(500); - - primary.stop(); - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = alternate.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - final String expected = "Test Alternate " + i; - /* When running in Gump Flume consistently returns the last event from the primary channel after - the failover, which fails this test */ - Assert.assertTrue( - "Channel contained event, but not expected message. Expected: " + expected + " Received: " + body, - body.endsWith(expected)); - } - } - - private String getBody(final Event event) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); - int n = 0; - while (-1 != (n = is.read())) { - baos.write(n); - } - return new String(baos.toByteArray()); - } - - private static boolean deleteFiles(final File file) { - boolean result = true; - if (file.isDirectory()) { - - final File[] files = file.listFiles(); - for (final File child : files) { - result &= deleteFiles(child); - } - - } else if (!file.exists()) { - return true; - } - - return result && file.delete(); - } - - private static class EventCollector implements AvroSourceProtocol { - private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - - private Server server; - - public EventCollector(final int port) { - try { - server = createServer(this, port); - } catch (InterruptedException ex) { - fail("Server creation was interrrupted"); - } - server.start(); - } - - private Server createServer(final AvroSourceProtocol protocol, final int port) throws InterruptedException { - - server = new NettyServer( - new SpecificResponder(AvroSourceProtocol.class, protocol), new InetSocketAddress(HOSTNAME, port)); - - return server; - } - - public void stop() { - server.close(); - } - - public Event poll() { - - AvroFlumeEvent avroEvent = null; - try { - avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException ie) { - // Ignore the exception. - } - if (avroEvent != null) { - return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); - } - System.out.println("No Event returned"); - return null; - } - - @Override - public Status append(final AvroFlumeEvent event) { - eventQueue.add(event); - return Status.OK; - } - - @Override - public Status appendBatch(final List events) { - Preconditions.checkState(eventQueue.addAll(events)); - return Status.OK; - } - } - - private static Map toStringMap(final Map charSeqMap) { - final Map stringMap = new HashMap<>(); - for (final Map.Entry entry : charSeqMap.entrySet()) { - stringMap.put(entry.getKey().toString(), entry.getValue().toString()); - } - return stringMap; - } -} diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java deleted file mode 100644 index f42d403f8e9..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.junit.Assert.fail; - -import com.google.common.base.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.netty.NettyServer; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.AvroSourceProtocol; -import org.apache.flume.source.avro.Status; -import org.apache.logging.log4j.EventLogger; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.ConfigurationFactory; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.status.StatusLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class FlumeEmbeddedAppenderTest { - private static final String CONFIG = "embedded.xml"; - private static final String HOSTNAME = "localhost"; - private static LoggerContext ctx; - - private EventCollector primary; - private EventCollector alternate; - - @BeforeClass - public static void setupClass() { - // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); - final File file = new File("target/file-channel"); - if (!deleteFiles(file)) { - System.err.println("Warning - unable to delete target/file-channel. Test errors may occur"); - } - } - - @AfterClass - public static void cleanupClass() { - StatusLogger.getLogger().reset(); - } - - @Before - public void setUp() throws Exception { - - final File file = new File("target/file-channel"); - deleteFiles(file); - - /* - * Clear out all other appenders associated with this logger to ensure we're - * only hitting the Avro appender. - */ - final int primaryPort = AvailablePortFinder.getNextAvailable(); - final int altPort = AvailablePortFinder.getNextAvailable(); - System.setProperty("primaryPort", Integer.toString(primaryPort)); - System.setProperty("alternatePort", Integer.toString(altPort)); - primary = new EventCollector(primaryPort); - alternate = new EventCollector(altPort); - System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG); - ctx = LoggerContext.getContext(false); - ctx.reconfigure(); - } - - @After - public void teardown() throws Exception { - System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); - ctx.reconfigure(); - primary.stop(); - alternate.stop(); - final File file = new File("target/file-channel"); - deleteFiles(file); - final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null); - for (final ObjectName name : names) { - try { - server.unregisterMBean(name); - } catch (final Exception ex) { - System.out.println("Unable to unregister " + name.toString()); - } - } - } - - @Test - public void testLog4Event() throws IOException { - - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); - EventLogger.logEvent(msg); - - final Event event = primary.poll(); - Assert.assertNotNull("Event should not be null", event); - final String body = getBody(event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith("Test Log4j")); - } - - @Test - public void testMultiple() throws IOException { - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull("Event should not be null", event); - final String body = getBody(event); - final String expected = "Test Multiple " + i; - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith(expected)); - } - } - - @Test - public void testFailover() throws InterruptedException, IOException { - final Logger logger = LogManager.getLogger("testFailover"); - logger.debug("Starting testFailover"); - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull("Event should not be null", event); - final String body = getBody(event); - final String expected = "Test Primary " + i; - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith(expected)); - } - - // Give the AvroSink time to receive notification and notify the channel. - Thread.sleep(500); - - primary.stop(); - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test"); - EventLogger.logEvent(msg); - } - for (int i = 0; i < 10; ++i) { - final Event event = alternate.poll(); - Assert.assertNotNull("Event should not be null", event); - final String body = getBody(event); - final String expected = "Test Alternate " + i; - /* When running in Gump Flume consistently returns the last event from the primary channel after - the failover, which fails this test */ - Assert.assertTrue( - "Channel contained event, but not expected message. Expected: " + expected + " Received: " + body, - body.endsWith(expected)); - } - } - /* Flume 1.4.0 does not support interceptors on the embedded agent - @Test */ - private void testHeaderAddedByInterceptor() { - - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); - EventLogger.logEvent(msg); - - final Event event = primary.poll(); - Assert.assertNotNull("Event should not be null", event); - final String environmentHeader = event.getHeaders().get("environment"); - Assert.assertEquals("local", environmentHeader); - } - - /* @Test */ - private void testPerformance() throws Exception { - final long start = System.currentTimeMillis(); - final int count = 10000; - for (int i = 0; i < count; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - final long elapsed = System.currentTimeMillis() - start; - System.out.println("Time to log " + count + " events " + elapsed + "ms"); - } - - private String getBody(final Event event) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); - int n = 0; - while (-1 != (n = is.read())) { - baos.write(n); - } - return new String(baos.toByteArray()); - } - - private static boolean deleteFiles(final File file) { - boolean result = true; - if (file.isDirectory()) { - - final File[] files = file.listFiles(); - if (files != null) { - for (final File child : files) { - result &= deleteFiles(child); - } - } - - } else if (!file.exists()) { - return true; - } - - return result && file.delete(); - } - - private static class EventCollector implements AvroSourceProtocol { - private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - - private Server server; - - public EventCollector(final int port) { - try { - server = createServer(this, port); - } catch (InterruptedException ex) { - fail("Server creation was interrrupted"); - } - server.start(); - } - - private Server createServer(final AvroSourceProtocol protocol, final int port) throws InterruptedException { - - server = new NettyServer( - new SpecificResponder(AvroSourceProtocol.class, protocol), new InetSocketAddress(HOSTNAME, port)); - - return server; - } - - public void stop() { - server.close(); - } - - public Event poll() { - - AvroFlumeEvent avroEvent = null; - try { - avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException ie) { - // Ignore the exception. - } - if (avroEvent != null) { - return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); - } - System.out.println("No Event returned"); - return null; - } - - @Override - public Status append(final AvroFlumeEvent event) { - eventQueue.add(event); - return Status.OK; - } - - @Override - public Status appendBatch(final List events) { - Preconditions.checkState(eventQueue.addAll(events)); - return Status.OK; - } - } - - private static Map toStringMap(final Map charSeqMap) { - final Map stringMap = new HashMap<>(); - for (final Map.Entry entry : charSeqMap.entrySet()) { - stringMap.put(entry.getKey().toString(), entry.getValue().toString()); - } - return stringMap; - } -} diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java deleted file mode 100644 index 6d16f0b16cd..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentAppenderTest.java +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.junit.Assert.fail; - -import com.google.common.base.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.netty.NettyServer; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.AvroSourceProtocol; -import org.apache.flume.source.avro.Status; -import org.apache.logging.log4j.EventLogger; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.Marker; -import org.apache.logging.log4j.MarkerManager; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.ConfigurationFactory; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.core.util.Integers; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.status.StatusLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class FlumePersistentAppenderTest { - private static final String CONFIG = "persistent.xml"; - private static final String HOSTNAME = "localhost"; - private static LoggerContext ctx; - - private EventCollector primary; - private EventCollector alternate; - - @BeforeClass - public static void setupClass() { - // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); - final File file = new File("target/file-channel"); - if (!deleteFiles(file)) { - System.err.println("Warning - unable to delete target/file-channel. Test errors may occur"); - } - } - - @AfterClass - public static void cleanupClass() { - StatusLogger.getLogger().reset(); - } - - @Before - public void setUp() throws Exception { - - final File file = new File("target/persistent"); - deleteFiles(file); - - /* - * Clear out all other appenders associated with this logger to ensure we're - * only hitting the Avro appender. - */ - final int primaryPort = AvailablePortFinder.getNextAvailable(); - final int altPort = AvailablePortFinder.getNextAvailable(); - System.setProperty("primaryPort", Integer.toString(primaryPort)); - System.setProperty("alternatePort", Integer.toString(altPort)); - primary = new EventCollector(primaryPort); - alternate = new EventCollector(altPort); - System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG); - ctx = LoggerContext.getContext(false); - ctx.reconfigure(); - } - - @After - public void teardown() throws Exception { - System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); - ctx.reconfigure(); - primary.stop(); - alternate.stop(); - final File file = new File("target/file-channel"); - deleteFiles(file); - final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null); - for (final ObjectName name : names) { - try { - server.unregisterMBean(name); - } catch (final Exception ex) { - System.out.println("Unable to unregister " + name.toString()); - } - } - } - - @Test - public void testLog4Event() throws IOException { - - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); - EventLogger.logEvent(msg); - - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, body.endsWith("Test Log4j")); - } - - @Test - public void testMultiple() { - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - final boolean[] fields = new boolean[10]; - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event); - final String value = event.getHeaders().get("counter"); - Assert.assertNotNull("Missing 'counter' in map " + event.getHeaders() + ", i = " + i, value); - final int counter = Integers.parseInt(value); - if (fields[counter]) { - Assert.fail("Duplicate event"); - } else { - fields[counter] = true; - } - } - for (int i = 0; i < 10; ++i) { - Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]); - } - } - - @Test - public void testFailover() throws InterruptedException { - final Logger logger = LogManager.getLogger("testFailover"); - logger.debug("Starting testFailover"); - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - boolean[] fields = new boolean[10]; - for (int i = 0; i < 10; ++i) { - final Event event = primary.poll(); - Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event); - final String value = event.getHeaders().get("counter"); - Assert.assertNotNull("Missing counter", value); - final int counter = Integers.parseInt(value); - if (fields[counter]) { - Assert.fail("Duplicate event"); - } else { - fields[counter] = true; - } - } - for (int i = 0; i < 10; ++i) { - Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]); - } - - // Give the AvroSink time to receive notification and notify the channel. - Thread.sleep(500); - - primary.stop(); - - for (int i = 0; i < 10; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test"); - msg.put("cntr", Integer.toString(i)); - EventLogger.logEvent(msg); - } - fields = new boolean[10]; - for (int i = 0; i < 10; ++i) { - final Event event = alternate.poll(); - Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event); - final String value = event.getHeaders().get("cntr"); - Assert.assertNotNull("Missing counter", value); - final int counter = Integer.parseInt(value); - if (fields[counter]) { - Assert.fail("Duplicate event"); - } else { - fields[counter] = true; - } - } - for (int i = 0; i < 10; ++i) { - Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]); - } - } - - @Test - public void testSingle() throws IOException { - - final Logger logger = LogManager.getLogger("EventLogger"); - final Marker marker = MarkerManager.getMarker("EVENT"); - logger.info(marker, "This is a test message"); - - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - Assert.assertTrue( - "Channel contained event, but not expected message. Received: " + body, - body.endsWith("This is a test message")); - } - - @Test - public void testMultipleConcurrent() throws InterruptedException { - - final int eventsCount = 10000; - - final Thread writer1 = new WriterThread(0, eventsCount / 4); - final Thread writer2 = new WriterThread(eventsCount / 4, eventsCount / 2); - final Thread writer3 = new WriterThread(eventsCount / 2, (3 * eventsCount) / 4); - final Thread writer4 = new WriterThread((3 * eventsCount) / 4, eventsCount); - writer1.start(); - writer2.start(); - writer3.start(); - writer4.start(); - - final boolean[] fields = new boolean[eventsCount]; - final Thread reader1 = new ReaderThread(0, eventsCount / 4, fields); - final Thread reader2 = new ReaderThread(eventsCount / 4, eventsCount / 2, fields); - final Thread reader3 = new ReaderThread(eventsCount / 2, (eventsCount * 3) / 4, fields); - final Thread reader4 = new ReaderThread((eventsCount * 3) / 4, eventsCount, fields); - - reader1.start(); - reader2.start(); - reader3.start(); - reader4.start(); - - writer1.join(); - writer2.join(); - writer3.join(); - writer4.join(); - reader1.join(); - reader2.join(); - reader3.join(); - reader4.join(); - - for (int i = 0; i < eventsCount; ++i) { - Assert.assertTrue("Channel contained event, but not expected message " + i, fields[i]); - } - } - - @Test - public void testRFC5424Layout() throws IOException { - - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test"); - EventLogger.logEvent(msg); - - final Event event = primary.poll(); - Assert.assertNotNull(event); - final String body = getBody(event); - Assert.assertTrue("Structured message does not contain @EID: " + body, body.contains("Test@18060")); - } - - private class WriterThread extends Thread { - - private final int start; - private final int stop; - - public WriterThread(final int start, final int stop) { - this.start = start; - this.stop = stop; - } - - @Override - public void run() { - for (int i = start; i < stop; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - } - } - - private final class ReaderThread extends Thread { - private final int start; - private final int stop; - private final boolean[] fields; - - private ReaderThread(final int start, final int stop, final boolean[] fields) { - this.start = start; - this.stop = stop; - this.fields = fields; - } - - @Override - public void run() { - - for (int i = start; i < stop; ++i) { - Event event = primary.poll(); - while (event == null) { - event = primary.poll(); - } - - Assert.assertNotNull("Received " + i + " events. Event " + (i + 1) + " is null", event); - final String value = event.getHeaders().get("counter"); - Assert.assertNotNull("Missing counter", value); - final int counter = Integers.parseInt(value); - if (fields[counter]) { - Assert.fail("Duplicate event"); - } else { - fields[counter] = true; - } - } - } - } - - @Test - public void testLogInterrupted() { - final ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.execute(() -> { - executor.shutdownNow(); - final Logger logger = LogManager.getLogger("EventLogger"); - final Marker marker = MarkerManager.getMarker("EVENT"); - logger.info(marker, "This is a test message"); - Assert.assertTrue( - "Interruption status not preserved", Thread.currentThread().isInterrupted()); - }); - } - - /* - @Test - public void testPerformance() throws Exception { - long start = System.currentTimeMillis(); - int count = 1000; - for (int i = 0; i < count; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - long elapsed = System.currentTimeMillis() - start; - System.out.println("Time to log " + count + " events " + elapsed + "ms"); - } */ - - private String getBody(final Event event) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); - int n = 0; - while (-1 != (n = is.read())) { - baos.write(n); - } - return new String(baos.toByteArray()); - } - - private static boolean deleteFiles(final File file) { - boolean result = true; - if (file.isDirectory()) { - - final File[] files = file.listFiles(); - if (files != null) { - for (final File child : files) { - result &= deleteFiles(child); - } - } - } else if (!file.exists()) { - return true; - } - - return result && file.delete(); - } - - private static class EventCollector implements AvroSourceProtocol { - private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - - private Server server; - - public EventCollector(final int port) { - try { - server = createServer(this, port); - } catch (InterruptedException ex) { - fail("Server creation was interrrupted"); - } - server.start(); - } - - private Server createServer(final AvroSourceProtocol protocol, final int port) throws InterruptedException { - - server = new NettyServer( - new SpecificResponder(AvroSourceProtocol.class, protocol), new InetSocketAddress(HOSTNAME, port)); - - return server; - } - - public void stop() { - server.close(); - } - - public Event poll() { - - AvroFlumeEvent avroEvent = null; - try { - avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException ie) { - // Ignore the exception. - } - if (avroEvent != null) { - return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); - } - System.out.println("No Event returned"); - return null; - } - - @Override - public Status append(final AvroFlumeEvent event) { - eventQueue.add(event); - // System.out.println("Received event " + event.getHeaders().get(new - // org.apache.avro.util.Utf8(FlumeEvent.GUID))); - return Status.OK; - } - - @Override - public Status appendBatch(final List events) { - Preconditions.checkState(eventQueue.addAll(events)); - for (final AvroFlumeEvent event : events) { - // System.out.println("Received event " + event.getHeaders().get(new - // org.apache.avro.util.Utf8(FlumeEvent.GUID))); - } - return Status.OK; - } - } - - private static Map toStringMap(final Map charSeqMap) { - final Map stringMap = new HashMap<>(); - for (final Map.Entry entry : charSeqMap.entrySet()) { - stringMap.put(entry.getKey().toString(), entry.getValue().toString()); - } - return stringMap; - } -} diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java deleted file mode 100644 index 08f166939ae..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumePersistentPerf.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.appender; - -import static org.junit.Assert.fail; - -import com.google.common.base.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPInputStream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.netty.NettyServer; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.AvroSourceProtocol; -import org.apache.flume.source.avro.Status; -import org.apache.logging.log4j.EventLogger; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.ConfigurationFactory; -import org.apache.logging.log4j.core.test.AvailablePortFinder; -import org.apache.logging.log4j.message.StructuredDataMessage; -import org.apache.logging.log4j.status.StatusLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - */ -public class FlumePersistentPerf { - private static final String CONFIG = "persistent.xml"; - private static final String HOSTNAME = "localhost"; - private static LoggerContext ctx; - - private EventCollector primary; - private EventCollector alternate; - - @BeforeClass - public static void setupClass() { - // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString()); - final File file = new File("target/file-channel"); - if (!deleteFiles(file)) { - System.err.println("Warning - unable to delete target/file-channel. Test errors may occur"); - } - } - - @AfterClass - public static void cleanupClass() { - StatusLogger.getLogger().reset(); - } - - @Before - public void setUp() throws Exception { - - final File file = new File("target/persistent"); - deleteFiles(file); - - /* - * Clear out all other appenders associated with this logger to ensure we're - * only hitting the Avro appender. - */ - final int primaryPort = AvailablePortFinder.getNextAvailable(); - final int altPort = AvailablePortFinder.getNextAvailable(); - System.setProperty("primaryPort", Integer.toString(primaryPort)); - System.setProperty("alternatePort", Integer.toString(altPort)); - primary = new EventCollector(primaryPort); - alternate = new EventCollector(altPort); - System.setProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY, CONFIG); - ctx = LoggerContext.getContext(false); - ctx.reconfigure(); - } - - @After - public void teardown() throws Exception { - System.clearProperty(ConfigurationFactory.CONFIGURATION_FILE_PROPERTY); - ctx.reconfigure(); - primary.stop(); - alternate.stop(); - final File file = new File("target/file-channel"); - deleteFiles(file); - final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - final Set names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null); - for (final ObjectName name : names) { - try { - server.unregisterMBean(name); - } catch (final Exception ex) { - System.out.println("Unable to unregister " + name.toString()); - } - } - } - - @Test - public void testPerformance() throws Exception { - final long start = System.currentTimeMillis(); - final int count = 10000; - for (int i = 0; i < count; ++i) { - final StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test"); - msg.put("counter", Integer.toString(i)); - EventLogger.logEvent(msg); - } - final long elapsed = System.currentTimeMillis() - start; - System.out.println("Time to log " + count + " events " + elapsed + "ms"); - } - - private String getBody(final Event event) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody())); - int n = 0; - while (-1 != (n = is.read())) { - baos.write(n); - } - return new String(baos.toByteArray()); - } - - private static boolean deleteFiles(final File file) { - boolean result = true; - if (file.isDirectory()) { - - final File[] files = file.listFiles(); - if (files != null) { - for (final File child : files) { - result &= deleteFiles(child); - } - } - } else if (!file.exists()) { - return true; - } - - return result && file.delete(); - } - - private static class EventCollector implements AvroSourceProtocol { - private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - - private Server server; - - public EventCollector(final int port) { - try { - server = createServer(this, port); - } catch (InterruptedException ex) { - fail("Server creation was interrrupted"); - } - server.start(); - } - - private Server createServer(final AvroSourceProtocol protocol, final int port) throws InterruptedException { - - server = new NettyServer( - new SpecificResponder(AvroSourceProtocol.class, protocol), new InetSocketAddress(HOSTNAME, port)); - - return server; - } - - public void stop() { - server.close(); - } - - public Event poll() { - - AvroFlumeEvent avroEvent = null; - try { - avroEvent = eventQueue.poll(30000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException ie) { - // Ignore the exception. - } - if (avroEvent != null) { - return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); - } - System.out.println("No Event returned"); - return null; - } - - @Override - public Status append(final AvroFlumeEvent event) { - eventQueue.add(event); - return Status.OK; - } - - @Override - public Status appendBatch(final List events) { - Preconditions.checkState(eventQueue.addAll(events)); - return Status.OK; - } - } - - private static Map toStringMap(final Map charSeqMap) { - final Map stringMap = new HashMap<>(); - for (final Map.Entry entry : charSeqMap.entrySet()) { - stringMap.put(entry.getKey().toString(), entry.getValue().toString()); - } - return stringMap; - } -} diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/test/FlumeKeyProvider.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/test/FlumeKeyProvider.java deleted file mode 100644 index c96c4f336c9..00000000000 --- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/test/FlumeKeyProvider.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.logging.log4j.flume.test; - -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.util.SecretKeyProvider; - -/** - * - */ -@Plugin(name = "FlumeKeyProvider", category = "KeyProvider", elementType = "SecretKeyProvider", printObject = true) -public class FlumeKeyProvider implements SecretKeyProvider { - - private static final byte[] key = new byte[] { - -7, -21, -118, -25, -79, 73, 72, -64, 0, 127, -93, -13, -38, 3, -73, -31, -2, -74, 3, 28, 113, -55, -105, 9, - -103, 97, -5, -54, 88, -110, 97, -4 - }; - - @Override - public SecretKey getSecretKey() { - return new SecretKeySpec(key, "AES"); - } -} diff --git a/log4j-flume-ng/src/test/resources/default_embedded.xml b/log4j-flume-ng/src/test/resources/default_embedded.xml deleted file mode 100644 index d67a68203c2..00000000000 --- a/log4j-flume-ng/src/test/resources/default_embedded.xml +++ /dev/null @@ -1,37 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - diff --git a/log4j-flume-ng/src/test/resources/embedded.xml b/log4j-flume-ng/src/test/resources/embedded.xml deleted file mode 100644 index d0e94043d35..00000000000 --- a/log4j-flume-ng/src/test/resources/embedded.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - - - memory - - agent1 agent2 - avro - localhost - ${sys:primaryPort} - 1 - avro - localhost - ${sys:alternatePort} - 1 - failover - 10 - 5 - - - - - - - - - - - - - - - diff --git a/log4j-flume-ng/src/test/resources/persistent.xml b/log4j-flume-ng/src/test/resources/persistent.xml deleted file mode 100644 index ca0dc45a801..00000000000 --- a/log4j-flume-ng/src/test/resources/persistent.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - diff --git a/log4j-parent/pom.xml b/log4j-parent/pom.xml index 573b2fb27aa..673ce052649 100644 --- a/log4j-parent/pom.xml +++ b/log4j-parent/pom.xml @@ -83,11 +83,9 @@ 7.0.5 4.13.1 1.8.0 - 1.11.0 3.0.22 33.3.0-jre 2.2.224 - 1.2.1 3.0 2.2.2 2.7.3 @@ -109,7 +107,6 @@ 3.3.4 1.7.0 4.0.5 - 18.3.12 0.6.0 9.4.55.v20240627 3.5.12 @@ -127,7 +124,6 @@ 3.9.9 4.11.0 15.4 - 4.1.112.Final 3.20.0 2.7.15 2.0.8 @@ -228,14 +224,6 @@ import - - io.netty - netty-bom - ${netty.version} - pom - import - - org.springframework spring-framework-bom @@ -425,114 +413,6 @@ ${embedded-ldap.version} - - org.apache.flume.flume-ng-channels - flume-file-channel - ${flume.version} - - - junit - junit - - - log4j - log4j - - - org.mortbay.jetty - servlet-api - - - org.mortbay.jetty - servlet-api-2.5 - - - org.slf4j - slf4j-log4j12 - - - - - - org.apache.flume - flume-ng-core - ${flume.version} - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - - - - org.apache.flume - flume-ng-embedded-agent - ${flume.version} - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - - - - org.apache.flume - flume-ng-node - ${flume.version} - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - - - - org.apache.flume - flume-ng-sdk - ${flume.version} - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - - com.google.guava @@ -553,30 +433,6 @@ ${h2.version} - - org.apache.hadoop - hadoop-core - ${hadoop.version} - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - junit - junit - - - org.mortbay.jetty - servlet-api - - - - org.hamcrest hamcrest @@ -711,12 +567,6 @@ ${jctools.version} - - com.sleepycat - je - ${je.version} - - org.zeromq jeromq diff --git a/pom.xml b/pom.xml index aadfc0f438e..bfb7ff9fa30 100644 --- a/pom.xml +++ b/pom.xml @@ -246,7 +246,6 @@ log4j-core-test log4j-couchdb log4j-docker - log4j-flume-ng log4j-iostreams log4j-jakarta-smtp log4j-jakarta-web @@ -428,7 +427,7 @@ org.apache.logging.log4j log4j-flume-ng - ${project.version} + 2.23.1 diff --git a/src/changelog/.2.x.x/move_flume_appender.xml b/src/changelog/.2.x.x/move_flume_appender.xml new file mode 100644 index 00000000000..d2c184d6625 --- /dev/null +++ b/src/changelog/.2.x.x/move_flume_appender.xml @@ -0,0 +1,8 @@ + + + + Move Flume Appender to its own release lifecycle. +