66
77# First Party
88from smdebug .core .access_layer .s3handler import ListRequest , ReadObjectRequest , S3Handler , is_s3
9- from smdebug .core .logger import get_logger
10- from smdebug .core .utils import (
11- get_node_id_from_tracefilename ,
12- get_timestamp_from_tracefilename ,
13- list_files_in_directory ,
14- )
9+ from smdebug .core .utils import get_node_id_from_tracefilename , get_timestamp_from_tracefilename
10+ from smdebug .profiler .MetricsReaderBase import MetricsReaderBase
1511from smdebug .profiler .profiler_constants import (
1612 DEFAULT_PREFIX ,
1713 ENV_TIME_BUFFER ,
18- ENV_TRAIILING_DURATION ,
1914 HOROVODTIMELINE_PREFIX ,
2015 MODELTIMELINE_SUFFIX ,
2116 PYTHONTIMELINE_SUFFIX ,
2217 TENSORBOARDTIMELINE_SUFFIX ,
2318 TIME_BUFFER_DEFAULT ,
24- TRAILING_DURATION_DEFAULT ,
2519)
2620from smdebug .profiler .tf_profiler_parser import (
2721 HorovodProfilerEvents ,
2822 SMProfilerEvents ,
2923 TensorboardProfilerEvents ,
3024)
31- from smdebug .profiler .utils import TimeUnits , convert_utc_timestamp_to_microseconds
3225
3326
34- class MetricsReader :
27+ class AlgorithmMetricsReader ( MetricsReaderBase ) :
3528 def __init__ (self ):
29+ super ().__init__ ()
3630 self .prefix = DEFAULT_PREFIX
37- self .logger = get_logger ("smdebug-profiler" )
3831 self ._SMEventsParser = SMProfilerEvents ()
3932 self ._TBEventsParser = TensorboardProfilerEvents ()
4033 self ._HorovordEventsParser = HorovodProfilerEvents ()
41- # This is a set of parsed event files. The entry is made into this file only if the complete file is read.
42- self ._parsed_files = set ()
43- self ._timestamp_to_filename = dict ()
44-
45- # The startAfter_prefix is used in ListPrefix call to poll for available tracefiles in the S3 bucket. The
46- # prefix lags behind the last polled tracefile by tunable trailing duration. This is to ensure that we do not
47- # miss a
48- # tracefile corresponding to timestamp earlier than last polled timestamp but arrived after we had polled.
4934
50- self ._startAfter_prefix = ""
51-
52- """
53- The function returns the timestamp of last available file.
54- This timestamp indicates users can query the events up to this timestamp to gauge
55- """
56-
57- def get_timestamp_of_latest_available_file (self ):
58- return (
59- sorted (self ._timestamp_to_filename .keys ())[- 1 ]
60- if len (self ._timestamp_to_filename ) > 0
61- else 0
62- )
35+ def _get_all_event_parsers (self ):
36+ return [self ._SMEventsParser , self ._TBEventsParser , self ._HorovordEventsParser ]
6337
6438 """
6539 The following function returns the time range for which the tracefiles are currently available in S3 or local
@@ -81,7 +55,7 @@ def get_current_time_range_for_event_query(self):
8155 Those files might contain 'B' type events that had started prior to 'start'
8256 """
8357
84- def _get_trace_files_in_the_range (
58+ def _get_event_files_in_the_range (
8559 self , start_time_microseconds , end_time_microseconds , use_buffer = True
8660 ):
8761 # increase the time range using TIME_BUFFER_DEFAULT
@@ -117,15 +91,15 @@ def _get_trace_files_in_the_range(
11791
11892 # Find the timestamp that is greater than or equal start_time_microseconds. The tracefile corresponding to
11993 # that timestamp will contain events that are active during start_time_microseconds
120- lower_bound_timestamp = bisect .bisect_left (timestamps , start_time_microseconds )
94+ lower_bound_timestamp_index = bisect .bisect_left (timestamps , start_time_microseconds )
12195
12296 # Find the timestamp that is immediate right to the end_time_microseconds. The tracefile corresponding to
12397 # that timestamp will contain events that are active during end_time_microseconds.
124- upper_bound_timestamp = bisect .bisect_left (timestamps , end_time_microseconds )
98+ upper_bound_timestamp_index = bisect .bisect_left (timestamps , end_time_microseconds )
12599
126100 event_files = list ()
127- for index in timestamps [lower_bound_timestamp : upper_bound_timestamp + 1 ]:
128- event_files .append (self ._timestamp_to_filename [index ])
101+ for index in timestamps [lower_bound_timestamp_index : upper_bound_timestamp_index + 1 ]:
102+ event_files .extend (self ._timestamp_to_filename [index ])
129103 return event_files
130104
131105 """
@@ -143,52 +117,21 @@ def _get_event_parser(self, filename):
143117 return self ._SMEventsParser
144118 if TENSORBOARDTIMELINE_SUFFIX in filename :
145119 return self ._TBEventsParser
146- if HorovodProfilerEvents in filename :
120+ if HOROVODTIMELINE_PREFIX in filename :
147121 return self ._HorovordEventsParser
148122
149- """
150- This function queries the files that are currently available in the directory (for local mode) or in S3 for download.
151- It rebuilds the map of timestamp to filename.
152- """
123+ def _get_timestamp_from_filename (self , event_file ):
124+ return get_timestamp_from_tracefilename (event_file )
125+
126+ def _get_event_file_regex (self ):
127+ return r"(.+)\.(json|csv)$"
153128
154- def refresh_event_file_list (self ):
155- pass
156129
130+ class LocalAlgorithmMetricsReader (AlgorithmMetricsReader ):
157131 """
158- The function returns the events that have recorded within the given time range.
159- The function will download (or parse) the tracefiles that are available
160- for the given time range. It is possible that events are recorded during training but are not available for
161- download.
162- TODO: Implement blocking call to wait for files to be available for download.
132+ The metrics reader is created with root folder in which the tracefiles are stored.
163133 """
164134
165- def get_events (self , start_time , end_time , unit = TimeUnits .MICROSECONDS ):
166- start_time = convert_utc_timestamp_to_microseconds (start_time , unit )
167- end_time = convert_utc_timestamp_to_microseconds (end_time , unit )
168-
169- event_files = self ._get_trace_files_in_the_range (start_time , end_time )
170-
171- # Download files and parse the events
172- self .parse_event_files (event_files )
173-
174- """
175- We might have recorded events from different sources within this timerange.
176- we will get the events from the relevant event parsers and merge them before returning.
177- """
178- result = []
179- for eventParser in [self ._SMEventsParser , self ._TBEventsParser , self ._HorovordEventsParser ]:
180- range_events = eventParser .get_events_within_time_range (
181- start_time , end_time , unit = TimeUnits .MICROSECONDS
182- )
183- result .extend (range_events )
184-
185- return result
186-
187- def parse_event_files (self , event_files ):
188- pass
189-
190-
191- class LocalMetricsReader (MetricsReader ):
192135 def __init__ (self , trace_root_folder ):
193136 self .trace_root_folder = trace_root_folder
194137 super ().__init__ ()
@@ -200,27 +143,13 @@ def __init__(self, trace_root_folder):
200143 """
201144
202145 def refresh_event_file_list (self ):
203- path = os .path .expanduser (self .trace_root_folder )
204- event_dir = os .path .join (path , DEFAULT_PREFIX , "" )
205- event_regex = r"(.+)\.(json|csv)$"
206- event_files = list_files_in_directory (event_dir , file_regex = event_regex )
207- for event_file in event_files :
208- timestamp = get_timestamp_from_tracefilename (event_file )
209- self ._timestamp_to_filename [timestamp ] = event_file
210-
211- """
212- The function opens and reads the event files if they are not already parsed.
213- For local metrics reader, we are currently assuming that the downloaded event file is a complete file.
214- """
146+ self ._refresh_event_file_list_local_mode (self .trace_root_folder )
215147
216148 def parse_event_files (self , event_files ):
217- for event_file in event_files :
218- if event_file not in self ._parsed_files :
219- self ._get_event_parser (event_file ).read_events_from_file (event_file )
220- self ._parsed_files .add (event_file )
149+ self ._parse_event_files_local_mode (event_files )
221150
222151
223- class S3MetricsReader ( MetricsReader ):
152+ class S3AlgorithmMetricsReader ( AlgorithmMetricsReader ):
224153 """
225154 The s3_trial_path points to a s3 folder in which the tracefiles are stored. e.g.
226155 s3://my_bucket/experiment_base_folder
@@ -273,28 +202,4 @@ def refresh_event_file_list(self):
273202 Prefix = self .prefix ,
274203 StartAfter = self ._startAfter_prefix if self ._startAfter_prefix else self .prefix ,
275204 )
276- event_files = [x for x in S3Handler .list_prefix (list_dir ) if "json" in x ]
277- for event_file in event_files :
278- timestamp = get_timestamp_from_tracefilename (event_file )
279- self ._timestamp_to_filename [timestamp ] = f"s3://{ self .bucket_name } /{ event_file } "
280- self .update_start_after_prefix ()
281-
282- """
283- It is possible that tracefiles from different nodes to arrive in S3 in different order. For example, Even if t1
284- > t2, a tracefile with timestamp "t1" can arrive in S3 before the tracefile with timestamp "t2". If we list the
285- prefix only on the basis of last arrived file (i.e. t1) we will miss the file for t2. Therefore, we will set the
286- start prefix to a timestamp that is trailing behind the last timestamp by 'trailing duration'. This will ensure
287- that we will attempt to get tracefiles with older timestamp even if they arrive late.
288- """
289-
290- def update_start_after_prefix (self ):
291- trailiing_duration = os .getenv (ENV_TRAIILING_DURATION , TRAILING_DURATION_DEFAULT )
292- sorted_timestamps = sorted (self ._timestamp_to_filename .keys ())
293- last_timestamp_available = sorted_timestamps [- 1 ]
294- trailing_timestamp = last_timestamp_available - trailiing_duration
295- # Get the timestamp that is closely matching the trailing_timestamp
296- trailing_timestamp = sorted_timestamps [
297- bisect .bisect_left (sorted_timestamps , trailing_timestamp )
298- ]
299- self ._startAfter_prefix = self ._timestamp_to_filename [trailing_timestamp ]
300- s3 , bucket_name , self ._startAfter_prefix = is_s3 (self ._startAfter_prefix )
205+ self ._refresh_event_file_list_s3_mode (list_dir )
0 commit comments