Skip to content

Commit

Permalink
space-time A* skeleton almost finished
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinPHR committed Mar 11, 2020
1 parent be6da3a commit 6f1755c
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 43 deletions.
Empty file added LICENSE.txt
Empty file.
30 changes: 30 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

from distutils.core import setup

setup(
name = 'space-time-astar', # How you named your package folder (MyLib)
packages = ['space-time-astar'], # Chose the same as "name"
version = '0.1', # Start with a small number and increase it with every change you make
license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository
description = 'A* search algorithm with an added time dimension to deal with dynamic obstacles.', # Give a short description about your library
author = 'Haoran Peng', # Type in your name
author_email = '[email protected]', # Type in your E-Mail
url = 'https://github.com/GavinPHR/Space-Time-AStar', # Provide either the link to your github or to your website
download_url = 'https://github.com/user/reponame/archive/v_01.tar.gz', # I explain this later on
keywords = ['astar-algorithm', 'obstacle-avoidance', 'time-dimension'], # Keywords that define your package best
install_requires=[ # I get to this in a second
'validators',
'beautifulsoup4',
],
classifiers=[
'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package
'Intended Audience :: Developers', # Define that your audience are developers
'Topic :: Software Development :: Build Tools',
'License :: OSI Approved :: MIT License', # Again, pick a license
'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
)
1 change: 1 addition & 0 deletions space-time-astar/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from space-time-astar.planner import Planner
4 changes: 2 additions & 2 deletions space-time-astar/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def make_grid(grid_size: int, minx: int, maxx: int, miny: int, maxy: int) -> np.
x_size = (maxx - minx) // grid_size
y_size = (maxy - miny) // grid_size
# Initialize the grid, assuming grid is 2D
grid = np.zeros([y_size, x_size, 2], dtype=np.uint16)
grid = np.zeros([y_size, x_size, 2], dtype=np.int32)
# Fill the grid in
y = miny - grid_size / 2
for i in range(y_size):
y += grid_size
x = minx - grid_size / 2
for j in range(x_size):
x += grid_size
grid[i][j] = np.array([y, x])
grid[i][j] = np.array([x, y])
return grid

'''
Expand Down
19 changes: 16 additions & 3 deletions space-time-astar/neighbour_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@
import numpy as np

class NeighbourTable:
# Current Right Left Down Up
# directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]
# Uncomment the line below for 9 directions, this might slow things down a lot
directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (1, -1), (-1, 1), (-1, -1)]

def __init__(self, grid: np.ndarray):
dimy, dimx = len(grid), len(grid[0])
table = dict()
for i in range(dimy):
for j in range(dimx):
neighbours = []
for dx, dy in (1, 0), (-1, 0), (0, 1), (0, -1):
for dx, dy in self.directions:
y, x = i + dy, j + dx,
if x >= 0 and x < dimx and y >= 0 and y < dimy:
neighbours.append([y, x])
neighbours.append(grid[y][x])
table[self.hash(grid[i][j])] = np.array(neighbours)
self.table = table

Expand All @@ -26,4 +30,13 @@ def lookup(self, position: np.ndarray) -> np.ndarray:
@staticmethod
def hash(grid_pos: np.ndarray) -> int:
concat = str(grid_pos[0]) + str(grid_pos[1])
return int(concat)
return concat

if __name__ == '__main__':
grid = np.array([[[15,5],[15,6],[15,7],[15,8],[15,9]],
[[16,5],[16,6],[16,7],[16,8],[16,9]],
[[17,5],[17,6],[17,7],[17,8],[17,9]],
[[18,5],[18,6],[18,7],[18,8],[18,9]],
[[19,5],[19,6],[19,7],[19,8],[19,9]]])
nt = NeighbourTable(grid)
print(nt.lookup(np.array([16,7])))
107 changes: 69 additions & 38 deletions space-time-astar/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
Email: [email protected]
'''
from typing import Tuple, List, Dict
from collections import defaultdict
import heapq
from heapq import heappush, heappop
import numpy as np
from scipy.spatial import KDTree

from neighbour_table import NeighbourTable
from grid import Grid
from copy import deepcopy
from state import State


# static obstacles must include the boundary
Expand All @@ -24,11 +23,11 @@ def __init__(self, grid_size: int,
start: Tuple[int, int],
goal: Tuple[int, int],
static_obstacles: List[Tuple[int, int]],
dynamic_obstacles: Dict[int, List[Tuple[int, int]]]):
dynamic_obstacles: Dict[int, List[Tuple[int, int]]],
max_iter:int = 10000,
debug = False):
self.grid_size = grid_size
self.robot_radius = robot_radius
self.start = np.array(start)
self.goal = np.array(goal)
np_static_obstacles = np.array(static_obstacles)
self.static_obstacles = KDTree(np_static_obstacles)
self.dynamic_obstacles = dict((k, np.array(v)) for k, v in dynamic_obstacles.items())
Expand All @@ -37,64 +36,96 @@ def __init__(self, grid_size: int,
self.grid = Grid(grid_size, np_static_obstacles)
# Make a lookup table for looking up neighbours of a grid
self.neighbour_table = NeighbourTable(self.grid.grid)
# Function to hash a position
self.hash = NeighbourTable.hash

self.start = self.grid.snap_to_grid(np.array(start))
self.goal = self.grid.snap_to_grid(np.array(goal))
self.max_iter = max_iter
self.debug = debug

'''
Used to calculate distance between two points
Also an admissible and consistent heuristic for A*
An admissible and consistent heuristic for A*
'''
@staticmethod
def h(start: np.ndarray, goal: np.ndarray) -> int:
return int(np.linalg.norm(start-goal, 1)) # L2 norm
return int(np.linalg.norm(start-goal, 1)) # L1 norm

@staticmethod
def l2(start: np.ndarray, goal: np.ndarray) -> int:
return int(np.linalg.norm(start-goal, 2)) # L2 norm

'''
Check whether the nearest static obstacle is within radius
'''
def safe_static(self, grid_pos: np.ndarray) -> bool:
return self.h(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius
return self.l2(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius

'''
Assume dynamic obstacles are agents with same radius, distance needs to be 2*radius
'''
def safe_dynamic(self, grid_pos: np.ndarray, time: int) -> bool:
return all(self.h(grid_pos, obstacle) > 2*self.robot_radius
return all(self.l2(grid_pos, obstacle) > 2*self.robot_radius
for obstacle in self.dynamic_obstacles.setdefault(time, []))

'''
Reconstruct path from A* search result
'''
def reconstruct_path(self, came_from: Dict[int, np.ndarray], current: np.ndarray):
total_path = [current]
while self.hash(current) in came_from.keys():
current = came_from[self.hash(current)]
total_path.append(current)
return total_path[::-1]

'''
Space-Time A*
'''
def plan(self):
# Standard A* setup, no surprise here
'''
need to implement heap here
'''
open_set = []
def plan(self) -> np.ndarray:
# Initialize the start state
s = State(self.start, 0, 0, self.h(self.start, self.goal))

open_set = [s]
closed_set = set()

# Keep track of parent nodes for reconstruction
came_from = dict()
g_score = defaultdict(lambda: float('inf'))
g_score[self.hash(self.start)] = 0
f_score = defaultdict(lambda: float('inf'))
f_score[self.hash(self.start)] = self.h(self.start, self.goal)

while open_set:
iter_ = 0
while open_set and iter_ < self.max_iter:
iter_ += 1
current_state = open_set[0] # Smallest element in min-heap
if current_state.pos_equal_to(self.goal):
if self.debug:
print('Path found after {0} iterations'.format(iter_))
return self.reconstruct_path(came_from, current_state)

closed_set.add(heappop(open_set))
epoch = current_state.time + 1
for neighbour in self.neighbour_table.lookup(current_state.pos):
neighbour_state = State(neighbour, epoch, current_state.g_score + 1, self.h(neighbour, self.goal))
# Check if visited
if neighbour_state in closed_set:
continue

# Avoid obstacles
if not self.safe_static(neighbour) or not self.safe_dynamic(neighbour, epoch):
continue

# Add to open set
if neighbour_state not in open_set:
came_from[neighbour_state] = current_state
heappush(open_set, neighbour_state)

if self.debug:
print('Open set is empty, no path found.')
return None

'''
Reconstruct path from A* search result
'''
def reconstruct_path(self, came_from: Dict[State, State], current: State) -> np.ndarray:
total_path = [current.pos]
while current in came_from.keys():
current = came_from[current]
total_path.append(current.pos)
return np.array(total_path[::-1])


if __name__ == '__main__':
grid_size = 1
robot_radius = 2
start = (3, 10)
goal = (15, 20)
static_obstacles = [(5, 15), (10, 20)]
start = (2, 2)
goal = (50, 50)
static_obstacles = [(0, 0), (60, 70)]
dynamic_obstacles = {0: [(5, 16)], 1: [(5, 17)], 2: [(5, 18), (11, 20)]}
planner = Planner(grid_size, robot_radius, start, goal, static_obstacles, dynamic_obstacles)
print(planner.neighbour_table.lookup(planner.grid.snap_to_grid([10,15])))
print(planner.plan())
34 changes: 34 additions & 0 deletions space-time-astar/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env python3
'''
Author: Haoran Peng
Email: [email protected]
'''
import numpy as np

class State:

def __init__(self, pos: np.ndarray, time: int, g_score: int, h_score: int):
self.pos = pos
self.time = time
self.g_score = g_score
self.f_score = g_score + h_score

def __hash__(self) -> int:
concat = str(self.pos[0]) + str(self.pos[1]) + '0' + str(self.time)
return int(concat)

def pos_equal_to(self, pos: np.ndarray) -> bool:
return np.array_equal(self.pos, pos)

def __lt__(self, other: 'State') -> bool:
return self.f_score < other.f_score

def __eq__(self, other: 'State') -> bool:
return self.__hash__() == other.__hash__()

def __str__(self):
return 'State(pos=[' + str(self.pos[0]) + ', ' + str(self.pos[1]) + '], ' \
+ 'time=' + str(self.time) + ', fscore=' + str(self.f_score) + ')'

def __repr__(self):
return self.__str__()

0 comments on commit 6f1755c

Please sign in to comment.