-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_grid.py
418 lines (367 loc) · 15.4 KB
/
generate_grid.py
1
#!/usr/bin/env pythonu"""Written by Enrico Ciraci'January 2024Compute a regular grid along the provided satellite track. 1. Merge the frames polygons into a single track polygon. 2. If multiple segments of the same track are present compute the extreme corners of a rectangle covering all the segments. 2. Compute the centroid of the track polygon. 3. Project the track polygon to 3857 Web Mercator Projection. 4. Rotate the track polygon to align it with the North-South direction. 5. Compute the trapezoid corners coordinates. 6. Compute the trapezoid diagonals equations. 7. Extend diagonals using a user define buffer. 8. Split the vertical and horizontal dimensions into a number of segments defined by az_res and n_c parameters. 9. Compute the grid cells corners coordinates. 10. Generate output shapefile.positional arguments: input_file Input file.options: -h, --help show this help message and exit --out_dir OUT_DIR, -O OUT_DIR Output directory. --buffer_dist BUFFER_DIST, -B BUFFER_DIST Buffer distance. --az_res AZ_RES, -R AZ_RES Cross track grid resolution (m). --n_c N_C, -C N_C Number of columns in the grid. --plot, -P Plot intermediate results.Python Dependenciesgeopandas: Open source project to make working with geospatial data in python easier: https://geopandas.orgpyproj: Python interface to PROJ (cartographic projections and coordinate transformations library): https://pyproj4.github.io/pyproj/stable/index.htmlshapely: Python package for manipulation and analy_sis of planar geometric objects: https://shapely.readthedocs.io/en/stable/matplotlib: Comprehensive library for creating static, animated, and interactive visualizations in Python: https://matplotlib.org"""# - Python Dependenciesfrom __future__ import print_functionimport osimport argparsefrom datetime import datetimeimport numpy as npimport geopandas as gpdfrom shapely.geometry import Polygon, Pointfrom shapely.affinity import rotateimport matplotlib.pyplot as pltfrom mita_csk_frame_grid_utils import (reproject_geodataframe, rotate_polygon_to_north_up)from PtsLine import PtsLine, PtsLineIntersectfrom reproject_geometry import reproject_geometryfrom rm_z_coord import rm_z_coorddef find_polygon_corners(geom: Polygon) -> dict: """ Find the corners of a squared polygon. Args: geom: shapefile geometry Polygon Returns: dictionary containing the coordinates of the southernmost, northernmost, easternmost, and westernmost corners. """ exterior_coords_list = geom.exterior.coords[:-1] # - Find the southernmost corner p_south = min(exterior_coords_list, key=lambda t: t[1]) # - Find the northernmost corner p_north = max(exterior_coords_list, key=lambda t: t[1]) # - Find the easternmost corner p_east = max(exterior_coords_list, key=lambda t: t[0]) # - Find the westernmost corner p_west = min(exterior_coords_list, key=lambda t: t[0]) return {'south': p_south, 'north': p_north, 'east': p_east, 'west': p_west}def generate_grid(gdf_t: gpd.GeoDataFrame, n_c: int, az_res: float, buffer_dist: float, plot: bool = False) -> gpd.GeoDataFrame: """ Compute a regular grid along the provided satellite track. If different polygons are present for the same track, a single grid covering all the polygons is generated. Args: gdf_t: geopandas GeoDataFrame containing the satellite track. n_c: number of columns of the output grid az_res: grid azimuth resolution (m) buffer_dist: grid buffer distance (m) plot: (Default value = False) Returns: gpd.GeoDataFrame: grid GeoDataFrame """ # - Compute Track Centroid - Need to reproject the track geometry # - to minimize distortion in the calculation. # - 1. Project to WGS 84 Web Mercator Projection EPSG:3857 # - 2. Compute Centroid # - get input data crs source_crs = gdf_t.crs.to_epsg() # - input data crs if gdf_t.shape[0] > 1: # - The input data contains multiple polygons s_corns = [] n_corns = [] e_corns = [] w_corns = [] for index, row in gdf_t.iterrows(): corners = find_polygon_corners(row['geometry']) s_corns.append(corners['south']) n_corns.append(corners['north']) e_corns.append(corners['east']) w_corns.append(corners['west']) # - Find the southernmost corner p_south = min(s_corns, key=lambda t: t[1]) # - Find the northernmost corner p_north = max(n_corns, key=lambda t: t[1]) # - Find the easternmost corner p_east = max(e_corns, key=lambda t: t[0]) # - Find the westernmost corner p_west = min(w_corns, key=lambda t: t[0]) # - Create a single polygon r_geom = Polygon([p_south, p_east, p_north, p_west, p_south]) # - assign the new geometry to the first entry of the GeoDataFrame gdf_t['geometry'].loc[0] = r_geom else: # - The input dataframe contains a single polygon r_geom = gdf_t['geometry'].loc[0] r_geom \ = reproject_geometry(r_geom, source_crs, 3857) # - Compute the centroid of the track polygon proj_centroid = Point(r_geom.centroid.x, r_geom.centroid.y) ll_centroid = reproject_geometry(proj_centroid, 3857, source_crs) # - Longitude of the centroid - convert to radians lat_cent = ll_centroid.y * np.pi / 180 # - Estimate an average distortion factor associated to the # - usage of Web Mercator Projection (EPSG:3857) # - Reference: https://en.wikipedia.org/wiki/Mercator_projection d_scale = np.cos(lat_cent) # - reproject to WGS 84 Web Mercator Projection EPSG:3857 gdf = reproject_geodataframe(gdf_t, 3857) # - If still present remove z coordinate d3_coord = gdf['geometry'].loc[0].exterior.coords[:-1] d2_coords = Polygon([(coord[0], coord[1]) for coord in d3_coord]) # - rotate geometries rotated_geometry, alpha \ = rotate_polygon_to_north_up(d2_coords) rotated_gdf = gdf.copy() rotated_gdf['geometry'] = rotated_geometry # - Extract Polygon centroid centroid = (rotated_geometry.centroid.x, rotated_geometry.centroid.y) # - Points to the left of the centroid left_points = [point for point in rotated_geometry.exterior.coords[:-1] if point[0] < centroid[0]] # - Points to the right of the centroid right_points = [point for point in rotated_geometry.exterior.coords[:-1] if point[0] > centroid[0]] # - Find trapezoid corners coordinates x_c, y_c = zip(*list(rotated_geometry.exterior.coords)) x_lpc, y_lpc = zip(*list(left_points)) x_rpc, y_rpc = zip(*list(right_points)) # - Corner 1 - Upper Left ind_ul = np.argmax(np.array(y_lpc)) pt_ul = (x_lpc[ind_ul], y_lpc[ind_ul]) # - Corner 2 - Upper Right ind_ur = np.argmax(np.array(y_rpc)) pt_ur = (x_rpc[ind_ur], y_rpc[ind_ur]) # - Corner 3 - Lower Right ind_lr = np.argmin(np.array(y_rpc)) pt_lr = (x_rpc[ind_lr], y_rpc[ind_lr]) # - Corner 4 - Lower Left ind_ll = np.argmin(np.array(y_lpc)) pt_ll = (x_lpc[ind_ll], y_lpc[ind_ll]) # - Compute trapezoid diagonals equations # - Diagonal 1 ln_1 = PtsLine(pt_ul[0], pt_ul[1], pt_lr[0], pt_lr[1]) # - Diagonal 2 ln_2 = PtsLine(pt_ur[0], pt_ur[1], pt_ll[0], pt_ll[1]) # - Extend diagonals using a user define buffer x_s = [] y_s = [] # - Corner 1 - Upper Left x_1 = pt_ul[0] - buffer_dist y_1 = ln_1.y_val(x_1) ul_ext = (x_1, y_1) x_s.append(x_1) y_s.append(y_1) # - Corner 2 x_2 = pt_ur[0] + buffer_dist y_2 = ln_2.y_val(x_2) ur_ext = (x_2, y_2) x_s.append(x_2) y_s.append(y_2) # - Corner 3 x_3 = pt_lr[0] + buffer_dist y_3 = ln_1.y_val(x_3) lr_ext = (x_3, y_3) x_s.append(x_3) y_s.append(y_3) # - Corner 4 x_4 = pt_ll[0] - buffer_dist y_4 = ln_2.y_val(x_4) ll_ext = (x_4, y_4) x_s.append(x_4) y_s.append(y_4) x_s.append(x_1) y_s.append(y_1) # - Trapezoid major axis equation ln_3 = PtsLine(ul_ext[0], ul_ext[1], ur_ext[0], ur_ext[1]) # - Trapezoid minor axis equation ln_4 = PtsLine(ll_ext[0], ll_ext[1], lr_ext[0], lr_ext[1]) # - Trapezoid left side equation ln5 = PtsLine(ul_ext[0], ul_ext[1], ll_ext[0], ll_ext[1]) # - Trapezoid right side equation ln6 = PtsLine(ur_ext[0], ur_ext[1], lr_ext[0], lr_ext[1]) # - Compute grid number of rows and # - Generate coordinates of reference points for the # - grid vertical lines x_north = np.linspace(pt_ul[0], pt_ur[0], n_c+1) x_south = np.linspace(pt_ll[0], pt_lr[0], n_c+1) # - Replace the first and last points with the corners coordinates # - with the corners of the buffered trapezoid x_north[0] = ul_ext[0] x_north[-1] = ur_ext[0] x_south[0] = ll_ext[0] x_south[-1] = lr_ext[0] # - Evaluate the y coordinates of the grid vertical lines y_north = ln_3.y_val(x_north) y_south = ln_4.y_val(x_south) # - Compute grid number of rows n_r = int(np.ceil(((max(y_north) - min(y_south)) * d_scale) / az_res)) # - Generate coordinates of reference points for the # - grid horizontal lines y_vert = np.linspace(min(lr_ext[1], ur_ext[1]), max(ll_ext[1], ul_ext[1]), n_r+1) # - Evaluate the x coordinates of the grid horizontal lines x_vert_l = [] x_vert_r = [] for y_p in y_vert: x_vert_l.append(ln5.x_val(y_p)) x_vert_r.append(ln6.x_val(y_p)) # - Generate list of vertical and horizontal lines horiz_lines = [PtsLine(x_vert_l[i], float(y_vert[i]), x_vert_r[i], float(y_vert[i])) for i in range(len(x_vert_l))] vert_lines = [PtsLine(float(x_north[i]), float(y_north[i]), float(x_south[i]), float(y_south[i])) for i in range(len(x_north))] # - Initialize Grid Corner Matrix corners_matrix = [] for h_r in horiz_lines: corners_horiz = [] for v_r in vert_lines: corners_horiz.append(PtsLineIntersect(h_r, v_r).intersection) corners_matrix.append(corners_horiz) # - Convert to numpy array corners_matrix = np.array(corners_matrix) # - Matrix shape n_rows, n_cols, _ = corners_matrix.shape # - Compute grid cells corners coordinates & generate output dataframe grid_corners = [] grid_geometry = [] grid_rows = [] grid_cols = [] for i in range(n_rows - 1): for j in range(n_cols - 1): grid_rows.append(i) grid_cols.append(j) grid_corners.append([corners_matrix[i, j], corners_matrix[i, j + 1], corners_matrix[i + 1, j + 1], corners_matrix[i + 1, j], corners_matrix[i, j]]) grid_geometry.append(rotate(Polygon(grid_corners[-1]), alpha, origin=centroid)) # - Create GeoDataFrame and convert to original CRS d = {'index': np.arange(len(grid_rows)), 'row': grid_rows, 'col': grid_cols, 'geometry': grid_geometry} grid_gdf = gpd.GeoDataFrame(d, crs='EPSG:3857').set_index('index') grid_gdf = reproject_geodataframe(grid_gdf, source_crs) if plot: # - Plot rotated geometry _, ax = plt.subplots(figsize=(5, 7)) ax.set_title('Rotated Geometry') ax.set_xlabel('Easting') ax.set_ylabel('Northing') ax.scatter(x_c, y_c, color='blue', zorder=0) ax.scatter(pt_ul[0], pt_ul[1], color='yellow') ax.scatter(pt_ur[0], pt_ur[1], color='yellow') ax.scatter(pt_lr[0], pt_lr[1], color='yellow') ax.scatter(pt_ll[0], pt_ll[1], color='yellow') ax.plot(*zip(*rotated_geometry.exterior.coords), color='red') ax.scatter(ul_ext[0], ul_ext[1], color='red') ax.scatter(ur_ext[0], ur_ext[1], color='red') ax.scatter(lr_ext[0], lr_ext[1], color='red') ax.scatter(ll_ext[0], ll_ext[1], color='red') ax.scatter(x_s, y_s, color='orange', marker='x') ax.plot([lr_ext[0], ul_ext[0]], [lr_ext[1], ul_ext[1]], color='cyan') ax.plot([ll_ext[0], ur_ext[0]], [ll_ext[1], ur_ext[1]], color='cyan') ax.scatter(x_north, y_north, color='green') ax.scatter(x_south, y_south, color='green') ax.plot(x_vert_l, y_vert, color='blue') ax.plot(x_vert_r, y_vert, color='blue') ax.plot(*zip(*grid_corners[6]), color='magenta') ax.plot(*zip(*grid_corners[-1]), color='magenta') ax.grid() plt.show() plt.close() return grid_gdfdef main() -> None: """ Generate a regular grid along the provided satellite track. """ parser = argparse.ArgumentParser( description="""Generate a regular grid along the provided satellite track.""" ) parser.add_argument('input_file', type=str, help='Input file.') # - Output directory - default is current working directory parser.add_argument('--out_dir', '-O', type=str, help='Output directory.', default=os.getcwd()) # - Buffer distance parser.add_argument('--buffer_dist', '-B', type=float, help='Buffer distance.', default=2e3) # - Number of Cells # - Along Track parser.add_argument('--az_res', '-R', type=float, help='Cross track grid resolution (m) [def. 5e3m].', default=5e3) # - Cross Track Resolution parser.add_argument('--n_c', '-C', type=float, help='Number of columns in the grid.', default=3) # - Plot Intermediate Results parser.add_argument('--plot', '-P', action='store_true', help='Plot intermediate results.') # - Parse arguments args = parser.parse_args() # - Number of Cells n_c = args.n_c # - Along Track - Number of Columns az_res = args.az_res # - Cross Track Resolution (km) - Azimuth Resolution # - set path to input shapefile input_shapefile = args.input_file output_f_name \ = os.path.basename(input_shapefile).replace('.shp', '_grid.shp') # - set path to output shapefile out_dir = args.out_dir os.makedirs(out_dir, exist_ok=True) # - import input data gdf = gpd.read_file(input_shapefile) # - get input data crs source_crs = gdf.crs.to_epsg() # - remove z coordinate gdf = rm_z_coord(gdf) # - Merge frames polygons into a single track polygon gdf_t \ = (gpd.GeoDataFrame(geometry=[gdf.unary_union], crs=source_crs) .explode(index_parts=False).reset_index(drop=True)) # - Compute a regular grid along the provided satellite track grid_gdf = generate_grid(gdf_t, n_c, az_res, args.buffer_dist, args.plot) # - Save grid to shapefile grid_gdf.to_file(os.path.join(out_dir, str(output_f_name))) print(f"# - Grid saved to: {os.path.join(out_dir, str(output_f_name))}")# - run main programif __name__ == '__main__': start_time = datetime.now() main() end_time = datetime.now() print(f"# - Computation Time: {end_time - start_time}")