Skip to content

Commit

Permalink
Fix parallelization select
Browse files Browse the repository at this point in the history
- Set a log until parallel develop is finished.
  • Loading branch information
mjanez committed Sep 13, 2023
1 parent cb7f6e7 commit 6ca448d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
6 changes: 3 additions & 3 deletions ogc2ckan/config/ckan_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def __init__(self):
self.default_license = os.environ.get('DEFAULT_LICENSE', OGC2CKAN_CKANINFO_CONFIG['default_license'])
self.default_license_id = os.environ.get('DEFAULT_LICENSE_ID', OGC2CKAN_CKANINFO_CONFIG['default_license_id'])
self.ckan_harvester = OGC2CKAN_HARVESTER_CONFIG
self.ssl_unverified_mode = os.environ.get('SSL_UNVERIFIED_MODE', OGC2CKAN_CKANINFO_CONFIG['ssl_unverified_mode'])
self.metadata_distributions = os.environ.get('METADATA_DISTRIBUTIONS', OGC2CKAN_CKANINFO_CONFIG['metadata_distributions'])
self.parallelization = os.environ.get('PARALLELIZATION', OGC2CKAN_CKANINFO_CONFIG['parallelization'])
self.ssl_unverified_mode = True if os.environ.get('SSL_UNVERIFIED_MODE') == 'True' else OGC2CKAN_CKANINFO_CONFIG['ssl_unverified_mode']
self.metadata_distributions = True if os.environ.get('METADATA_DISTRIBUTIONS') == 'True' else OGC2CKAN_CKANINFO_CONFIG['metadata_distributions']
self.parallelization = True if os.environ.get('PARALLELIZATION') == 'True' else OGC2CKAN_CKANINFO_CONFIG['parallelization']
self.dir3_soup = self.get_dir3_soup()
self.ckan_dataset_schema = os.environ.get('CKAN_DATASET_SCHEMA', OGC2CKAN_CKANINFO_CONFIG['ckan_dataset_schema'])

Expand Down
20 changes: 11 additions & 9 deletions ogc2ckan/ogc2ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,17 @@ def start_harvesting(config_file):
logging.info(f"{log_module}:CKAN_URL: {ckan_info.ckan_site_url}")

try:
if harvest_servers is not None:
if ckan_info.parallelization is True:
#TODO: Fix multicore parallel processing
parallel_count = Parallel(n_jobs=processes)(delayed(launch_harvest)(harvest_server=endpoint, ckan_info=ckan_info) for endpoint in harvest_servers)
new_records.append(sum(i[0] for i in parallel_count))
else:
for endpoint in harvest_servers:
harvester = launch_harvest(harvest_server=endpoint, ckan_info=ckan_info)
new_records.append(harvester.ckan_dataset_count)
if harvest_servers is not None and ckan_info.parallelization is True:
#TODO: Fix multicore parallel processing
logging.warning(f'{log_module}:Parallel processing is not implemented yet.')
'''
parallel_count = Parallel(n_jobs=processes)(delayed(launch_harvest)(harvest_server=endpoint, ckan_info=ckan_info) for endpoint in harvest_servers)
new_records.append(sum(i[0] for i in parallel_count))
'''
elif harvest_servers and ckan_info.parallelization is False:
for endpoint in harvest_servers:
harvester = launch_harvest(harvest_server=endpoint, ckan_info=ckan_info)
new_records.append(harvester.ckan_dataset_count)
except Exception as e:
logging.error(f"{log_module}:Check invalid 'type' and 'active: True' in 'harvest_servers/{{my-harvest-server}}'at {config_file} Error: {e}")
new_records = 0
Expand Down

0 comments on commit 6ca448d

Please sign in to comment.