diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 492c488114e1..b831751b6e34 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -67,9 +67,12 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.file.Path; +import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -99,12 +102,17 @@ @WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because " + "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile"), - @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the unpacked file was last modified (tar only)."), - @WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. This attribute holds always the same value as file.lastModifiedTime (tar only)."), - @WritesAttribute(attribute = "file.owner", description = "The owner of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.group", description = "The group owner of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.encryptionMethod", description = "The encryption method for entries in Zip archives")}) + @WritesAttribute(attribute = UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and time that the unpacked file was last modified (tar and zip only)."), + @WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, description = "The date and time that the file was created. For encrypted zip files this attribute" + + " always holds the same value as " + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted zip files if available it will be returned otherwise" + + " this will be the same value as" + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."), + @WritesAttribute(attribute = UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and time the file's metadata changed (tar only)."), + @WritesAttribute(attribute = UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time the file was last accessed (tar and unencrypted zip files only)"), + @WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE, description = "The owner of the unpacked file (tar only)"), + @WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE, description = "The group owner of the unpacked file (tar only)"), + @WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE, description = "The uncompressed size of the unpacked file (tar and zip only)"), + @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, description = "The read/write/execute permissions of the unpacked file (tar and unencrypted zip files only)"), + @WritesAttribute(attribute = UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption method for entries in Zip archives")}) @SeeAlso(MergeContent.class) @UseCase( description = "Unpack Zip containing filenames with special characters, created on Windows with filename charset 'Cp437' or 'IBM437'.", @@ -131,8 +139,11 @@ public class UnpackContent extends AbstractProcessor { public static final String FILE_LAST_MODIFIED_TIME_ATTRIBUTE = "file.lastModifiedTime"; public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; + public static final String FILE_LAST_METADATA_CHANGE_ATTRIBUTE = "file.lastMetadataChange"; + public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime"; public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; public static final String FILE_GROUP_ATTRIBUTE = "file.group"; + public static final String FILE_SIZE_ATTRIBUTE = "file.size"; public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; public static final String FILE_ENCRYPTION_METHOD_ATTRIBUTE = "file.encryptionMethod"; @@ -373,6 +384,7 @@ public TarUnpacker(Pattern fileFilter) { @Override public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { final String fragmentId = UUID.randomUUID().toString(); + final Map attributes = new HashMap<>(); session.read(source, inputStream -> { int fragmentCount = 0; try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inputStream))) { @@ -387,26 +399,41 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li FlowFile unpackedFile = session.create(source); try { - final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), filePathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode())); + attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName())); + attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName())); + attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(tarEntry.getRealSize())); + String lastModified = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModified); + + if (tarEntry.getCreationTime() != null) { + final String creationTime = DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant()); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); + } else { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModified); + } - unpackedFile = session.putAllAttributes(unpackedFile, Map.of( - CoreAttributes.FILENAME.key(), file.getName(), - CoreAttributes.PATH.key(), filePathString, - CoreAttributes.MIME_TYPE.key(), OCTET_STREAM, + if (tarEntry.getStatusChangeTime() != null) { + final String metadataChangeTime = DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant()); + attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, metadataChangeTime); + } - FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()), - FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()), - FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()), + if (tarEntry.getLastAccessTime() != null) { + final String lastAccesTime = DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant()); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccesTime); + } - FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString, - FILE_CREATION_TIME_ATTRIBUTE, timeAsString, + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); - FRAGMENT_ID, fragmentId, - FRAGMENT_INDEX, String.valueOf(++fragmentCount) - )); + unpackedFile = session.putAllAttributes(unpackedFile, attributes); final long fileSize = tarEntry.getSize(); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize)); + attributes.clear(); } finally { unpacked.add(unpackedFile); } @@ -470,28 +497,59 @@ protected boolean isFileEntryMatched(final boolean directory, final String fileN return !directory && (fileFilter == null || fileFilter.matcher(fileName).find()); } - protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) { + protected void processEntry(final InputStream zipInputStream, boolean directory, String zipEntryName, Map attributes) { if (isFileEntryMatched(directory, zipEntryName)) { final File file = new File(zipEntryName); final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent(); FlowFile unpackedFile = session.create(sourceFlowFile); try { - unpackedFile = session.putAllAttributes(unpackedFile, Map.of( - CoreAttributes.FILENAME.key(), file.getName(), - CoreAttributes.PATH.key(), parentDirectory, - CoreAttributes.MIME_TYPE.key(), OCTET_STREAM, - FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString(), - - FRAGMENT_ID, fragmentId, - FRAGMENT_INDEX, String.valueOf(++fragmentIndex) - )); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), parentDirectory); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex)); + unpackedFile = session.putAllAttributes(unpackedFile, attributes); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream)); } finally { unpacked.add(unpackedFile); } } } + + protected void addFileSizeAttribute(long fileSize, Map attributes) { + attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(fileSize)); + } + + protected void addEncryptionMethodAttribute(EncryptionMethod encryptionMethod, Map attributes) { + attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString()); + } + + protected void addFilePermissionsAttribute(int mode, Map attributes) { + if (mode > -1) { + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(mode)); + } + } + + protected void addZipEntryTimeAttributes(Instant lastModified, Instant creation, Instant lastAccess, Map attributes) { + String lastModifiedDate = null; + if (lastModified != null) { + lastModifiedDate = DATE_TIME_FORMATTER.format(lastModified); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModifiedDate); + } + + if (creation != null) { + final String creationTime = DATE_TIME_FORMATTER.format(creation); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); + } else if (lastModifiedDate != null) { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModifiedDate); + } + + if (lastAccess != null) { + final String lastAccessDate = DATE_TIME_FORMATTER.format(lastAccess); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccessDate); + } + } } private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback { @@ -518,8 +576,19 @@ public void process(final InputStream inputStream) throws IOException { try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream), filenameEncoding.toString(), true, allowStoredEntriesWithDataDescriptor)) { ZipArchiveEntry zipEntry; + final Map attributes = new HashMap<>(); while ((zipEntry = zipInputStream.getNextEntry()) != null) { - processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE); + addEncryptionMethodAttribute(EncryptionMethod.NONE, attributes); + addFileSizeAttribute(zipEntry.getSize(), attributes); + addFilePermissionsAttribute(zipEntry.getUnixMode(), attributes); + // NOTE: Per javadocs, ZipArchiveEntry can return -1 for getTime() if its not specified + // and getLastAccessTime() can return null if it is not specified. + Instant lastModified = zipEntry.getLastModifiedDate().toInstant(); + Instant creation = zipEntry.getTime() > 0 ? new Date(zipEntry.getTime()).toInstant() : null; + Instant lastAccess = zipEntry.getLastAccessTime() != null ? zipEntry.getLastAccessTime().toInstant() : null; + addZipEntryTimeAttributes(lastModified, creation, lastAccess, attributes); + processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), attributes); + attributes.clear(); } } } @@ -547,8 +616,15 @@ private EncryptedZipInputStreamCallback( public void process(final InputStream inputStream) throws IOException { try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password, filenameEncoding)) { LocalFileHeader zipEntry; + final Map attributes = new HashMap<>(); while ((zipEntry = zipInputStream.getNextEntry()) != null) { - processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod()); + //NOTE: LocalFileHeader has no methods to return creation time and the mode. + addEncryptionMethodAttribute(zipEntry.getEncryptionMethod(), attributes); + addFileSizeAttribute(zipEntry.getUncompressedSize(), attributes); + Instant lastModified = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null; + addZipEntryTimeAttributes(lastModified, null, null, attributes); + processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), attributes); + attributes.clear(); } } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index fc2b0226a7e4..95b4eb76e4d6 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -41,6 +41,7 @@ import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT; import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -82,19 +83,21 @@ public void testTar() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); for (final MockFlowFile flowFile : unpacked) { + assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(UnpackContent.FRAGMENT_ID, UnpackContent.FRAGMENT_INDEX, + UnpackContent.FRAGMENT_COUNT, UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE))); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(folder).resolve(filename); - assertEquals("rw-r--r--", flowFile.getAttribute("file.permissions")); - assertEquals("jmcarey", flowFile.getAttribute("file.owner")); - assertEquals("mkpasswd", flowFile.getAttribute("file.group")); - String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime"); - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString); + assertEquals("rw-r--r--", flowFile.getAttribute(UnpackContent.FILE_PERMISSIONS_ATTRIBUTE)); + assertEquals("jmcarey", flowFile.getAttribute(UnpackContent.FILE_OWNER_ATTRIBUTE)); + assertEquals("mkpasswd", flowFile.getAttribute(UnpackContent.FILE_GROUP_ATTRIBUTE)); + String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime"); + assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString)); String creationTimeAsString = flowFile.getAttribute("file.creationTime"); - - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString); + assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString)); assertTrue(Files.exists(path)); @@ -183,6 +186,10 @@ public void testZip() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); for (final MockFlowFile flowFile : unpacked) { + assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(CoreAttributes.FILENAME.key(), CoreAttributes.PATH.key(), + UnpackContent.FRAGMENT_ID, UnpackContent.FRAGMENT_INDEX, UnpackContent.FRAGMENT_COUNT, + UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE, UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, UnpackContent.FILE_PERMISSIONS_ATTRIBUTE))); final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(folder).resolve(filename); @@ -218,7 +225,6 @@ public void testInvalidZip() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE); for (final MockFlowFile flowFile : unpacked) { final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - // final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(filename); assertTrue(Files.exists(path));