From 0f102763b80c93b3672697e687073a44882fe7fb Mon Sep 17 00:00:00 2001 From: Maksim Andrianov Date: Fri, 13 Nov 2020 04:42:52 +0300 Subject: [PATCH] [generator][python] Fixed race condition bugs; Prepared a bit for launching into a cluster. --- tools/python/airmaps/dags/build_coastline.py | 23 +--- tools/python/airmaps/dags/build_maps.py | 112 +++++++++++++----- tools/python/airmaps/dags/update_planet.py | 28 +---- tools/python/airmaps/instruments/storage.py | 26 +++- tools/python/airmaps/instruments/utils.py | 19 +-- tools/python/airmaps/requirements.txt | 2 +- tools/python/airmaps/requirements_dev.txt | 3 +- .../airmaps/sandbox/airmaps/airmaps.ini | 2 +- .../python/airmaps/sandbox/create_storage.sh | 1 + tools/python/maps_generator/generator/env.py | 17 ++- .../maps_generator/generator/generation.py | 11 +- .../maps_generator/generator/settings.py | 26 ++-- .../generator/stages_declaration.py | 3 +- 13 files changed, 156 insertions(+), 117 deletions(-) diff --git a/tools/python/airmaps/dags/build_coastline.py b/tools/python/airmaps/dags/build_coastline.py index d2daebfd0ed..8209bbf8970 100644 --- a/tools/python/airmaps/dags/build_coastline.py +++ b/tools/python/airmaps/dags/build_coastline.py @@ -10,7 +10,6 @@ from airmaps.instruments import settings from airmaps.instruments import storage from airmaps.instruments.utils import get_latest_filename -from airmaps.instruments.utils import make_rm_build_task from airmaps.instruments.utils import put_current_date_in_filename from airmaps.instruments.utils import rm_build from maps_generator.generator import stages_declaration as sd @@ -40,9 +39,7 @@ COASTLINE_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/coasts" -def publish_coastline(**kwargs): - build_name = kwargs["ti"].xcom_pull(key="build_name") - env = Env(build_name=build_name) +def publish_coastline(env): for name in (f"{WORLD_COASTS_NAME}.geom", f"{WORLD_COASTS_NAME}.rawgeom"): coastline = put_current_date_in_filename(name) latest = get_latest_filename(name) @@ -58,7 +55,6 @@ def publish_coastline(**kwargs): def build_coastline(**kwargs): env = Env() kwargs["ti"].xcom_push(key="build_name", value=env.build_name) - run_generation( env, ( @@ -68,26 +64,13 @@ def build_coastline(**kwargs): ), ) env.finish() + publish_coastline(env) + rm_build(env) BUILD_COASTLINE_TASK = PythonOperator( task_id="Build_coastline_task", provide_context=True, python_callable=build_coastline, - on_failure_callback=lambda ctx: rm_build(**ctx), - dag=DAG, -) - - -PUBLISH_COASTLINE_TASK = PythonOperator( - task_id="Publish_coastline_task", - provide_context=True, - python_callable=publish_coastline, dag=DAG, ) - - -RM_BUILD_TASK = make_rm_build_task(DAG) - - -BUILD_COASTLINE_TASK >> PUBLISH_COASTLINE_TASK >> RM_BUILD_TASK diff --git a/tools/python/airmaps/dags/build_maps.py b/tools/python/airmaps/dags/build_maps.py index 38a08167edf..dc4c4ee9ea1 100644 --- a/tools/python/airmaps/dags/build_maps.py +++ b/tools/python/airmaps/dags/build_maps.py @@ -1,24 +1,27 @@ import logging +import os from datetime import timedelta +import filelock from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airmaps.instruments import settings from airmaps.instruments import storage -from airmaps.instruments.utils import make_rm_build_task +from airmaps.instruments.utils import rm_build from airmaps.instruments.utils import run_generation_from_first_stage from maps_generator.generator import stages_declaration as sd from maps_generator.generator.env import Env from maps_generator.generator.env import PathProvider from maps_generator.generator.env import get_all_countries_list +from maps_generator.generator.status import Status from maps_generator.maps_generator import run_generation logger = logging.getLogger("airmaps") -MAPS_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/maps" +MAPS_STORAGE_PATH = os.path.join(settings.STORAGE_PREFIX, "maps") class MapsGenerationDAG(DAG): @@ -39,16 +42,6 @@ def __init__(self, *args, **kwargs): dag=self, ) - publish_maps_task = PythonOperator( - task_id="Publish_maps_task", - provide_context=True, - python_callable=MapsGenerationDAG.publish_maps, - dag=self, - ) - - rm_build_task = make_rm_build_task(self) - - build_epilog_task >> publish_maps_task >> rm_build_task for country in get_all_countries_list(PathProvider.borders_path()): build_prolog_task >> self.make_mwm_operator(country) >> build_epilog_task @@ -56,6 +49,57 @@ def __init__(self, *args, **kwargs): def get_params(namespace="env", **kwargs): return kwargs.get("params", {}).get(namespace, {}) + @staticmethod + def publish_maps_build(env, subdir="temp_dir", **kwargs): + subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir] + storage_path = os.path.join(MAPS_STORAGE_PATH, subdir) + storage.wd_publish( + env.paths.build_path, os.path.join(storage_path, env.build_name) + ) + + @staticmethod + def fetch_maps_build(env, subdir="temp_dir", **kwargs): + subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir] + storage_path = os.path.join(MAPS_STORAGE_PATH, subdir) + storage.wd_fetch( + os.path.join(storage_path, env.build_name), env.paths.build_path + ) + + @staticmethod + def publish_map(env, country, subdir="temp_dir", **kwargs): + subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir] + storage_path = os.path.join(MAPS_STORAGE_PATH, subdir) + + ignore_paths = { + os.path.normpath(env.paths.draft_path), + os.path.normpath(env.paths.status_path), + os.path.normpath(env.paths.generation_borders_path), + os.path.normpath(env.paths.intermediate_tmp_path), + os.path.normpath(env.paths.coastline_tmp_path), + } + + def publish(path): + rel_path = path.replace(env.paths.build_path, "")[1:] + dest = os.path.join(storage_path, env.build_name, rel_path) + storage.wd_publish(path, dest) + + def find_and_publish_files_for_country(path): + for root, dirs, files in os.walk(path): + if os.path.normpath(root) in ignore_paths: + continue + + for dir in dirs: + if dir.startswith(country): + publish(os.path.join(root, dir)) + else: + find_and_publish_files_for_country(os.path.join(root, dir)) + + for file in files: + if file.startswith(country): + publish(os.path.join(root, file)) + + find_and_publish_files_for_country(env.paths.build_path) + @staticmethod def build_prolog(**kwargs): params = MapsGenerationDAG.get_params(**kwargs) @@ -71,6 +115,9 @@ def build_prolog(**kwargs): sd.StageDownloadDescriptions(), ), ) + env.clean() + MapsGenerationDAG.publish_maps_build(env, **kwargs) + rm_build(env) @staticmethod def make_build_mwm_func(country): @@ -78,16 +125,26 @@ def build_mwm(**kwargs): build_name = kwargs["ti"].xcom_pull(key="build_name") params = MapsGenerationDAG.get_params(**kwargs) params.update({"build_name": build_name, "countries": [country,]}) - env = Env(**params) - # We need to check existing of mwm.tmp. It is needed if we want to - # build mwms from part of planet. - tmp_mwm_name = env.get_tmp_mwm_names() - assert len(tmp_mwm_name) <= 1 - if not tmp_mwm_name: - logger.warning(f"mwm.tmp does not exist for {country}.") - return + + lock_file = os.path.join(PathProvider.tmp_dir(), "download.lock") + status_name = os.path.join(PathProvider.tmp_dir(), "download.download") + with filelock.FileLock(lock_file): + env = Env(**params) + # We need to check existing of mwm.tmp. It is needed if we want to + # build mwms from part of planet. + tmp_mwm_name = env.get_tmp_mwm_names() + assert len(tmp_mwm_name) <= 1 + if not tmp_mwm_name: + logger.warning(f"mwm.tmp does not exist for {country}.") + return + + s = Status(status_name) + if not s.is_finished(): + MapsGenerationDAG.fetch_maps_build(env, **kwargs) + s.finish() run_generation_from_first_stage(env, (sd.StageMwm(),), build_lock=False) + MapsGenerationDAG.publish_map(env, country, **kwargs) return build_mwm @@ -97,6 +154,7 @@ def build_epilog(**kwargs): params = MapsGenerationDAG.get_params(**kwargs) params.update({"build_name": build_name}) env = Env(**params) + MapsGenerationDAG.fetch_maps_build(env, **kwargs) run_generation_from_first_stage( env, ( @@ -108,16 +166,8 @@ def build_epilog(**kwargs): ), ) env.finish() - - @staticmethod - def publish_maps(**kwargs): - build_name = kwargs["ti"].xcom_pull(key="build_name") - params = MapsGenerationDAG.get_params(**kwargs) - params.update({"build_name": build_name}) - env = Env(**params) - subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)["subdir"] - storage_path = f"{MAPS_STORAGE_PATH}/{subdir}" - storage.wd_publish(env.paths.mwm_path, f"{storage_path}/{env.mwm_version}/") + MapsGenerationDAG.publish_maps_build(env, subdir="mwm_dir", **kwargs) + rm_build(env) def make_mwm_operator(self, country): normalized_name = "__".join(country.lower().split()) @@ -129,7 +179,7 @@ def make_mwm_operator(self, country): ) -PARAMS = {"storage": {"subdir": "open_source"}} +PARAMS = {"storage": {"mwm_dir": "open_source", "temp_dir": "temp"}} if settings.DEBUG: PARAMS["env"] = { # The planet file in debug mode does not contain Russia_Moscow territory. diff --git a/tools/python/airmaps/dags/update_planet.py b/tools/python/airmaps/dags/update_planet.py index dcab15c9da0..428873bbda6 100644 --- a/tools/python/airmaps/dags/update_planet.py +++ b/tools/python/airmaps/dags/update_planet.py @@ -1,4 +1,5 @@ import logging +import os from datetime import timedelta from airflow import DAG @@ -7,7 +8,7 @@ from airmaps.instruments import settings from airmaps.instruments import storage -from airmaps.instruments.utils import make_rm_build_task +from airmaps.instruments.utils import rm_build from maps_generator.generator import stages_declaration as sd from maps_generator.generator.env import Env from maps_generator.maps_generator import run_generation @@ -33,13 +34,14 @@ ) -PLANET_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/planet_regular/planet-latest.o5m" +PLANET_STORAGE_PATH = os.path.join( + settings.STORAGE_PREFIX, "planet_regular", "planet-latest.o5m" +) def update_planet(**kwargs): env = Env() kwargs["ti"].xcom_push(key="build_name", value=env.build_name) - if settings.DEBUG: env.add_skipped_stage(sd.StageUpdatePlanet) @@ -52,13 +54,9 @@ def update_planet(**kwargs): ), ) env.finish() - - -def publish_planet(**kwargs): - build_name = kwargs["ti"].xcom_pull(key="build_name") - env = Env(build_name=build_name) storage.wd_publish(env.paths.planet_o5m, PLANET_STORAGE_PATH) storage.wd_publish(md5_ext(env.paths.planet_o5m), md5_ext(PLANET_STORAGE_PATH)) + rm_build(env) UPDATE_PLANET_TASK = PythonOperator( @@ -67,17 +65,3 @@ def publish_planet(**kwargs): python_callable=update_planet, dag=DAG, ) - - -PUBLISH_PLANET_TASK = PythonOperator( - task_id="Publish_planet_task", - provide_context=True, - python_callable=publish_planet, - dag=DAG, -) - - -RM_BUILD_TASK = make_rm_build_task(DAG) - - -UPDATE_PLANET_TASK >> PUBLISH_PLANET_TASK >> RM_BUILD_TASK diff --git a/tools/python/airmaps/instruments/storage.py b/tools/python/airmaps/instruments/storage.py index e62b3da68a4..c0bd7e9876c 100644 --- a/tools/python/airmaps/instruments/storage.py +++ b/tools/python/airmaps/instruments/storage.py @@ -1,6 +1,8 @@ import logging +import os -import webdav.client as wc +from webdav3.client import Client +from webdav3.urn import Urn from airmaps.instruments import settings @@ -15,13 +17,27 @@ def wd_fetch(src, dst): logger.info(f"Fetch form {src} to {dst} with options {WD_OPTIONS}.") - client = wc.Client(WD_OPTIONS) + client = Client(WD_OPTIONS) client.download_sync(src, dst) def wd_publish(src, dst): logger.info(f"Publish form {src} to {dst} with options {WD_OPTIONS}.") - client = wc.Client(WD_OPTIONS) - tmp = f"{dst[:-1]}__/" if dst[-1] == "/" else f"{dst}__" + if os.path.isdir(src): + dst += Urn.separate + + dst = Urn(dst) + tmp = f"{dst.path()}__" + if dst.is_dir(): + tmp = f"{dst.path()[:-1]}__{Urn.separate}" + + parent = dst.parent() + path = Urn.separate + client = Client(WD_OPTIONS) + for dir in str(parent).split(Urn.separate): + if not client.check(path): + client.mkdir(path) + path += f"{dir}{Urn.separate}" + client.upload_sync(local_path=src, remote_path=tmp) - client.move(remote_path_from=tmp, remote_path_to=dst) + client.move(remote_path_from=tmp, remote_path_to=dst.path(), overwrite=True) diff --git a/tools/python/airmaps/instruments/utils.py b/tools/python/airmaps/instruments/utils.py index 40eb42e8a90..6be82b5bcfc 100644 --- a/tools/python/airmaps/instruments/utils.py +++ b/tools/python/airmaps/instruments/utils.py @@ -1,15 +1,16 @@ +import logging import os import shutil from datetime import datetime from typing import Iterable -from airflow.operators.python_operator import PythonOperator - from maps_generator.generator.env import Env from maps_generator.generator.stages import Stage from maps_generator.generator.stages import get_stage_name from maps_generator.maps_generator import run_generation +logger = logging.getLogger("airmaps") + def put_current_date_in_filename(filename): path, name = os.path.split(filename) @@ -26,21 +27,11 @@ def get_latest_filename(filename, prefix=""): return os.path.join(path, ".".join(parts)) -def rm_build(**kwargs): - build_name = kwargs["ti"].xcom_pull(key="build_name") - env = Env(build_name=build_name) +def rm_build(env): + logger.info(f"Build {env.build_path} will be removed.") shutil.rmtree(env.build_path) -def make_rm_build_task(dag): - return PythonOperator( - task_id="Rm_build_task", - provide_context=True, - python_callable=rm_build, - dag=dag, - ) - - def run_generation_from_first_stage( env: Env, stages: Iterable[Stage], build_lock: bool = True ): diff --git a/tools/python/airmaps/requirements.txt b/tools/python/airmaps/requirements.txt index 20d0bc268b0..498fef39647 100644 --- a/tools/python/airmaps/requirements.txt +++ b/tools/python/airmaps/requirements.txt @@ -3,4 +3,4 @@ omim-maps_generator apache-airflow [postgres]==1.10.10 psycopg2-binary==2.8.4 cryptography==2.8 -webdavclient==1.0.8 \ No newline at end of file +git+https://github.com/maksimandrianov/webdav-client-python-3.git@andrianov \ No newline at end of file diff --git a/tools/python/airmaps/requirements_dev.txt b/tools/python/airmaps/requirements_dev.txt index 31bad3b6877..007b69c0185 100644 --- a/tools/python/airmaps/requirements_dev.txt +++ b/tools/python/airmaps/requirements_dev.txt @@ -2,4 +2,5 @@ apache-airflow [postgres]==1.10.10 psycopg2-binary==2.8.4 cryptography==2.8 -webdavclient==1.0.8 +git+https://github.com/maksimandrianov/webdav-client-python-3.git@andrianov + diff --git a/tools/python/airmaps/sandbox/airmaps/airmaps.ini b/tools/python/airmaps/sandbox/airmaps/airmaps.ini index a840687143d..45088bf4a67 100644 --- a/tools/python/airmaps/sandbox/airmaps/airmaps.ini +++ b/tools/python/airmaps/sandbox/airmaps/airmaps.ini @@ -14,7 +14,7 @@ OMIM_PATH: /omim [Storage] # Webdaw settings. -WD_HOST: webdav +WD_HOST: http://webdav WD_LOGIN: alice WD_PASSWORD: secret1234 diff --git a/tools/python/airmaps/sandbox/create_storage.sh b/tools/python/airmaps/sandbox/create_storage.sh index 9baa355bf66..573fd33552c 100755 --- a/tools/python/airmaps/sandbox/create_storage.sh +++ b/tools/python/airmaps/sandbox/create_storage.sh @@ -5,6 +5,7 @@ BUILD_PATH="$(dirname "$0")" echo "Creating storage.." mkdir -p "${BUILD_PATH}/storage/tests/coasts" mkdir -p "${BUILD_PATH}/storage/tests/maps/open_source" +mkdir -p "${BUILD_PATH}/storage/tests/maps/temp" mkdir -p "${BUILD_PATH}/storage/tests/planet_regular" chown -R www-data:www-data "${BUILD_PATH}/storage/tests" diff --git a/tools/python/maps_generator/generator/env.py b/tools/python/maps_generator/generator/env.py index 3f6aac6eb3f..e61b2fdb50f 100644 --- a/tools/python/maps_generator/generator/env.py +++ b/tools/python/maps_generator/generator/env.py @@ -118,7 +118,7 @@ class PathProvider: PathProvider is used for building paths for a maps generation. """ - def __init__(self, build_path: AnyStr, build_name:AnyStr, mwm_version: AnyStr): + def __init__(self, build_path: AnyStr, build_name: AnyStr, mwm_version: AnyStr): self.build_path = build_path self.build_name = build_name self.mwm_version = mwm_version @@ -555,5 +555,18 @@ def setup_borders(self): def setup_osm2ft(self): for x in os.listdir(self.paths.osm2ft_path): p = os.path.join(self.paths.osm2ft_path, x) - if os.path.isfile(p) and x.endswith(".mwm.osm2ft"): + if ( + os.path.isfile(p) + and x.endswith(".mwm.osm2ft") + and x.replace(".mwm.osm2ft", "") in self.countries + ): shutil.move(p, os.path.join(self.paths.mwm_path, x)) + + def clean(self): + for p in ( + self.paths.draft_path, + self.paths.status_path, + self.paths.generation_borders_path, + ): + logger.info(f"{p} will be removed.") + shutil.rmtree(p) diff --git a/tools/python/maps_generator/generator/generation.py b/tools/python/maps_generator/generator/generation.py index ff82023f392..91b82936aa7 100644 --- a/tools/python/maps_generator/generator/generation.py +++ b/tools/python/maps_generator/generator/generation.py @@ -1,3 +1,4 @@ +import logging import os from typing import AnyStr from typing import List @@ -15,6 +16,8 @@ from maps_generator.generator.status import Status from maps_generator.generator.status import without_stat_ext +logger = logging.getLogger("maps_generator") + class Generation: """ @@ -98,14 +101,6 @@ def reset_to_stage(self, stage_name: AnyStr): ): raise ContinueError(f"{stage_name} not in {', '.join(high_level_stages)}.") - if not os.path.exists(self.env.paths.status_path): - raise ContinueError(f"Status path {self.env.paths.status_path} not found.") - - if not os.path.exists(self.env.paths.main_status_path): - raise ContinueError( - f"Status file {self.env.paths.main_status_path} not found." - ) - countries_statuses_paths = [] countries = set(self.env.countries) for f in os.listdir(self.env.paths.status_path): diff --git a/tools/python/maps_generator/generator/settings.py b/tools/python/maps_generator/generator/settings.py index dcfc52b7012..491d9a21bcf 100644 --- a/tools/python/maps_generator/generator/settings.py +++ b/tools/python/maps_generator/generator/settings.py @@ -1,7 +1,7 @@ import argparse +import logging import multiprocessing import os -import site import sys from configparser import ConfigParser from configparser import ExtendedInterpolation @@ -12,6 +12,8 @@ from maps_generator.utils.md5 import md5_ext from maps_generator.utils.system import total_virtual_memory +logger = logging.getLogger("maps_generator") + ETC_DIR = os.path.join(os.path.dirname(__file__), "..", "var", "etc") parser = argparse.ArgumentParser(add_help=False) @@ -54,7 +56,8 @@ class CfgReader: def __init__(self, default_settings_path: AnyStr): self.config = ConfigParser(interpolation=ExtendedInterpolation()) - self.config.read([get_config_path(default_settings_path)]) + self.config_path = get_config_path(default_settings_path) + self.config.read([self.config_path, ]) def get_opt(self, s: AnyStr, v: AnyStr, default: Any = None): val = CfgReader._get_env_val(s, v) @@ -222,17 +225,20 @@ def init(default_settings_path: AnyStr): NODE_STORAGE = cfg.get_opt("Generator tool", "NODE_STORAGE", NODE_STORAGE) if not os.path.exists(USER_RESOURCE_PATH): - from data_files import find_data_files + try: + from data_files import find_data_files - USER_RESOURCE_PATH = find_data_files("omim-data") - assert USER_RESOURCE_PATH is not None + USER_RESOURCE_PATH = find_data_files("omim-data") + assert USER_RESOURCE_PATH is not None - import borders + import borders - # Issue: If maps_generator is installed in your system as a system - # package and borders.init() is called first time, call borders.init() - # might return False, because you need root permission. - assert borders.init() + # Issue: If maps_generator is installed in your system as a system + # package and borders.init() is called first time, call borders.init() + # might return False, because you need root permission. + assert borders.init() + except AssertionError as e: + logger.exception(f"Error while loading settings from {cfg.config_path}:") # Stages section: global NEED_PLANET_UPDATE diff --git a/tools/python/maps_generator/generator/stages_declaration.py b/tools/python/maps_generator/generator/stages_declaration.py index 570ef85f928..1cc4f22394c 100644 --- a/tools/python/maps_generator/generator/stages_declaration.py +++ b/tools/python/maps_generator/generator/stages_declaration.py @@ -412,8 +412,7 @@ def apply(self, env: Env): if os.path.isfile(p) and x.endswith(".mwm.osm2ft"): shutil.move(p, os.path.join(env.paths.osm2ft_path, x)) - logger.info(f"{env.paths.draft_path} will be removed.") - shutil.rmtree(env.paths.draft_path) + env.clean() stages.init()