-
Notifications
You must be signed in to change notification settings - Fork 17
/
transform_tracking_logs.py
272 lines (236 loc) · 10.5 KB
/
transform_tracking_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
Management command for transforming tracking log files.
"""
import json
import os
from io import BytesIO
from textwrap import dedent
from django.core.management.base import BaseCommand
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider
from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender
# Number of bytes to download at a time, this is 2 MB
CHUNK_SIZE = 1024 * 1024 * 2
def transform_tracking_logs(
source,
source_container,
source_prefix,
sender
):
"""
Transform one or more tracking log files from the given source to the given destination.
"""
# Containers are effectively directories, this recursively tries to find files
# matching the given prefix in the given source.
container = source.get_container(container_name=source_container)
display_path = os.path.join(source_container, source_prefix.lstrip("/"))
print(f"Looking for log files in {display_path}*")
for file in source.iterate_container_objects(container, source_prefix):
# Download the file as a stream of characters to save on memory
print(f"Streaming file {file}...")
last_successful_byte = 0
line = ""
while last_successful_byte < int(file.size):
end_byte = last_successful_byte + CHUNK_SIZE
if end_byte > file.size:
end_byte = file.size
chunks = source.download_object_range_as_stream(
file,
start_bytes=last_successful_byte,
end_bytes=end_byte
)
for chunk in chunks:
chunk = chunk.decode('utf-8')
# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char
last_successful_byte = end_byte
# Sometimes the file doesn't end with a newline, we try to use
# any remaining bytes as a final line.
if line:
sender.transform_and_queue(line) # pragma: no cover
# Give the queue a chance to send any remaining events left in the queue
sender.finalize()
def get_source_config_from_options(source_config_options):
"""
Prepare our source configuration from the configuration JSON.
"""
source_config = json.loads(source_config_options)
try:
source_prefix = source_config.pop("prefix")
source_container = source_config.pop("container")
return source_config, source_container, source_prefix
except KeyError as e:
print("The following keys must be defined in source_config: 'prefix', 'container'")
raise e
def get_dest_config_from_options(destination_provider, dest_config_options):
"""
Prepare our destination configuration.
All None's if these are being sent to an LRS, or use values from the destination_configuration JSON option.
"""
if destination_provider != "LRS":
dest_config = json.loads(dest_config_options)
try:
dest_container = dest_config.pop("container")
dest_prefix = dest_config.pop("prefix")
except KeyError as e:
print("If not using the 'LRS' destination, the following keys must be defined in "
"destination_config: 'prefix', 'container'")
raise e
else:
dest_config = dest_container = dest_prefix = None
return dest_config, dest_container, dest_prefix
def validate_source_and_files(driver, container_name, prefix):
"""
Validate that the given libcloud source exists and has files in it to read.
"""
container = driver.get_container(container_name)
objects = list(driver.iterate_container_objects(container, prefix))
if not objects:
raise FileNotFoundError(f"No files found in {container_name}/{prefix}*")
print(f"Found {len(objects)} files in {container_name}/{prefix}*")
return [f"{obj.name} - {obj.size} bytes" for obj in objects]
def validate_destination(driver, container_name, prefix, source_objects):
"""
Validate that the given libcloud destination exists and can be written to.
"""
container = driver.get_container(container_name)
full_path = f"{prefix}/manifest.log"
file_list = "\n".join(source_objects)
driver.upload_object_via_stream(
iterator=BytesIO(file_list.encode()),
container=container,
object_name=full_path
)
print(f"Wrote source file list to '{container_name}/{full_path}'")
def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config):
"""
Attempt to configure the libcloud drivers for source and destination.
"""
try:
source_provider = getattr(Provider, source_provider)
source_cls = get_driver(source_provider)
source_driver = source_cls(**source_config)
except AttributeError:
print(f"{source_provider} is not a valid source Libcloud provider.")
raise
# There is no driver for LRS
destination_driver = "LRS"
if destination_provider != "LRS":
try:
destination_provider = getattr(Provider, destination_provider)
destination_cls = get_driver(destination_provider)
destination_driver = destination_cls(**destination_config)
except AttributeError:
print(f"{destination_provider} is not a valid destination Libcloud provider.")
raise
return source_driver, destination_driver
class Command(BaseCommand):
"""
Transform tracking logs to an LRS or other output destination.
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
parser.add_argument(
'--source_provider',
type=str,
help="An Apache Libcloud 'provider constant' from: "
"https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . "
"Ex: LOCAL for local storage or S3 for AWS S3.",
required=True,
)
parser.add_argument(
'--source_config',
type=str,
help="A JSON dictionary of configuration for the source provider. Leave"
"blank the destination_provider is 'LRS'. See the Libcloud docs for the necessary options"
"for your destination. If your destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them "
"to the config here under the key 'container'. If your source needs a prefix (ex: directory path, "
"or wildcard beginning of a filename), add it here under the key 'prefix'. If no prefix is given, "
"all files in the given location will be attempted!",
required=True,
)
parser.add_argument(
'--destination_provider',
type=str,
default="LRS",
help="Either 'LRS' to use the default configured xAPI and/or Caliper servers"
"or an Apache Libcloud 'provider constant' from this list: "
"https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . "
"Ex: LOCAL for local storage or S3 for AWS S3.",
)
parser.add_argument(
'--destination_config',
type=str,
help="A JSON dictionary of configuration for the destination provider. Not needed for the 'LRS' "
"destination_provider. See the Libcloud docs for the necessary options for your destination. If your "
"destination (S3, MinIO, etc) needs a 'bucket' or 'container' add them to the config here under the "
"key 'container'. If your destination needs a prefix (ex: directory path), add it here under the key "
"'prefix'. If no prefix is given, the output file(s) will be written to the base path.",
)
parser.add_argument(
'--transformer_type',
choices=["xapi", "caliper"],
required=True,
help="The type of transformation to do, only one can be done at a time.",
)
parser.add_argument(
'--batch_size',
type=int,
default=10000,
help="How many events to send at a time. For the LRS destination this will be one POST per this many "
"events, for all other destinations a new file will be created containing up to this many events. "
"This helps reduce memory usage in the script and increases helps with LRS performance.",
)
parser.add_argument(
'--sleep_between_batches_secs',
type=float,
default=10.0,
help="Fractional seconds to sleep between sending batches to a destination, used to reduce load on the LMS "
"and LRSs when performing large operations.",
)
parser.add_argument(
'--dry_run',
action="store_true",
help="Attempt to transform all lines from all files, but do not send to the destination.",
)
def handle(self, *args, **options):
"""
Configure the command and start the transform process.
"""
source_config, source_container, source_prefix = get_source_config_from_options(options["source_config"])
dest_config, dest_container, dest_prefix = get_dest_config_from_options(
options["destination_provider"],
options["destination_config"]
)
source_driver, dest_driver = get_libcloud_drivers(
options["source_provider"],
source_config,
options["destination_provider"],
dest_config
)
source_file_list = validate_source_and_files(source_driver, source_container, source_prefix)
if dest_driver != "LRS":
validate_destination(dest_driver, dest_container, dest_prefix, source_file_list)
else:
print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n")
sender = QueuedSender(
dest_driver,
dest_container,
dest_prefix,
options["transformer_type"],
max_queue_size=options["batch_size"],
sleep_between_batches_secs=options["sleep_between_batches_secs"],
dry_run=options["dry_run"]
)
transform_tracking_logs(
source_driver,
source_container,
source_prefix,
sender
)