From 51e72c4718e75b78e1ead1825b96d3087afcc719 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Wed, 2 Jun 2021 16:40:45 -0700 Subject: [PATCH] Group high zoom metatile jobs by rawr size threshold, add tilespecifier to order jobs by rawr size --- batch-setup/make_meta_tiles.py | 270 ++++++++++++++++++++++++++------- 1 file changed, 218 insertions(+), 52 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 323156a..49fecb4 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -1,3 +1,5 @@ +import queue + from batch import Buckets from batch import run_go import yaml @@ -19,7 +21,7 @@ from tilequeue.store import make_s3_tile_key_generator from ModestMaps.Core import Coordinate from multiprocessing import Pool - +import time MissingTiles = namedtuple('MissingTiles', 'low_zoom_file high_zoom_file') @@ -129,15 +131,15 @@ def read_metas_to_file(self, filename, present=False, compress=False): stdout=filename) @contextmanager - def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): + def missing_tiles_split(self, split_zoom, zoom_max, high_zoom_job_set): """ To be used in a with-statement. Yields a MissingTiles object, giving information about the tiles which are missing. - High zoom jobs are output either at split_zoom (RAWR tile granularity) - or zoom_max (usually lower, e.g: 7) depending on whether big_jobs - contains a truthy value for the RAWR tile. The big_jobs are looked up - at zoom_max. + High zoom jobs are output between split_zoom (RAWR tile granularity) and zoom_max + (usually lower, e.g: 6) depending on whether job_list + contains a truthy value for the RAWR tile. The job_list jobs are are looked up + at starting at zoom_max, then at increasing zooms until split_zoom. """ self.run_batch_job() @@ -149,7 +151,7 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): self.read_metas_to_file(missing_meta_file, compress=True) - print("Splitting into high and low zoom lists") + print("[%s] Splitting into high and low zoom lists" % (time.ctime())) # contains zooms 0 until group zoom. the jobs between the group # zoom and RAWR zoom are merged into the parent at group zoom. @@ -162,25 +164,29 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): with gzip.open(missing_meta_file, 'r') as fh: for line in fh: - c = deserialize_coord(line) - if c.zoom < split_zoom: + this_coord = deserialize_coord(line) + if this_coord.zoom < split_zoom: # in order to not have too many jobs in the queue, we # group the low zoom jobs to the zoom_max (usually 7) - if c.zoom > zoom_max: - c = c.zoomTo(zoom_max).container() + if this_coord.zoom > zoom_max: + this_coord = this_coord.zoomTo(zoom_max).container() - missing_low[c] = True + missing_low[this_coord] = True else: - # if the group of jobs at zoom_max would be too big - # (according to big_jobs[]) then enqueue the original - # coordinate. this is to prevent a "long tail" of huge - # job groups. - job_coord = c.zoomTo(zoom_max).container() - if not big_jobs[job_coord]: - c = job_coord - - missing_high[c] = True + # check the job set at every zoom starting from + # zoom max - if we don't find it at a lower zoom + # register it at split_zoom + job_registered = False + for this_zoom in range(zoom_max, split_zoom): + lower_zoom_job_coord = this_coord.zoomTo(this_zoom) + if high_zoom_job_set[lower_zoom_job_coord]: + missing_high[lower_zoom_job_coord] = True + job_registered = True + break + + if not job_registered: + missing_high[this_coord] = True with open(missing_low_file, 'w') as fh: for coord in missing_low: @@ -190,6 +196,7 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): for coord in missing_high: fh.write(serialize_coord(coord) + "\n") + print("[%s] Done splitting into high and low zoom lists" % (time.ctime())) yield MissingTiles(missing_low_file, missing_high_file) finally: @@ -235,7 +242,7 @@ def __call__(self, parent): assert dz >= 0 width = 1 << dz - size = 0 + sizes = {} for dx in range(width): for dy in range(width): coord = Coordinate( @@ -244,19 +251,85 @@ def __call__(self, parent): row=((parent.row << dz) + dy)) key = gen(self.prefix, coord, 'zip') response = s3.head_object(Bucket=self.bucket, Key=key) - size += response['ContentLength'] + sizes[coord] = response['ContentLength'] + + # now sum the sizes of rawr tile pyramids from rawr zoom to parent zoom + parent_coord = deserialize_coord("0/0/0") + for coord in sizes.keys(): + for zoom in range(self.rawr_zoom, parent.zoom, -1): + parent_coord = coord.zoomTo(zoom-1).container() + if parent_coord not in sizes: + sizes[parent_coord] = 0 + sizes[parent_coord] += sizes[coord] + + return parent, sizes + - return parent, size +class TileSpecifier(object): + ORDER_KEY = "order" + MEM_GB_KEY = "mem_gb" + + """ + Provides the ability to sort tiles based on an ordering and specify memory reqs + """ + + def __init__(self, default_mem_gb=8, spec_dict={}): + """ + :param default_mem_gb: + :param spec_dict: keys are of form "//" value is a map with keys "mem_gb" and "order". e.g. + {"7/3/10": {"mem_gb": 2.5, "order": 12}, + "10/12/3": {"mem_gb": 0.3, "order": 10}} + """ + self.default_mem_gb = default_mem_gb + self.spec_dict = spec_dict + + @staticmethod + def from_coord_list(coord_list, default_mem_gb): + # this currently does not allow mem specification, just ordering. + spec_dict = {} + for i in range(len(coord_list)): + coord_str = serialize_coord(coord_list[i]) + spec_dict[coord_str] = {TileSpecifier.ORDER_KEY: i, TileSpecifier.MEM_GB_KEY: default_mem_gb} + + return TileSpecifier(default_mem_gb, spec_dict) + + def reorder(self, coord_list): + """ + Using the sort ordering for this specifier, reorders the tiles in coord_list. + coords that are in the coord_list that aren't mentioned in the ordering will go first. + :return: + """ + return sorted(coord_list, key=lambda coord: self.get_ordering_val(coord)) + + def get_ordering_val(self, coord): + """ + coord is type string + returns ordering location for coord. The lower it is the earlier in the order. + If there is no ordering specified for this coordinate, returns 0 + """ + if coord in self.spec_dict: + return self.spec_dict[coord][self.ORDER_KEY] + + return 0 + + def get_mem_reqs_mb(self, coord_str): + """ + returns the specified memory requirement in megabytes for the coordinate. If none are specified, + returns default_gb + """ + if coord_str in self.spec_dict: + return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 or self.default_mem_gb + else: + return self.default_mem_gb * 1024 -def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, - size_threshold, pool_size=30): +def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, + size_threshold, pool_size=30): """ Look up the RAWR tiles in the rawr_bucket under the prefix and with the given key format, group the RAWR tiles (usually at zoom 10) by the job - group zoom (usually 7) and sum their sizes. Return a map-like object - which has a truthy value for those Coordinates at group_zoom which sum - to size_threshold or more. + group zoom (usually 7) and sum their sizes. Return an ordered list of job coordinates + by descending raw size sum. A pool size of 30 seems to work well; the point of the pool is to hide the latency of S3 requests, so pretty quickly hits the law of diminishing @@ -267,12 +340,15 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, p = Pool(pool_size) job_sizer = _JobSizer(rawr_bucket, prefix, key_format_type, rawr_zoom) - big_jobs = CoordSet(group_zoom, min_zoom=group_zoom) - + grouped_by_rawr_tile_size = [] + print("[%s] Bucketizing jobs by zoom using size of raw tiles" % time.ctime()) # loop over each column individually. this limits the number of concurrent # tasks, which means we don't waste memory maintaining a huge queue of # pending tasks. and when something goes wrong, the stacktrace isn't buried # in a million others. + all_sizes = {} + grouping_queue = queue.Queue() + num_coords = 1 << group_zoom for x in range(num_coords): # kick off tasks async. each one knows its own coordinate, so we only @@ -281,33 +357,113 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, for y in range(num_coords): coord = Coordinate(zoom=group_zoom, column=x, row=y) tasks.append(p.apply_async(job_sizer, (coord,))) + grouping_queue.put_nowait(coord) # queue for future size counting # collect tasks and put them into the big jobs list. for task in tasks: - coord, size = task.get() - if size >= size_threshold: - big_jobs[coord] = True + coord, sizes = task.get() + all_sizes.update(sizes) + + # now use all_sizes plus the size_threshold to find the lowest zoom we can group each coordinate into + counts_at_zoom = {} + while not grouping_queue.empty(): + this_coord = grouping_queue.get_nowait() + this_size = all_sizes[this_coord] + + if this_size <= size_threshold or this_coord.zoom == rawr_zoom: + # we're good on size, or can't group at a higher zoom + grouped_by_rawr_tile_size.append(this_coord) + grouping_queue.task_done() + + if this_coord.zoom not in counts_at_zoom: + counts_at_zoom[this_coord.zoom] = 0 + counts_at_zoom[this_coord.zoom] += 1 + else: + # too big for this zoom, queue the children + top_left_child = this_coord.zoomBy(1) + + grouping_queue.put_nowait(top_left_child) + grouping_queue.put_nowait(top_left_child.down(1)) + grouping_queue.put_nowait(top_left_child.right(1)) + grouping_queue.put_nowait(top_left_child.down(1).right(1)) - return big_jobs + print("[%s] Done bucketizing jobs - count by zoom %s" % (time.ctime(), counts_at_zoom)) + + # validate counts by zoom - expecting the equivalent of 4^10 zoom 10 jobs. + counts_at_zoom_sum = 0 + for z in counts_at_zoom.keys(): + count_at_this_zoom = counts_at_zoom[z] + zoom_10_equiv_count = count_at_this_zoom * (4 ** (10 - z)) + counts_at_zoom_sum += zoom_10_equiv_count + if counts_at_zoom_sum == 4**10: + print("Count of jobs by zoom is correct") + else: + # TODO should we fail/exit here? Probably + print("Count of jobs by zoom is off by %d" % (counts_at_zoom_sum - 4**10)) + + ordered_job_list = sorted(grouped_by_rawr_tile_size, key=lambda coord: all_sizes[coord], reverse=True) + return ordered_job_list + + +def viable_container_overrides(mem_mb): + """ + Turns a number into the next highest even multiple that AWS will accept, and the min number of CPUs you need for that amount + :param mem_mb: (int) the megabytes of memory you'd request in an ideal world + :return: the amount of mem you need to request for AWS batch to honor it, the amount of vcpus you must request + """ + mem_mb = int(mem_mb) + if mem_mb < 512: + return 512, 1 -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, mem_multiplier=1.0, mem_max=32 * 1024): + if mem_mb % 1024 == 0: + return mem_mb, 1 + + mem_gb_truncated = int(mem_mb / 1024) + next_gb = mem_gb_truncated + 1 + desired_mem_mb = next_gb * 1024 + + max_mem_per_vcpu = 8 * 1024 + vcpus = 1 + int((desired_mem_mb - 1)/max_mem_per_vcpu) + + return desired_mem_mb, vcpus + + +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier(), + mem_multiplier=1.0, mem_max=32 * 1024): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs - args = BatchEnqueueArgs(config_file, None, tile_list_file, None) os.environ['TILEQUEUE__BATCH__CHECK-METATILE-EXISTS'] = ( str(check_metatile_exists).lower()) - with open(args.config) as fh: + with open(config_file) as fh: cfg = make_config_from_argparse(fh) - update_memory_request(cfg, mem_multiplier, mem_max) - tilequeue_batch_enqueue(cfg, args) + with open(tile_list_file, 'r') as tile_list: + coord_lines = [line.strip() for line in tile_list.readlines()] + + reordered_lines = tile_specifier.reorder(coord_lines) + print("[%s] Starting to enqueue %d tile batches" % (time.ctime(), len(reordered_lines))) + for coord_line in reordered_lines: + # override memory requirements for this job with what the tile_specifier tells us + mem_mb = int(tile_specifier.get_mem_reqs_mb(coord_line)) + adjusted_mem = mem_mb * mem_multiplier -def update_memory_request(cfg, mem_multiplier, mem_max): - cfg.yml["batch"]["memory"] = int(min(cfg.yml["batch"]["memory"] * mem_multiplier, mem_max)) + # now that we know what we want, pick something AWS actually supports + viable_mem_request, required_min_cpus = viable_container_overrides(adjusted_mem) + print("REMOVEME: [%s] enqueueing %s at %s mem mb and %s cpus" % (time.ctime(), coord_line, viable_mem_request, required_min_cpus)) + update_container_overrides(cfg, viable_mem_request, mem_max, required_min_cpus) + + args = BatchEnqueueArgs(config_file, coord_line, None, None) + tilequeue_batch_enqueue(cfg, args) + print("[%s] Done enqueuing tile batches" % time.ctime()) + + +def update_container_overrides(cfg, mem_mb, mem_max, cpus): + cfg.yml["batch"]["memory"] = int(min(mem_mb, mem_max)) + cfg.yml["batch"]["vcpus"] = cpus # adaptor class for MissingTiles to see just the high zoom parts, this is used @@ -336,16 +492,17 @@ def missing_file(self, missing): # certain number of retries. class TileRenderer(object): - def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, allowed_missing_tiles=0): + def __init__(self, tile_finder, high_zoom_job_set, split_zoom, zoom_max, tile_specifier, allowed_missing_tiles=0): self.tile_finder = tile_finder - self.big_jobs = big_jobs + self.high_zoom_job_set = high_zoom_job_set self.split_zoom = split_zoom self.zoom_max = zoom_max self.allowed_missing_tiles = allowed_missing_tiles + self.tile_specifier = tile_specifier def _missing(self): return self.tile_finder.missing_tiles_split( - self.split_zoom, self.zoom_max, self.big_jobs) + self.split_zoom, self.zoom_max, self.high_zoom_job_set) def render(self, num_retries, lense): mem_max = 32 * 1024 # 32 GiB @@ -372,7 +529,7 @@ def render(self, num_retries, lense): (count, lense.description, ', '.join(sample))) enqueue_tiles(lense.config, missing_tile_file, - check_metatile_exists, mem_multiplier, mem_max) + check_metatile_exists, self.tile_specifier, mem_multiplier, mem_max) else: with self._missing() as missing: @@ -384,6 +541,12 @@ def render(self, num_retries, lense): % (count, lense.description, num_retries, ', '.join(sample))) +def to_coord_set(coords, max_zoom, min_zoom): + coord_set = CoordSet(max_zoom, min_zoom) + for coord in coords: + coord_set[coord] = True + return coord_set + if __name__ == '__main__': import argparse @@ -418,10 +581,9 @@ def render(self, num_retries, lense): "prefixed with the date or hash first.") parser.add_argument('--metatile-size', default=8, type=int, help='Metatile size (in 256px tiles).') - parser.add_argument('--size-threshold', default=350000000, type=int, - help='If all the RAWR tiles grouped together are ' - 'bigger than this, split the job up into individual ' - 'RAWR tiles.') + parser.add_argument('--size-threshold', default=80000000, type=int, + help='No RAWR tiles will be built together unless ' + 'the sum of their sizes is less than this number') parser.add_argument('--allowed-missing-tiles', default=2, type=int, help='The maximum number of missing metatiles allowed ' 'to continue the build process.') @@ -437,6 +599,7 @@ def render(self, num_retries, lense): # TODO: split zoom and zoom max should come from config. split_zoom = 10 zoom_max = 7 + default_mem_gb = 4 region = args.region or os.environ.get('AWS_DEFAULT_REGION') if region is None: @@ -452,11 +615,14 @@ def render(self, num_retries, lense): buckets.missing, buckets.meta, date_prefix, missing_bucket_date_prefix, region, args.key_format_type, args.config, metatile_max_zoom) - big_jobs = _big_jobs( + jobs_list = _distribute_jobs_by_raw_tile_size( buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, args.allowed_missing_tiles) + tile_specifier = TileSpecifier.from_coord_list(jobs_list, default_mem_gb) + + tile_renderer = TileRenderer(tile_finder, to_coord_set(jobs_list, split_zoom, zoom_max), split_zoom, zoom_max, + tile_specifier, args.allowed_missing_tiles) tile_renderer.render(args.retries, LowZoomLense(args.low_zoom_config))