-
Notifications
You must be signed in to change notification settings - Fork 192
/
Copy patharchive.py
135 lines (104 loc) · 4.04 KB
/
archive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import hashlib
import json
import os
import zipfile
import io
from os.path import basename, splitext
import slackviewer
from slackviewer.constants import SLACKVIEWER_TEMP_PATH
from slackviewer.utils.six import to_unicode, to_bytes
def SHA1_file(filepath, extra=b''):
"""
Returns hex digest of SHA1 hash of file at filepath
:param str filepath: File to hash
:param bytes extra: Extra content added to raw read of file before taking hash
:return: hex digest of hash
:rtype: str
"""
h = hashlib.sha1()
with io.open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(h.block_size), b''):
h.update(chunk)
h.update(extra)
return h.hexdigest()
def extract_archive(filepath):
"""
Returns the path of the archive
:param str filepath: Path to file to extract or read
:return: path of the archive
:rtype: str
"""
# Checks if file path is a directory
if os.path.isdir(filepath):
path = os.path.abspath(filepath)
print("Archive already extracted. Viewing from {}...".format(path))
return path
# Checks if the filepath is a zipfile and continues to extract if it is
# if not it raises an error
elif not zipfile.is_zipfile(filepath):
# Misuse of TypeError? :P
raise TypeError("{} is not a zipfile".format(filepath))
archive_sha = SHA1_file(
filepath=filepath,
# Add version of slackviewer to hash as well so we can invalidate the cached copy
# if there are new features added
extra=to_bytes(slackviewer.__version__)
)
extracted_path = os.path.join(SLACKVIEWER_TEMP_PATH, archive_sha)
if os.path.exists(extracted_path):
print("{} already exists".format(extracted_path))
else:
# Extract zip
with zipfile.ZipFile(filepath) as zip:
print("{} extracting to {}...".format(filepath, extracted_path))
for info in zip.infolist():
print(info.filename)
info.filename = info.filename.encode("cp437").decode("utf-8")
print(info.filename)
zip.extract(info,path=extracted_path)
print("{} extracted to {}".format(filepath, extracted_path))
# Add additional file with archive info
create_archive_info(filepath, extracted_path, archive_sha)
return extracted_path
# Saves archive info
# When loading empty dms and there is no info file then this is called to
# create a new archive file
def create_archive_info(filepath, extracted_path, archive_sha=None):
"""
Saves archive info to a json file
:param str filepath: Path to directory of archive
:param str extracted_path: Path to directory of archive
:param str archive_sha: SHA string created when archive was extracted from zip
"""
archive_info = {
"sha1": archive_sha,
"filename": os.path.split(filepath)[1],
}
with io.open(
os.path.join(
extracted_path,
".slackviewer_archive_info.json",
), 'w+', encoding="utf-8"
) as f:
s = json.dumps(archive_info, ensure_ascii=False)
s = to_unicode(s)
f.write(s)
def get_export_info(archive_name):
"""
Given a file or directory, extract it and return information that will be used in
an export printout: the basename of the file, the name stripped of its extension, and
our best guess (based on Slack's current naming convention) of the name of the
workspace that this is an export of.
"""
extracted_path = extract_archive(archive_name)
base_filename = basename(archive_name)
(noext_filename, _) = splitext(base_filename)
# Typical extract name: "My Friends and Family Slack export Jul 21 2018 - Sep 06 2018"
# If that's not the format, we will just fall back to the extension-free filename.
(workspace_name, _) = noext_filename.split(" Slack export ", 1)
return {
"readable_path": extracted_path,
"basename": base_filename,
"stripped_name": noext_filename,
"workspace_name": workspace_name,
}