forked from tomaszdudek7/airflow_project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
build_images.py
105 lines (87 loc) · 3.67 KB
/
build_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from json import JSONDecodeError
from typing import List
import docker
import argparse
import logging
import sys
import os.path
import json
import shutil, errno
LIBRARIES_TO_COPY = ['papermill_runner', 'result_saver']
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--task", dest="taskname",
help="If you wish to build only specific task, specify its catalog name", required=False)
parser.add_argument("-l", "--loud", help="Log docker build process.", action='store_true', default=False)
class ImagesBuilder:
def __init__(self, parser):
self.args = parser.parse_args()
self.loud = self.args.loud
self.log = self.configure_and_get_logger()
self.cli = docker.from_env()
def build_images(self):
directories = self.get_directories_to_browse()
self.log.info(f"Browsing {directories}")
for directory in directories:
self.build_task(directory)
def build_task(self, directory_name):
self.log.info(f"Handling {directory_name}")
try:
self.copy_libraries(directory_name)
self.log.info("Building image. (run script with -l to see docker logs)")
image, build_logs = self.cli.images.build(path=f'./docker/{directory_name}', tag=directory_name, rm=True)
while True:
try:
output = self.parse_output(next(build_logs))
if self.loud:
self.log.info(output)
except StopIteration:
self.log.info("Image built.")
break
finally:
self.remove_libraries(directory_name)
def parse_output(self, raw) -> str:
try:
return raw['stream'].strip('\n')
except KeyError:
return raw
def copy_libraries(self, directory_name):
for library in LIBRARIES_TO_COPY:
src = f'./python/libraries/{library}'
dest = f'./docker/{directory_name}/{library}'
self.log.info(f"Copying {src} to {dest}")
self.copy_dirs(src, dest)
def remove_libraries(self, directory_name):
self.log.info("Cleaning up.")
for library in LIBRARIES_TO_COPY:
dest = f'./docker/{directory_name}/{library}'
self.log.info(f"Removing {dest}")
shutil.rmtree(dest)
def get_directories_to_browse(self) -> List[str]:
taskname = self.args.taskname
if taskname is not None:
self.log.info(f"Taskname specified as {taskname}. Will build only that docker image.")
path = f"{taskname}"
if not os.path.isdir(f'./docker/{taskname}'):
raise Exception(f'''Directory /docker/{taskname} does not exists.''')
return [path]
else:
self.log.info(f"No particular task name specified. Will build every image in /docker/.")
return [x for x in os.listdir('./docker') if not x.startswith('.') and os.path.isdir(f'./docker/{x}')]
def copy_dirs(self, src, dst):
try:
shutil.copytree(src, dst)
except OSError as exc:
if exc.errno == errno.ENOTDIR:
shutil.copy(src, dst)
else:
raise
def configure_and_get_logger(self) -> logging.Logger:
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
root.addHandler(handler)
return logging.getLogger("build_images")
ImagesBuilder(parser).build_images()