From 90515a2d088846bd095f918d28cee91de083addd Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 26 Jul 2024 17:10:22 +0000 Subject: [PATCH 1/8] NIFI-12709 Added ability to get more attributes for zip files as well as created new attributes to get for both tar and zip files. --- .../processors/standard/UnpackContent.java | 125 +++++++++++++----- .../standard/TestUnpackContent.java | 9 +- 2 files changed, 96 insertions(+), 38 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 492c488114e1..a588efa8e9d6 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -67,9 +67,12 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.file.Path; +import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -99,12 +102,17 @@ @WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because " + "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile"), - @WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the unpacked file was last modified (tar only)."), - @WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. This attribute holds always the same value as file.lastModifiedTime (tar only)."), - @WritesAttribute(attribute = "file.owner", description = "The owner of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.group", description = "The group owner of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the unpacked file (tar only)"), - @WritesAttribute(attribute = "file.encryptionMethod", description = "The encryption method for entries in Zip archives")}) + @WritesAttribute(attribute = UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and time that the unpacked file was last modified (tar and zip only)."), + @WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, description = "The date and time that the file was created. For encrypted zip files this attribute" + + " always holds the same value as " + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted zip files if available it will be returned otherwise" + + " this will be the same value as" + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."), + @WritesAttribute(attribute = UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and time the file's metadata changed (tar only)."), + @WritesAttribute(attribute = UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time the file was last accessed (tar and unencrypted zip only)"), + @WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE, description = "The owner of the unpacked file (tar only)"), + @WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE, description = "The group owner of the unpacked file (tar only)"), + @WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE, description = "The uncompressed size of the unpacked file (tar and zip only)"), + @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, description = "The read/write/execute permissions of the unpacked file (tar and unencrypted zip files)"), + @WritesAttribute(attribute = UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption method for entries in Zip archives")}) @SeeAlso(MergeContent.class) @UseCase( description = "Unpack Zip containing filenames with special characters, created on Windows with filename charset 'Cp437' or 'IBM437'.", @@ -131,8 +139,11 @@ public class UnpackContent extends AbstractProcessor { public static final String FILE_LAST_MODIFIED_TIME_ATTRIBUTE = "file.lastModifiedTime"; public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; + public static final String FILE_LAST_METADATA_CHANGE_ATTRIBUTE = "file.lastMetadataChange"; + public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime"; public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; public static final String FILE_GROUP_ATTRIBUTE = "file.group"; + public static final String FILE_SIZE_ATTRIBUTE = "file.size"; public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; public static final String FILE_ENCRYPTION_METHOD_ATTRIBUTE = "file.encryptionMethod"; @@ -387,23 +398,37 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li FlowFile unpackedFile = session.create(source); try { - final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), filePathString); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode())); + attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName())); + attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName())); + attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(tarEntry.getRealSize())); + String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + + if (tarEntry.getCreationTime() != null) { + timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant()); + } + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); - unpackedFile = session.putAllAttributes(unpackedFile, Map.of( - CoreAttributes.FILENAME.key(), file.getName(), - CoreAttributes.PATH.key(), filePathString, - CoreAttributes.MIME_TYPE.key(), OCTET_STREAM, + if (tarEntry.getStatusChangeTime() != null) { + timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant()); + attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, timeAsString); + } - FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()), - FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()), - FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()), + if (tarEntry.getLastAccessTime() != null) { + timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant()); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString); + } - FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString, - FILE_CREATION_TIME_ATTRIBUTE, timeAsString, + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); - FRAGMENT_ID, fragmentId, - FRAGMENT_INDEX, String.valueOf(++fragmentCount) - )); + unpackedFile = session.putAllAttributes(unpackedFile, attributes); final long fileSize = tarEntry.getSize(); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize)); @@ -437,6 +462,11 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li } } + private record ZipInputStreamMetadata(boolean directory, String zipEntryName, EncryptionMethod encryptionMethod, + Instant creationTime, Instant lastModifiedDate, Instant lastAccessDate, int mode, + long uncompressedSize) { + } + private abstract static class ZipInputStreamCallback implements InputStreamCallback { private static final String PATH_SEPARATOR = "/"; @@ -470,22 +500,40 @@ protected boolean isFileEntryMatched(final boolean directory, final String fileN return !directory && (fileFilter == null || fileFilter.matcher(fileName).find()); } - protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) { - if (isFileEntryMatched(directory, zipEntryName)) { - final File file = new File(zipEntryName); + protected void processEntry(final InputStream zipInputStream, ZipInputStreamMetadata metadata) { + if (isFileEntryMatched(metadata.directory(), metadata.zipEntryName())) { + final File file = new File(metadata.zipEntryName()); final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent(); FlowFile unpackedFile = session.create(sourceFlowFile); try { - unpackedFile = session.putAllAttributes(unpackedFile, Map.of( - CoreAttributes.FILENAME.key(), file.getName(), - CoreAttributes.PATH.key(), parentDirectory, - CoreAttributes.MIME_TYPE.key(), OCTET_STREAM, - FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString(), - - FRAGMENT_ID, fragmentId, - FRAGMENT_INDEX, String.valueOf(++fragmentIndex) - )); + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), file.getName()); + attributes.put(CoreAttributes.PATH.key(), parentDirectory); + attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, metadata.encryptionMethod().toString()); + attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(metadata.uncompressedSize())); + String timeAsString = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + + if (metadata.creationTime() != null) { + timeAsString = DATE_TIME_FORMATTER.format(metadata.creationTime()); + } + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); + + if (metadata.lastAccessDate() != null) { + timeAsString = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString); + } + + if (metadata.mode() > -1) { + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(metadata.mode())); + } + + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex)); + + unpackedFile = session.putAllAttributes(unpackedFile, attributes); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream)); } finally { unpacked.add(unpackedFile); @@ -519,7 +567,14 @@ public void process(final InputStream inputStream) throws IOException { filenameEncoding.toString(), true, allowStoredEntriesWithDataDescriptor)) { ZipArchiveEntry zipEntry; while ((zipEntry = zipInputStream.getNextEntry()) != null) { - processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE); + // NOTE: Per javadocs, ZipArchiveEntry can return -1 for getTime() if its not specified + // and getLastAccessTime() can return null if it is not specified. + Instant creationTime = zipEntry.getTime() > 0 ? new Date(zipEntry.getTime()).toInstant() : null; + Instant lastModifiedDate = zipEntry.getLastModifiedDate().toInstant(); + Instant lastAccessTime = zipEntry.getLastAccessTime() != null ? zipEntry.getLastAccessTime().toInstant() : null; + ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getName(), + EncryptionMethod.NONE, creationTime, lastModifiedDate, lastAccessTime, zipEntry.getUnixMode(), zipEntry.getSize()); + processEntry(zipInputStream, zipInputStreamMetadata); } } } @@ -548,7 +603,11 @@ public void process(final InputStream inputStream) throws IOException { try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password, filenameEncoding)) { LocalFileHeader zipEntry; while ((zipEntry = zipInputStream.getNextEntry()) != null) { - processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod()); + //NOTE: LocalFileHeader has no methods to return creation time and the mode. + Instant lastModifiedDate = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null; + ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getFileName(), + zipEntry.getEncryptionMethod(), null, lastModifiedDate, null, -1, zipEntry.getUncompressedSize()); + processEntry(zipInputStream, zipInputStreamMetadata); } } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index fc2b0226a7e4..770b54250c54 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -41,6 +41,7 @@ import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT; import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -88,13 +89,11 @@ public void testTar() throws IOException { assertEquals("rw-r--r--", flowFile.getAttribute("file.permissions")); assertEquals("jmcarey", flowFile.getAttribute("file.owner")); assertEquals("mkpasswd", flowFile.getAttribute("file.group")); - String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime"); - - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString); + String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime"); + assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString)); String creationTimeAsString = flowFile.getAttribute("file.creationTime"); - - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString); + assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString)); assertTrue(Files.exists(path)); From 146d8215473d9a9fa264c044e184d15cb1156ce3 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Mon, 29 Jul 2024 21:29:30 +0000 Subject: [PATCH 2/8] NIFI-12709 Added ability to retrieve last modified date for the creation time of an encrypted zip. --- .../nifi/processors/standard/UnpackContent.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index a588efa8e9d6..4c8b868b7e77 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -513,13 +513,20 @@ protected void processEntry(final InputStream zipInputStream, ZipInputStreamMeta attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, metadata.encryptionMethod().toString()); attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(metadata.uncompressedSize())); - String timeAsString = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); - attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + + String timeAsString = null; + if (metadata.lastModifiedDate() != null) { + timeAsString = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + } if (metadata.creationTime() != null) { timeAsString = DATE_TIME_FORMATTER.format(metadata.creationTime()); } - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); + + if (timeAsString != null) { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); + } if (metadata.lastAccessDate() != null) { timeAsString = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); @@ -606,7 +613,7 @@ public void process(final InputStream inputStream) throws IOException { //NOTE: LocalFileHeader has no methods to return creation time and the mode. Instant lastModifiedDate = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null; ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getFileName(), - zipEntry.getEncryptionMethod(), null, lastModifiedDate, null, -1, zipEntry.getUncompressedSize()); + zipEntry.getEncryptionMethod(), lastModifiedDate, lastModifiedDate, null, -1, zipEntry.getUncompressedSize()); processEntry(zipInputStream, zipInputStreamMetadata); } } From 82acded223c2e72387645546fdbcf8cc83743396 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Mon, 29 Jul 2024 21:56:46 +0000 Subject: [PATCH 3/8] NIFI-12709 Modified write attributes wording. --- .../org/apache/nifi/processors/standard/UnpackContent.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 4c8b868b7e77..eb8f8dc66a7d 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -107,11 +107,11 @@ " always holds the same value as " + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted zip files if available it will be returned otherwise" + " this will be the same value as" + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."), @WritesAttribute(attribute = UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and time the file's metadata changed (tar only)."), - @WritesAttribute(attribute = UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time the file was last accessed (tar and unencrypted zip only)"), + @WritesAttribute(attribute = UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time the file was last accessed (tar and unencrypted zip files only)"), @WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE, description = "The owner of the unpacked file (tar only)"), @WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE, description = "The group owner of the unpacked file (tar only)"), @WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE, description = "The uncompressed size of the unpacked file (tar and zip only)"), - @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, description = "The read/write/execute permissions of the unpacked file (tar and unencrypted zip files)"), + @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, description = "The read/write/execute permissions of the unpacked file (tar and unencrypted zip files only)"), @WritesAttribute(attribute = UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption method for entries in Zip archives")}) @SeeAlso(MergeContent.class) @UseCase( From 6b8f23f0bb3438d08817a4b1adec9d5f0b6c3e10 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 9 Aug 2024 16:53:50 +0000 Subject: [PATCH 4/8] NIFI-12709 Added new String variables for formattting of various dates and added more testing to ensure the presence of added attributes. --- .../processors/standard/UnpackContent.java | 19 +++++++++---------- .../standard/TestUnpackContent.java | 16 ++++++++++++---- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index eb8f8dc66a7d..5baf3334f0d2 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -514,23 +514,22 @@ protected void processEntry(final InputStream zipInputStream, ZipInputStreamMeta attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, metadata.encryptionMethod().toString()); attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(metadata.uncompressedSize())); - String timeAsString = null; + String lastModifiedDate = null; if (metadata.lastModifiedDate() != null) { - timeAsString = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); - attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + lastModifiedDate = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModifiedDate); } if (metadata.creationTime() != null) { - timeAsString = DATE_TIME_FORMATTER.format(metadata.creationTime()); - } - - if (timeAsString != null) { - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); + final String creationTime = DATE_TIME_FORMATTER.format(metadata.creationTime()); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); + } else if (lastModifiedDate != null) { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModifiedDate); } if (metadata.lastAccessDate() != null) { - timeAsString = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); - attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString); + final String lastAccessDate = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccessDate); } if (metadata.mode() > -1) { diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index 770b54250c54..9dfb4d0b66d1 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -83,12 +83,17 @@ public void testTar() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); for (final MockFlowFile flowFile : unpacked) { + System.err.println(flowFile.getAttributes()); + assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(UnpackContent.FRAGMENT_ID, UnpackContent.FRAGMENT_INDEX, + UnpackContent.FRAGMENT_COUNT, UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE))); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(folder).resolve(filename); - assertEquals("rw-r--r--", flowFile.getAttribute("file.permissions")); - assertEquals("jmcarey", flowFile.getAttribute("file.owner")); - assertEquals("mkpasswd", flowFile.getAttribute("file.group")); + + assertEquals("rw-r--r--", flowFile.getAttribute(UnpackContent.FILE_PERMISSIONS_ATTRIBUTE)); + assertEquals("jmcarey", flowFile.getAttribute(UnpackContent.FILE_OWNER_ATTRIBUTE)); + assertEquals("mkpasswd", flowFile.getAttribute(UnpackContent.FILE_GROUP_ATTRIBUTE)); String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime"); assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString)); @@ -182,6 +187,10 @@ public void testZip() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); for (final MockFlowFile flowFile : unpacked) { + assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(CoreAttributes.FILENAME.key(), CoreAttributes.PATH.key(), + UnpackContent.FRAGMENT_ID, UnpackContent.FRAGMENT_INDEX, UnpackContent.FRAGMENT_COUNT, + UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE, UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, UnpackContent.FILE_PERMISSIONS_ATTRIBUTE))); final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(folder).resolve(filename); @@ -217,7 +226,6 @@ public void testInvalidZip() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE); for (final MockFlowFile flowFile : unpacked) { final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - // final String folder = flowFile.getAttribute(CoreAttributes.PATH.key()); final Path path = dataPath.resolve(filename); assertTrue(Files.exists(path)); From 4f3b7e9941ee4bae4d2fe7551faceec456bfa366 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 9 Aug 2024 16:55:58 +0000 Subject: [PATCH 5/8] NIFI-12709 Removed System.out.println statement --- .../org/apache/nifi/processors/standard/TestUnpackContent.java | 1 - 1 file changed, 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index 9dfb4d0b66d1..95b4eb76e4d6 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -83,7 +83,6 @@ public void testTar() throws IOException { final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); for (final MockFlowFile flowFile : unpacked) { - System.err.println(flowFile.getAttributes()); assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(UnpackContent.FRAGMENT_ID, UnpackContent.FRAGMENT_INDEX, UnpackContent.FRAGMENT_COUNT, UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE))); From 0b6b5d458d3653cb7dd0b2abc66e0e8b7b939ea4 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 9 Aug 2024 17:00:42 +0000 Subject: [PATCH 6/8] NIFI-12709 Removed extra space. --- .../java/org/apache/nifi/processors/standard/UnpackContent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 5baf3334f0d2..739b34f3db7b 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -528,7 +528,7 @@ protected void processEntry(final InputStream zipInputStream, ZipInputStreamMeta } if (metadata.lastAccessDate() != null) { - final String lastAccessDate = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); + final String lastAccessDate = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccessDate); } From a4e85818952743beed42b67915014717c8680c21 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 9 Aug 2024 17:19:10 +0000 Subject: [PATCH 7/8] NIFI-12709 Removed extra lines and added string variables for formatting tar file times. --- .../processors/standard/UnpackContent.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 739b34f3db7b..89b66faf193f 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -402,27 +402,28 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li attributes.put(CoreAttributes.FILENAME.key(), file.getName()); attributes.put(CoreAttributes.PATH.key(), filePathString); attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); - attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode())); attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName())); attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName())); attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(tarEntry.getRealSize())); - String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); - attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString); + String lastModified = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant()); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModified); if (tarEntry.getCreationTime() != null) { - timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant()); + final String creationTime = DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant()); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); + } else { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModified); } - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString); if (tarEntry.getStatusChangeTime() != null) { - timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant()); - attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, timeAsString); + final String metadataChangeTime = DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant()); + attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, metadataChangeTime); } if (tarEntry.getLastAccessTime() != null) { - timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant()); - attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString); + final String lastAccesTime = DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant()); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccesTime); } attributes.put(FRAGMENT_ID, fragmentId); From 369bacc8975e9183ad87d8ee15c56f78b4576f32 Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 16 Aug 2024 17:04:24 +0000 Subject: [PATCH 8/8] NIFI-12709 Removed ZipInputStreamMetadata record and replaced it with methods to add zip entry attributes. --- .../processors/standard/UnpackContent.java | 102 ++++++++++-------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 89b66faf193f..b831751b6e34 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -384,6 +384,7 @@ public TarUnpacker(Pattern fileFilter) { @Override public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { final String fragmentId = UUID.randomUUID().toString(); + final Map attributes = new HashMap<>(); session.read(source, inputStream -> { int fragmentCount = 0; try (final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inputStream))) { @@ -398,7 +399,6 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li FlowFile unpackedFile = session.create(source); try { - final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), file.getName()); attributes.put(CoreAttributes.PATH.key(), filePathString); attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); @@ -433,6 +433,7 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li final long fileSize = tarEntry.getSize(); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize)); + attributes.clear(); } finally { unpacked.add(unpackedFile); } @@ -463,11 +464,6 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li } } - private record ZipInputStreamMetadata(boolean directory, String zipEntryName, EncryptionMethod encryptionMethod, - Instant creationTime, Instant lastModifiedDate, Instant lastAccessDate, int mode, - long uncompressedSize) { - } - private abstract static class ZipInputStreamCallback implements InputStreamCallback { private static final String PATH_SEPARATOR = "/"; @@ -501,45 +497,18 @@ protected boolean isFileEntryMatched(final boolean directory, final String fileN return !directory && (fileFilter == null || fileFilter.matcher(fileName).find()); } - protected void processEntry(final InputStream zipInputStream, ZipInputStreamMetadata metadata) { - if (isFileEntryMatched(metadata.directory(), metadata.zipEntryName())) { - final File file = new File(metadata.zipEntryName()); + protected void processEntry(final InputStream zipInputStream, boolean directory, String zipEntryName, Map attributes) { + if (isFileEntryMatched(directory, zipEntryName)) { + final File file = new File(zipEntryName); final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent(); FlowFile unpackedFile = session.create(sourceFlowFile); try { - final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), file.getName()); attributes.put(CoreAttributes.PATH.key(), parentDirectory); attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); - attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, metadata.encryptionMethod().toString()); - attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(metadata.uncompressedSize())); - - String lastModifiedDate = null; - if (metadata.lastModifiedDate() != null) { - lastModifiedDate = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate()); - attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModifiedDate); - } - - if (metadata.creationTime() != null) { - final String creationTime = DATE_TIME_FORMATTER.format(metadata.creationTime()); - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); - } else if (lastModifiedDate != null) { - attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModifiedDate); - } - - if (metadata.lastAccessDate() != null) { - final String lastAccessDate = DATE_TIME_FORMATTER.format(metadata.lastAccessDate()); - attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccessDate); - } - - if (metadata.mode() > -1) { - attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(metadata.mode())); - } - attributes.put(FRAGMENT_ID, fragmentId); attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex)); - unpackedFile = session.putAllAttributes(unpackedFile, attributes); unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream)); } finally { @@ -547,6 +516,40 @@ protected void processEntry(final InputStream zipInputStream, ZipInputStreamMeta } } } + + protected void addFileSizeAttribute(long fileSize, Map attributes) { + attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(fileSize)); + } + + protected void addEncryptionMethodAttribute(EncryptionMethod encryptionMethod, Map attributes) { + attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString()); + } + + protected void addFilePermissionsAttribute(int mode, Map attributes) { + if (mode > -1) { + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(mode)); + } + } + + protected void addZipEntryTimeAttributes(Instant lastModified, Instant creation, Instant lastAccess, Map attributes) { + String lastModifiedDate = null; + if (lastModified != null) { + lastModifiedDate = DATE_TIME_FORMATTER.format(lastModified); + attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, lastModifiedDate); + } + + if (creation != null) { + final String creationTime = DATE_TIME_FORMATTER.format(creation); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime); + } else if (lastModifiedDate != null) { + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, lastModifiedDate); + } + + if (lastAccess != null) { + final String lastAccessDate = DATE_TIME_FORMATTER.format(lastAccess); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccessDate); + } + } } private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback { @@ -573,15 +576,19 @@ public void process(final InputStream inputStream) throws IOException { try (final ZipArchiveInputStream zipInputStream = new ZipArchiveInputStream(new BufferedInputStream(inputStream), filenameEncoding.toString(), true, allowStoredEntriesWithDataDescriptor)) { ZipArchiveEntry zipEntry; + final Map attributes = new HashMap<>(); while ((zipEntry = zipInputStream.getNextEntry()) != null) { + addEncryptionMethodAttribute(EncryptionMethod.NONE, attributes); + addFileSizeAttribute(zipEntry.getSize(), attributes); + addFilePermissionsAttribute(zipEntry.getUnixMode(), attributes); // NOTE: Per javadocs, ZipArchiveEntry can return -1 for getTime() if its not specified // and getLastAccessTime() can return null if it is not specified. - Instant creationTime = zipEntry.getTime() > 0 ? new Date(zipEntry.getTime()).toInstant() : null; - Instant lastModifiedDate = zipEntry.getLastModifiedDate().toInstant(); - Instant lastAccessTime = zipEntry.getLastAccessTime() != null ? zipEntry.getLastAccessTime().toInstant() : null; - ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getName(), - EncryptionMethod.NONE, creationTime, lastModifiedDate, lastAccessTime, zipEntry.getUnixMode(), zipEntry.getSize()); - processEntry(zipInputStream, zipInputStreamMetadata); + Instant lastModified = zipEntry.getLastModifiedDate().toInstant(); + Instant creation = zipEntry.getTime() > 0 ? new Date(zipEntry.getTime()).toInstant() : null; + Instant lastAccess = zipEntry.getLastAccessTime() != null ? zipEntry.getLastAccessTime().toInstant() : null; + addZipEntryTimeAttributes(lastModified, creation, lastAccess, attributes); + processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), attributes); + attributes.clear(); } } } @@ -609,12 +616,15 @@ private EncryptedZipInputStreamCallback( public void process(final InputStream inputStream) throws IOException { try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password, filenameEncoding)) { LocalFileHeader zipEntry; + final Map attributes = new HashMap<>(); while ((zipEntry = zipInputStream.getNextEntry()) != null) { //NOTE: LocalFileHeader has no methods to return creation time and the mode. - Instant lastModifiedDate = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null; - ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getFileName(), - zipEntry.getEncryptionMethod(), lastModifiedDate, lastModifiedDate, null, -1, zipEntry.getUncompressedSize()); - processEntry(zipInputStream, zipInputStreamMetadata); + addEncryptionMethodAttribute(zipEntry.getEncryptionMethod(), attributes); + addFileSizeAttribute(zipEntry.getUncompressedSize(), attributes); + Instant lastModified = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null; + addZipEntryTimeAttributes(lastModified, null, null, attributes); + processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), attributes); + attributes.clear(); } } }