From 6f1755c8257d70a25e95aed4843b15b49a053ef5 Mon Sep 17 00:00:00 2001 From: Gavin Peng Date: Wed, 11 Mar 2020 16:12:28 +0000 Subject: [PATCH] space-time A* skeleton almost finished --- LICENSE.txt | 0 setup.py | 30 ++++++++ space-time-astar/__init__.py | 1 + space-time-astar/grid.py | 4 +- space-time-astar/neighbour_table.py | 19 ++++- space-time-astar/planner.py | 107 ++++++++++++++++++---------- space-time-astar/state.py | 34 +++++++++ 7 files changed, 152 insertions(+), 43 deletions(-) create mode 100644 LICENSE.txt create mode 100644 space-time-astar/state.py diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index e69de29..d7066c5 100644 --- a/setup.py +++ b/setup.py @@ -0,0 +1,30 @@ + +from distutils.core import setup + +setup( + name = 'space-time-astar', # How you named your package folder (MyLib) + packages = ['space-time-astar'], # Chose the same as "name" + version = '0.1', # Start with a small number and increase it with every change you make + license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository + description = 'A* search algorithm with an added time dimension to deal with dynamic obstacles.', # Give a short description about your library + author = 'Haoran Peng', # Type in your name + author_email = 'gavinsweden@gmail.com', # Type in your E-Mail + url = 'https://github.com/GavinPHR/Space-Time-AStar', # Provide either the link to your github or to your website + download_url = 'https://github.com/user/reponame/archive/v_01.tar.gz', # I explain this later on + keywords = ['astar-algorithm', 'obstacle-avoidance', 'time-dimension'], # Keywords that define your package best + install_requires=[ # I get to this in a second + 'validators', + 'beautifulsoup4', + ], + classifiers=[ + 'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package + 'Intended Audience :: Developers', # Define that your audience are developers + 'Topic :: Software Development :: Build Tools', + 'License :: OSI Approved :: MIT License', # Again, pick a license + 'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + ], +) diff --git a/space-time-astar/__init__.py b/space-time-astar/__init__.py index e69de29..c7cee50 100644 --- a/space-time-astar/__init__.py +++ b/space-time-astar/__init__.py @@ -0,0 +1 @@ +from space-time-astar.planner import Planner \ No newline at end of file diff --git a/space-time-astar/grid.py b/space-time-astar/grid.py index 4331632..00bd921 100644 --- a/space-time-astar/grid.py +++ b/space-time-astar/grid.py @@ -25,7 +25,7 @@ def make_grid(grid_size: int, minx: int, maxx: int, miny: int, maxy: int) -> np. x_size = (maxx - minx) // grid_size y_size = (maxy - miny) // grid_size # Initialize the grid, assuming grid is 2D - grid = np.zeros([y_size, x_size, 2], dtype=np.uint16) + grid = np.zeros([y_size, x_size, 2], dtype=np.int32) # Fill the grid in y = miny - grid_size / 2 for i in range(y_size): @@ -33,7 +33,7 @@ def make_grid(grid_size: int, minx: int, maxx: int, miny: int, maxy: int) -> np. x = minx - grid_size / 2 for j in range(x_size): x += grid_size - grid[i][j] = np.array([y, x]) + grid[i][j] = np.array([x, y]) return grid ''' diff --git a/space-time-astar/neighbour_table.py b/space-time-astar/neighbour_table.py index 211195e..6b38db9 100644 --- a/space-time-astar/neighbour_table.py +++ b/space-time-astar/neighbour_table.py @@ -6,6 +6,10 @@ import numpy as np class NeighbourTable: + # Current Right Left Down Up + # directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)] + # Uncomment the line below for 9 directions, this might slow things down a lot + directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (1, -1), (-1, 1), (-1, -1)] def __init__(self, grid: np.ndarray): dimy, dimx = len(grid), len(grid[0]) @@ -13,10 +17,10 @@ def __init__(self, grid: np.ndarray): for i in range(dimy): for j in range(dimx): neighbours = [] - for dx, dy in (1, 0), (-1, 0), (0, 1), (0, -1): + for dx, dy in self.directions: y, x = i + dy, j + dx, if x >= 0 and x < dimx and y >= 0 and y < dimy: - neighbours.append([y, x]) + neighbours.append(grid[y][x]) table[self.hash(grid[i][j])] = np.array(neighbours) self.table = table @@ -26,4 +30,13 @@ def lookup(self, position: np.ndarray) -> np.ndarray: @staticmethod def hash(grid_pos: np.ndarray) -> int: concat = str(grid_pos[0]) + str(grid_pos[1]) - return int(concat) \ No newline at end of file + return concat + +if __name__ == '__main__': + grid = np.array([[[15,5],[15,6],[15,7],[15,8],[15,9]], + [[16,5],[16,6],[16,7],[16,8],[16,9]], + [[17,5],[17,6],[17,7],[17,8],[17,9]], + [[18,5],[18,6],[18,7],[18,8],[18,9]], + [[19,5],[19,6],[19,7],[19,8],[19,9]]]) + nt = NeighbourTable(grid) + print(nt.lookup(np.array([16,7]))) \ No newline at end of file diff --git a/space-time-astar/planner.py b/space-time-astar/planner.py index 1279256..611aabc 100644 --- a/space-time-astar/planner.py +++ b/space-time-astar/planner.py @@ -4,14 +4,13 @@ Email: gavinsweden@gmail.com ''' from typing import Tuple, List, Dict -from collections import defaultdict -import heapq +from heapq import heappush, heappop import numpy as np from scipy.spatial import KDTree from neighbour_table import NeighbourTable from grid import Grid -from copy import deepcopy +from state import State # static obstacles must include the boundary @@ -24,11 +23,11 @@ def __init__(self, grid_size: int, start: Tuple[int, int], goal: Tuple[int, int], static_obstacles: List[Tuple[int, int]], - dynamic_obstacles: Dict[int, List[Tuple[int, int]]]): + dynamic_obstacles: Dict[int, List[Tuple[int, int]]], + max_iter:int = 10000, + debug = False): self.grid_size = grid_size self.robot_radius = robot_radius - self.start = np.array(start) - self.goal = np.array(goal) np_static_obstacles = np.array(static_obstacles) self.static_obstacles = KDTree(np_static_obstacles) self.dynamic_obstacles = dict((k, np.array(v)) for k, v in dynamic_obstacles.items()) @@ -37,64 +36,96 @@ def __init__(self, grid_size: int, self.grid = Grid(grid_size, np_static_obstacles) # Make a lookup table for looking up neighbours of a grid self.neighbour_table = NeighbourTable(self.grid.grid) - # Function to hash a position - self.hash = NeighbourTable.hash + + self.start = self.grid.snap_to_grid(np.array(start)) + self.goal = self.grid.snap_to_grid(np.array(goal)) + self.max_iter = max_iter + self.debug = debug ''' - Used to calculate distance between two points - Also an admissible and consistent heuristic for A* + An admissible and consistent heuristic for A* ''' @staticmethod def h(start: np.ndarray, goal: np.ndarray) -> int: - return int(np.linalg.norm(start-goal, 1)) # L2 norm + return int(np.linalg.norm(start-goal, 1)) # L1 norm + + @staticmethod + def l2(start: np.ndarray, goal: np.ndarray) -> int: + return int(np.linalg.norm(start-goal, 2)) # L2 norm ''' Check whether the nearest static obstacle is within radius ''' def safe_static(self, grid_pos: np.ndarray) -> bool: - return self.h(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius + return self.l2(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius ''' Assume dynamic obstacles are agents with same radius, distance needs to be 2*radius ''' def safe_dynamic(self, grid_pos: np.ndarray, time: int) -> bool: - return all(self.h(grid_pos, obstacle) > 2*self.robot_radius + return all(self.l2(grid_pos, obstacle) > 2*self.robot_radius for obstacle in self.dynamic_obstacles.setdefault(time, [])) - ''' - Reconstruct path from A* search result - ''' - def reconstruct_path(self, came_from: Dict[int, np.ndarray], current: np.ndarray): - total_path = [current] - while self.hash(current) in came_from.keys(): - current = came_from[self.hash(current)] - total_path.append(current) - return total_path[::-1] - ''' Space-Time A* ''' - def plan(self): - # Standard A* setup, no surprise here - ''' - need to implement heap here - ''' - open_set = [] + def plan(self) -> np.ndarray: + # Initialize the start state + s = State(self.start, 0, 0, self.h(self.start, self.goal)) + + open_set = [s] + closed_set = set() + + # Keep track of parent nodes for reconstruction came_from = dict() - g_score = defaultdict(lambda: float('inf')) - g_score[self.hash(self.start)] = 0 - f_score = defaultdict(lambda: float('inf')) - f_score[self.hash(self.start)] = self.h(self.start, self.goal) - while open_set: + iter_ = 0 + while open_set and iter_ < self.max_iter: + iter_ += 1 + current_state = open_set[0] # Smallest element in min-heap + if current_state.pos_equal_to(self.goal): + if self.debug: + print('Path found after {0} iterations'.format(iter_)) + return self.reconstruct_path(came_from, current_state) + + closed_set.add(heappop(open_set)) + epoch = current_state.time + 1 + for neighbour in self.neighbour_table.lookup(current_state.pos): + neighbour_state = State(neighbour, epoch, current_state.g_score + 1, self.h(neighbour, self.goal)) + # Check if visited + if neighbour_state in closed_set: + continue + + # Avoid obstacles + if not self.safe_static(neighbour) or not self.safe_dynamic(neighbour, epoch): + continue + + # Add to open set + if neighbour_state not in open_set: + came_from[neighbour_state] = current_state + heappush(open_set, neighbour_state) + + if self.debug: + print('Open set is empty, no path found.') + return None + + ''' + Reconstruct path from A* search result + ''' + def reconstruct_path(self, came_from: Dict[State, State], current: State) -> np.ndarray: + total_path = [current.pos] + while current in came_from.keys(): + current = came_from[current] + total_path.append(current.pos) + return np.array(total_path[::-1]) if __name__ == '__main__': grid_size = 1 robot_radius = 2 - start = (3, 10) - goal = (15, 20) - static_obstacles = [(5, 15), (10, 20)] + start = (2, 2) + goal = (50, 50) + static_obstacles = [(0, 0), (60, 70)] dynamic_obstacles = {0: [(5, 16)], 1: [(5, 17)], 2: [(5, 18), (11, 20)]} planner = Planner(grid_size, robot_radius, start, goal, static_obstacles, dynamic_obstacles) - print(planner.neighbour_table.lookup(planner.grid.snap_to_grid([10,15]))) \ No newline at end of file + print(planner.plan()) diff --git a/space-time-astar/state.py b/space-time-astar/state.py new file mode 100644 index 0000000..902cd72 --- /dev/null +++ b/space-time-astar/state.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +''' +Author: Haoran Peng +Email: gavinsweden@gmail.com +''' +import numpy as np + +class State: + + def __init__(self, pos: np.ndarray, time: int, g_score: int, h_score: int): + self.pos = pos + self.time = time + self.g_score = g_score + self.f_score = g_score + h_score + + def __hash__(self) -> int: + concat = str(self.pos[0]) + str(self.pos[1]) + '0' + str(self.time) + return int(concat) + + def pos_equal_to(self, pos: np.ndarray) -> bool: + return np.array_equal(self.pos, pos) + + def __lt__(self, other: 'State') -> bool: + return self.f_score < other.f_score + + def __eq__(self, other: 'State') -> bool: + return self.__hash__() == other.__hash__() + + def __str__(self): + return 'State(pos=[' + str(self.pos[0]) + ', ' + str(self.pos[1]) + '], ' \ + + 'time=' + str(self.time) + ', fscore=' + str(self.f_score) + ')' + + def __repr__(self): + return self.__str__()