Skip to content

Commit

Permalink
added generate_app_user function into scheduled task
Browse files Browse the repository at this point in the history
  • Loading branch information
nrjadkry committed Nov 9, 2023
1 parent b54e651 commit 4661a6b
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 162 deletions.
296 changes: 154 additions & 142 deletions src/backend/app/projects/project_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,6 @@ def generate_task_files_wrapper(project_id, task, xlsform, form_type, odk_creden


def generate_appuser_files(
db: Session,
project_id: int,
extract_polygon: bool,
upload: str,
Expand All @@ -1436,166 +1435,179 @@ def generate_appuser_files(
- form_type: weather the form is xls, xlsx or xml
- background_task_id: the task_id of the background task running this function.
"""
try:
project_log = log.bind(task="create_project", project_id=project_id)

project_log.info(f"Starting generate_appuser_files for project {project_id}")

# Get the project table contents.
project = table(
"projects",
column("project_name_prefix"),
column("xform_title"),
column("id"),
column("odk_central_url"),
column("odk_central_user"),
column("odk_central_password"),
column("outline"),
)

where = f"id={project_id}"
sql = select(
project.c.project_name_prefix,
project.c.xform_title,
project.c.id,
project.c.odk_central_url,
project.c.odk_central_user,
project.c.odk_central_password,
geoalchemy2.functions.ST_AsGeoJSON(project.c.outline).label("outline"),
).where(text(where))
result = db.execute(sql)

# There should only be one match
if result.rowcount != 1:
log.warning(str(sql))
if result.rowcount < 1:
raise HTTPException(status_code=400, detail="Project not found")
else:
raise HTTPException(status_code=400, detail="Multiple projects found")

one = result.first()
for db in database.get_db():
try:
project_log = log.bind(task="create_project", project_id=project_id)

if one:
# Get odk credentials from project.
odk_credentials = {
"odk_central_url": one.odk_central_url,
"odk_central_user": one.odk_central_user,
"odk_central_password": one.odk_central_password,
}
project_log.info(
f"Starting generate_appuser_files for project {project_id}"
)

odk_credentials = project_schemas.ODKCentral(**odk_credentials)
# Get the project table contents.
project = table(
"projects",
column("project_name_prefix"),
column("xform_title"),
column("id"),
column("odk_central_url"),
column("odk_central_user"),
column("odk_central_password"),
column("outline"),
)

xform_title = one.xform_title if one.xform_title else None
where = f"id={project_id}"
sql = select(
project.c.project_name_prefix,
project.c.xform_title,
project.c.id,
project.c.odk_central_url,
project.c.odk_central_user,
project.c.odk_central_password,
geoalchemy2.functions.ST_AsGeoJSON(project.c.outline).label("outline"),
).where(text(where))
result = db.execute(sql)

# There should only be one match
if result.rowcount != 1:
log.warning(str(sql))
if result.rowcount < 1:
raise HTTPException(status_code=400, detail="Project not found")
else:
raise HTTPException(
status_code=400, detail="Multiple projects found"
)

category = xform_title
if upload:
xlsform = f"/tmp/{category}.{form_type}"
contents = upload
with open(xlsform, "wb") as f:
f.write(contents)
else:
xlsform = f"{xlsforms_path}/{xform_title}.xls"
one = result.first()

# Data Extracts
if extracts_contents is not None:
project_log.info("Uploading data extracts")
upload_custom_data_extracts(db, project_id, extracts_contents)
if one:
# Get odk credentials from project.
odk_credentials = {
"odk_central_url": one.odk_central_url,
"odk_central_user": one.odk_central_user,
"odk_central_password": one.odk_central_password,
}

else:
project = (
db.query(db_models.DbProject)
.filter(db_models.DbProject.id == project_id)
.first()
)
config_file_contents = project.form_config_file
odk_credentials = project_schemas.ODKCentral(**odk_credentials)

project_log.info("Extracting Data from OSM")
xform_title = one.xform_title if one.xform_title else None

config_path = "/tmp/config.yaml"
if config_file_contents:
with open(config_path, "w", encoding="utf-8") as config_file_handle:
config_file_handle.write(config_file_contents.decode("utf-8"))
category = xform_title
if upload:
xlsform = f"/tmp/{category}.{form_type}"
contents = upload
with open(xlsform, "wb") as f:
f.write(contents)
else:
config_path = f"{data_models_path}/{category}.yaml"

# # OSM Extracts for whole project
pg = PostgresClient("underpass", config_path)
outline = json.loads(one.outline)
boundary = {"type": "Feature", "properties": {}, "geometry": outline}
data_extract = pg.execQuery(boundary)
filter = FilterData(xlsform)

updated_data_extract = {"type": "FeatureCollection", "features": []}
filtered_data_extract = (
filter.cleanData(data_extract)
if data_extract
else updated_data_extract
)

# Collect feature mappings for bulk insert
feature_mappings = []

for feature in filtered_data_extract["features"]:
# If the osm extracts contents do not have a title, provide an empty text for that.
feature["properties"]["title"] = ""
xlsform = f"{xlsforms_path}/{xform_title}.xls"

feature_shape = shape(feature["geometry"])
# Data Extracts
if extracts_contents is not None:
project_log.info("Uploading data extracts")
upload_custom_data_extracts(db, project_id, extracts_contents)

# If the centroid of the Polygon is not inside the outline, skip the feature.
if extract_polygon and (
not shape(outline).contains(shape(feature_shape.centroid))
):
continue
else:
project = (
db.query(db_models.DbProject)
.filter(db_models.DbProject.id == project_id)
.first()
)
config_file_contents = project.form_config_file

project_log.info("Extracting Data from OSM")

config_path = "/tmp/config.yaml"
if config_file_contents:
with open(
config_path, "w", encoding="utf-8"
) as config_file_handle:
config_file_handle.write(
config_file_contents.decode("utf-8")
)
else:
config_path = f"{data_models_path}/{category}.yaml"

wkb_element = from_shape(feature_shape, srid=4326)
feature_mapping = {
"project_id": project_id,
"category_title": category,
"geometry": wkb_element,
"properties": feature["properties"],
# # OSM Extracts for whole project
pg = PostgresClient("underpass", config_path)
outline = json.loads(one.outline)
boundary = {
"type": "Feature",
"properties": {},
"geometry": outline,
}
updated_data_extract["features"].append(feature)
feature_mappings.append(feature_mapping)
# Bulk insert the osm extracts into the db.
db.bulk_insert_mappings(db_models.DbFeatures, feature_mappings)
data_extract = pg.execQuery(boundary)
filter = FilterData(xlsform)

updated_data_extract = {"type": "FeatureCollection", "features": []}
filtered_data_extract = (
filter.cleanData(data_extract)
if data_extract
else updated_data_extract
)

# Generating QR Code, XForm and uploading OSM Extracts to the form.
# Creating app users and updating the role of that user.
tasks_list = tasks_crud.get_task_lists(db, project_id)
# Collect feature mappings for bulk insert
feature_mappings = []

# info = get_cpu_info()
# cores = info["count"]
# with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# futures = {executor.submit(generate_task_files_wrapper, project_id, task, xlsform, form_type, odk_credentials): task for task in tasks_list}
for feature in filtered_data_extract["features"]:
# If the osm extracts contents do not have a title, provide an empty text for that.
feature["properties"]["title"] = ""

# for future in concurrent.futures.as_completed(futures):
# log.debug(f"Waiting for thread to complete..")
feature_shape = shape(feature["geometry"])

for task in tasks_list:
try:
generate_task_files(
db,
project_id,
task,
xlsform,
form_type,
odk_credentials,
)
except Exception as e:
log.warning(str(e))
continue
# # Update background task status to COMPLETED
update_background_task_status_in_database(
db, background_task_id, 4
) # 4 is COMPLETED
# If the centroid of the Polygon is not inside the outline, skip the feature.
if extract_polygon and (
not shape(outline).contains(shape(feature_shape.centroid))
):
continue

except Exception as e:
log.warning(str(e))
wkb_element = from_shape(feature_shape, srid=4326)
feature_mapping = {
"project_id": project_id,
"category_title": category,
"geometry": wkb_element,
"properties": feature["properties"],
}
updated_data_extract["features"].append(feature)
feature_mappings.append(feature_mapping)
# Bulk insert the osm extracts into the db.
db.bulk_insert_mappings(db_models.DbFeatures, feature_mappings)

# Generating QR Code, XForm and uploading OSM Extracts to the form.
# Creating app users and updating the role of that user.
tasks_list = tasks_crud.get_task_lists(db, project_id)

# info = get_cpu_info()
# cores = info["count"]
# with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# futures = {executor.submit(generate_task_files_wrapper, project_id, task, xlsform, form_type, odk_credentials): task for task in tasks_list}

# for future in concurrent.futures.as_completed(futures):
# log.debug(f"Waiting for thread to complete..")

for task in tasks_list:
try:
generate_task_files(
db,
project_id,
task,
xlsform,
form_type,
odk_credentials,
)
except Exception as e:
log.warning(str(e))
continue
# # Update background task status to COMPLETED
update_background_task_status_in_database(
db, background_task_id, 4
) # 4 is COMPLETED

# Update background task status to FAILED
update_background_task_status_in_database(
db, background_task_id, 2, str(e)
) # 2 is FAILED
except Exception as e:
log.warning(str(e))

# Update background task status to FAILED
update_background_task_status_in_database(
db, background_task_id, 2, str(e)
) # 2 is FAILED


def create_qrcode(
Expand Down
23 changes: 3 additions & 20 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from pathlib import Path
from typing import List, Optional

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import (
APIRouter,
BackgroundTasks,
Expand All @@ -40,6 +39,8 @@
from osm_fieldwork.xlsforms import xlsforms_path
from sqlalchemy.orm import Session

from app.scheduler import scheduler

from ..central import central_crud
from ..db import database, db_models
from ..models.enums import TILES_FORMATS, TILES_SOURCE
Expand Down Expand Up @@ -633,15 +634,11 @@ async def generate_files(
db, task_id=background_task_id, project_id=project_id
)

sched = AsyncIOScheduler()
sched.start()

job = sched.add_job(
job = scheduler.add_job(
project_crud.generate_appuser_files,
"date",
run_date=datetime.now(),
args=[
db,
project_id,
extract_polygon,
contents,
Expand All @@ -653,23 +650,9 @@ async def generate_files(
id=str(background_task_id),
)

# log.debug(f"Submitting {background_task_id} to background tasks stack")
# background_tasks.add_task(
# project_crud.generate_appuser_files,
# db,
# project_id,
# extract_polygon,
# contents,
# extracts_contents if data_extracts else None,
# xform_title,
# file_ext[1:] if upload else "xls",
# background_task_id,
# )

return {
"Message": f"{project_id}",
"task_id": f"{background_task_id}",
# "job":job
}


Expand Down

0 comments on commit 4661a6b

Please sign in to comment.