Skip to content

Commit

Permalink
[generator][python] Fixed race condition bugs; Prepared a bit for lau…
Browse files Browse the repository at this point in the history
…nching into a cluster.
  • Loading branch information
maksimandrianov committed Nov 24, 2020
1 parent 1e2f350 commit 0f10276
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 117 deletions.
23 changes: 3 additions & 20 deletions tools/python/airmaps/dags/build_coastline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import get_latest_filename
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import put_current_date_in_filename
from airmaps.instruments.utils import rm_build
from maps_generator.generator import stages_declaration as sd
Expand Down Expand Up @@ -40,9 +39,7 @@
COASTLINE_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/coasts"


def publish_coastline(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
def publish_coastline(env):
for name in (f"{WORLD_COASTS_NAME}.geom", f"{WORLD_COASTS_NAME}.rawgeom"):
coastline = put_current_date_in_filename(name)
latest = get_latest_filename(name)
Expand All @@ -58,7 +55,6 @@ def publish_coastline(**kwargs):
def build_coastline(**kwargs):
env = Env()
kwargs["ti"].xcom_push(key="build_name", value=env.build_name)

run_generation(
env,
(
Expand All @@ -68,26 +64,13 @@ def build_coastline(**kwargs):
),
)
env.finish()
publish_coastline(env)
rm_build(env)


BUILD_COASTLINE_TASK = PythonOperator(
task_id="Build_coastline_task",
provide_context=True,
python_callable=build_coastline,
on_failure_callback=lambda ctx: rm_build(**ctx),
dag=DAG,
)


PUBLISH_COASTLINE_TASK = PythonOperator(
task_id="Publish_coastline_task",
provide_context=True,
python_callable=publish_coastline,
dag=DAG,
)


RM_BUILD_TASK = make_rm_build_task(DAG)


BUILD_COASTLINE_TASK >> PUBLISH_COASTLINE_TASK >> RM_BUILD_TASK
112 changes: 81 additions & 31 deletions tools/python/airmaps/dags/build_maps.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import logging
import os
from datetime import timedelta

import filelock
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import rm_build
from airmaps.instruments.utils import run_generation_from_first_stage
from maps_generator.generator import stages_declaration as sd
from maps_generator.generator.env import Env
from maps_generator.generator.env import PathProvider
from maps_generator.generator.env import get_all_countries_list
from maps_generator.generator.status import Status
from maps_generator.maps_generator import run_generation

logger = logging.getLogger("airmaps")


MAPS_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/maps"
MAPS_STORAGE_PATH = os.path.join(settings.STORAGE_PREFIX, "maps")


class MapsGenerationDAG(DAG):
Expand All @@ -39,23 +42,64 @@ def __init__(self, *args, **kwargs):
dag=self,
)

publish_maps_task = PythonOperator(
task_id="Publish_maps_task",
provide_context=True,
python_callable=MapsGenerationDAG.publish_maps,
dag=self,
)

rm_build_task = make_rm_build_task(self)

build_epilog_task >> publish_maps_task >> rm_build_task
for country in get_all_countries_list(PathProvider.borders_path()):
build_prolog_task >> self.make_mwm_operator(country) >> build_epilog_task

@staticmethod
def get_params(namespace="env", **kwargs):
return kwargs.get("params", {}).get(namespace, {})

@staticmethod
def publish_maps_build(env, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)
storage.wd_publish(
env.paths.build_path, os.path.join(storage_path, env.build_name)
)

@staticmethod
def fetch_maps_build(env, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)
storage.wd_fetch(
os.path.join(storage_path, env.build_name), env.paths.build_path
)

@staticmethod
def publish_map(env, country, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)

ignore_paths = {
os.path.normpath(env.paths.draft_path),
os.path.normpath(env.paths.status_path),
os.path.normpath(env.paths.generation_borders_path),
os.path.normpath(env.paths.intermediate_tmp_path),
os.path.normpath(env.paths.coastline_tmp_path),
}

def publish(path):
rel_path = path.replace(env.paths.build_path, "")[1:]
dest = os.path.join(storage_path, env.build_name, rel_path)
storage.wd_publish(path, dest)

def find_and_publish_files_for_country(path):
for root, dirs, files in os.walk(path):
if os.path.normpath(root) in ignore_paths:
continue

for dir in dirs:
if dir.startswith(country):
publish(os.path.join(root, dir))
else:
find_and_publish_files_for_country(os.path.join(root, dir))

for file in files:
if file.startswith(country):
publish(os.path.join(root, file))

find_and_publish_files_for_country(env.paths.build_path)

@staticmethod
def build_prolog(**kwargs):
params = MapsGenerationDAG.get_params(**kwargs)
Expand All @@ -71,23 +115,36 @@ def build_prolog(**kwargs):
sd.StageDownloadDescriptions(),
),
)
env.clean()
MapsGenerationDAG.publish_maps_build(env, **kwargs)
rm_build(env)

@staticmethod
def make_build_mwm_func(country):
def build_mwm(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name, "countries": [country,]})
env = Env(**params)
# We need to check existing of mwm.tmp. It is needed if we want to
# build mwms from part of planet.
tmp_mwm_name = env.get_tmp_mwm_names()
assert len(tmp_mwm_name) <= 1
if not tmp_mwm_name:
logger.warning(f"mwm.tmp does not exist for {country}.")
return

lock_file = os.path.join(PathProvider.tmp_dir(), "download.lock")
status_name = os.path.join(PathProvider.tmp_dir(), "download.download")
with filelock.FileLock(lock_file):
env = Env(**params)
# We need to check existing of mwm.tmp. It is needed if we want to
# build mwms from part of planet.
tmp_mwm_name = env.get_tmp_mwm_names()
assert len(tmp_mwm_name) <= 1
if not tmp_mwm_name:
logger.warning(f"mwm.tmp does not exist for {country}.")
return

s = Status(status_name)
if not s.is_finished():
MapsGenerationDAG.fetch_maps_build(env, **kwargs)
s.finish()

run_generation_from_first_stage(env, (sd.StageMwm(),), build_lock=False)
MapsGenerationDAG.publish_map(env, country, **kwargs)

return build_mwm

Expand All @@ -97,6 +154,7 @@ def build_epilog(**kwargs):
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name})
env = Env(**params)
MapsGenerationDAG.fetch_maps_build(env, **kwargs)
run_generation_from_first_stage(
env,
(
Expand All @@ -108,16 +166,8 @@ def build_epilog(**kwargs):
),
)
env.finish()

@staticmethod
def publish_maps(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name})
env = Env(**params)
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)["subdir"]
storage_path = f"{MAPS_STORAGE_PATH}/{subdir}"
storage.wd_publish(env.paths.mwm_path, f"{storage_path}/{env.mwm_version}/")
MapsGenerationDAG.publish_maps_build(env, subdir="mwm_dir", **kwargs)
rm_build(env)

def make_mwm_operator(self, country):
normalized_name = "__".join(country.lower().split())
Expand All @@ -129,7 +179,7 @@ def make_mwm_operator(self, country):
)


PARAMS = {"storage": {"subdir": "open_source"}}
PARAMS = {"storage": {"mwm_dir": "open_source", "temp_dir": "temp"}}
if settings.DEBUG:
PARAMS["env"] = {
# The planet file in debug mode does not contain Russia_Moscow territory.
Expand Down
28 changes: 6 additions & 22 deletions tools/python/airmaps/dags/update_planet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from datetime import timedelta

from airflow import DAG
Expand All @@ -7,7 +8,7 @@

from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import rm_build
from maps_generator.generator import stages_declaration as sd
from maps_generator.generator.env import Env
from maps_generator.maps_generator import run_generation
Expand All @@ -33,13 +34,14 @@
)


PLANET_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/planet_regular/planet-latest.o5m"
PLANET_STORAGE_PATH = os.path.join(
settings.STORAGE_PREFIX, "planet_regular", "planet-latest.o5m"
)


def update_planet(**kwargs):
env = Env()
kwargs["ti"].xcom_push(key="build_name", value=env.build_name)

if settings.DEBUG:
env.add_skipped_stage(sd.StageUpdatePlanet)

Expand All @@ -52,13 +54,9 @@ def update_planet(**kwargs):
),
)
env.finish()


def publish_planet(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
storage.wd_publish(env.paths.planet_o5m, PLANET_STORAGE_PATH)
storage.wd_publish(md5_ext(env.paths.planet_o5m), md5_ext(PLANET_STORAGE_PATH))
rm_build(env)


UPDATE_PLANET_TASK = PythonOperator(
Expand All @@ -67,17 +65,3 @@ def publish_planet(**kwargs):
python_callable=update_planet,
dag=DAG,
)


PUBLISH_PLANET_TASK = PythonOperator(
task_id="Publish_planet_task",
provide_context=True,
python_callable=publish_planet,
dag=DAG,
)


RM_BUILD_TASK = make_rm_build_task(DAG)


UPDATE_PLANET_TASK >> PUBLISH_PLANET_TASK >> RM_BUILD_TASK
26 changes: 21 additions & 5 deletions tools/python/airmaps/instruments/storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os

import webdav.client as wc
from webdav3.client import Client
from webdav3.urn import Urn

from airmaps.instruments import settings

Expand All @@ -15,13 +17,27 @@

def wd_fetch(src, dst):
logger.info(f"Fetch form {src} to {dst} with options {WD_OPTIONS}.")
client = wc.Client(WD_OPTIONS)
client = Client(WD_OPTIONS)
client.download_sync(src, dst)


def wd_publish(src, dst):
logger.info(f"Publish form {src} to {dst} with options {WD_OPTIONS}.")
client = wc.Client(WD_OPTIONS)
tmp = f"{dst[:-1]}__/" if dst[-1] == "/" else f"{dst}__"
if os.path.isdir(src):
dst += Urn.separate

dst = Urn(dst)
tmp = f"{dst.path()}__"
if dst.is_dir():
tmp = f"{dst.path()[:-1]}__{Urn.separate}"

parent = dst.parent()
path = Urn.separate
client = Client(WD_OPTIONS)
for dir in str(parent).split(Urn.separate):
if not client.check(path):
client.mkdir(path)
path += f"{dir}{Urn.separate}"

client.upload_sync(local_path=src, remote_path=tmp)
client.move(remote_path_from=tmp, remote_path_to=dst)
client.move(remote_path_from=tmp, remote_path_to=dst.path(), overwrite=True)
19 changes: 5 additions & 14 deletions tools/python/airmaps/instruments/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
import os
import shutil
from datetime import datetime
from typing import Iterable

from airflow.operators.python_operator import PythonOperator

from maps_generator.generator.env import Env
from maps_generator.generator.stages import Stage
from maps_generator.generator.stages import get_stage_name
from maps_generator.maps_generator import run_generation

logger = logging.getLogger("airmaps")


def put_current_date_in_filename(filename):
path, name = os.path.split(filename)
Expand All @@ -26,21 +27,11 @@ def get_latest_filename(filename, prefix=""):
return os.path.join(path, ".".join(parts))


def rm_build(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
def rm_build(env):
logger.info(f"Build {env.build_path} will be removed.")
shutil.rmtree(env.build_path)


def make_rm_build_task(dag):
return PythonOperator(
task_id="Rm_build_task",
provide_context=True,
python_callable=rm_build,
dag=dag,
)


def run_generation_from_first_stage(
env: Env, stages: Iterable[Stage], build_lock: bool = True
):
Expand Down
2 changes: 1 addition & 1 deletion tools/python/airmaps/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ omim-maps_generator
apache-airflow [postgres]==1.10.10
psycopg2-binary==2.8.4
cryptography==2.8
webdavclient==1.0.8
git+https://github.com/maksimandrianov/webdav-client-python-3.git@andrianov
Loading

0 comments on commit 0f10276

Please sign in to comment.