diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 39a9e51ac8125..02f89e38ef815 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -86,4 +86,9 @@
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 6ebf1c71f0d5b..76e8e32070496 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -451,6 +451,7 @@
org.apache.hadoop.fs.s3a.commit.impl.*
org.apache.hadoop.fs.s3a.commit.magic.*
org.apache.hadoop.fs.s3a.commit.staging.*
+ org.apache.hadoop.fs.s3a.audit.mapreduce.*
org.apache.hadoop.mapreduce.**
@@ -462,6 +463,23 @@
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+ generate-avro-sources
+ generate-sources
+
+ schema
+
+
+
+
+ src/main/java/org/apache/hadoop/fs/s3a/audit/avro
+ ${project.build.directory}/generated-sources/avro
+
+
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java
new file mode 100644
index 0000000000000..9dfd698b6687d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.audit.mapreduce.S3AAuditLogMergerAndParser;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_FAIL;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * Its functionality is to parse the audit log files
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+ private final S3AAuditLogMergerAndParser s3AAuditLogMergerAndParser =
+ new S3AAuditLogMergerAndParser();
+
+ /**
+ * Name of this tool: {@value}.
+ */
+ public static final String AUDITTOOL =
+ "org.apache.hadoop.fs.s3a.audit.AuditTool";
+
+ /**
+ * Purpose of this tool: {@value}.
+ */
+ public static final String PURPOSE =
+ "\n\nUSAGE:\nMerge, parse audit log files and convert into avro file "
+ + "for "
+ + "better "
+ + "visualization";
+
+ // Exit codes
+ private static final int SUCCESS = EXIT_SUCCESS;
+ private static final int FAILURE = EXIT_FAIL;
+ private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+ private static final String USAGE =
+ "bin/hadoop " + "Class" + " DestinationPath" + " SourcePath" + "\n" +
+ "bin/hadoop " + AUDITTOOL + " s3a://BUCKET" + " s3a://BUCKET" + "\n";
+
+ private PrintWriter out;
+
+ public AuditTool() {
+ super();
+ }
+
+ /**
+ * Tells us the usage of the AuditTool by commands.
+ *
+ * @return the string USAGE
+ */
+ public String getUsage() {
+ return USAGE + PURPOSE;
+ }
+
+ public String getName() {
+ return AUDITTOOL;
+ }
+
+ /**
+ * This run method in AuditTool takes source and destination path of bucket,
+ * and check if there are directories and pass these paths to merge and
+ * parse audit log files.
+ *
+ * @param args argument list
+ * @return SUCCESS i.e, '0', which is an exit code
+ * @throws Exception on any failure.
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ List paths = Arrays.asList(args);
+ if(paths.size() == 2) {
+ // Path of audit log files
+ Path logsPath = new Path(paths.get(1));
+ // Path of destination directory
+ Path destPath = new Path(paths.get(0));
+
+ // Setting the file system
+ URI fsURI = new URI(logsPath.toString());
+ FileSystem fileSystem = FileSystem.get(fsURI, new Configuration());
+
+ FileStatus fileStatus = fileSystem.getFileStatus(logsPath);
+ if (fileStatus.isFile()) {
+ errorln("Expecting a directory, but " + logsPath.getName() + " is a"
+ + " file which was passed as an argument");
+ throw invalidArgs(
+ "Expecting a directory, but " + logsPath.getName() + " is a"
+ + " file which was passed as an argument");
+ }
+ FileStatus fileStatus1 = fileSystem.getFileStatus(destPath);
+ if (fileStatus1.isFile()) {
+ errorln("Expecting a directory, but " + destPath.getName() + " is a"
+ + " file which was passed as an argument");
+ throw invalidArgs(
+ "Expecting a directory, but " + destPath.getName() + " is a"
+ + " file which was passed as an argument");
+ }
+
+ // Calls S3AAuditLogMergerAndParser for implementing merging, passing of
+ // audit log files and converting into avro file
+ boolean mergeAndParseResult =
+ s3AAuditLogMergerAndParser.mergeAndParseAuditLogFiles(
+ fileSystem, logsPath, destPath);
+ if (!mergeAndParseResult) {
+ return FAILURE;
+ }
+ } else {
+ errorln(getUsage());
+ throw invalidArgs("Invalid number of arguments, please specify audit "
+ + "log files directory as 1st argument and destination directory "
+ + "as 2nd argument");
+ }
+ return SUCCESS;
+ }
+
+ protected static void errorln(String x) {
+ System.err.println(x);
+ }
+
+ /**
+ * Build the exception to raise on invalid arguments.
+ *
+ * @param format string format
+ * @param args optional arguments for the string
+ * @return a new exception to throw
+ */
+ protected static ExitUtil.ExitException invalidArgs(
+ String format, Object... args) {
+ return exitException(INVALID_ARGUMENT, format, args);
+ }
+
+ /**
+ * Build a exception to throw with a formatted message.
+ *
+ * @param exitCode exit code to use
+ * @param format string format
+ * @param args optional arguments for the string
+ * @return a new exception to throw
+ */
+ protected static ExitUtil.ExitException exitException(
+ final int exitCode,
+ final String format,
+ final Object... args) {
+ return new ExitUtil.ExitException(exitCode,
+ String.format(format, args));
+ }
+
+ /**
+ * Convert a path to a URI, catching any {@code URISyntaxException}
+ * and converting to an invalid args exception.
+ *
+ * @param s3Path path to convert to a URI
+ * @return a URI of the path
+ * @throws ExitUtil.ExitException INVALID_ARGUMENT if the URI is invalid
+ */
+ protected static URI toUri(String s3Path) {
+ URI uri;
+ try {
+ uri = new URI(s3Path);
+ } catch (URISyntaxException e) {
+ throw invalidArgs("Not a valid fileystem path: %s", s3Path);
+ }
+ return uri;
+ }
+
+ /**
+ * Flush all active output channels, including {@Code System.err},
+ * so as to stay in sync with any JRE log messages.
+ */
+ private void flush() {
+ if (out != null) {
+ out.flush();
+ } else {
+ System.out.flush();
+ }
+ System.err.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ /**
+ * Inner entry point, with no logging or system exits.
+ *
+ * @param conf configuration
+ * @param argv argument list
+ * @return an exception
+ * @throws Exception Exception.
+ */
+ public static int exec(Configuration conf, String... argv) throws Exception {
+ try (AuditTool auditTool = new AuditTool()) {
+ return ToolRunner.run(conf, auditTool, argv);
+ }
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param argv args list
+ */
+ public static void main(String[] argv) {
+ try {
+ ExitUtil.terminate(exec(new Configuration(), argv));
+ } catch (ExitUtil.ExitException e) {
+ LOG.error(e.toString());
+ System.exit(e.status);
+ } catch (Exception e) {
+ LOG.error(e.toString(), e);
+ ExitUtil.halt(-1, e);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/avro/AvroDataSchema.avsc b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/avro/AvroDataSchema.avsc
new file mode 100644
index 0000000000000..747f8ab2e9a62
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/avro/AvroDataSchema.avsc
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type" : "record", "name" : "AvroS3LogEntryRecord",
+ "namespace" : "org.apache.hadoop.fs.s3a.audit",
+ "fields" : [
+ { "name" : "turnaroundtime" , "type" : ["long", "null"] },
+ { "name" : "remoteip", "type" : "string" },
+ { "name" : "auth", "type" : "string" },
+ { "name" : "useragent", "type" : "string" },
+ { "name" : "hostid", "type" : "string" },
+ { "name" : "requesturi", "type" : "string" },
+ { "name" : "endpoint", "type" : "string" },
+ { "name" : "bytessent", "type" : ["long", "null"] },
+ { "name" : "cypher", "type" : "string" },
+ { "name" : "key", "type" : "string" },
+ { "name" : "timestamp", "type" : "string" },
+ { "name" : "awserrorcode", "type" : "string" },
+ { "name" : "owner", "type" : "string" },
+ { "name" : "requester", "type" : "string" },
+ { "name" : "objectsize", "type" : ["long", "null"] },
+ { "name" : "tail", "type" : "string" },
+ { "name" : "verb", "type" : "string" },
+ { "name" : "version", "type" : "string" },
+ { "name" : "bucket", "type" : "string" },
+ { "name" : "sigv", "type" : "string" },
+ { "name" : "referrer", "type" : "string" },
+ { "name" : "totaltime", "type" : ["long", "null"] },
+ { "name" : "requestid", "type" : "string" },
+ { "name" : "http", "type" : "string" },
+ { "name" : "tls", "type" : "string" },
+ { "name" : "referrerMap", "type" : {"type": "map", "values": "string"} }
+ ]
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java
new file mode 100644
index 0000000000000..1ac0282208794
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroS3LogEntryRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.BYTESSENT_GROUP;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.OBJECTSIZE_GROUP;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.TOTALTIME_GROUP;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.TURNAROUNDTIME_GROUP;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ private long auditLogsParsed = 0;
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs of a single audit log
+ */
+ public HashMap parseAuditLog(String singleAuditLog) {
+ HashMap auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.info(
+ "This is an empty string or null string, expected a valid string to parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap parseReferrerHeader(String referrerHeader) {
+ HashMap referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.info(
+ "This is an empty string or null string, expected a valid string to parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+
+ try (FSDataInputStream fsDataInputStream =
+ awaitFuture(fileSystem.openFile(fileStatus.getPath())
+ .withFileStatus(fileStatus)
+ .build())) {
+
+ // Instantiating generated AvroDataRecord class
+ AvroS3LogEntryRecord avroDataRecord = new AvroS3LogEntryRecord();
+
+ // Instantiate DatumWriter class
+ DatumWriter datumWriter =
+ new SpecificDatumWriter(AvroS3LogEntryRecord.class);
+ DataFileWriter dataFileWriter =
+ new DataFileWriter(datumWriter);
+
+ List longValues =
+ Arrays.asList(TURNAROUNDTIME_GROUP, BYTESSENT_GROUP,
+ OBJECTSIZE_GROUP, TOTALTIME_GROUP);
+
+ // Write avro data into a file in bucket destination path
+ Path avroFile = new Path(destPath, "AvroData.avro");
+
+ // Reading the file data using LineRecordReader
+ LineRecordReader lineRecordReader =
+ new LineRecordReader(fsDataInputStream, 0L, fileLength,
+ MAX_LINE_LENGTH);
+ LongWritable k = new LongWritable();
+ Text singleAuditLog = new Text();
+
+ try (FSDataOutputStream fsDataOutputStreamAvro = fileSystem.create(
+ avroFile)) {
+ // adding schema, output stream to DataFileWriter
+ dataFileWriter.create(AvroS3LogEntryRecord.getClassSchema(),
+ fsDataOutputStreamAvro);
+
+ // Parse each and every audit log from list of logs
+ while (lineRecordReader.next(k, singleAuditLog)) {
+ // Parse audit log except referrer header
+ HashMap auditLogMap =
+ parseAuditLog(singleAuditLog.toString());
+
+ String referrerHeader = auditLogMap.get("referrer");
+ if (referrerHeader == null || referrerHeader.equals("-")) {
+ LOG.debug("Log didn't parse : {}", referrerHeader);
+ continue;
+ }
+
+ // Parse only referrer header
+ HashMap referrerHeaderMap =
+ parseReferrerHeader(referrerHeader);
+
+ if (referrerHeaderMap.size() > 0) {
+ auditLogsParsed++;
+ }
+
+ // Insert data according to schema
+ for (Map.Entry entry : auditLogMap.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue().trim();
+
+ // if value == '-' and key is not in arraylist then put '-' or else '-1'
+ // if key is in arraylist of long values then parse the long value
+ // while parsing do it in try-catch block,
+ // in catch block need to log exception and set value as '-1'
+ try {
+ if (longValues.contains(key)) {
+ if (value.equals("-")) {
+ avroDataRecord.put(key, null);
+ } else {
+ avroDataRecord.put(key, Long.parseLong(value));
+ }
+ } else {
+ avroDataRecord.put(key, value);
+ }
+ } catch (Exception e) {
+ avroDataRecord.put(key, null);
+ }
+ }
+ avroDataRecord.put("referrerMap", referrerHeaderMap);
+ dataFileWriter.append(avroDataRecord);
+ }
+ dataFileWriter.flush();
+ }
+ }
+ // Write byte array into a file in destination path.
+ fsDataOutputStream.write(byteBuffer);
+ }
+ LOG.info("Successfully generated avro data");
+ }
+ return true;
+ }
+
+ public long getAuditLogsParsed() {
+ return auditLogsParsed;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/package-info.java
new file mode 100644
index 0000000000000..cbe907beaf7ca
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * It requires mapreduce on the class path.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditTool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditTool.java
new file mode 100644
index 0000000000000..38c8c2c319d0e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditTool.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Files;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+
+import static org.apache.hadoop.fs.s3a.audit.TestS3AAuditLogMergerAndParser.SAMPLE_LOG_ENTRY;
+
+/**
+ * This will implement tests on AuditTool class.
+ */
+public class TestAuditTool extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAuditTool.class);
+
+ private final AuditTool auditTool = new AuditTool();
+
+ /**
+ * Sample directories and files to test.
+ */
+ private File sampleFile;
+ private File sampleDir;
+ private File sampleDestDir;
+
+ /**
+ * Testing run method in AuditTool class by passing source and destination
+ * paths.
+ */
+ @Test
+ public void testRun() throws Exception {
+ sampleDir = Files.createTempDirectory("sampleDir").toFile();
+ sampleFile = File.createTempFile("sampleFile", ".txt", sampleDir);
+ try (FileWriter fw = new FileWriter(sampleFile)) {
+ fw.write(SAMPLE_LOG_ENTRY);
+ fw.flush();
+ }
+ sampleDestDir = Files.createTempDirectory("sampleDestDir").toFile();
+ Path logsPath = new Path(sampleDir.toURI());
+ Path destPath = new Path(sampleDestDir.toURI());
+ String[] args = {destPath.toString(), logsPath.toString()};
+ auditTool.run(args);
+ FileSystem fileSystem = destPath.getFileSystem(getConfiguration());
+ RemoteIterator listOfDestFiles =
+ fileSystem.listFiles(destPath, true);
+ Path expectedPath = new Path(destPath, "AvroData.avro");
+ fileSystem.open(expectedPath);
+
+ File avroFile = new File(expectedPath.toUri());
+
+ //DeSerializing the objects
+ DatumReader datumReader =
+ new SpecificDatumReader(AvroS3LogEntryRecord.class);
+
+ //Instantiating DataFileReader
+ DataFileReader dataFileReader =
+ new DataFileReader(avroFile, datumReader);
+ AvroS3LogEntryRecord avroS3LogEntryRecord = null;
+
+ while (dataFileReader.hasNext()) {
+ avroS3LogEntryRecord = dataFileReader.next(avroS3LogEntryRecord);
+ //verifying the bucket from generated avro data
+ assertEquals("the expected and actual results should be same",
+ "bucket-london", avroS3LogEntryRecord.get("bucket").toString());
+ //verifying the remoteip from generated avro data
+ assertEquals("the expected and actual results should be same",
+ "109.157.171.174", avroS3LogEntryRecord.get("remoteip").toString());
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestS3AAuditLogMergerAndParser.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestS3AAuditLogMergerAndParser.java
new file mode 100644
index 0000000000000..ed6aba7469e37
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestS3AAuditLogMergerAndParser.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.audit.mapreduce.S3AAuditLogMergerAndParser;
+
+/**
+ * This will implement different tests on S3AAuditLogMergerAndParser class.
+ */
+public class TestS3AAuditLogMergerAndParser extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestS3AAuditLogMergerAndParser.class);
+
+ /**
+ * A real log entry.
+ * This is derived from a real log entry on a test run.
+ * If this needs to be updated, please do it from a real log.
+ * Splitting this up across lines has a tendency to break things, so
+ * be careful making changes.
+ */
+ static final String SAMPLE_LOG_ENTRY =
+ "183c9826b45486e485693808f38e2c4071004bf5dfd4c3ab210f0a21a4000000"
+ + " bucket-london"
+ + " [13/May/2021:11:26:06 +0000]"
+ + " 109.157.171.174"
+ + " arn:aws:iam::152813717700:user/dev"
+ + " M7ZB7C4RTKXJKTM9"
+ + " REST.PUT.OBJECT"
+ + " fork-0001/test/testParseBrokenCSVFile"
+ + " \"PUT /fork-0001/test/testParseBrokenCSVFile HTTP/1.1\""
+ + " 200"
+ + " -"
+ + " -"
+ + " 794"
+ + " 55"
+ + " 17"
+ + " \"https://audit.example.org/hadoop/1/op_create/"
+ + "e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278/"
+ + "?op=op_create"
+ + "&p1=fork-0001/test/testParseBrokenCSVFile"
+ + "&pr=alice"
+ + "&ps=2eac5a04-2153-48db-896a-09bc9a2fd132"
+ + "&id=e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278&t0=154"
+ + "&fs=e8ede3c7-8506-4a43-8268-fe8fcbb510a4&t1=156&"
+ + "ts=1620905165700\""
+ + " \"Hadoop 3.4.0-SNAPSHOT, java/1.8.0_282 vendor/AdoptOpenJDK\""
+ + " -"
+ + " TrIqtEYGWAwvu0h1N9WJKyoqM0TyHUaY+ZZBwP2yNf2qQp1Z/0="
+ + " SigV4"
+ + " ECDHE-RSA-AES128-GCM-SHA256"
+ + " AuthHeader"
+ + " bucket-london.s3.eu-west-2.amazonaws.com"
+ + " TLSv1.2" + "\n";
+
+ static final String SAMPLE_LOG_ENTRY_1 =
+ "01234567890123456789"
+ + " bucket-london1"
+ + " [13/May/2021:11:26:06 +0000]"
+ + " 109.157.171.174"
+ + " arn:aws:iam::152813717700:user/dev"
+ + " M7ZB7C4RTKXJKTM9"
+ + " REST.PUT.OBJECT"
+ + " fork-0001/test/testParseBrokenCSVFile"
+ + " \"PUT /fork-0001/test/testParseBrokenCSVFile HTTP/1.1\""
+ + " 200"
+ + " -"
+ + " -"
+ + " 794"
+ + " 55"
+ + " 17"
+ + " \"https://audit.example.org/hadoop/1/op_create/"
+ + "e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278/"
+ + "?op=op_create"
+ + "&p1=fork-0001/test/testParseBrokenCSVFile"
+ + "&pr=alice"
+ + "&ps=2eac5a04-2153-48db-896a-09bc9a2fd132"
+ + "&id=e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278&t0=154"
+ + "&fs=e8ede3c7-8506-4a43-8268-fe8fcbb510a4&t1=156&"
+ + "ts=1620905165700\""
+ + " \"Hadoop 3.4.0-SNAPSHOT, java/1.8.0_282 vendor/AdoptOpenJDK\""
+ + " -"
+ + " TrIqtEYGWAwvu0h1N9WJKyoqM0TyHUaY+ZZBwP2yNf2qQp1Z/0="
+ + " SigV4"
+ + " ECDHE-RSA-AES128-GCM-SHA256"
+ + " AuthHeader"
+ + " bucket-london.s3.eu-west-2.amazonaws.com"
+ + " TLSv1.2" + "\n";
+
+ /**
+ * A real referrer header entry.
+ * This is derived from a real log entry on a test run.
+ * If this needs to be updated, please do it from a real log.
+ * Splitting this up across lines has a tendency to break things, so
+ * be careful making changes.
+ */
+ private final String sampleReferrerHeader =
+ "\"https://audit.example.org/hadoop/1/op_create/"
+ + "e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278/?"
+ + "op=op_create"
+ + "&p1=fork-0001/test/testParseBrokenCSVFile"
+ + "&pr=alice"
+ + "&ps=2eac5a04-2153-48db-896a-09bc9a2fd132"
+ + "&id=e8ede3c7-8506-4a43-8268-fe8fcbb510a4-00000278&t0=154"
+ + "&fs=e8ede3c7-8506-4a43-8268-fe8fcbb510a4&t1=156"
+ + "&ts=1620905165700\"";
+
+ /**
+ * Sample directories and files to test.
+ */
+ private File sampleFile;
+ private File sampleDir;
+ private File sampleDestDir;
+
+ private final S3AAuditLogMergerAndParser s3AAuditLogMergerAndParser =
+ new S3AAuditLogMergerAndParser();
+
+ /**
+ * Testing parseAuditLog method in parser class by passing sample audit log
+ * entry and checks if the log is parsed correctly.
+ */
+ @Test
+ public void testParseAuditLog() {
+ Map parseAuditLogResult =
+ s3AAuditLogMergerAndParser.parseAuditLog(SAMPLE_LOG_ENTRY);
+ assertNotNull("the result of parseAuditLogResult should be not null",
+ parseAuditLogResult);
+ //verifying the bucket from parsed audit log
+ assertEquals("the expected and actual results should be same",
+ "bucket-london", parseAuditLogResult.get("bucket"));
+ //verifying the remoteip from parsed audit log
+ assertEquals("the expected and actual results should be same",
+ "109.157.171.174", parseAuditLogResult.get("remoteip"));
+ }
+
+ /**
+ * Testing parseAuditLog method in parser class by passing empty string and
+ * null and checks if the result is empty.
+ */
+ @Test
+ public void testParseAuditLogEmptyAndNull() {
+ Map parseAuditLogResultEmpty =
+ s3AAuditLogMergerAndParser.parseAuditLog("");
+ assertTrue("the returned list should be empty for this test",
+ parseAuditLogResultEmpty.isEmpty());
+ Map parseAuditLogResultNull =
+ s3AAuditLogMergerAndParser.parseAuditLog(null);
+ assertTrue("the returned list should be empty for this test",
+ parseAuditLogResultNull.isEmpty());
+ }
+
+ /**
+ * Testing parseReferrerHeader method in parser class by passing
+ * sample referrer header taken from sample audit log and checks if the
+ * referrer header is parsed correctly.
+ */
+ @Test
+ public void testParseReferrerHeader() {
+ Map parseReferrerHeaderResult =
+ s3AAuditLogMergerAndParser.parseReferrerHeader(sampleReferrerHeader);
+ assertNotNull("the result of parseReferrerHeaderResult should be not null",
+ parseReferrerHeaderResult);
+ //verifying the path 'p1' from parsed referrer header
+ assertEquals("the expected and actual results should be same",
+ "fork-0001/test/testParseBrokenCSVFile",
+ parseReferrerHeaderResult.get("p1"));
+ //verifying the principal 'pr' from parsed referrer header
+ assertEquals("the expected and actual results should be same", "alice",
+ parseReferrerHeaderResult.get("pr"));
+ }
+
+ /**
+ * Testing parseReferrerHeader method in parser class by passing empty
+ * string and null string and checks if the result is empty.
+ */
+ @Test
+ public void testParseReferrerHeaderEmptyAndNull() {
+ Map parseReferrerHeaderResultEmpty =
+ s3AAuditLogMergerAndParser.parseReferrerHeader("");
+ assertTrue("the returned list should be empty for this test",
+ parseReferrerHeaderResultEmpty.isEmpty());
+ Map parseReferrerHeaderResultNull =
+ s3AAuditLogMergerAndParser.parseReferrerHeader(null);
+ assertTrue("the returned list should be empty for this test",
+ parseReferrerHeaderResultNull.isEmpty());
+ }
+
+ /**
+ * Testing mergeAndParseAuditLogFiles method by passing filesystem, source
+ * and destination paths.
+ */
+ @Test
+ public void testMergeAndParseAuditLogFiles() throws IOException {
+ sampleDir = Files.createTempDirectory("sampleDir").toFile();
+ sampleFile = File.createTempFile("sampleFile", ".txt", sampleDir);
+ try (FileWriter fw = new FileWriter(sampleFile)) {
+ fw.write(SAMPLE_LOG_ENTRY);
+ fw.write(SAMPLE_LOG_ENTRY_1);
+ fw.flush();
+ }
+ sampleDestDir = Files.createTempDirectory("sampleDestDir").toFile();
+ Path logsPath = new Path(sampleDir.toURI());
+ Path destPath = new Path(sampleDestDir.toURI());
+ FileSystem fileSystem = logsPath.getFileSystem(getConfiguration());
+ boolean mergeAndParseResult =
+ s3AAuditLogMergerAndParser.mergeAndParseAuditLogFiles(fileSystem,
+ logsPath, destPath);
+ assertTrue("the result should be true", mergeAndParseResult);
+ }
+
+ /**
+ * Testing mergeAndParseAuditLogCounter method by passing filesystem,
+ * sample files source and destination paths.
+ */
+ @Test
+ public void testMergeAndParseAuditLogCounter() throws IOException {
+ sampleDir = Files.createTempDirectory("sampleDir").toFile();
+ File firstSampleFile =
+ File.createTempFile("sampleFile1", ".txt", sampleDir);
+ File secondSampleFile =
+ File.createTempFile("sampleFile2", ".txt", sampleDir);
+ File thirdSampleFile =
+ File.createTempFile("sampleFile3", ".txt", sampleDir);
+ try (FileWriter fw = new FileWriter(firstSampleFile);
+ FileWriter fw1 = new FileWriter(secondSampleFile);
+ FileWriter fw2 = new FileWriter(thirdSampleFile)) {
+ fw.write(SAMPLE_LOG_ENTRY);
+ fw1.write(SAMPLE_LOG_ENTRY);
+ fw2.write(SAMPLE_LOG_ENTRY_1);
+ }
+ sampleDestDir = Files.createTempDirectory("sampleDestDir").toFile();
+ Path logsPath = new Path(sampleDir.toURI());
+ Path destPath = new Path(sampleDestDir.toURI());
+ FileSystem fileSystem = logsPath.getFileSystem(getConfiguration());
+ boolean mergeAndParseResult =
+ s3AAuditLogMergerAndParser.mergeAndParseAuditLogFiles(fileSystem,
+ logsPath, destPath);
+ assertTrue("the result should be true", mergeAndParseResult);
+
+ long noOfAuditLogsParsed = s3AAuditLogMergerAndParser.getAuditLogsParsed();
+ assertEquals("the expected and actual results should be same",
+ 3, noOfAuditLogsParsed);
+ }
+}