-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharchivatix.py
200 lines (152 loc) Β· 5.79 KB
/
archivatix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import datetime
import re
import hydra
import time
import progressbar
import sys
from omegaconf import DictConfig
from ftplib import FTP
from dateutil import parser
# Just a fancy ass banner
print('''
_ _ _ _
/\ | | (_) | | (_)
/ \ _ __ ___| |__ ___ ____ _| |_ ___ __
/ /\ \ | '__/ __| '_ \| \ \ / / _` | __| \ \/ /
/ ____ \| | | (__| | | | |\ V / (_| | |_| |> <
/_/ \_\_| \___|_| |_|_| \_/ \__,_|\__|_/_/\_\
''')
# @todo : Add errors on empty config files
@hydra.main(config_path="config.yaml")
class Archivatix:
"""
An Archivatix class.
It will parse each FTP config given in config.yaml and find files according to archiving rules.
Two behaviors are possible, first one, search and destroy (delete files), or second search and backup (move files).
"""
def __init__(self, cfg: DictConfig) -> None:
self.cfg = cfg
# Apply rules for the archiving task
# Files bigger then 0.01 MB and older then 2020-01-01
origin_path = "/"
rules = [MinSizeRule(10000), DateRule(datetime.datetime(2020, 1, 1))]
# start_time = time.time()
# For each FTP object inside the config.yaml, parse and apply with predefined rules.
for credentials in cfg.ftp:
for credential in credentials.items():
ftp = FTPUtils(credential[1]["host"], credential[1]["username"], credential[1]["password"])
ftp.connect()
ftp.walk(origin_path, rules)
ftp.close()
# print("--- %s seconds ---" % (time.time() - start_time))
# print("\n*****Done*****\n")
class FTPUtils:
def __init__(self, host, user, password):
self.host = host
self.user = user
self.password = password
self.ftp = FTP()
def connect(self):
print("")
print(
"FTP connecting to {} with user={} ".format(
self.host, self.user
)
)
print("------------------------------------------------------------")
self.ftp = FTP()
self.ftp.set_debuglevel(0)
self.ftp.connect(self.host)
self.ftp.login(self.user, self.password)
def walk(self, dir="/", rules=None):
"""
Performs a recursive search of folders for files meeting a defined
criteria
:param dir: string The origin path
:type rules: [Rule] A list of rules to be applied
"""
if rules is None:
rules = []
# Default Mode = archive
archive_mode = True
destroy_mode = False
files = []
rules_passed = []
# Parse all the folder of the current path and identify if the file met the rules conditions.
bar = progressbar.ProgressBar(max_value=progressbar.UnknownLength, redirect_stdout=True)
i = 0
# Archive yearly, skip folder with 4 digits.
if re.findall(r"[0-9]{4}", dir):
return None
print("β£ Scanning {} for directories and files".format(dir), file=sys.stdout)
for item in self.ftp.mlsd(dir):
i += 1
time.sleep(0.001)
bar.update(i)
if item[1]['type'] == 'dir':
# Go in the deepest folder first (post-order) with a recursive routine
# print('{}/{}'.format(dir, item[0]))
self.walk('{}/{}'.format(dir, item[0]), rules)
elif item[1]['type'] == 'file':
select_file = True
# Apply each rule and flag the file with true or false
for rule in rules:
if not rule.apply(item[1]):
select_file = False
# Add the file to the list if it passes and apply a rules
if select_file:
rules_passed.append(item[0])
if archive_mode:
self.archive(dir, item)
if destroy_mode:
self.destroy(dir, item)
# List of all files
files.append(item[0])
# else:
# # System files type="cdir"/"pdir" e.g: . / ..
# system_files.append(item[0])
# Check if all selected files and display those for the current directory
if rules_passed:
# @ToDo: improve the display of big list
print("")
print("Files moved / archived")
print('\n'.join(sorted(rules_passed)))
def archive(self, dir, file):
timestamp = file[1]["modify"]
folder_name = parser.parse(timestamp).year
if str(folder_name) in self.ftp.nlst(dir):
self.ftp.rename('{}/{}'.format(dir, file[0]), '{}/{}/{}'.format(dir, folder_name, file[0]))
else:
self.ftp.cwd(dir)
self.ftp.mkd(str(folder_name))
self.ftp.cwd("..")
self.ftp.rename('{}/{}'.format(dir, file[0]), '{}/{}/{}'.format(dir, folder_name, file[0]))
# self.ftp.rename(origin, destination)
# @ToDo
def destroy(self, curr_dir, file):
self.ftp.delete()
def close(self):
self.ftp.close()
def dir(self):
self.ftp.dir()
class Rule:
"""
Rule interface that defining the basic behavior of a rule.
"""
def apply(self, file):
print("Applying rule")
class MinSizeRule(Rule):
def __init__(self, min_size):
self.min_size = min_size
# @Override
def apply(self, file) -> bool:
return int(file["size"]) > self.min_size
class DateRule(Rule):
def __init__(self, max_date):
self.max_date = max_date
# @Override
def apply(self, file) -> bool:
timestamp = file['modify']
file_date = parser.parse(timestamp)
return file_date < self.max_date
f = Archivatix()