Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 147 additions & 131 deletions graphs/a_star.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,154 @@
"""
A* (A-Star) Pathfinding Algorithm Implementation.

A* is an informed search algorithm (heuristic search) that finds the shortest
path between a start node and a goal node in a weighted graph. It balances
the actual distance from the start (g-score) and the estimated distance to
the goal (h-score) using the formula: f(n) = g(n) + h(n).

Time Complexity: O(E log V) where E is the number of edges and V is the
number of vertices.
Space Complexity: O(V) to store the graph structures and priority queue.
"""

from __future__ import annotations

DIRECTIONS = [
[-1, 0], # left
[0, -1], # down
[1, 0], # right
[0, 1], # up
]


# function to search the path
def search(
grid: list[list[int]],
init: list[int],
goal: list[int],
cost: int,
heuristic: list[list[int]],
) -> tuple[list[list[int]], list[list[int]]]:
import heapq
import math
from collections.abc import Callable

# ==========================================
# 1. Heuristic Functions
# ==========================================


def manhattan_distance(a: tuple[float, float], b: tuple[float, float]) -> float:
"""Calculate the Manhattan distance between two 2D points."""
return abs(a[0] - b[0]) + abs(a[1] - b[1])


def euclidean_distance(a: tuple[float, float], b: tuple[float, float]) -> float:
"""Calculate the Euclidean distance between two 2D points."""
return math.sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)


def chebyshev_distance(a: tuple[float, float], b: tuple[float, float]) -> float:
"""Calculate the Chebyshev distance between two 2D points."""
return max(abs(a[0] - b[0]), abs(a[1] - b[1]))


# ==========================================
# 2. Grid-Based Implementation
# ==========================================


def a_star_grid(
grid: list[list[float]],
start: tuple[int, int],
end: tuple[int, int],
heuristic_func: Callable[
[tuple[float, float], tuple[float, float]], float
] = manhattan_distance,
) -> list[tuple[int, int]] | None:
"""
Search for a path on a grid avoiding obstacles.
>>> grid = [[0, 1, 0, 0, 0, 0],
... [0, 1, 0, 0, 0, 0],
... [0, 1, 0, 0, 0, 0],
... [0, 1, 0, 0, 1, 0],
... [0, 0, 0, 0, 1, 0]]
>>> init = [0, 0]
>>> goal = [len(grid) - 1, len(grid[0]) - 1]
>>> cost = 1
>>> heuristic = [[0] * len(grid[0]) for _ in range(len(grid))]
>>> heuristic = [[0 for row in range(len(grid[0]))] for col in range(len(grid))]
>>> for i in range(len(grid)):
... for j in range(len(grid[0])):
... heuristic[i][j] = abs(i - goal[0]) + abs(j - goal[1])
... if grid[i][j] == 1:
... heuristic[i][j] = 99
>>> path, action = search(grid, init, goal, cost, heuristic)
>>> path # doctest: +NORMALIZE_WHITESPACE
[[0, 0], [1, 0], [2, 0], [3, 0], [4, 0], [4, 1], [4, 2], [4, 3], [3, 3],
[2, 3], [2, 4], [2, 5], [3, 5], [4, 5]]
>>> action # doctest: +NORMALIZE_WHITESPACE
[[0, 0, 0, 0, 0, 0], [2, 0, 0, 0, 0, 0], [2, 0, 0, 0, 3, 3],
[2, 0, 0, 0, 0, 2], [2, 3, 3, 3, 0, 2]]
Perform A* search on a 2D weighted grid using heapq.
Grid values represent the traversal cost. float('inf') represents an obstacle.

>>> grid = [[1.0, 1.0, 1.0], [1.0, float('inf'), 1.0], [1.0, 1.0, 1.0]]
>>> a_star_grid(grid, (0, 0), (2, 2), manhattan_distance)
[(0, 0), (0, 1), (0, 2), (1, 2), (2, 2)]
"""
closed = [
[0 for col in range(len(grid[0]))] for row in range(len(grid))
] # the reference grid
closed[init[0]][init[1]] = 1
action = [
[0 for col in range(len(grid[0]))] for row in range(len(grid))
] # the action grid

x = init[0]
y = init[1]
g = 0
f = g + heuristic[x][y] # cost from starting cell to destination cell
cell = [[f, g, x, y]]

found = False # flag that is set when search is complete
resign = False # flag set if we can't find expand

while not found and not resign:
if len(cell) == 0:
raise ValueError("Algorithm is unable to find solution")
else: # to choose the least costliest action so as to move closer to the goal
cell.sort()
cell.reverse()
next_cell = cell.pop()
x = next_cell[2]
y = next_cell[3]
g = next_cell[1]

if x == goal[0] and y == goal[1]:
found = True
else:
for i in range(len(DIRECTIONS)): # to try out different valid actions
x2 = x + DIRECTIONS[i][0]
y2 = y + DIRECTIONS[i][1]
if (
x2 >= 0
and x2 < len(grid)
and y2 >= 0
and y2 < len(grid[0])
and closed[x2][y2] == 0
and grid[x2][y2] == 0
):
g2 = g + cost
f2 = g2 + heuristic[x2][y2]
cell.append([f2, g2, x2, y2])
closed[x2][y2] = 1
action[x2][y2] = i
invpath = []
x = goal[0]
y = goal[1]
invpath.append([x, y]) # we get the reverse path from here
while x != init[0] or y != init[1]:
x2 = x - DIRECTIONS[action[x][y]][0]
y2 = y - DIRECTIONS[action[x][y]][1]
x = x2
y = y2
invpath.append([x, y])

path = []
for i in range(len(invpath)):
path.append(invpath[len(invpath) - 1 - i])
return path, action
rows, cols = len(grid), len(grid[0])
open_set: list[tuple[float, tuple[int, int]]] = []
heapq.heappush(open_set, (0.0, start))

came_from: dict[tuple[int, int], tuple[int, int]] = {}
g_score = {start: 0.0}

while open_set:
_, current = heapq.heappop(open_set)

if current == end:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
path.append(start)
return path[::-1]

# 4-directional movement
for move_r, move_c in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
neighbor = (current[0] + move_r, current[1] + move_c)

if 0 <= neighbor[0] < rows and 0 <= neighbor[1] < cols:
cost = grid[neighbor[0]][neighbor[1]]
if cost == float("inf"):
continue

tentative_g = g_score[current] + cost
if tentative_g < g_score.get(neighbor, float("inf")):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
f_score = tentative_g + heuristic_func(neighbor, end)
if neighbor not in [item[1] for item in open_set]:
heapq.heappush(open_set, (f_score, neighbor))
return None


# ==========================================
# 3. Adjacency List Implementation
# ==========================================


def a_star_adjacency_list(
graph: dict[str, list[tuple[str, float]]],
start: str,
end: str,
heuristic_dict: dict[str, float],
) -> list[str] | None:
"""
Perform A* search on a graph represented as an adjacency list.
heuristic_dict provides pre-calculated h-scores from each node to the goal.

>>> graph = {
... 'A': [('B', 1.0), ('C', 4.0)],
... 'B': [('A', 1.0), ('D', 5.0)],
... 'C': [('A', 4.0), ('D', 1.0)],
... 'D': [('B', 5.0), ('C', 1.0)]
... }
>>> h_dict = {'A': 3.0, 'B': 2.0, 'C': 1.0, 'D': 0.0}
>>> a_star_adjacency_list(graph, 'A', 'D', h_dict)
['A', 'C', 'D']
"""
open_set: list[tuple[float, str]] = []
heapq.heappush(open_set, (0.0, start))

came_from: dict[str, str] = {}
g_score = {start: 0.0}

while open_set:
_, current = heapq.heappop(open_set)

if current == end:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
path.append(start)
return path[::-1]

for neighbor, weight in graph.get(current, []):
tentative_g = g_score[current] + weight
if tentative_g < g_score.get(neighbor, float("inf")):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
f_score = tentative_g + heuristic_dict.get(neighbor, 0.0)
if neighbor not in [item[1] for item in open_set]:
heapq.heappush(open_set, (f_score, neighbor))
return None


if __name__ == "__main__":
grid = [
[0, 1, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0], # 0 are free path whereas 1's are obstacles
[0, 1, 0, 0, 0, 0],
[0, 1, 0, 0, 1, 0],
[0, 0, 0, 0, 1, 0],
]

init = [0, 0]
# all coordinates are given in format [y,x]
goal = [len(grid) - 1, len(grid[0]) - 1]
cost = 1

# the cost map which pushes the path closer to the goal
heuristic = [[0 for row in range(len(grid[0]))] for col in range(len(grid))]
for i in range(len(grid)):
for j in range(len(grid[0])):
heuristic[i][j] = abs(i - goal[0]) + abs(j - goal[1])
if grid[i][j] == 1:
# added extra penalty in the heuristic map
heuristic[i][j] = 99

path, action = search(grid, init, goal, cost, heuristic)

print("ACTION MAP")
for i in range(len(action)):
print(action[i])

for i in range(len(path)):
print(path[i])
import doctest

doctest.testmod()