Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12709 Added ability to get more attributes for zip files as well as created new attributes to get for both tar and zip files. #9122

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -99,12 +102,17 @@
@WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because "
+ "the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile"),
@WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the unpacked file was last modified (tar only)."),
@WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. This attribute holds always the same value as file.lastModifiedTime (tar only)."),
@WritesAttribute(attribute = "file.owner", description = "The owner of the unpacked file (tar only)"),
@WritesAttribute(attribute = "file.group", description = "The group owner of the unpacked file (tar only)"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the unpacked file (tar only)"),
@WritesAttribute(attribute = "file.encryptionMethod", description = "The encryption method for entries in Zip archives")})
@WritesAttribute(attribute = UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and time that the unpacked file was last modified (tar and zip only)."),
@WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, description = "The date and time that the file was created. For encrypted zip files this attribute" +
" always holds the same value as " + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted zip files if available it will be returned otherwise" +
" this will be the same value as" + UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."),
@WritesAttribute(attribute = UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and time the file's metadata changed (tar only)."),
@WritesAttribute(attribute = UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time the file was last accessed (tar and unencrypted zip files only)"),
@WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE, description = "The owner of the unpacked file (tar only)"),
@WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE, description = "The group owner of the unpacked file (tar only)"),
@WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE, description = "The uncompressed size of the unpacked file (tar and zip only)"),
@WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, description = "The read/write/execute permissions of the unpacked file (tar and unencrypted zip files only)"),
@WritesAttribute(attribute = UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption method for entries in Zip archives")})
@SeeAlso(MergeContent.class)
@UseCase(
description = "Unpack Zip containing filenames with special characters, created on Windows with filename charset 'Cp437' or 'IBM437'.",
Expand All @@ -131,8 +139,11 @@ public class UnpackContent extends AbstractProcessor {

public static final String FILE_LAST_MODIFIED_TIME_ATTRIBUTE = "file.lastModifiedTime";
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
public static final String FILE_LAST_METADATA_CHANGE_ATTRIBUTE = "file.lastMetadataChange";
public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
public static final String FILE_GROUP_ATTRIBUTE = "file.group";
public static final String FILE_SIZE_ATTRIBUTE = "file.size";
public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
public static final String FILE_ENCRYPTION_METHOD_ATTRIBUTE = "file.encryptionMethod";

Expand Down Expand Up @@ -387,23 +398,37 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li

FlowFile unpackedFile = session.create(source);
try {
final String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), filePathString);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);

attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
attributes.put(FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()));
attributes.put(FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()));
attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(tarEntry.getRealSize()));
String timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);

if (tarEntry.getCreationTime() != null) {
timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant());
}
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);

unpackedFile = session.putAllAttributes(unpackedFile, Map.of(
CoreAttributes.FILENAME.key(), file.getName(),
CoreAttributes.PATH.key(), filePathString,
CoreAttributes.MIME_TYPE.key(), OCTET_STREAM,
if (tarEntry.getStatusChangeTime() != null) {
timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant());
attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, timeAsString);
}

FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()),
FILE_OWNER_ATTRIBUTE, String.valueOf(tarEntry.getUserName()),
FILE_GROUP_ATTRIBUTE, String.valueOf(tarEntry.getGroupName()),
if (tarEntry.getLastAccessTime() != null) {
timeAsString = DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant());
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString);
}

FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString,
FILE_CREATION_TIME_ATTRIBUTE, timeAsString,
attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));

FRAGMENT_ID, fragmentId,
FRAGMENT_INDEX, String.valueOf(++fragmentCount)
));
unpackedFile = session.putAllAttributes(unpackedFile, attributes);

final long fileSize = tarEntry.getSize();
unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
Expand Down Expand Up @@ -437,6 +462,11 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li
}
}

private record ZipInputStreamMetadata(boolean directory, String zipEntryName, EncryptionMethod encryptionMethod,
Instant creationTime, Instant lastModifiedDate, Instant lastAccessDate, int mode,
long uncompressedSize) {
}
dan-s1 marked this conversation as resolved.
Show resolved Hide resolved

private abstract static class ZipInputStreamCallback implements InputStreamCallback {
private static final String PATH_SEPARATOR = "/";

Expand Down Expand Up @@ -470,22 +500,47 @@ protected boolean isFileEntryMatched(final boolean directory, final String fileN
return !directory && (fileFilter == null || fileFilter.matcher(fileName).find());
}

protected void processEntry(final InputStream zipInputStream, final boolean directory, final String zipEntryName, final EncryptionMethod encryptionMethod) {
if (isFileEntryMatched(directory, zipEntryName)) {
final File file = new File(zipEntryName);
protected void processEntry(final InputStream zipInputStream, ZipInputStreamMetadata metadata) {
if (isFileEntryMatched(metadata.directory(), metadata.zipEntryName())) {
final File file = new File(metadata.zipEntryName());
final String parentDirectory = (file.getParent() == null) ? PATH_SEPARATOR : file.getParent();

FlowFile unpackedFile = session.create(sourceFlowFile);
try {
unpackedFile = session.putAllAttributes(unpackedFile, Map.of(
CoreAttributes.FILENAME.key(), file.getName(),
CoreAttributes.PATH.key(), parentDirectory,
CoreAttributes.MIME_TYPE.key(), OCTET_STREAM,
FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString(),

FRAGMENT_ID, fragmentId,
FRAGMENT_INDEX, String.valueOf(++fragmentIndex)
));
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, metadata.encryptionMethod().toString());
attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(metadata.uncompressedSize()));

String timeAsString = null;
dan-s1 marked this conversation as resolved.
Show resolved Hide resolved
if (metadata.lastModifiedDate() != null) {
timeAsString = DATE_TIME_FORMATTER.format(metadata.lastModifiedDate());
attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, timeAsString);
}

if (metadata.creationTime() != null) {
timeAsString = DATE_TIME_FORMATTER.format(metadata.creationTime());
}

if (timeAsString != null) {
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, timeAsString);
}

if (metadata.lastAccessDate() != null) {
timeAsString = DATE_TIME_FORMATTER.format(metadata.lastAccessDate());
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, timeAsString);
}

if (metadata.mode() > -1) {
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(metadata.mode()));
}

attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentIndex));

unpackedFile = session.putAllAttributes(unpackedFile, attributes);
unpackedFile = session.write(unpackedFile, outputStream -> StreamUtils.copy(zipInputStream, outputStream));
} finally {
unpacked.add(unpackedFile);
Expand Down Expand Up @@ -519,7 +574,14 @@ public void process(final InputStream inputStream) throws IOException {
filenameEncoding.toString(), true, allowStoredEntriesWithDataDescriptor)) {
ZipArchiveEntry zipEntry;
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getName(), EncryptionMethod.NONE);
// NOTE: Per javadocs, ZipArchiveEntry can return -1 for getTime() if its not specified
// and getLastAccessTime() can return null if it is not specified.
Instant creationTime = zipEntry.getTime() > 0 ? new Date(zipEntry.getTime()).toInstant() : null;
Instant lastModifiedDate = zipEntry.getLastModifiedDate().toInstant();
Instant lastAccessTime = zipEntry.getLastAccessTime() != null ? zipEntry.getLastAccessTime().toInstant() : null;
ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getName(),
EncryptionMethod.NONE, creationTime, lastModifiedDate, lastAccessTime, zipEntry.getUnixMode(), zipEntry.getSize());
processEntry(zipInputStream, zipInputStreamMetadata);
}
}
}
Expand Down Expand Up @@ -548,7 +610,11 @@ public void process(final InputStream inputStream) throws IOException {
try (final ZipInputStream zipInputStream = new ZipInputStream(new BufferedInputStream(inputStream), password, filenameEncoding)) {
LocalFileHeader zipEntry;
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
processEntry(zipInputStream, zipEntry.isDirectory(), zipEntry.getFileName(), zipEntry.getEncryptionMethod());
//NOTE: LocalFileHeader has no methods to return creation time and the mode.
Instant lastModifiedDate = zipEntry.getLastModifiedTime() > 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null;
ZipInputStreamMetadata zipInputStreamMetadata = new ZipInputStreamMetadata(zipEntry.isDirectory(), zipEntry.getFileName(),
zipEntry.getEncryptionMethod(), lastModifiedDate, lastModifiedDate, null, -1, zipEntry.getUncompressedSize());
processEntry(zipInputStream, zipInputStreamMetadata);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -88,13 +89,11 @@ public void testTar() throws IOException {
assertEquals("rw-r--r--", flowFile.getAttribute("file.permissions"));
assertEquals("jmcarey", flowFile.getAttribute("file.owner"));
assertEquals("mkpasswd", flowFile.getAttribute("file.group"));
String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime");

DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString);

String modifiedTimeAsString = flowFile.getAttribute("file.lastModifiedTime");
assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString));
String creationTimeAsString = flowFile.getAttribute("file.creationTime");

DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString);
assertDoesNotThrow(() -> DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString));

assertTrue(Files.exists(path));

Expand Down