diff --git a/AGENTS.md b/AGENTS.md index 42c7543..dea4bfb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -372,15 +372,54 @@ Each demo module must: ## PR and Change Guidelines -### Before Submitting Changes +### Before Submitting Changes - Required CI Checks + +**CRITICAL: Always run these checks before committing to ensure CI passes:** + +```bash +# 1. Install dependencies +python -m pip install -e ".[dev]" + +# 2. Run all pre-commit hooks (includes format, lint, and other checks) +pre-commit run --all-files --show-diff-on-failure --color=always + +# 3. Run full test suite +pytest -q + +# 4. Verify no linting errors remain +ruff check . +``` + +**Pre-commit Hook Checks (must all pass):** +- ✅ `ruff` - Linting (E, F, I, B rules) +- ✅ `ruff-format` - Code formatting +- ✅ `trailing-whitespace` - Remove trailing whitespace +- ✅ `end-of-file-fixer` - Ensure files end with newline +- ✅ `check-merge-conflicts` - Check for merge conflict markers + +**If pre-commit hooks fail:** +1. Review the error output carefully +2. Fix issues manually or use `ruff check --fix` for auto-fixes +3. Re-run `pre-commit run --all-files` until all checks pass +4. Commit the fixes + +**Common Issues:** +- Trailing whitespace: Run `pre-commit run trailing-whitespace --all-files` +- Import ordering: Run `ruff check --fix` to auto-sort imports +- Formatting: Run `ruff format .` to format all files + +### Standard Pre-Commit Workflow + 1. **Run tests:** `pytest -q` must pass 2. **Format code:** `ruff format .` 3. **Fix linting:** `ruff check --fix` -4. **Check coverage:** Maintain or improve test coverage -5. **Update docs:** If adding new features, update relevant documentation +4. **Run pre-commit:** `pre-commit run --all-files` +5. **Check coverage:** Maintain or improve test coverage +6. **Update docs:** If adding new features, update relevant documentation ### PR Acceptance Criteria -- All tests pass +- All tests pass (208/208 currently) +- All pre-commit hooks pass - Code is properly formatted (Ruff) - No new linting errors - Test coverage maintained or improved @@ -395,6 +434,8 @@ Each demo module must: - [ ] Code follows existing style conventions - [ ] Demo function exists and works - [ ] No breaking changes to existing APIs +- [ ] All pre-commit hooks pass +- [ ] CI checks will pass (verified locally) --- diff --git a/flask_app/app.py b/flask_app/app.py index 182938b..fa392df 100644 --- a/flask_app/app.py +++ b/flask_app/app.py @@ -158,6 +158,53 @@ "two_pointers": """Summary: Pair pointers moving toward/along an array. Typical Time: O(n), Space: O(1) Use: Two-sum in sorted array, dedup, merging, partitioning. +""", + # Robotics Navigation + "wall_following": """Summary: Reactive navigation using left/right-hand rule along walls. +Time: O(P) where P is perimeter of obstacles +Space: O(1) for basic version +Use: Maze navigation with unknown environment, bump sensors only. +Pitfall: May not find goal if obstacles disconnected; doesn't guarantee shortest path. +""", + "bug_algorithms": """Summary: Bug1/Bug2 goal-seeking with obstacle circumnavigation. +Bug1: Traverse full perimeter, leave at closest point. Time: O(n*P) +Bug2: Follow m-line, leave when closer than hit point. Often more efficient. +Both: Provably complete, reactive (local sensing), not optimal. +Use: Goal-seeking in unknown environments with guaranteed completeness. +""", + "pledge_algorithm": """Summary: Wall-following with cumulative rotation tracking. +Time: O(P) where P is obstacle perimeter +Space: O(1) rotation counter +Use: Escape closed loops; resume straight motion when rotation sum = 0. +Advantage: More sophisticated than basic wall-following. +""", + "potential_fields": """Summary: Artificial potential fields for reactive navigation. +Time: O(steps * obstacles) per iteration +Space: O(1) for force calculations +Use: Real-time reactive planning; attractive force to goal, repulsive from obstacles. +Pitfall: Can get stuck in local minima (e.g., U-shaped obstacles). +Parameters: attractive_gain, repulsive_gain, influence_distance need tuning. +""", + "rrt": """Summary: Rapidly-exploring Random Tree sampling-based planner. +Time: O(iterations * tree_size) for nearest neighbor search +Space: O(tree_size) for storing nodes +Use: High-dimensional configuration spaces, complex obstacles. +Probabilistically complete; not optimal (variants: RRT*, RRT-Connect). +Parameters: step_size, goal_sample_rate, max_iterations affect performance. +""", + "particle_filter": """Summary: Monte Carlo localization using particle set representation. +Time: O(N * M) where N=particles, M=landmarks +Space: O(N) for particle set +Steps: Predict (motion model), Update (measurement likelihood), Resample. +Use: Non-parametric belief, multimodal distributions, global localization. +Pitfall: Particle depletion; add noise during resampling. +""", + "occupancy_grid": """Summary: Probabilistic environment mapping with log-odds. +Time: O(rays * cells_per_ray) per update +Space: O(width * height) for grid +Use: SLAM, obstacle detection, autonomous navigation. +Log-odds prevents numerical issues; ray-based sensor model. +Cells along ray marked free, endpoint marked occupied. """, } @@ -344,7 +391,9 @@ def run_demo(module_name: str) -> str: try: mod = importlib.import_module(module_name) except ModuleNotFoundError as e: - raise ModuleNotFoundError(f"Could not import module {module_name!r}. Ensure it is a valid demo id.") from e + raise ModuleNotFoundError( + f"Could not import module {module_name!r}. Ensure it is a valid demo id." + ) from e demo_fn = getattr(mod, "demo", None) if not callable(demo_fn): @@ -474,9 +523,7 @@ def viz_sorting(): try: from flask_app.visualizations import sorting_viz as s_viz # type: ignore - algorithms = [ - {"key": k, "name": v["name"]} for k, v in s_viz.ALGORITHMS.items() - ] + algorithms = [{"key": k, "name": v["name"]} for k, v in s_viz.ALGORITHMS.items()] except Exception: algorithms = [ {"key": "quick", "name": "Quick Sort"}, @@ -573,9 +620,7 @@ def viz_path(): try: from flask_app.visualizations import path_viz as p_viz # type: ignore - algorithms = [ - {"key": k, "name": v["name"]} for k, v in p_viz.ALGORITHMS.items() - ] + algorithms = [{"key": k, "name": v["name"]} for k, v in p_viz.ALGORITHMS.items()] except Exception: algorithms = [ {"key": "astar", "name": "A* (Manhattan)"}, @@ -622,6 +667,59 @@ def api_viz_path(): return jsonify({"error": error}), 500 +@app.get("/viz/robotics") +def viz_robotics(): + # Render robotics navigation visualization page + algo = request.args.get("algo", "wall_left") + try: + from flask_app.visualizations import robotics_viz as r_viz # type: ignore + + algorithms = [{"key": k, "name": v["name"]} for k, v in r_viz.ALGORITHMS.items()] + except Exception: + algorithms = [ + {"key": "wall_left", "name": "Wall Following (Left-Hand)"}, + {"key": "wall_right", "name": "Wall Following (Right-Hand)"}, + {"key": "bug1", "name": "Bug1 Algorithm"}, + {"key": "pledge", "name": "Pledge Algorithm"}, + {"key": "potential", "name": "Potential Fields"}, + {"key": "rrt", "name": "RRT"}, + ] + return render_template( + "viz_robotics.html", + algo=algo, + algorithms=algorithms, + notes=get_notes( + { + "wall_left": "wall_following", + "wall_right": "wall_following", + "bug1": "bug_algorithms", + "pledge": "pledge_algorithm", + "potential": "potential_fields", + "rrt": "rrt", + }.get(algo, algo) + ), + ) + + +@app.get("/viz/robotics/data") +def api_viz_robotics(): + algo = request.args.get("algo", "wall_left") + rows = int(request.args.get("rows", 15)) + cols = int(request.args.get("cols", 20)) + density = float(request.args.get("density", 0.2)) + seed = request.args.get("seed", None) + seed = int(seed) if seed not in (None, "", "null") else None + + try: + from flask_app.visualizations import robotics_viz as r_viz # type: ignore + + result = r_viz.visualize(algo, rows=rows, cols=cols, density=density, seed=seed) + return jsonify(result) + except Exception as e: + error = "".join(traceback.format_exception(type(e), e, e.__traceback__)) + return jsonify({"error": error}), 500 + + @app.get("/viz/arrays") def viz_arrays(): # Render array techniques visualization page (Binary Search / Two-Pointers / Sliding Window) @@ -629,9 +727,7 @@ def viz_arrays(): try: from flask_app.visualizations import array_viz as a_viz # type: ignore - algorithms = [ - {"key": k, "name": v["name"]} for k, v in a_viz.ALGORITHMS.items() - ] + algorithms = [{"key": k, "name": v["name"]} for k, v in a_viz.ALGORITHMS.items()] except Exception: algorithms = [ {"key": "binary_search", "name": "Binary Search"}, @@ -690,9 +786,7 @@ def viz_mst(): try: from flask_app.visualizations import mst_viz as m_viz # type: ignore - algorithms = [ - {"key": k, "name": v["name"]} for k, v in m_viz.ALGORITHMS.items() - ] + algorithms = [{"key": k, "name": v["name"]} for k, v in m_viz.ALGORITHMS.items()] except Exception: algorithms = [ {"key": "kruskal", "name": "Minimum Spanning Tree (Kruskal)"}, @@ -738,9 +832,7 @@ def viz_topo(): try: from flask_app.visualizations import topo_viz as t_viz # type: ignore - algorithms = [ - {"key": k, "name": v["name"]} for k, v in t_viz.ALGORITHMS.items() - ] + algorithms = [{"key": k, "name": v["name"]} for k, v in t_viz.ALGORITHMS.items()] except Exception: algorithms = [ {"key": "kahn", "name": "Topological Sort (Kahn's Algorithm)"}, diff --git a/flask_app/docs_server.py b/flask_app/docs_server.py index d99a041..2f464e3 100644 --- a/flask_app/docs_server.py +++ b/flask_app/docs_server.py @@ -1,4 +1,5 @@ import os + from flask import Flask, send_from_directory app = Flask(__name__) @@ -6,6 +7,7 @@ # Path to built MkDocs site DOCS_BUILD_DIR = os.path.join(os.path.dirname(__file__), "..", "site") + @app.route("/docs/") @app.route("/docs/") def serve_docs(filename="index.html"): @@ -14,5 +16,6 @@ def serve_docs(filename="index.html"): """ return send_from_directory(DOCS_BUILD_DIR, filename) + if __name__ == "__main__": app.run(host="127.0.0.1", port=5003, debug=True) diff --git a/flask_app/templates/viz_robotics.html b/flask_app/templates/viz_robotics.html new file mode 100644 index 0000000..67a902f --- /dev/null +++ b/flask_app/templates/viz_robotics.html @@ -0,0 +1,315 @@ +{% extends "base.html" %} +{% block content %} +
+
+
+

Robotics Navigation Visualization

+
+ interactive + Robot navigation algorithms +
+
+
+ + {% if notes %} +
+

Algorithm Notes

+
{{ notes }}
+

Big-O Guide

+
+ {% endif %} + +
+
+ + + + + + + + + + + + +
+ + + + + +
+
+
+ +
+
Ready
+
+ Free space + Obstacle + Start + Goal + Robot + Path + Visited +
+
+
+
+ + + + +{% endblock %} diff --git a/flask_app/visualizations/array_viz.py b/flask_app/visualizations/array_viz.py index e76ee8d..cf13bd7 100644 --- a/flask_app/visualizations/array_viz.py +++ b/flask_app/visualizations/array_viz.py @@ -34,9 +34,7 @@ def binary_search_frames( arr: list[int], target: int, max_steps: int = 20000 ) -> list[dict[str, Any]]: a = arr[:] - frames: list[dict[str, Any]] = [ - _snap(a, "init", lo=0, hi=len(a) - 1, mid=None, found=False) - ] + frames: list[dict[str, Any]] = [_snap(a, "init", lo=0, hi=len(a) - 1, mid=None, found=False)] lo, hi = 0, len(a) - 1 steps = 0 while lo <= hi and steps < max_steps: @@ -71,14 +69,10 @@ def two_pointers_sum_frames( return frames if s < target: left += 1 - frames.append( - _snap(a, "move-left", l=left, r=right, sum=None, target=target) - ) + frames.append(_snap(a, "move-left", l=left, r=right, sum=None, target=target)) else: right -= 1 - frames.append( - _snap(a, "move-right", l=left, r=right, sum=None, target=target) - ) + frames.append(_snap(a, "move-right", l=left, r=right, sum=None, target=target)) steps += 1 frames.append(_snap(a, "not-found", l=left, r=right, sum=None, target=target)) return frames @@ -93,9 +87,7 @@ def sliding_window_min_len_geq_frames( """ a = arr[:] frames: list[dict[str, Any]] = [ - _snap( - a, "init", win_l=0, win_r=-1, best_l=None, best_r=None, s=0, target=target - ) + _snap(a, "init", win_l=0, win_r=-1, best_l=None, best_r=None, s=0, target=target) ] n = len(a) s = 0 diff --git a/flask_app/visualizations/graph_viz.py b/flask_app/visualizations/graph_viz.py index 0f77120..390e904 100644 --- a/flask_app/visualizations/graph_viz.py +++ b/flask_app/visualizations/graph_viz.py @@ -53,9 +53,7 @@ def union(a: int, b: int) -> None: edges.add(e) -def generate_graph( - n: int = 12, p: float = 0.25, seed: int | None = None -) -> dict[str, Any]: +def generate_graph(n: int = 12, p: float = 0.25, seed: int | None = None) -> dict[str, Any]: """ Generate an undirected simple graph with n nodes. - Start with no edges, add each possible edge with probability p @@ -85,9 +83,7 @@ def _frame(state: dict[str, Any]) -> dict[str, Any]: } -def bfs_frames( - g: dict[str, Any], start: int = 0, max_steps: int = 20000 -) -> list[dict[str, Any]]: +def bfs_frames(g: dict[str, Any], start: int = 0, max_steps: int = 20000) -> list[dict[str, Any]]: n = g["n"] adj: list[list[int]] = [[] for _ in range(n)] for u, v in g["edges"]: @@ -152,9 +148,7 @@ def bfs_frames( return frames -def dfs_frames( - g: dict[str, Any], start: int = 0, max_steps: int = 20000 -) -> list[dict[str, Any]]: +def dfs_frames(g: dict[str, Any], start: int = 0, max_steps: int = 20000) -> list[dict[str, Any]]: n = g["n"] adj: list[list[int]] = [[] for _ in range(n)] for u, v in g["edges"]: diff --git a/flask_app/visualizations/mst_viz.py b/flask_app/visualizations/mst_viz.py index eab901b..36269e3 100644 --- a/flask_app/visualizations/mst_viz.py +++ b/flask_app/visualizations/mst_viz.py @@ -9,9 +9,7 @@ Edge = tuple[int, int, float] -def _circle_layout( - n: int, jitter: float = 0.0, rng: random.Random | None = None -) -> list[Coord]: +def _circle_layout(n: int, jitter: float = 0.0, rng: random.Random | None = None) -> list[Coord]: pts: list[Coord] = [] rng = rng or random.Random() for i in range(n): @@ -131,9 +129,7 @@ def union(a: int, b: int) -> bool: return frames -def prim_frames( - g: dict[str, Any], start: int = 0, max_steps: int = 50000 -) -> list[dict[str, Any]]: +def prim_frames(g: dict[str, Any], start: int = 0, max_steps: int = 50000) -> list[dict[str, Any]]: n: int = g["n"] edges: list[Edge] = g["edges"] # Build adjacency diff --git a/flask_app/visualizations/nn_viz.py b/flask_app/visualizations/nn_viz.py index e2e0d58..b3f128d 100644 --- a/flask_app/visualizations/nn_viz.py +++ b/flask_app/visualizations/nn_viz.py @@ -101,10 +101,7 @@ def __init__(self, hidden: int, lr: float, seed: int | None = None) -> None: def forward(self, x: Point) -> tuple[list[float], float]: # x: (2,) # z1 = W1 x + b1 - z1 = [ - self.W1[i][0] * x[0] + self.W1[i][1] * x[1] + self.b1[i] - for i in range(self.h) - ] + z1 = [self.W1[i][0] * x[0] + self.W1[i][1] * x[1] + self.b1[i] for i in range(self.h)] a1 = [_tanh(z) for z in z1] # z2 = W2 a1 + b2 z2 = sum(self.W2[i] * a1[i] for i in range(self.h)) + self.b2 @@ -137,10 +134,7 @@ def backward_update(self, x: Point, y: int) -> float: # loss for monitoring (BCE) eps = 1e-9 - loss = -( - float(y) * math.log(yhat + eps) - + (1.0 - float(y)) * math.log(1.0 - yhat + eps) - ) + loss = -(float(y) * math.log(yhat + eps) + (1.0 - float(y)) * math.log(1.0 - yhat + eps)) return loss def predict_proba(self, x: Point) -> float: diff --git a/flask_app/visualizations/path_viz.py b/flask_app/visualizations/path_viz.py index 94855a2..23fe466 100644 --- a/flask_app/visualizations/path_viz.py +++ b/flask_app/visualizations/path_viz.py @@ -92,9 +92,7 @@ def a_star_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str came_from: dict[Coord, Coord] = {} g_score: dict[Coord, int] = {start: 0} - frames: list[dict[str, Any]] = [ - _frame(None, list(open_set), list(closed_set), [], "init") - ] + frames: list[dict[str, Any]] = [_frame(None, list(open_set), list(closed_set), [], "init")] while open_heap and len(frames) < max_steps: _, _, current = heapq.heappop(open_heap) @@ -105,9 +103,7 @@ def a_star_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str if current == goal: path = _reconstruct_path(came_from, current) - frames.append( - _frame(current, list(open_set), list(closed_set), path, "done") - ) + frames.append(_frame(current, list(open_set), list(closed_set), path, "done")) return frames closed_set.add(current) @@ -125,18 +121,14 @@ def a_star_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str heapq.heappush(open_heap, (f, tie, nbr)) open_set.add(nbr) p = _reconstruct_path(came_from, current) - frames.append( - _frame(nbr, list(open_set), list(closed_set), p, "push/update") - ) + frames.append(_frame(nbr, list(open_set), list(closed_set), p, "push/update")) # No path found frames.append(_frame(None, list(open_set), list(closed_set), [], "no-path")) return frames -def dijkstra_frames( - grid: dict[str, Any], max_steps: int = 50000 -) -> list[dict[str, Any]]: +def dijkstra_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str, Any]]: rows, cols = grid["rows"], grid["cols"] walls = set(map(tuple, grid["walls"])) start: Coord = tuple(grid["start"]) # type: ignore @@ -151,9 +143,7 @@ def dijkstra_frames( came_from: dict[Coord, Coord] = {} dist: dict[Coord, int] = {start: 0} - frames: list[dict[str, Any]] = [ - _frame(None, list(open_set), list(closed_set), [], "init") - ] + frames: list[dict[str, Any]] = [_frame(None, list(open_set), list(closed_set), [], "init")] while open_heap and len(frames) < max_steps: _, _, current = heapq.heappop(open_heap) @@ -164,9 +154,7 @@ def dijkstra_frames( if current == goal: path = _reconstruct_path(came_from, current) - frames.append( - _frame(current, list(open_set), list(closed_set), path, "done") - ) + frames.append(_frame(current, list(open_set), list(closed_set), path, "done")) return frames closed_set.add(current) @@ -183,9 +171,7 @@ def dijkstra_frames( heapq.heappush(open_heap, (nd, tie, nbr)) open_set.add(nbr) p = _reconstruct_path(came_from, current) - frames.append( - _frame(nbr, list(open_set), list(closed_set), p, "push/update") - ) + frames.append(_frame(nbr, list(open_set), list(closed_set), p, "push/update")) frames.append(_frame(None, list(open_set), list(closed_set), [], "no-path")) return frames @@ -246,9 +232,7 @@ def gbfs_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str, closed_set: set[Coord] = set() came_from: dict[Coord, Coord] = {} - frames: list[dict[str, Any]] = [ - _frame(None, list(open_set), list(closed_set), [], "init") - ] + frames: list[dict[str, Any]] = [_frame(None, list(open_set), list(closed_set), [], "init")] while open_heap and len(frames) < max_steps: _, _, current = heapq.heappop(open_heap) @@ -259,9 +243,7 @@ def gbfs_frames(grid: dict[str, Any], max_steps: int = 50000) -> list[dict[str, if current == goal: path = _reconstruct_path(came_from, current) - frames.append( - _frame(current, list(open_set), list(closed_set), path, "done") - ) + frames.append(_frame(current, list(open_set), list(closed_set), path, "done")) return frames closed_set.add(current) diff --git a/flask_app/visualizations/robotics_viz.py b/flask_app/visualizations/robotics_viz.py new file mode 100644 index 0000000..85cf100 --- /dev/null +++ b/flask_app/visualizations/robotics_viz.py @@ -0,0 +1,295 @@ +""" +Robotics navigation algorithm visualization. + +Generates animation frames for various robotics navigation algorithms. +""" + +from __future__ import annotations + +import random +from typing import Any + +from interview_workbook.robotics import ( + bug_algorithms, + pledge_algorithm, + potential_fields, + rrt, + utils, +) + +Coord = tuple[int, int] + + +def generate_grid( + rows: int = 15, + cols: int = 20, + density: float = 0.2, + seed: int | None = None, +) -> dict[str, Any]: + """ + Generate a grid with random obstacles. + - rows x cols grid + - Each cell becomes an obstacle with probability 'density' + - Start at (1,1), Goal at (cols-2, rows-2) + """ + if seed is not None: + random.seed(seed) + + grid = [[0 for _ in range(cols)] for _ in range(rows)] + + # Add random obstacles + for r in range(rows): + for c in range(cols): + if random.random() < density: + grid[r][c] = 1 + + # Ensure start and goal are free + start = (1, 1) + goal = (cols - 2, rows - 2) + grid[start[1]][start[0]] = 0 + grid[goal[1]][goal[0]] = 0 + + # Clear immediate neighbors of start + for dx in [-1, 0, 1]: + for dy in [-1, 0, 1]: + x, y = start[0] + dx, start[1] + dy + if 0 <= x < cols and 0 <= y < rows: + grid[y][x] = 0 + + return { + "rows": rows, + "cols": cols, + "grid": grid, + "start": start, + "goal": goal, + } + + +def _frame( + robot_pos: Coord | None, + robot_dir: str | None, + path: list[Coord], + visited: set[Coord], + op: str, +) -> dict[str, Any]: + """Create a visualization frame.""" + return { + "robot": list(robot_pos) if robot_pos else None, + "direction": robot_dir, + "path": [list(p) for p in path], + "visited": [list(v) for v in sorted(visited)], + "op": op, + } + + +def wall_following_frames( + grid_data: dict[str, Any], use_left_hand: bool = True, max_steps: int = 500 +) -> list[dict[str, Any]]: + """Generate frames for wall-following algorithm.""" + grid_world = utils.GridWorld(grid_data["grid"]) + start = tuple(grid_data["start"]) + goal = tuple(grid_data["goal"]) + + frames: list[dict[str, Any]] = [] + + # Track execution step by step + robot = utils.RobotState(start[0], start[1], utils.Direction.NORTH) + path = [start] + visited: set[Coord] = {start} + + frames.append(_frame(start, "NORTH", path, visited, "init")) + + for _ in range(max_steps): + if (robot.x, robot.y) == goal: + frames.append(_frame((robot.x, robot.y), robot.direction.name, path, visited, "goal")) + break + + if use_left_hand: + # Left-hand rule + robot.turn_left() + next_pos = robot.move_forward() + + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_right() + next_pos = robot.move_forward() + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_right() + next_pos = robot.move_forward() + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_right() + else: + # Right-hand rule + robot.turn_right() + next_pos = robot.move_forward() + + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_left() + next_pos = robot.move_forward() + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_left() + next_pos = robot.move_forward() + if grid_world.is_free(*next_pos): + robot.apply_move() + else: + robot.turn_left() + + current_pos = (robot.x, robot.y) + if current_pos not in visited: + visited.add(current_pos) + path.append(current_pos) + + frames.append(_frame(current_pos, robot.direction.name, path, visited, "move")) + + return frames + + +def bug1_frames(grid_data: dict[str, Any], max_steps: int = 500) -> list[dict[str, Any]]: + """Generate frames for Bug1 algorithm.""" + grid_world = utils.GridWorld(grid_data["grid"]) + start = tuple(grid_data["start"]) + goal = tuple(grid_data["goal"]) + + frames: list[dict[str, Any]] = [] + path = [start] + visited: set[Coord] = {start} + + frames.append(_frame(start, None, path, visited, "init")) + + # Run Bug1 algorithm - simplified visualization + full_path = bug_algorithms.bug1_navigate(grid_world, start, goal, max_steps=max_steps) + + if full_path: + for i, pos in enumerate(full_path[1:], 1): + visited.add(pos) + frames.append(_frame(pos, None, path[:i], visited, "move")) + frames.append(_frame(goal, None, full_path, visited, "goal")) + + return frames + + +def rrt_frames(grid_data: dict[str, Any], max_iter: int = 300) -> list[dict[str, Any]]: + """Generate frames for RRT algorithm.""" + grid = grid_data["grid"] + start = tuple(grid_data["start"]) + goal = tuple(grid_data["goal"]) + + frames: list[dict[str, Any]] = [] + frames.append(_frame(start, None, [start], {start}, "init")) + + # Run RRT + path = rrt.rrt_plan( + grid, start, goal, max_iterations=max_iter, step_size=1.5, goal_sample_rate=0.15 + ) + + if path: + visited = set(path) + for i, pos in enumerate(path): + frames.append(_frame(pos, None, path[: i + 1], visited, "explore")) + frames.append(_frame(goal, None, path, visited, "goal")) + else: + frames.append(_frame(None, None, [start], {start}, "no-path")) + + return frames + + +def potential_field_frames(grid_data: dict[str, Any], max_steps: int = 200) -> list[dict[str, Any]]: + """Generate frames for potential field algorithm.""" + grid = grid_data["grid"] + start = tuple(grid_data["start"]) + goal = tuple(grid_data["goal"]) + + frames: list[dict[str, Any]] = [] + frames.append(_frame(start, None, [start], {start}, "init")) + + # Run potential field + path = potential_fields.potential_field_navigate( + grid, + start, + goal, + attractive_gain=2.0, + repulsive_gain=80.0, + influence_distance=3.0, + max_steps=max_steps, + ) + + if path: + visited = set(path) + for i, pos in enumerate(path): + frames.append(_frame(pos, None, path[: i + 1], visited, "move")) + frames.append(_frame(goal, None, path, visited, "goal")) + else: + frames.append(_frame(None, None, [start], {start}, "stuck")) + + return frames + + +def pledge_frames(grid_data: dict[str, Any], max_steps: int = 500) -> list[dict[str, Any]]: + """Generate frames for Pledge algorithm.""" + grid_world = utils.GridWorld(grid_data["grid"]) + start = tuple(grid_data["start"]) + goal = tuple(grid_data["goal"]) + + frames: list[dict[str, Any]] = [] + frames.append(_frame(start, None, [start], {start}, "init")) + + # Run Pledge algorithm + path = pledge_algorithm.pledge_navigate(grid_world, start, goal, max_steps=max_steps) + + if path: + visited = set(path) + for i, pos in enumerate(path): + frames.append(_frame(pos, None, path[: i + 1], visited, "move")) + frames.append(_frame(goal, None, path, visited, "goal")) + else: + frames.append(_frame(None, None, [start], {start}, "no-path")) + + return frames + + +ALGORITHMS = { + "wall_left": { + "name": "Wall Following (Left-Hand)", + "frames": lambda g, **kw: wall_following_frames(g, use_left_hand=True, **kw), + }, + "wall_right": { + "name": "Wall Following (Right-Hand)", + "frames": lambda g, **kw: wall_following_frames(g, use_left_hand=False, **kw), + }, + "bug1": {"name": "Bug1 Algorithm", "frames": bug1_frames}, + "pledge": {"name": "Pledge Algorithm", "frames": pledge_frames}, + "potential": {"name": "Potential Fields", "frames": potential_field_frames}, + "rrt": {"name": "RRT (Rapidly-exploring Random Tree)", "frames": rrt_frames}, +} + + +def visualize( + algo_key: str, + rows: int = 15, + cols: int = 20, + density: float = 0.2, + seed: int | None = None, +) -> dict[str, Any]: + """Generate visualization data for a robotics algorithm.""" + algo = ALGORITHMS.get(algo_key) + if not algo: + raise ValueError(f"Unknown algorithm '{algo_key}'") + + grid_data = generate_grid(rows=rows, cols=cols, density=density, seed=seed) + frames = algo["frames"](grid_data) + + return { + "algorithm": algo_key, + "name": algo["name"], + "grid": grid_data, + "frames": frames, + } diff --git a/flask_app/visualizations/sorting_viz.py b/flask_app/visualizations/sorting_viz.py index 56c37e3..5b6833f 100644 --- a/flask_app/visualizations/sorting_viz.py +++ b/flask_app/visualizations/sorting_viz.py @@ -11,9 +11,7 @@ def _snap( return {"arr": arr[:], "a": a, "b": b, "op": op} -def generate_array( - n: int = 30, seed: int | None = None, unique: bool = True -) -> list[int]: +def generate_array(n: int = 30, seed: int | None = None, unique: bool = True) -> list[int]: """ Generate a random array for visualization. - If unique: values are 1..n shuffled @@ -47,9 +45,7 @@ def bubble_sort_frames(arr: list[int], max_steps: int = 20000) -> list[dict[str, return frames -def insertion_sort_frames( - arr: list[int], max_steps: int = 20000 -) -> list[dict[str, Any]]: +def insertion_sort_frames(arr: list[int], max_steps: int = 20000) -> list[dict[str, Any]]: a = arr[:] frames: list[dict[str, Any]] = [_snap(a)] for i in range(1, len(a)): diff --git a/flask_app/visualizations/topo_viz.py b/flask_app/visualizations/topo_viz.py index c2f9b2b..21bff12 100644 --- a/flask_app/visualizations/topo_viz.py +++ b/flask_app/visualizations/topo_viz.py @@ -51,8 +51,7 @@ def generate_dag( rng = random.Random(seed) coords = _layer_layout(n, layers) nodes = [ - {"id": i, "x": coords[i][0], "y": coords[i][1], "layer": coords[i][2]} - for i in range(n) + {"id": i, "x": coords[i][0], "y": coords[i][1], "layer": coords[i][2]} for i in range(n) ] # group node ids by layer diff --git a/src/interview_workbook/robotics/__init__.py b/src/interview_workbook/robotics/__init__.py new file mode 100644 index 0000000..5dc1421 --- /dev/null +++ b/src/interview_workbook/robotics/__init__.py @@ -0,0 +1 @@ +"""Robotics algorithms and AI navigation techniques.""" diff --git a/src/interview_workbook/robotics/bug_algorithms.py b/src/interview_workbook/robotics/bug_algorithms.py new file mode 100644 index 0000000..74417de --- /dev/null +++ b/src/interview_workbook/robotics/bug_algorithms.py @@ -0,0 +1,315 @@ +""" +Bug algorithms for robot navigation. + +Bug1 and Bug2 are simple, reactive navigation algorithms that combine +goal-seeking behavior with obstacle avoidance. These algorithms are +provably complete for point robots in 2D environments. + +Time complexity: O(n * P) where n is obstacles, P is total perimeter +Space complexity: O(P) for storing boundary points +""" + +from __future__ import annotations + +from interview_workbook.robotics.utils import GridWorld, euclidean_distance + + +def bug1_navigate( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_steps: int = 2000 +) -> list[tuple[int, int]] | None: + """ + Bug1 algorithm: Simple and complete obstacle avoidance. + + Algorithm: + 1. Move toward goal in straight line + 2. When hit obstacle, follow its perimeter completely + 3. Leave obstacle at point closest to goal + 4. Repeat until goal reached + + Time: O(n * P) where n is obstacles, P is total perimeter + Space: O(P) to store perimeter points + + Args: + grid: Environment with obstacles + start: Starting position + goal: Goal position + max_steps: Maximum steps to prevent infinite loops + + Returns: + Path to goal or None if not reachable + + Advantages: + - Complete: guaranteed to find path if one exists + - Simple to implement + + Disadvantages: + - Not efficient: may traverse entire obstacle perimeter + - Requires sensing entire perimeter before leaving + """ + if not grid.is_free(*start) or not grid.is_free(*goal): + return None + + path = [start] + current = start + + for _ in range(max_steps): + if current == goal: + return path + + # Try to move toward goal + next_pos = _move_toward_goal(grid, current, goal) + + if next_pos is None: + # Hit an obstacle - use Bug1 strategy + perimeter_path = _follow_obstacle_perimeter(grid, current, goal) + if perimeter_path is None: + return None + + path.extend(perimeter_path) + if len(perimeter_path) > 0: + current = perimeter_path[-1] + else: + path.append(next_pos) + current = next_pos + + return None # Max steps exceeded + + +def bug2_navigate( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_steps: int = 2000 +) -> list[tuple[int, int]] | None: + """ + Bug2 algorithm: More efficient than Bug1 in many cases. + + Algorithm: + 1. Define m-line: straight line from start to goal + 2. Move along m-line toward goal + 3. When hit obstacle, follow perimeter until: + - Back on m-line AND + - Closer to goal than hit point + 4. Resume moving along m-line + + Time: O(n * P) worst case, often better in practice + Space: O(1) only tracks m-line + + Args: + grid: Environment with obstacles + start: Starting position + goal: Goal position + max_steps: Maximum steps + + Returns: + Path to goal or None if not reachable + + Advantages: + - Often more efficient than Bug1 + - Still complete + + Disadvantages: + - Can be worse than Bug1 in adversarial cases + - Requires sensing position relative to m-line + """ + if not grid.is_free(*start) or not grid.is_free(*goal): + return None + + path = [start] + current = start + following_wall = False + hit_point = None + hit_distance = float("inf") + + for _ in range(max_steps): + if current == goal: + return path + + if not following_wall: + # Try to move toward goal along m-line + next_pos = _move_toward_goal(grid, current, goal) + + if next_pos is None: + # Hit obstacle - start following perimeter + following_wall = True + hit_point = current + hit_distance = euclidean_distance(current, goal) + else: + path.append(next_pos) + current = next_pos + else: + # Following obstacle perimeter + # Check if we're back on m-line and closer to goal + if current != hit_point and _on_m_line(start, goal, current): + current_distance = euclidean_distance(current, goal) + if current_distance < hit_distance: + # Leave obstacle and resume toward goal + following_wall = False + continue + + # Continue following perimeter + next_pos = _follow_wall_step(grid, current, goal) + if next_pos is None: + return None + path.append(next_pos) + current = next_pos + + return None + + +def _move_toward_goal( + grid: GridWorld, current: tuple[int, int], goal: tuple[int, int] +) -> tuple[int, int] | None: + """ + Try to move one step toward goal. Return None if obstacle blocks. + Uses simple greedy approach: move in direction that reduces distance most. + """ + x, y = current + best_pos = None + best_dist = euclidean_distance(current, goal) + + # Try all 4 neighbors + for dx, dy in [(0, -1), (1, 0), (0, 1), (-1, 0)]: + nx, ny = x + dx, y + dy + if grid.is_free(nx, ny): + dist = euclidean_distance((nx, ny), goal) + if dist < best_dist: + best_dist = dist + best_pos = (nx, ny) + + return best_pos + + +def _follow_obstacle_perimeter( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_perimeter: int = 500 +) -> list[tuple[int, int]] | None: + """ + Follow obstacle perimeter completely (Bug1). + Return to point closest to goal. + """ + perimeter = [] + current = start + closest_point = start + closest_dist = euclidean_distance(start, goal) + + # Follow wall using right-hand rule + for _ in range(max_perimeter): + next_pos = _follow_wall_step(grid, current, goal) + if next_pos is None: + return None + + perimeter.append(next_pos) + current = next_pos + + # Track closest point to goal + dist = euclidean_distance(current, goal) + if dist < closest_dist: + closest_dist = dist + closest_point = current + + # Check if we've completed the perimeter + if current == start and len(perimeter) > 1: + break + + # Find path from start to closest point along perimeter + if closest_point == start: + return [] + + for i, pos in enumerate(perimeter): + if pos == closest_point: + return perimeter[: i + 1] + + return perimeter + + +def _follow_wall_step( + grid: GridWorld, current: tuple[int, int], goal: tuple[int, int] +) -> tuple[int, int] | None: + """Make one step following the wall (right-hand rule).""" + x, y = current + + # Try directions in order: prefer moving toward goal when possible + for dx, dy in [(0, -1), (1, 0), (0, 1), (-1, 0)]: + nx, ny = x + dx, y + dy + if grid.is_free(nx, ny): + return (nx, ny) + + return None + + +def _on_m_line(start: tuple[int, int], goal: tuple[int, int], point: tuple[int, int]) -> bool: + """ + Check if point is (approximately) on the line from start to goal. + Uses cross product to check collinearity with tolerance for grid cells. + """ + # Vector from start to goal + v1 = (goal[0] - start[0], goal[1] - start[1]) + # Vector from start to point + v2 = (point[0] - start[0], point[1] - start[1]) + + # Cross product (for 2D, this is scalar) + # For 2D vectors, the cross product's magnitude gives the area of the parallelogram formed by v1 and v2, + # which is proportional to the perpendicular distance from 'point' to the line from 'start' to 'goal'. + # In grid-based navigation, we allow a small tolerance to account for discretization. + cross = v1[0] * v2[1] - v1[1] * v2[0] + + # Allow small tolerance for discrete grid + return abs(cross) < 1.5 + + +def demo(): + """Demo Bug algorithms.""" + print("Bug Algorithms Demo") + print("=" * 50) + + # Create environment with obstacles + maze = [ + [0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 1, 1, 1, 0, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0], + [0, 0, 0, 1, 1, 1, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0], + ] + + grid = GridWorld(maze) + start = (1, 1) + goal = (7, 4) + + print("Grid (0=free, 1=obstacle):") + for row in maze: + print(" ".join(str(cell) for cell in row)) + print(f"\nStart: {start}, Goal: {goal}\n") + + # Bug1 + print("Bug1 Algorithm:") + path1 = bug1_navigate(grid, start, goal, max_steps=500) + if path1: + print(f" Path found: {len(path1)} steps") + print(f" Direct distance: {euclidean_distance(start, goal):.2f}") + else: + print(" Path not found") + + # Bug2 + print("\nBug2 Algorithm:") + path2 = bug2_navigate(grid, start, goal, max_steps=500) + if path2: + print(f" Path found: {len(path2)} steps") + print(f" Direct distance: {euclidean_distance(start, goal):.2f}") + else: + print(" Path not found") + + print("\nKey Differences:") + print("Bug1:") + print(" + Always complete (finds path if exists)") + print(" + Simple strategy") + print(" - May traverse entire perimeter unnecessarily") + print("\nBug2:") + print(" + Often more efficient") + print(" + Stays closer to m-line") + print(" - Can be worse in adversarial cases") + print("\nBoth algorithms:") + print(" - Reactive: only use local information") + print(" - Complete: guaranteed to find path") + print(" - Not optimal: don't find shortest path") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/occupancy_grid.py b/src/interview_workbook/robotics/occupancy_grid.py new file mode 100644 index 0000000..b975dcb --- /dev/null +++ b/src/interview_workbook/robotics/occupancy_grid.py @@ -0,0 +1,240 @@ +""" +Occupancy Grid Mapping for robots. + +Occupancy grids represent the environment as a discretized grid where each +cell has a probability of being occupied. This is a fundamental technique +in robotic mapping and SLAM. + +Time complexity: O(rays * cells_per_ray) per update +Space complexity: O(width * height) for grid +""" + +from __future__ import annotations + +import math + + +class OccupancyGrid: + """ + Probabilistic occupancy grid for environment mapping. + + Each cell stores log-odds of occupancy for numerical stability. + """ + + def __init__(self, width: int, height: int, resolution: float = 1.0): + """ + Initialize occupancy grid. + + Args: + width: Grid width in cells + height: Grid height in cells + resolution: Meters per cell + """ + self.width = width + self.height = height + self.resolution = resolution + # Log-odds representation: log(p/(1-p)) + # 0 = unknown, > 0 = occupied, < 0 = free + self.grid = [[0.0 for _ in range(width)] for _ in range(height)] + + # Sensor model parameters (log-odds) + self.log_odds_occ = 0.4 # Increase when obstacle detected + self.log_odds_free = -0.4 # Decrease when free space observed + self.max_log_odds = 3.5 # Clamp to prevent overflow + self.min_log_odds = -3.5 + + def update_ray(self, robot_pos: tuple[float, float], hit_pos: tuple[float, float]): + """ + Update grid with a single range sensor ray. + + Mark cells along ray as free, and endpoint as occupied. + + Time: O(ray_length) for bresenham line traversal + Space: O(1) + + Args: + robot_pos: Robot position in world coordinates + hit_pos: Where ray hit obstacle (or max range) + """ + # Convert to grid coordinates + x0 = int(robot_pos[0] / self.resolution) + y0 = int(robot_pos[1] / self.resolution) + x1 = int(hit_pos[0] / self.resolution) + y1 = int(hit_pos[1] / self.resolution) + + # Get all cells along ray (Bresenham's line algorithm) + cells = self._bresenham_line(x0, y0, x1, y1) + + # Update cells along ray as free (except endpoint) + for x, y in cells[:-1]: + if self._in_bounds(x, y): + self.grid[y][x] += self.log_odds_free + self.grid[y][x] = max(self.min_log_odds, min(self.max_log_odds, self.grid[y][x])) + + # Update endpoint as occupied + if cells and self._in_bounds(cells[-1][0], cells[-1][1]): + x, y = cells[-1] + self.grid[y][x] += self.log_odds_occ + self.grid[y][x] = max(self.min_log_odds, min(self.max_log_odds, self.grid[y][x])) + + def get_probability(self, x: int, y: int) -> float: + """ + Get occupancy probability for cell. + + Converts log-odds back to probability: p = 1 / (1 + exp(-log_odds)) + """ + if not self._in_bounds(x, y): + return 0.5 + + log_odds = self.grid[y][x] + return 1.0 / (1.0 + math.exp(-log_odds)) + + def is_occupied(self, x: int, y: int, threshold: float = 0.7) -> bool: + """Check if cell is occupied (probability above threshold).""" + return self.get_probability(x, y) > threshold + + def _in_bounds(self, x: int, y: int) -> bool: + """Check if grid coordinates are valid.""" + return 0 <= x < self.width and 0 <= y < self.height + + def _bresenham_line(self, x0: int, y0: int, x1: int, y1: int) -> list[tuple[int, int]]: + """Bresenham's line algorithm to get cells along ray.""" + cells = [] + dx = abs(x1 - x0) + dy = abs(y1 - y0) + sx = 1 if x0 < x1 else -1 + sy = 1 if y0 < y1 else -1 + err = dx - dy + + x, y = x0, y0 + + while True: + cells.append((x, y)) + + if x == x1 and y == y1: + break + + e2 = 2 * err + if e2 > -dy: + err -= dy + x += sx + if e2 < dx: + err += dx + y += sy + + return cells + + def to_grid_visualization( + self, unknown_char: str = "?", free_char: str = ".", occ_char: str = "#" + ) -> list[str]: + """ + Convert occupancy grid to ASCII visualization. + + Args: + unknown_char: Character for unknown cells + free_char: Character for free cells + occ_char: Character for occupied cells + + Returns: + List of strings representing the grid + """ + lines = [] + for row in self.grid: + line = [] + for log_odds in row: + if abs(log_odds) < 0.1: + line.append(unknown_char) + elif log_odds > 0: + line.append(occ_char) + else: + line.append(free_char) + lines.append(" ".join(line)) + return lines + + +def demo(): + """Demo occupancy grid mapping.""" + print("Occupancy Grid Mapping Demo") + print("=" * 50) + + # Create occupancy grid + grid = OccupancyGrid(width=20, height=15, resolution=1.0) + + # Simulate robot scanning environment + # Robot at center, scanning 360 degrees + robot_x, robot_y = 10.0, 7.5 + + print(f"Robot position: ({robot_x}, {robot_y})") + print("Simulating laser scans...\n") + + # Simulate obstacles (walls) + obstacles = [ + # Vertical wall on right + [(15, 3), (15, 4), (15, 5), (15, 6), (15, 7), (15, 8), (15, 9)], + # Horizontal wall on top + [(5, 3), (6, 3), (7, 3), (8, 3), (9, 3), (10, 3)], + # Small obstacle + [(7, 10), (8, 10)], + ] + + # Flatten obstacles + obstacle_set = set() + for obs_list in obstacles: + obstacle_set.update(obs_list) + + # Simulate laser scans at different angles + num_rays = 36 # 10 degree increments + max_range = 12.0 + + for i in range(num_rays): + angle = (i / num_rays) * 2 * math.pi + + # Cast ray + dx = math.cos(angle) + dy = math.sin(angle) + + # Find hit point (either obstacle or max range) + hit_x, hit_y = robot_x, robot_y + for step in range(1, int(max_range) + 1): + test_x = robot_x + dx * step + test_y = robot_y + dy * step + + grid_x = int(test_x) + grid_y = int(test_y) + + if (grid_x, grid_y) in obstacle_set: + hit_x = test_x + hit_y = test_y + break + + hit_x = test_x + hit_y = test_y + + # Update grid + grid.update_ray((robot_x, robot_y), (hit_x, hit_y)) + + # Display grid + print("Occupancy Grid (? = unknown, . = free, # = occupied):") + viz = grid.to_grid_visualization() + for line in viz: + print(line) + + print("\nKey Insights:") + print("- Each cell stores probability of occupancy") + print("- Log-odds representation prevents numerical issues") + print("- Cells along ray marked as free, endpoint as occupied") + print("- Multiple observations increase confidence") + print("- Probabilistic: handles sensor noise gracefully") + print("- Foundation for SLAM (Simultaneous Localization and Mapping)") + print("\nSensor Model:") + print(f" - log_odds_occ: +{grid.log_odds_occ} (obstacle detected)") + print(f" - log_odds_free: {grid.log_odds_free} (free space observed)") + print(" - Clamped to prevent overflow") + print("\nApplications:") + print(" - Autonomous navigation") + print(" - SLAM algorithms") + print(" - Obstacle detection and avoidance") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/particle_filter.py b/src/interview_workbook/robotics/particle_filter.py new file mode 100644 index 0000000..0c2d7ac --- /dev/null +++ b/src/interview_workbook/robotics/particle_filter.py @@ -0,0 +1,247 @@ +""" +Particle Filter for robot localization. + +Particle filters (Sequential Monte Carlo) are used for robot localization - +estimating the robot's position and orientation from noisy sensor measurements. + +Time complexity: O(N) per update where N is number of particles +Space complexity: O(N) for storing particles +""" + +from __future__ import annotations + +import math +import random + + +class Particle: + """Represents a hypothesis about robot state (x, y, orientation).""" + + def __init__(self, x: float, y: float, heading: float, weight: float = 1.0): + self.x = x + self.y = y + self.heading = heading # In radians + self.weight = weight + + def __repr__(self) -> str: + return ( + f"Particle(x={self.x:.2f}, y={self.y:.2f}, θ={self.heading:.2f}, w={self.weight:.3f})" + ) + + +def particle_filter_localize( + particles: list[Particle], + measurement: tuple[float, float], + movement: tuple[float, float, float], + landmarks: list[tuple[float, float]], + motion_noise: float = 0.1, + measurement_noise: float = 0.5, +) -> list[Particle]: + """ + Perform one iteration of particle filter localization. + + Algorithm: + 1. Prediction: Move particles according to motion model + 2. Update: Weight particles by measurement likelihood + 3. Resample: Draw new particles proportional to weights + + Time: O(N * M) where N=particles, M=landmarks + Space: O(N) for particle set + + Args: + particles: Current particle set + measurement: Sensor reading (e.g., range to landmark) + movement: (dx, dy, dtheta) motion command + landmarks: Known landmark positions + motion_noise: Noise in motion model + measurement_noise: Noise in sensor measurements + + Returns: + New particle set after prediction, update, and resample + + Advantages: + - Non-parametric: can represent multimodal distributions + - Works with non-Gaussian noise + - Effective for global localization + + Disadvantages: + - Particle depletion possible + - Requires many particles for high dimensions + - Computational cost scales with particles + """ + # 1. Prediction step: apply motion model with noise + for p in particles: + dx, dy, dtheta = movement + + # Add motion noise + dx += random.gauss(0, motion_noise) + dy += random.gauss(0, motion_noise) + dtheta += random.gauss(0, motion_noise * 0.1) + + # Update particle pose + p.x += dx + p.y += dy + p.heading += dtheta + + # 2. Update step: weight particles by measurement likelihood + for p in particles: + # Calculate expected measurement from this particle's pose + # Assuming measurement is distance to nearest landmark + if landmarks: + # Find nearest landmark + min_expected_dist = float("inf") + for lx, ly in landmarks: + dist = ((p.x - lx) ** 2 + (p.y - ly) ** 2) ** 0.5 + min_expected_dist = min(min_expected_dist, dist) + + # Measurement likelihood (Gaussian) + measured_dist = (measurement[0] ** 2 + measurement[1] ** 2) ** 0.5 + error = abs(measured_dist - min_expected_dist) + p.weight = _gaussian(error, 0, measurement_noise) + else: + p.weight = 1.0 + + # Normalize weights + total_weight = sum(p.weight for p in particles) + if total_weight > 0: + for p in particles: + p.weight /= total_weight + + # 3. Resample: draw new particles proportional to weights + new_particles = [] + weights = [p.weight for p in particles] + + for _ in range(len(particles)): + # Weighted random selection + selected = _weighted_random_choice(particles, weights) + # Create new particle (add small noise to avoid particle depletion) + new_p = Particle( + selected.x + random.gauss(0, motion_noise * 0.1), + selected.y + random.gauss(0, motion_noise * 0.1), + selected.heading + random.gauss(0, motion_noise * 0.01), + 1.0, + ) + new_particles.append(new_p) + + return new_particles + + +def estimate_pose(particles: list[Particle]) -> tuple[float, float, float]: + """ + Estimate robot pose from particle distribution. + Returns weighted mean of particles. + """ + if not particles: + return (0.0, 0.0, 0.0) + + x_sum = sum(p.x * p.weight for p in particles) + y_sum = sum(p.y * p.weight for p in particles) + heading_sum = sum(p.heading * p.weight for p in particles) + weight_sum = sum(p.weight for p in particles) + + if weight_sum > 0: + return (x_sum / weight_sum, y_sum / weight_sum, heading_sum / weight_sum) + else: + return ( + sum(p.x for p in particles) / len(particles), + sum(p.y for p in particles) / len(particles), + sum(p.heading for p in particles) / len(particles), + ) + + +def _gaussian(x: float, mu: float, sigma: float) -> float: + """Gaussian probability density.""" + return math.exp(-0.5 * ((x - mu) / sigma) ** 2) / (sigma * math.sqrt(2 * math.pi)) + + +def _weighted_random_choice(items: list[Particle], weights: list[float]) -> Particle: + """Select item with probability proportional to weight.""" + total = sum(weights) + r = random.uniform(0, total) + cumsum = 0.0 + for item, weight in zip(items, weights): + cumsum += weight + if cumsum >= r: + return item + return items[-1] + + +def demo(): + """Demo particle filter localization.""" + print("Particle Filter Localization Demo") + print("=" * 50) + + random.seed(42) + + # Initialize particles (uniform distribution) + num_particles = 100 + particles = [ + Particle( + random.uniform(0, 10), + random.uniform(0, 10), + random.uniform(0, 6.28), + 1.0 / num_particles, + ) + for _ in range(num_particles) + ] + + # Known landmarks + landmarks = [(2.0, 2.0), (8.0, 2.0), (5.0, 8.0)] + + # True robot pose (unknown to filter) + true_x, true_y, true_heading = 5.0, 5.0, 0.0 + + print(f"Landmarks: {landmarks}") + print(f"True initial pose: x={true_x:.2f}, y={true_y:.2f}, θ={true_heading:.2f}") + print(f"Number of particles: {num_particles}\n") + + # Simulate robot movement and measurements + movements = [ + (1.0, 0.0, 0.0), # Move right + (0.0, 1.0, 0.0), # Move down + (0.5, 0.5, 0.0), # Move diagonally + ] + + for i, movement in enumerate(movements): + print(f"Step {i + 1}: Movement {movement}") + + # Update true pose + true_x += movement[0] + true_y += movement[1] + true_heading += movement[2] + + # Simulate measurement (distance to nearest landmark) + min_dist = float("inf") + for lx, ly in landmarks: + dist = ((true_x - lx) ** 2 + (true_y - ly) ** 2) ** 0.5 + min_dist = min(min_dist, dist) + + # Add measurement noise + measured_dist = min_dist + random.gauss(0, 0.3) + measurement = (measured_dist, 0.0) # Simplified: just distance + + # Run particle filter + particles = particle_filter_localize( + particles, measurement, movement, landmarks, motion_noise=0.1, measurement_noise=0.5 + ) + + # Estimate pose + est_x, est_y, est_heading = estimate_pose(particles) + + print(f" True pose: x={true_x:.2f}, y={true_y:.2f}, θ={true_heading:.2f}") + print(f" Estimated: x={est_x:.2f}, y={est_y:.2f}, θ={est_heading:.2f}") + print(f" Error: {((est_x - true_x) ** 2 + (est_y - true_y) ** 2) ** 0.5:.2f}") + print() + + print("Particle Filter Key Insights:") + print("- Represents belief as set of weighted samples (particles)") + print("- Three steps: Predict (motion model), Update (measurement), Resample") + print("- Non-parametric: can represent complex, multimodal distributions") + print("- Used for global localization and kidnapped robot problem") + print("- Number of particles trades off accuracy vs computation") + print("- Particle depletion: adding noise during resampling helps") + print("- Converges as particles concentrate on true pose") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/pledge_algorithm.py b/src/interview_workbook/robotics/pledge_algorithm.py new file mode 100644 index 0000000..e61e49a --- /dev/null +++ b/src/interview_workbook/robotics/pledge_algorithm.py @@ -0,0 +1,189 @@ +""" +Pledge algorithm for maze navigation. + +The Pledge algorithm is an improvement over simple wall-following that +can escape from closed loops by tracking the robot's total rotation angle. + +Time complexity: O(P) where P is obstacle perimeter +Space complexity: O(1) - only tracks angle counter +""" + +from __future__ import annotations + +from interview_workbook.robotics.utils import Direction, GridWorld, RobotState + + +def pledge_navigate( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_steps: int = 2000 +) -> list[tuple[int, int]] | None: + """ + Navigate using Pledge algorithm. + + The Pledge algorithm improves on wall-following by tracking total rotation. + When total rotation returns to 0 (or multiple of 360°), the robot resumes + straight-line motion toward the goal. + + Algorithm: + 1. Move straight toward goal direction + 2. When obstacle encountered, follow wall (e.g., right-hand rule) + 3. Track cumulative rotation angle + 4. When total rotation = 0, resume straight motion + 5. Repeat until goal reached + + Time: O(P) where P is total obstacle perimeter + Space: O(1) for rotation counter + + Args: + grid: Environment with obstacles + start: Starting position + goal: Goal position + max_steps: Maximum steps to prevent infinite loops + + Returns: + Path to goal or None if not reachable + + Advantages over wall-following: + - Can escape from closed obstacle loops + - More efficient in many cases + - Still reactive and simple + + Common pitfalls: + - Requires orientation tracking + - May still revisit areas + - Not optimal path + """ + if not grid.is_free(*start) or not grid.is_free(*goal): + return None + + # Determine initial direction toward goal + dx = goal[0] - start[0] + dy = goal[1] - start[1] + + if abs(dx) > abs(dy): + initial_dir = Direction.EAST if dx > 0 else Direction.WEST + else: + initial_dir = Direction.SOUTH if dy > 0 else Direction.NORTH + + robot = RobotState(start[0], start[1], initial_dir) + path = [start] + rotation_sum = 0 # Track cumulative rotation in 90-degree units + following_wall = False + + for _ in range(max_steps): + if (robot.x, robot.y) == goal: + return path + + if not following_wall and rotation_sum == 0: + # Try to move straight toward goal + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + path.append((robot.x, robot.y)) + else: + # Hit obstacle, start following wall + following_wall = True + robot.turn_right() + rotation_sum += 1 + else: + # Following wall with right-hand rule + # First try to turn left (to follow wall closely) + robot.turn_left() + rotation_sum -= 1 + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + path.append((robot.x, robot.y)) + else: + # Wall on left, turn back right and try straight + robot.turn_right() + rotation_sum += 1 + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + path.append((robot.x, robot.y)) + else: + # Blocked straight, turn right again + robot.turn_right() + rotation_sum += 1 + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + path.append((robot.x, robot.y)) + else: + # Turn around (blocked on 3 sides) + robot.turn_right() + rotation_sum += 1 + + # Check if we can resume straight motion + if rotation_sum == 0: + following_wall = False + + return None + + +def demo(): + """Demo Pledge algorithm.""" + print("Pledge Algorithm Demo") + print("=" * 50) + + # Create maze with a closed loop + maze = [ + [0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 1, 1, 1, 1, 1, 0, 0, 0], + [0, 1, 0, 0, 0, 1, 0, 0, 0], + [0, 1, 0, 1, 0, 1, 0, 0, 0], + [0, 1, 0, 0, 0, 1, 0, 0, 0], + [0, 1, 1, 1, 1, 1, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0], + ] + + grid = GridWorld(maze) + start = (0, 0) + goal = (8, 6) + + print("Grid (0=free, 1=wall):") + for row in maze: + print(" ".join(str(cell) for cell in row)) + print(f"\nStart: {start}, Goal: {goal}\n") + + path = pledge_navigate(grid, start, goal, max_steps=500) + + if path: + print(f"Path found: {len(path)} steps") + print(f"Unique positions visited: {len(set(path))}") + + # Show path on grid + path_set = set(path) + print("\nPath visualization (S=start, G=goal, *=path):") + for y, row in enumerate(maze): + line = [] + for x, cell in enumerate(row): + if (x, y) == start: + line.append("S") + elif (x, y) == goal: + line.append("G") + elif cell == 1: + line.append("#") + elif (x, y) in path_set: + line.append("*") + else: + line.append(".") + print(" ".join(line)) + else: + print("Goal not reached") + + print("\nPledge Algorithm Insights:") + print("- Tracks cumulative rotation to detect when robot has 'unwound'") + print("- When rotation sum = 0, robot resumes straight motion") + print("- Avoids infinite loops in closed obstacle boundaries") + print("- More sophisticated than simple wall-following") + print("- Still reactive: uses only local sensor information") + print("- Works well for simply-connected obstacles") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/potential_fields.py b/src/interview_workbook/robotics/potential_fields.py new file mode 100644 index 0000000..9fe384c --- /dev/null +++ b/src/interview_workbook/robotics/potential_fields.py @@ -0,0 +1,221 @@ +""" +Potential field navigation for robots. + +Potential fields create artificial attractive and repulsive forces to guide +robots to goals while avoiding obstacles. This is a reactive planning method +commonly used in mobile robotics. + +Time complexity: O(steps * obstacles) per iteration +Space complexity: O(1) for force calculations +""" + +from __future__ import annotations + +import math + + +def potential_field_navigate( + grid: list[list[int]], + start: tuple[int, int], + goal: tuple[int, int], + attractive_gain: float = 1.0, + repulsive_gain: float = 100.0, + influence_distance: float = 3.0, + max_steps: int = 500, +) -> list[tuple[int, int]] | None: + """ + Navigate using artificial potential fields. + + The robot is attracted to the goal and repelled by obstacles. + Total force = Attractive force + Repulsive forces + + Time: O(steps * obstacles) where obstacles are within influence_distance + Space: O(1) for force calculations + + Args: + grid: Environment (0=free, 1=obstacle) + start: Starting position + goal: Goal position + attractive_gain: Weight for attractive force (higher = stronger pull to goal) + repulsive_gain: Weight for repulsive force (higher = stronger obstacle avoidance) + influence_distance: Max distance for obstacle repulsion + max_steps: Maximum navigation steps + + Returns: + Path from start to goal, or None if stuck + + Advantages: + - Simple and reactive + - Smooth paths in open spaces + - Real-time capable + + Disadvantages: + - Can get stuck in local minima + - No completeness guarantee + - Oscillations possible near obstacles + """ + rows = len(grid) + cols = len(grid[0]) if rows else 0 + + def is_valid(x: int, y: int) -> bool: + return 0 <= x < cols and 0 <= y < rows and grid[y][x] == 0 + + if not is_valid(*start) or not is_valid(*goal): + return None + + path = [start] + current = start + stuck_counter = 0 + prev_pos = None + + for _ in range(max_steps): + if current == goal: + return path + + # Calculate attractive force (toward goal) + dx_goal = goal[0] - current[0] + dy_goal = goal[1] - current[1] + dist_goal = math.sqrt(dx_goal**2 + dy_goal**2) + + if dist_goal < 0.1: + return path + + # Unit vector toward goal + fx_attr = attractive_gain * dx_goal / dist_goal + fy_attr = attractive_gain * dy_goal / dist_goal + + # Calculate repulsive forces (away from obstacles) + fx_rep = 0.0 + fy_rep = 0.0 + + for y in range( + max(0, current[1] - int(influence_distance) - 1), + min(rows, current[1] + int(influence_distance) + 2), + ): + for x in range( + max(0, current[0] - int(influence_distance) - 1), + min(cols, current[0] + int(influence_distance) + 2), + ): + if grid[y][x] == 1: # Obstacle + dx_obs = current[0] - x + dy_obs = current[1] - y + dist_obs = math.sqrt(dx_obs**2 + dy_obs**2) + + if dist_obs < influence_distance and dist_obs > 0.1: + # Repulsive force: inversely proportional to distance + magnitude = ( + repulsive_gain + * (1.0 / dist_obs - 1.0 / influence_distance) + / (dist_obs**2) + ) + fx_rep += magnitude * dx_obs / dist_obs + fy_rep += magnitude * dy_obs / dist_obs + + # Total force + fx_total = fx_attr + fx_rep + fy_total = fy_attr + fy_rep + + # Determine next move (discretized to 8 directions) + angle = math.atan2(fy_total, fx_total) + + # Try 8 directions, prioritizing the direction of total force + # Generate angles in a zigzag pattern: [0, -π/4, +π/4, -π/2, +π/2, -3π/4, +3π/4, π] + # This explores nearby directions first before trying opposite directions + directions = [] + for i in range(8): + a = angle + (i // 2) * (math.pi / 4) * (1 if i % 2 == 0 else -1) + dx = round(math.cos(a)) + dy = round(math.sin(a)) + directions.append((dx, dy)) + + # Find first valid move + moved = False + for dx, dy in directions: + nx, ny = current[0] + dx, current[1] + dy + if is_valid(nx, ny): + current = (nx, ny) + path.append(current) + moved = True + break + + if not moved: + return None # Stuck (no valid moves) + + # Detect oscillation or being stuck + if current == prev_pos: + stuck_counter += 1 + if stuck_counter > 5: + return None # Likely in local minimum + else: + stuck_counter = 0 + + prev_pos = path[-2] if len(path) >= 2 else None + + return None # Max steps exceeded + + +def demo(): + """Demo potential field navigation.""" + print("Potential Field Navigation Demo") + print("=" * 50) + + # Create environment with obstacles + maze = [ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 1, 1, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ] + + start = (1, 1) + goal = (8, 6) + + print("Grid (0=free, 1=obstacle):") + for row in maze: + print(" ".join(str(cell) for cell in row)) + print(f"\nStart: {start}, Goal: {goal}\n") + + # Try different parameter combinations + configs = [ + (1.0, 100.0, 3.0, "Balanced"), + (2.0, 100.0, 3.0, "Strong attraction"), + (1.0, 200.0, 3.0, "Strong repulsion"), + (1.0, 100.0, 5.0, "Large influence distance"), + ] + + for attr_gain, rep_gain, influence, name in configs: + print(f"{name} (attr={attr_gain}, rep={rep_gain}, influence={influence}):") + path = potential_field_navigate( + maze, + start, + goal, + attractive_gain=attr_gain, + repulsive_gain=rep_gain, + influence_distance=influence, + max_steps=300, + ) + + if path: + print(f" Success! Path length: {len(path)} steps") + else: + print(" Failed (local minimum or stuck)") + print() + + print("Key Insights:") + print("- Attractive force pulls robot toward goal") + print("- Repulsive forces push robot away from obstacles") + print("- Can get stuck in local minima (e.g., U-shaped obstacles)") + print("- Parameter tuning is crucial:") + print(" * Higher attractive_gain: more direct path, may hit obstacles") + print(" * Higher repulsive_gain: safer but may not reach goal") + print(" * Larger influence_distance: smoother but may be overly cautious") + print("- Works well in open environments") + print("- Real-time capable (reactive)") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/rrt.py b/src/interview_workbook/robotics/rrt.py new file mode 100644 index 0000000..07427d8 --- /dev/null +++ b/src/interview_workbook/robotics/rrt.py @@ -0,0 +1,224 @@ +""" +RRT (Rapidly-exploring Random Tree) path planning. + +RRT is a sampling-based path planning algorithm that builds a tree of +feasible paths by randomly sampling the configuration space. It's widely +used in robotics for high-dimensional planning problems. + +Time complexity: O(n) where n is number of samples +Space complexity: O(n) for storing the tree +""" + +from __future__ import annotations + +import random + + +def rrt_plan( + grid: list[list[int]], + start: tuple[int, int], + goal: tuple[int, int], + max_iterations: int = 1000, + step_size: float = 1.0, + goal_sample_rate: float = 0.1, +) -> list[tuple[int, int]] | None: + """ + RRT (Rapidly-exploring Random Tree) path planner. + + Builds a tree by repeatedly: + 1. Sample random point in space (or goal with probability) + 2. Find nearest node in tree + 3. Extend tree toward sample + 4. Check if goal reached + + Time: O(iterations * tree_size) for nearest neighbor search + Space: O(tree_size) for storing nodes + + Args: + grid: Environment (0=free, 1=obstacle) + start: Starting position + goal: Goal position + max_iterations: Maximum sampling iterations + step_size: Maximum distance to extend tree per iteration + goal_sample_rate: Probability of sampling goal (vs random point) + + Returns: + Path from start to goal, or None if not found + + Advantages: + - Probabilistically complete + - Works well in high-dimensional spaces + - Can handle complex obstacles + + Disadvantages: + - Not optimal (path quality depends on samples) + - Requires tuning (step_size, iterations) + - Memory grows with iterations + """ + rows = len(grid) + cols = len(grid[0]) if rows else 0 + + def is_valid(x: int, y: int) -> bool: + ix, iy = int(round(x)), int(round(y)) + return 0 <= ix < cols and 0 <= iy < rows and grid[iy][ix] == 0 + + def collision_free(p1: tuple[float, float], p2: tuple[float, float]) -> bool: + """Check if path from p1 to p2 is collision-free.""" + x1, y1 = p1 + x2, y2 = p2 + dist = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5 + steps = max(int(dist * 2), 1) + + for i in range(steps + 1): + t = i / steps if steps > 0 else 0 + x = x1 + t * (x2 - x1) + y = y1 + t * (y2 - y1) + if not is_valid(x, y): + return False + return True + + if not is_valid(*start) or not is_valid(*goal): + return None + + # Tree: each node stores (position, parent_index) + tree = [(start, -1)] + goal_f = (float(goal[0]), float(goal[1])) + + for _ in range(max_iterations): + # Sample random point (bias toward goal) + if random.random() < goal_sample_rate: + sample = goal_f + else: + sample = (random.uniform(0, cols - 1), random.uniform(0, rows - 1)) + + # Find nearest node in tree + nearest_idx = 0 + min_dist = float("inf") + for idx, (pos, _) in enumerate(tree): + dist = (pos[0] - sample[0]) ** 2 + (pos[1] - sample[1]) ** 2 + if dist < min_dist: + min_dist = dist + nearest_idx = idx + + nearest_pos = tree[nearest_idx][0] + + # Extend toward sample + dx = sample[0] - nearest_pos[0] + dy = sample[1] - nearest_pos[1] + dist = (dx**2 + dy**2) ** 0.5 + + if dist < 0.01: + continue + + # Limit extension to step_size + if dist > step_size: + dx = dx / dist * step_size + dy = dy / dist * step_size + + new_pos = (nearest_pos[0] + dx, nearest_pos[1] + dy) + + # Check collision + if collision_free(nearest_pos, new_pos): + tree.append((new_pos, nearest_idx)) + + # Check if goal reached + goal_dist = ((new_pos[0] - goal_f[0]) ** 2 + (new_pos[1] - goal_f[1]) ** 2) ** 0.5 + if goal_dist < step_size and collision_free(new_pos, goal_f): + # Goal reached! Reconstruct path + tree.append((goal_f, len(tree) - 1)) + return _reconstruct_path(tree) + + return None # Goal not reached + + +def _reconstruct_path(tree: list[tuple[tuple[float, float], int]]) -> list[tuple[int, int]]: + """Reconstruct path from tree.""" + path = [] + idx = len(tree) - 1 + + while idx != -1: + pos, parent_idx = tree[idx] + path.append((int(round(pos[0])), int(round(pos[1])))) + idx = parent_idx + + path.reverse() + return path + + +def demo(): + """Demo RRT path planning.""" + print("RRT (Rapidly-exploring Random Tree) Demo") + print("=" * 50) + + random.seed(42) # For reproducible results + + # Create environment with obstacles + maze = [ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 1, 1, 1, 0, 0, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], + [0, 1, 1, 1, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ] + + start = (1, 1) + goal = (8, 5) + + print("Grid (0=free, 1=obstacle):") + for row in maze: + print(" ".join(str(cell) for cell in row)) + print(f"\nStart: {start}, Goal: {goal}\n") + + # Try different parameter settings + configs = [ + (500, 1.0, 0.1, "Standard"), + (500, 2.0, 0.1, "Larger steps"), + (500, 1.0, 0.3, "Higher goal bias"), + (1000, 1.0, 0.1, "More iterations"), + ] + + for max_iter, step, goal_rate, name in configs: + print(f"{name} (iter={max_iter}, step={step}, goal_bias={goal_rate}):") + random.seed(42) # Same seed for fair comparison + path = rrt_plan(maze, start, goal, max_iter, step, goal_rate) + + if path: + print(f" Success! Path length: {len(path)} waypoints") + + # Show path on grid + path_set = set(path) + print(" Path visualization:") + for y, row in enumerate(maze): + line = [] + for x, cell in enumerate(row): + if (x, y) == start: + line.append("S") + elif (x, y) == goal: + line.append("G") + elif cell == 1: + line.append("#") + elif (x, y) in path_set: + line.append("*") + else: + line.append(".") + print(" " + " ".join(line)) + else: + print(" Failed to find path") + print() + + print("RRT Key Insights:") + print("- Samples random points and grows tree toward them") + print("- Probabilistically complete (will find path eventually)") + print("- Not optimal: path quality depends on sampling") + print("- Good for high-dimensional configuration spaces") + print("- Parameters affect performance:") + print(" * step_size: larger = faster exploration, may miss narrow passages") + print(" * goal_sample_rate: higher = faster toward goal, less exploration") + print(" * max_iterations: more = higher success rate, slower") + print("- Variants: RRT*, RRT-Connect improve performance") + + +if __name__ == "__main__": + demo() diff --git a/src/interview_workbook/robotics/utils.py b/src/interview_workbook/robotics/utils.py new file mode 100644 index 0000000..37ee447 --- /dev/null +++ b/src/interview_workbook/robotics/utils.py @@ -0,0 +1,116 @@ +"""Utility classes and functions for robotics algorithms.""" + +from __future__ import annotations + +from enum import Enum + + +class Direction(Enum): + """Cardinal directions for robot orientation.""" + + NORTH = 0 + EAST = 1 + SOUTH = 2 + WEST = 3 + + def turn_left(self) -> Direction: + """Turn 90 degrees counterclockwise.""" + return Direction((self.value - 1) % 4) + + def turn_right(self) -> Direction: + """Turn 90 degrees clockwise.""" + return Direction((self.value + 1) % 4) + + def turn_around(self) -> Direction: + """Turn 180 degrees.""" + return Direction((self.value + 2) % 4) + + def to_vector(self) -> tuple[int, int]: + """Convert direction to (dx, dy) movement vector.""" + vectors = { + Direction.NORTH: (0, -1), + Direction.EAST: (1, 0), + Direction.SOUTH: (0, 1), + Direction.WEST: (-1, 0), + } + return vectors[self] + + +class RobotState: + """Represents the state of a robot (position and orientation).""" + + def __init__(self, x: int, y: int, direction: Direction): + self.x = x + self.y = y + self.direction = direction + + def move_forward(self) -> tuple[int, int]: + """Return the position the robot would reach by moving forward.""" + dx, dy = self.direction.to_vector() + return (self.x + dx, self.y + dy) + + def apply_move(self): + """Move the robot forward in its current direction.""" + self.x, self.y = self.move_forward() + + def turn_left(self): + """Turn the robot left (counterclockwise).""" + self.direction = self.direction.turn_left() + + def turn_right(self): + """Turn the robot right (clockwise).""" + self.direction = self.direction.turn_right() + + def turn_around(self): + """Turn the robot 180 degrees.""" + self.direction = self.direction.turn_around() + + def __repr__(self) -> str: + return f"RobotState(x={self.x}, y={self.y}, dir={self.direction.name})" + + +class GridWorld: + """ + 2D grid world environment for robot navigation. + + Convention: + - 0 = free space + - 1 = obstacle/wall + - Grid coordinates: (0,0) is top-left + """ + + def __init__(self, grid: list[list[int]]): + self.grid = grid + self.rows = len(grid) + self.cols = len(grid[0]) if grid else 0 + + def is_valid(self, x: int, y: int) -> bool: + """Check if coordinates are within bounds.""" + return 0 <= x < self.cols and 0 <= y < self.rows + + def is_free(self, x: int, y: int) -> bool: + """Check if cell is free (not an obstacle).""" + return self.is_valid(x, y) and self.grid[y][x] == 0 + + def is_obstacle(self, x: int, y: int) -> bool: + """Check if cell contains an obstacle.""" + return not self.is_valid(x, y) or self.grid[y][x] == 1 + + def get_neighbors(self, x: int, y: int) -> list[tuple[int, int]]: + """Get valid neighboring cells (4-directional).""" + neighbors = [] + for dx, dy in [(0, -1), (1, 0), (0, 1), (-1, 0)]: + nx, ny = x + dx, y + dy + if self.is_free(nx, ny): + neighbors.append((nx, ny)) + return neighbors + + +def manhattan_distance(p1: tuple[int, int], p2: tuple[int, int]) -> int: + """Calculate Manhattan distance between two points.""" + return abs(p1[0] - p2[0]) + abs(p1[1] - p2[1]) + + +def euclidean_distance(p1: tuple[int, int], p2: tuple[int, int]) -> float: + """Calculate Euclidean distance between two points.""" + return ((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) ** 0.5 diff --git a/src/interview_workbook/robotics/wall_following.py b/src/interview_workbook/robotics/wall_following.py new file mode 100644 index 0000000..c4f1826 --- /dev/null +++ b/src/interview_workbook/robotics/wall_following.py @@ -0,0 +1,198 @@ +""" +Wall-following navigation algorithm. + +A fundamental robotics navigation technique where the robot follows walls +to navigate through unknown environments. This is useful when you only +know start and end positions and must discover obstacles by bumping into them. + +Time complexity: O(P) where P is the perimeter of all obstacles encountered +Space complexity: O(1) - only stores robot state +""" + +from __future__ import annotations + +from interview_workbook.robotics.utils import Direction, GridWorld, RobotState + + +def wall_follow_left_hand( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_steps: int = 1000 +) -> list[tuple[int, int]] | None: + """ + Navigate from start to goal using left-hand wall-following rule. + + The robot keeps its left "hand" on the wall and follows it. This guarantees + finding the goal if it's reachable and the obstacles are simply connected. + + Args: + grid: The environment with obstacles + start: Starting position (x, y) + goal: Goal position (x, y) + max_steps: Maximum steps to prevent infinite loops + + Returns: + Path as list of positions, or None if goal not reached within max_steps + + Algorithm: + 1. Move forward if possible + 2. If can't move forward, turn right + 3. After turning right, try to turn left (to follow wall on left) + 4. Repeat until goal reached or max_steps exceeded + + Common pitfalls: + - May not find goal if obstacles are not simply connected (multiple disconnected walls) + - Can loop forever in certain maze configurations + - Does not find shortest path + """ + if not grid.is_free(*start) or not grid.is_free(*goal): + return None + + robot = RobotState(start[0], start[1], Direction.NORTH) + path = [start] + + for _ in range(max_steps): + if (robot.x, robot.y) == goal: + return path + + # Try to turn left first (to keep left hand on wall) + robot.turn_left() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + # Can move with left hand on wall + robot.apply_move() + else: + # Wall on left, turn back right and try forward + robot.turn_right() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + else: + # Can't move forward, keep turning right until we can + robot.turn_right() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + else: + # Surrounded on 3 sides, turn around + robot.turn_right() + + current_pos = (robot.x, robot.y) + path.append(current_pos) + + return None # Goal not reached within max_steps + + +def wall_follow_right_hand( + grid: GridWorld, start: tuple[int, int], goal: tuple[int, int], max_steps: int = 1000 +) -> list[tuple[int, int]] | None: + """ + Navigate using right-hand wall-following rule. + + Similar to left-hand rule but keeps right hand on the wall. + + Args: + grid: The environment with obstacles + start: Starting position (x, y) + goal: Goal position (x, y) + max_steps: Maximum steps to prevent infinite loops + + Returns: + Path as list of positions, or None if goal not reached + """ + if not grid.is_free(*start) or not grid.is_free(*goal): + return None + + robot = RobotState(start[0], start[1], Direction.NORTH) + path = [start] + + for _ in range(max_steps): + if (robot.x, robot.y) == goal: + return path + + # Try to turn right first (to keep right hand on wall) + robot.turn_right() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + # Can move with right hand on wall + robot.apply_move() + else: + # Wall on right, turn back left and try forward + robot.turn_left() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + else: + # Can't move forward, keep turning left until we can + robot.turn_left() + next_pos = robot.move_forward() + + if grid.is_free(*next_pos): + robot.apply_move() + else: + # Surrounded on 3 sides, turn around + robot.turn_left() + + current_pos = (robot.x, robot.y) + path.append(current_pos) + + return None + + +def demo(): + """Demo wall-following navigation.""" + print("Wall-Following Navigation Demo") + print("=" * 50) + + # Create a simple maze + maze = [ + [0, 0, 0, 0, 0, 0, 0, 0], + [0, 1, 1, 1, 1, 1, 1, 0], + [0, 1, 0, 0, 0, 0, 0, 0], + [0, 1, 0, 1, 1, 1, 1, 0], + [0, 0, 0, 0, 0, 0, 1, 0], + [0, 1, 1, 1, 1, 0, 1, 0], + [0, 0, 0, 0, 0, 0, 0, 0], + ] + + grid = GridWorld(maze) + start = (0, 0) + goal = (7, 6) + + print("Grid (0=free, 1=wall):") + for row in maze: + print(" ".join(str(cell) for cell in row)) + print() + + # Left-hand rule + print(f"Start: {start}, Goal: {goal}") + print("\nLeft-hand wall following:") + path_left = wall_follow_left_hand(grid, start, goal, max_steps=200) + if path_left: + print(f"Found goal in {len(path_left)} steps") + print(f"Path length: {len(set(path_left))} unique positions") + else: + print("Goal not reached") + + # Right-hand rule + print("\nRight-hand wall following:") + path_right = wall_follow_right_hand(grid, start, goal, max_steps=200) + if path_right: + print(f"Found goal in {len(path_right)} steps") + print(f"Path length: {len(set(path_right))} unique positions") + else: + print("Goal not reached") + + print("\nKey insights:") + print("- Wall-following is reactive, not optimal") + print("- Works well in simply-connected environments") + print("- Left vs right rule can give different paths") + print("- May revisit cells multiple times") + print("- No memory of visited cells in basic version") + + +if __name__ == "__main__": + demo() diff --git a/tests/test_robotics.py b/tests/test_robotics.py new file mode 100644 index 0000000..c28aba2 --- /dev/null +++ b/tests/test_robotics.py @@ -0,0 +1,309 @@ +"""Tests for robotics navigation algorithms.""" + +import random + +from interview_workbook.robotics import ( + bug_algorithms, + occupancy_grid, + particle_filter, + pledge_algorithm, + potential_fields, + rrt, + utils, + wall_following, +) + + +def test_direction_enum(): + """Test Direction enum functionality.""" + # Test turning + assert utils.Direction.NORTH.turn_left() == utils.Direction.WEST + assert utils.Direction.NORTH.turn_right() == utils.Direction.EAST + assert utils.Direction.NORTH.turn_around() == utils.Direction.SOUTH + + # Test vectors + assert utils.Direction.NORTH.to_vector() == (0, -1) + assert utils.Direction.EAST.to_vector() == (1, 0) + assert utils.Direction.SOUTH.to_vector() == (0, 1) + assert utils.Direction.WEST.to_vector() == (-1, 0) + + +def test_robot_state(): + """Test RobotState class.""" + robot = utils.RobotState(5, 5, utils.Direction.NORTH) + + # Test forward movement + assert robot.move_forward() == (5, 4) + robot.apply_move() + assert robot.x == 5 and robot.y == 4 + + # Test turning + robot.turn_right() + assert robot.direction == utils.Direction.EAST + assert robot.move_forward() == (6, 4) + + +def test_grid_world(): + """Test GridWorld class.""" + grid = utils.GridWorld([[0, 1, 0], [0, 0, 1], [1, 0, 0]]) + + # Test validity checks + assert grid.is_valid(0, 0) + assert not grid.is_valid(-1, 0) + assert not grid.is_valid(3, 0) + + # Test obstacle checks + assert grid.is_free(0, 0) + assert grid.is_obstacle(1, 0) + + # Test neighbors (1,1) has neighbors at (0,1) and (1,2) - free cells + neighbors = grid.get_neighbors(1, 1) + assert (0, 1) in neighbors + assert (1, 2) in neighbors + assert len(neighbors) == 2 # Only two neighbors are free + + +def test_manhattan_distance(): + """Test Manhattan distance calculation.""" + assert utils.manhattan_distance((0, 0), (3, 4)) == 7 + assert utils.manhattan_distance((1, 1), (1, 1)) == 0 + assert utils.manhattan_distance((0, 0), (-3, -4)) == 7 + + +def test_euclidean_distance(): + """Test Euclidean distance calculation.""" + assert utils.euclidean_distance((0, 0), (3, 4)) == 5.0 + assert utils.euclidean_distance((1, 1), (1, 1)) == 0.0 + + +def test_wall_following_simple(): + """Test wall following algorithms.""" + maze = [ + [0, 0, 0, 0, 0], + [0, 1, 1, 1, 0], + [0, 0, 0, 0, 0], + ] + grid = utils.GridWorld(maze) + start = (0, 0) + goal = (4, 2) + + # Test left-hand rule + path = wall_following.wall_follow_left_hand(grid, start, goal, max_steps=100) + assert path is not None + assert path[0] == start + assert path[-1] == goal + + +def test_wall_following_invalid_positions(): + """Test wall following with invalid start/goal.""" + maze = [[0, 1, 0], [0, 0, 0], [1, 0, 0]] + grid = utils.GridWorld(maze) + + # Start in obstacle + path = wall_following.wall_follow_left_hand(grid, (1, 0), (2, 1), max_steps=100) + assert path is None + + # Goal in obstacle + path = wall_following.wall_follow_left_hand(grid, (0, 0), (1, 0), max_steps=100) + assert path is None + + +def test_bug1_navigate(): + """Test Bug1 algorithm.""" + maze = [ + [0, 0, 0, 0, 0, 0], + [0, 1, 1, 1, 0, 0], + [0, 0, 0, 0, 0, 0], + ] + grid = utils.GridWorld(maze) + start = (0, 1) + goal = (5, 1) + + path = bug_algorithms.bug1_navigate(grid, start, goal, max_steps=100) + assert path is not None + assert path[0] == start + assert path[-1] == goal + + +def test_pledge_navigate(): + """Test Pledge algorithm.""" + maze = [ + [0, 0, 0, 0, 0], + [0, 1, 1, 0, 0], + [0, 0, 0, 0, 0], + ] + grid = utils.GridWorld(maze) + start = (0, 0) + goal = (4, 2) + + path = pledge_algorithm.pledge_navigate(grid, start, goal, max_steps=100) + assert path is not None + assert path[0] == start + assert path[-1] == goal + + +def test_potential_field_navigate(): + """Test potential field navigation.""" + maze = [ + [0, 0, 0, 0, 0], + [0, 0, 1, 0, 0], + [0, 0, 0, 0, 0], + ] + start = (0, 1) + goal = (4, 1) + + # Try with parameters that should work + path = potential_fields.potential_field_navigate( + maze, + start, + goal, + attractive_gain=2.0, + repulsive_gain=50.0, + influence_distance=2.0, + max_steps=100, + ) + # Potential fields may fail due to local minima, so we just check it doesn't crash + assert path is None or (isinstance(path, list) and len(path) > 0) + + +def test_rrt_plan(): + """Test RRT path planner.""" + random.seed(42) + maze = [ + [0, 0, 0, 0, 0], + [0, 1, 1, 0, 0], + [0, 0, 0, 0, 0], + ] + start = (0, 1) + goal = (4, 1) + + path = rrt.rrt_plan(maze, start, goal, max_iterations=500, step_size=1.0, goal_sample_rate=0.2) + # RRT is probabilistic, so it may not always find a path, but with these parameters it should + # Check that path is either None or a valid non-empty list + assert path is None or (isinstance(path, list) and len(path) > 0) + if path: + assert path[0] == start + assert path[-1] == goal + + +def test_rrt_invalid_positions(): + """Test RRT with invalid start/goal.""" + maze = [[0, 1, 0], [0, 0, 0]] + + # Start in obstacle + path = rrt.rrt_plan(maze, (1, 0), (2, 1), max_iterations=100) + assert path is None + + # Goal in obstacle + path = rrt.rrt_plan(maze, (0, 0), (1, 0), max_iterations=100) + assert path is None + + +def test_particle_filter_localize(): + """Test particle filter localization.""" + random.seed(42) + + # Initialize particles + particles = [ + particle_filter.Particle(random.uniform(0, 10), random.uniform(0, 10), 0.0, 1.0) + for _ in range(50) + ] + + landmarks = [(5.0, 5.0)] + measurement = (2.0, 0.0) + movement = (1.0, 0.0, 0.0) + + # Run one iteration + new_particles = particle_filter.particle_filter_localize( + particles, measurement, movement, landmarks, motion_noise=0.1, measurement_noise=0.5 + ) + + assert len(new_particles) == len(particles) + assert all(isinstance(p, particle_filter.Particle) for p in new_particles) + + +def test_particle_filter_estimate_pose(): + """Test pose estimation from particles.""" + particles = [ + particle_filter.Particle(5.0, 5.0, 0.0, 1.0), + particle_filter.Particle(5.1, 4.9, 0.1, 1.0), + particle_filter.Particle(4.9, 5.1, -0.1, 1.0), + ] + + x, y, heading = particle_filter.estimate_pose(particles) + assert 4.8 < x < 5.2 + assert 4.8 < y < 5.2 + assert -0.2 < heading < 0.2 + + +def test_occupancy_grid_creation(): + """Test occupancy grid initialization.""" + grid = occupancy_grid.OccupancyGrid(width=10, height=10, resolution=1.0) + + assert grid.width == 10 + assert grid.height == 10 + assert grid.resolution == 1.0 + assert len(grid.grid) == 10 + assert len(grid.grid[0]) == 10 + + +def test_occupancy_grid_update(): + """Test occupancy grid ray updates.""" + grid = occupancy_grid.OccupancyGrid(width=10, height=10, resolution=1.0) + + # Update with a ray + robot_pos = (5.0, 5.0) + hit_pos = (8.0, 5.0) + + grid.update_ray(robot_pos, hit_pos) + + # Check that some cells were updated + # Cells along ray should be marked as free (log_odds < 0) + assert grid.grid[5][6] < 0 # Cell along ray should be free + + # Endpoint should be marked as occupied (log_odds > 0) + assert grid.grid[5][8] > 0 + + +def test_occupancy_grid_probability(): + """Test occupancy probability calculation.""" + grid = occupancy_grid.OccupancyGrid(width=5, height=5) + + # Initially all cells should be at 0.5 probability (unknown) + assert abs(grid.get_probability(2, 2) - 0.5) < 0.01 + + # Manually set log-odds and check probability + grid.grid[2][2] = 2.0 # High log-odds = high probability + assert grid.get_probability(2, 2) > 0.8 + + grid.grid[3][3] = -2.0 # Low log-odds = low probability + assert grid.get_probability(3, 3) < 0.2 + + +def test_occupancy_grid_visualization(): + """Test occupancy grid visualization.""" + grid = occupancy_grid.OccupancyGrid(width=5, height=5) + + # Set some cells + grid.grid[2][2] = 2.0 # Occupied + grid.grid[1][1] = -2.0 # Free + + viz = grid.to_grid_visualization() + assert len(viz) == 5 + assert "#" in viz[2] # Occupied cell should show as # + assert "." in viz[1] # Free cell should show as . + + +def test_all_demos_run(): + """Test that all robotics demos can run without errors.""" + # Just check that demos don't crash + wall_following.demo() + bug_algorithms.demo() + pledge_algorithm.demo() + potential_fields.demo() + particle_filter.demo() + occupancy_grid.demo() + + # RRT demo uses randomness + random.seed(42) + rrt.demo()