Skip to content

Commit

Permalink
some decoupling and some progress on A*
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinPHR committed Mar 11, 2020
1 parent 81f560e commit be6da3a
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion space-time-astar/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Email: [email protected]
'''
from typing import Tuple, List, Dict
from collections import defaultdict
import heapq
import numpy as np
from scipy.spatial import KDTree

Expand Down Expand Up @@ -35,8 +37,56 @@ def __init__(self, grid_size: int,
self.grid = Grid(grid_size, np_static_obstacles)
# Make a lookup table for looking up neighbours of a grid
self.neighbour_table = NeighbourTable(self.grid.grid)
# Function to hash a position
self.hash = NeighbourTable.hash


'''
Used to calculate distance between two points
Also an admissible and consistent heuristic for A*
'''
@staticmethod
def h(start: np.ndarray, goal: np.ndarray) -> int:
return int(np.linalg.norm(start-goal, 1)) # L2 norm

'''
Check whether the nearest static obstacle is within radius
'''
def safe_static(self, grid_pos: np.ndarray) -> bool:
return self.h(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius

'''
Assume dynamic obstacles are agents with same radius, distance needs to be 2*radius
'''
def safe_dynamic(self, grid_pos: np.ndarray, time: int) -> bool:
return all(self.h(grid_pos, obstacle) > 2*self.robot_radius
for obstacle in self.dynamic_obstacles.setdefault(time, []))

'''
Reconstruct path from A* search result
'''
def reconstruct_path(self, came_from: Dict[int, np.ndarray], current: np.ndarray):
total_path = [current]
while self.hash(current) in came_from.keys():
current = came_from[self.hash(current)]
total_path.append(current)
return total_path[::-1]

'''
Space-Time A*
'''
def plan(self):
# Standard A* setup, no surprise here
'''
need to implement heap here
'''
open_set = []
came_from = dict()
g_score = defaultdict(lambda: float('inf'))
g_score[self.hash(self.start)] = 0
f_score = defaultdict(lambda: float('inf'))
f_score[self.hash(self.start)] = self.h(self.start, self.goal)

while open_set:


if __name__ == '__main__':
Expand Down

0 comments on commit be6da3a

Please sign in to comment.