-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathspg.py
645 lines (516 loc) · 31 KB
/
spg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
import os
from pathlib import Path
from tqdm import tqdm
from itertools import product
import numpy as np
import utm
from pyproj import Transformer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
from load_map import load_map, extract_waypoints
from openai_models import get_embed
from utils import load_from_file, save_to_file
KNOWN_RELATIONS = [
"left", "right",
"in front of", "opposite to", "behind",
"near", "next to", "adjacent to", "close to", "by",
"between",
"north of", "south of", "east of", "west of", "northeast of", "northwest of", "southeast of", "southwest of"
]
MAX_RANGE = 50 # target within this radius of anchor. providence: 60; auckland: 40; boston: 60; san_francisco: 80; indoor: 2; outdoor: 50
DIST_TO_ANCHOR = 2.0 # distance to robot when compute a target location for SRE with only an anchor
def plot_landmarks(landmarks=None, osm_fpth=None):
"""
Plot landmarks in the shared world space local to the Spot's map.
"""
plt.figure()
plt.rcParams.update({'font.size': 5})
if landmarks:
plt.scatter(x=[landmarks[L]["x"] for L in landmarks], y=[landmarks[L]["y"] for L in landmarks], c="green", label="landmarks")
for L in landmarks:
if "osm_name" not in landmarks[L] and L != "robot":
plt.text(landmarks[L]["x"], landmarks[L]["y"], L)
plt.scatter(x=landmarks["robot"]["x"],
y=landmarks["robot"]["y"], c="orange", label="robot")
plt.text(landmarks["robot"]["x"],
landmarks["robot"]["y"], "robot")
plt.legend()
if osm_fpth:
location_name = os.path.splitext(os.path.basename(osm_fpth))[0]
plt.title(f"Landmark Map: {location_name}")
plt.show(block=False)
else:
plt.title(f"Landmark Map")
plt.show(block=True)
plt.savefig(f"{os.path.join(os.path.dirname(osm_fpth), f'{location_name}_landmarks.png')}", dpi=300)
plt.rcdefaults() # reset font size to default
def rotate(vec, angle):
# https://motion.cs.illinois.edu/RoboticSystems/CoordinateTransformations.html
mat_rot = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])
return np.dot(mat_rot, vec)
def align_coordinates(graph_dpath, waypoints, osm_landmarks, coord_alignment, crs):
# rotation and translation to align Spot to world Cartesian frame (default: 0, not needed if no Spot graph)
rotation, translation = 0, 0
if coord_alignment:
# If use Spot graph, alignment landmark value is not None, then compute rotation and translation for alignment
print(" >> Computing alignment from robot to world frame...")
known_landmark_1 = np.array(crs.transform(coord_alignment[0]["long"], coord_alignment[0]["lat"], 0, radians=False)[:-1])
known_landmark_2 = np.array(crs.transform(coord_alignment[1]["long"], coord_alignment[1]["lat"], 0, radians=False)[:-1])
known_waypoint_1 = np.array([waypoints[coord_alignment[0]["waypoint"]]["position"]["x"],
waypoints[coord_alignment[0]["waypoint"]]["position"]["y"]])
known_waypoint_2 = np.array([waypoints[coord_alignment[1]["waypoint"]]["position"]["x"],
waypoints[coord_alignment[1]["waypoint"]]["position"]["y"]])
# Use the vector from known landmarks to compute rotation
vec_lmk_1_to_2 = known_landmark_2 - known_landmark_1
vec_wp_1_to_2 = known_waypoint_2 - known_waypoint_1
# Compute the rotation between the known landmark and waypoint
dir_world = np.arctan2(vec_lmk_1_to_2[1], vec_lmk_1_to_2[0]) # i.e., world coordinate
dir_robot = np.arctan2(vec_wp_1_to_2[1], vec_wp_1_to_2[0]) # i.e., Spot coordinate
rotation = dir_world - dir_robot
landmarks = {}
if graph_dpath and waypoints:
# Each image is named after the Spot waypoint ID (auto-generated by GraphNav)
waypoint_ids = [Path(image_fpath).stem for image_fpath in os.listdir(os.path.join(graph_dpath, "images"))]
for wid, wp_desc in waypoints.items():
# NOTE: all landmarks are either one of the following:
# 1. waypoint_0: robot start location when using GraphNav
# 2. landmarks whose waypoints created by GraphNav; they have images
is_landmark = True if wid in waypoint_ids or wp_desc["name"] == "waypoint_0" else False
if is_landmark:
cartesian_coords = np.array([wp_desc["position"]["x"], wp_desc["position"]["y"]])
# Align the Spot's cartesian coordinates to the world frame:
cartesian_coords = rotate(cartesian_coords, rotation)
if coord_alignment:
# Use the newly rotated point to figure out the translation
if wid == coord_alignment[0]["waypoint"]:
known_waypoint_1 = cartesian_coords
elif wid == coord_alignment[1]["waypoint"]:
known_waypoint_2 = cartesian_coords
lmk_id = "robot" if wp_desc["name"] == "waypoint_0" else wid
landmarks[lmk_id] = {"x": cartesian_coords[0], "y": cartesian_coords[1]}
if coord_alignment:
# Compute translation to align the known landmark from world to Spot space AFTER rotation
translation = ((known_waypoint_1 - known_landmark_1) + (known_waypoint_2 - known_landmark_2)) / 2.0
else:
# This means we are only working with OSM landmarks
print(" >> WARNING: not using Spot graph")
# Process then add OSM landmarks if provided
if osm_landmarks:
for lmk, lmk_desc in osm_landmarks.items():
# lmk_id = lmk.lower().replace(' ', '_')
if "wid" in lmk_desc:
# OSM landmarks visited by Spot GraphNav have waypoint IDs, just use Spot graph coorindates
wid = lmk_desc["wid"]
lmk_cartesian = np.array([waypoints[wid]["position"]["x"], waypoints[wid]["position"]["y"]])
# lmk_cartesian = np.array([landmarks[wid]["x"], landmarks[wid]["y"]])
# landmarks[wid]["osm_name"] = lmk
else:
# Convert landmark location to Cartesian coordinate then add computed translation
lmk_cartesian = np.array(crs.transform(lmk_desc["long"], lmk_desc["lat"], 0, radians=False)[:-1])
lmk_cartesian += translation
landmarks[lmk] = {"x": lmk_cartesian[0], "y": lmk_cartesian[1]}
return landmarks
def create_waypoints(obj_fpath, crs):
"""
Create waypoints of objects similar to that created by Spot GraphNav.
"""
objects = load_from_file(obj_fpath)
# Set the origin of Cartesian coordinates to the location of the robot
robot = None
if "waypoint_0" in objects:
robot = objects["waypoint_0"]
else:
raise KeyError(" >> ERROR: missing robot coordinates. Check obj_fpath file")
if "lat" in list(objects.values())[0]:
# Convert locations of objects from geographic to Cartesian coordinates if not provided as Cartesian
if not crs:
_, _, zone, _ = utm.from_latlon(robot["lat"], robot["long"])
crs = Transformer.from_crs(crs_from="+proj=latlong +ellps=WGS84 +datum=WGS84",
crs_to=f"+proj=utm +ellps=WGS84 +datum=WGS84 +south +units=m +zone={zone}")
# NOTE: a 2D map actually is projected to the X-Z Cartesian plane, NOT X-Y
# thus we only take the x and z coordinates, where the z will be used as Spot's y-axis
for loc in objects.values():
loc["x"], loc["y"] = crs.transform(loc["long"], loc["lat"], 0, radians=False)[:-1]
# Convert to same data structure output by Spot GraphNav
waypoints = {obj: {"name": obj, "position": {"x": loc["x"], "y": loc["y"]}} for obj, loc in objects.items()}
return waypoints, crs
def load_lmks(graph_dpath=None, osm_fpath=None, ignore_graph=False):
"""
Load landmarks from OSM or Spot graph or both then convert their locations to Cartesian coordinates.
"""
# Load waypoints from Spot graph if exists
waypoints, transformer = None, None
try:
graph, _, _, _, _, _ = load_map(graph_dpath)
except Exception:
print(" >> WARNING: no Spot graph file found in provided directory path\nCreate waypoints from object locations")
waypoints, transformer = create_waypoints(os.path.join(graph_dpath, "obj_locs.json"), crs=None)
else:
# Get important details from Spot graph and create a dict instead of using their data structure
waypoints = extract_waypoints(graph)
# Load text description of OSM landmarks if exists
osm_landmarks = []
if os.path.isfile(osm_fpath):
osm_landmarks = load_from_file(osm_fpath)
# Use geographic coordinates of first landmark to get a zone number for UTM conversion
lmk_desc = list(osm_landmarks.values())[0]
_, _, zone, _ = utm.from_latlon(lmk_desc["lat"], lmk_desc["long"])
transformer = Transformer.from_crs(crs_from="+proj=latlong +ellps=WGS84 +datum=WGS84",
crs_to=f"+proj=utm +ellps=WGS84 +datum=WGS84 +south +units=m +zone={zone}")
else:
print(" >> WARNING: no OSM landmarks loaded")
# Load Spot waypoints in both geographic and Cartesian coordinates for transformation
alignment_lmks = []
alignment_fpath = os.path.join(graph_dpath, "alignment.json")
if os.path.isfile(alignment_fpath):
alignment_lmks = load_from_file(alignment_fpath)
# Put Spot waypoints and OSM landmarks in Cartesian coordinates
landmarks = align_coordinates(graph_dpath, waypoints, osm_landmarks, alignment_lmks, transformer)
# Visualize landmarks
plot_landmarks(landmarks, osm_fpath)
return landmarks
def sort_combs(lmk_grounds):
"""
Sort all combinations of target and anchor landmarks by their joint cosine similarity scores.
"""
combs_sorted = []
for comb in list(product(*lmk_grounds)): # Cartesian product of lists of target and anchor landmarks
joint_score = 1
target, anchor = [], []
for idx, score_lmk in enumerate(comb):
joint_score *= score_lmk[0]
# Get target or anchor landmark name of the combination
if idx == 0: # target landmark is always the first in a combination
target.append(score_lmk[1])
else: # SRE with 0, 1 or 2 anchor landmarks
anchor.append(score_lmk[1])
combs_sorted.append({"score": joint_score, "target": target, "anchor": anchor})
combs_sorted.sort(key=lambda comb: comb["score"], reverse=True)
return combs_sorted
def find_match_rel(rel_unseen, known_rel_embeds_fpath):
"""
Use cosine similatiry between text embeddings to find best matching known spatial relation to unseen input.
"""
if os.path.isfile(known_rel_embeds_fpath):
known_rel_embeds = load_from_file(known_rel_embeds_fpath)
else:
known_rel_embeds = {known_rel: get_embed(known_rel) for known_rel in KNOWN_RELATIONS}
save_to_file(known_rel_embeds, known_rel_embeds_fpath)
# unseen_rel_embed = get_embed(rel_unseen)
unknown_rel_embeds_fpath = known_rel_embeds_fpath.replace("known", "unknown")
unknown_rel_embeds = load_from_file(unknown_rel_embeds_fpath) if os.path.isfile(unknown_rel_embeds_fpath) else {}
if rel_unseen in unknown_rel_embeds:
unseen_rel_embed = unknown_rel_embeds[rel_unseen]
else:
unseen_rel_embed = get_embed(rel_unseen)
unknown_rel_embeds[rel_unseen] = unseen_rel_embed
save_to_file(unknown_rel_embeds, unknown_rel_embeds_fpath)
print(f"SAVED UNSEEN SPATIAL RELATION: {rel_unseen}'")
scores = cosine_similarity(np.array(unseen_rel_embed).reshape(1, -1), np.array(list(known_rel_embeds.values())))[0]
rel_match = sorted(zip(scores, KNOWN_RELATIONS), reverse=True)[0][1]
return rel_match
def get_target_loc(landmarks, spatial_rel, anchor_candidate, sre=None, plot=False):
"""
Ground spatial referring expression with only an anchor landmark: left, right, cardinal directions
by finding a location relative to the given anchor landmark.
e.g., go to the left side of the bakery, go to the north of the bakery
"""
robot = landmarks["robot"]
try:
anchor = landmarks[anchor_candidate]
except KeyError:
return None
# Compute valid the range vector(s) (potentially only one) for an anchoring landmark
range_vecs = compute_area(spatial_rel, robot, anchor)
# Compute robot location that is at given distance to the anchor
loc_min = {"x": (range_vecs[0]["mean"][0] * DIST_TO_ANCHOR) + anchor["x"],
"y": (range_vecs[0]["mean"][1] * DIST_TO_ANCHOR) + anchor["y"]}
dist_min = np.linalg.norm(np.array([loc_min["x"], loc_min["y"]]) - np.array([robot["x"], robot["y"]]))
range_vec_closest = range_vecs[0]
for range_vec in range_vecs:
loc_new = {"x": (range_vec["mean"][0] * DIST_TO_ANCHOR) + anchor["x"],
"y": (range_vec["mean"][1] * DIST_TO_ANCHOR) + anchor["y"]}
dist_new = np.linalg.norm(np.array([loc_new["x"], loc_new["y"]]) - np.array([robot["x"], robot["y"]]))
if dist_new < dist_min:
loc_min = loc_new
dist_min = dist_new
range_vec_closest = range_vec
if plot:
plt.figure()
plt.scatter(x=[robot["x"]], y=[robot["y"]], marker="o", label="robot")
plt.scatter(x=[loc_min["x"]], y=[loc_min["y"]], marker="x", c="g", s=15, label="new robot loc")
# Plot all target and anchor landmarks
for lmk in landmarks:
plt.scatter(x=landmarks[lmk]["x"], y=landmarks[lmk]["y"], marker="o", c="darkorange", label=f"anchor: {lmk}")
plt.text(landmarks[lmk]["x"], landmarks[lmk]["y"], lmk)
# Plot the range
plt.plot([anchor["x"], (range_vec_closest["min"][0] * DIST_TO_ANCHOR) + anchor["x"]],
[anchor["y"], (range_vec_closest["min"][1] * DIST_TO_ANCHOR) + anchor["y"]],
linestyle="dotted", c="r")
plt.plot([anchor["x"], (range_vec_closest["max"][0] * DIST_TO_ANCHOR) + anchor["x"]],
[anchor["y"], (range_vec_closest["max"][1] * DIST_TO_ANCHOR) + anchor["y"]],
linestyle="dotted", c="b")
plt.title(f"Computed Target Position: {sre}" if sre else f"Computed Target Position: {spatial_rel}")
plt.axis("square")
plt.legend()
plt.show(block=False)
return loc_min
def compute_area(spatial_rel, robot, anchor, do_360_search=False, anchor_name=None, plot=False):
"""
Compute a vector from anchor to robot as a normal vector pointing outside of anchor
and a range within which the vector from anchor to target can lie.
"""
range_vecs = []
# Compute unit vector from anchor to robot
vec_a2r = [robot["x"] - anchor["x"], robot["y"] - anchor["y"]]
unit_vec_a2r = np.array(vec_a2r) / np.linalg.norm(vec_a2r)
fov = 180 # robot's field-of-view
if spatial_rel in ["in front of", "opposite to"]:
mean_angle = 0
elif spatial_rel in ["behind"]:
mean_angle = 180
elif spatial_rel in ["left"]:
mean_angle = -90
elif spatial_rel in ["right"]:
mean_angle = 90
elif spatial_rel in ["north of", "north", "south of", "south", "east of", "east", "west of", "west",
"northeast of", "northeast", "northwest of", "northwest",
"southeast of", "southeast", "southwest of", "southwest"]:
# Find the difference between each cardinal direction and the current anchor-to-robot vector to figure out how much to rotate it
if spatial_rel in ["north", "north of"]:
mean_angle = np.rad2deg(np.arctan2(1, 0) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
elif spatial_rel in ["south", "south of"]:
mean_angle = np.rad2deg(np.arctan2(-1, 0) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
elif spatial_rel in ["east", "east of"]:
mean_angle = np.rad2deg(np.arctan2(0, 1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
elif spatial_rel in ["west", "west of"]:
mean_angle = np.rad2deg(np.arctan2(0, -1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
elif spatial_rel in ["northeast", "northeast of"]:
mean_angle = np.rad2deg(np.arctan2(1, 1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
fov = 90
elif spatial_rel in ["northwest", "northwest of"]:
mean_angle = np.rad2deg(np.arctan2(1, -1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
fov = 90
elif spatial_rel in ["southeast", "southeast of"]:
mean_angle = np.rad2deg(np.arctan2(-1, 1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
fov = 90
elif spatial_rel in ["southwest", "southwest of"]:
mean_angle = np.rad2deg(np.arctan2(-1, -1) - np.arctan2(unit_vec_a2r[1], unit_vec_a2r[0]))
fov = 90
do_360_search = False # NOTE: since cardinal directions are absolute, we should not do any 360-sweep:
# Check for sweep condition: this means we will consider different normal vectors representing the "front" of the object
rots_a2r = [0]
if spatial_rel in ["near", "next to", "adjacent to", "close to", "by"] or do_360_search:
mean_angle = 0
rots_a2r = [rot for rot in range(0, 360, fov)]
for rot in rots_a2r:
# Compute the mean vector and vectors representing min and max range
vec_a2t_mean = rotate(unit_vec_a2r, np.deg2rad(mean_angle + rot))
vec_a2t_min = rotate(vec_a2t_mean, np.deg2rad(- fov / 2))
vec_a2t_max = rotate(vec_a2t_mean, np.deg2rad(fov / 2))
range_vecs.append({"mean": vec_a2t_mean, "min": vec_a2t_min, "max": vec_a2t_max})
if plot:
plt.figure()
# Plot robot and anchor location
plt.scatter(x=[robot["x"]], y=[robot["y"]], marker="o", color="yellow", label="robot")
plt.scatter(x=[anchor["x"]], y=[anchor["y"]], marker="o", color="orange", label="anchor")
plt.text(anchor["x"], anchor["y"], s=anchor_name)
# Plot the normal vector from the robot to the anchor:
plt.plot([robot["x"], anchor["x"]], [robot["y"], anchor["y"]], color="black")
plt.arrow(x=robot["x"], y=robot["y"], dx=-vec_a2r[0]/2.0, dy=-vec_a2r[1]/2.0, shape="full",
width=0.01, head_width=0.1, color="black", label="normal")
for idx, range_vec in enumerate(range_vecs):
mean_pose = [(range_vec["mean"][0] * MAX_RANGE) + anchor["x"],
(range_vec["mean"][1] * MAX_RANGE) + anchor["y"]]
plt.scatter(x=[mean_pose[0]], y=[mean_pose[1]], c="g", marker="o", label=f"mean_{idx}")
min_pose = [(range_vec["min"][0] * MAX_RANGE) + anchor["x"],
(range_vec["min"][1] * MAX_RANGE) + anchor["y"]]
plt.scatter(x=[min_pose[0]], y=[min_pose[1]], c="r", marker="x", label=f"min_{idx}")
max_pose = [(range_vec["max"][0] * MAX_RANGE) + anchor["x"],
(range_vec["max"][1] * MAX_RANGE) + anchor["y"]]
plt.scatter(x=[max_pose[0]], y=[max_pose[1]], c="b", marker="x", label=f"max_{idx}")
plt.plot([anchor["x"], mean_pose[0]], [anchor["y"], mean_pose[1]], linestyle="dashed", c="g")
plt.plot([anchor["x"], min_pose[0]], [anchor["y"], min_pose[1]], linestyle="dotted", c="r")
plt.plot([anchor["x"], max_pose[0]], [anchor["y"], max_pose[1]], linestyle="dotted", c="b")
plt.title(f"Evaluated range for spatial relation: {spatial_rel}")
plt.legend()
plt.axis("square")
plt.show(block=False)
plt.savefig(f"compute-area_{'_'.join(spatial_rel.split(' '))}.png")
return range_vecs
def eval_spatial_pred(landmarks, spatial_rel, target_candidate, anchor_candidates, sre=None, plot=False):
"""
Evaluate if a spatial relation is valid given candidate target landmark and anchor landmark(s).
"""
robot, target = landmarks["robot"], landmarks[target_candidate]
# If target equals to any anchor, spatial predicate is True
if target_candidate in anchor_candidates:
return False
# Check if target has same location as any anchor
# OSM landmark name and Spot waypoint ID refer to same location
for lmk_id in anchor_candidates:
if target["x"] == landmarks[lmk_id]["x"] and target["y"] == landmarks[lmk_id]["y"]:
return False
if spatial_rel in ["between"]:
# assert len(anchor_candidates) == 2, f"between but candidate anchors less than two: {anchor_candidates}"
try:
anchor_1 = landmarks[anchor_candidates[0]]
anchor_2 = landmarks[anchor_candidates[1]]
except (KeyError, IndexError):
return False # anchor may instead be a waypoint in the Spot space
# Avoid evaluating a target ''between'' the same anchor
if anchor_candidates[0] == anchor_candidates[1]:
return False
if anchor_1["x"] == anchor_2["x"] and anchor_1["y"] == anchor_2["y"]:
return False
target = np.array([target["x"], target["y"]])
anchor_1 = np.array([anchor_1["x"], anchor_1["y"]])
anchor_2 = np.array([anchor_2["x"], anchor_2["y"]])
# Find two lines perpendicular to the vector defined by two anchors and passing through them
vec_a1_to_a2 = anchor_2 - anchor_1
slope = - vec_a1_to_a2[0] / vec_a1_to_a2[1] # line slope is negative recipical of vector slope
# slope = np.tan([- vec_a1_to_a2[1], vec_a1_to_a2[0]]) # line slope is negative recipical of vector slope
offset_1 = - slope * anchor_1[0] + anchor_1[1] # -m.x + y = c
offset_2 = - slope * anchor_2[0] + anchor_2[1] # -m.x + y = d
# Check target between two lines: a.x_tar + b.y_tar between c and d
offset_tar = -slope * target[0] + target[1]
is_tar_between = (offset_tar >= offset_1 and offset_tar <= offset_2) or (offset_tar >= offset_2 and offset_tar <= offset_1)
# Check target within max distance to two achors
dist_anchor1_to_tar = np.linalg.norm(target - anchor_1)
dist_anchor2_to_tar = np.linalg.norm(target - anchor_2)
is_pred_true = is_tar_between and dist_anchor1_to_tar <= MAX_RANGE and dist_anchor2_to_tar <= MAX_RANGE
# if is_pred_true:
# print(f' - VALID LANDMARKS:\ttarget:{target_candidate}\tanchor:{anchor_candidates}')
if plot:
vec_a1_to_a2 = anchor_2 - anchor_1; vec_a1_to_a2 /= np.linalg.norm(vec_a1_to_a2)
A, B = rotate(vec_a1_to_a2 * MAX_RANGE, np.deg2rad(-90)) + anchor_1, rotate(vec_a1_to_a2 * MAX_RANGE, np.deg2rad(90)) + anchor_1
C, D = rotate(vec_a1_to_a2 * MAX_RANGE, np.deg2rad(-90)) + anchor_2, rotate(vec_a1_to_a2 * MAX_RANGE, np.deg2rad(90)) + anchor_2
plt.figure(figsize=(10,6))
plt.title(f"Grounding SRE: {sre}\n(Target:{target_candidate}, Anchors:{anchor_candidates})")
plt.scatter(x=[target[0]], y=[target[1]], marker='o', color='green', label='target')
plt.scatter(x=[anchor_1[0]], y=[anchor_1[1]], marker='o', color='orange', label='anchor_1')
plt.scatter(x=[anchor_2[0]], y=[anchor_2[1]], marker='o', color='orange', label='anchor_2')
plt.plot([A[0], anchor_1[0]], [A[1], anchor_1[1]], linestyle='dotted', c='r')
plt.plot([C[0], anchor_2[0]], [C[1], anchor_2[1]], linestyle='dotted', c='b')
plt.plot([B[0], anchor_1[0]], [B[1], anchor_1[1]], linestyle='dotted', c='r')
plt.plot([D[0], anchor_2[0]], [D[1], anchor_2[1]], linestyle='dotted', c='b')
plt.plot([anchor_1[0], anchor_2[0]], [anchor_1[1], anchor_2[1]], linestyle='dotted', c='black')
plt.text(x=target[0], y=target[1], s=target_candidate)
plt.text(x=anchor_1[0], y=anchor_1[1], s=anchor_candidates[0])
plt.text(x=anchor_2[0], y=anchor_2[1], s=anchor_candidates[1])
plt.axis('square')
plt.show(block=False)
plt.savefig(f"eval-spatial-pred-{spatial_rel}-{target_candidate}-{'-'.join(anchor_candidates)}.png")
return is_pred_true
else:
try:
anchor = landmarks[anchor_candidates[0]]
except KeyError:
return False # anchor may instead be a waypoint in the Spot space
is_pred_true = False
range_vecs = compute_area(spatial_rel, robot, anchor, anchor_name=anchor_candidates[0], plot=False)
target = np.array([target["x"], target["y"]])
anchor = np.array([anchor["x"], anchor["y"]])
for range_vec in range_vecs:
vec_anc2tar = target - anchor
vec_mean = np.array([range_vec["mean"][0], range_vec["mean"][1]])
vec_min = np.array([range_vec["min"][0], range_vec["min"][1]])
vec_max = np.array([range_vec["max"][0], range_vec["max"][1]])
is_same_dir_mean = np.dot(vec_anc2tar, vec_mean) >= 0 # angle between target and mean vectors [-90, 90]
is_between_min_max = np.cross(vec_min, vec_anc2tar) >= 0 and np.cross(vec_max, vec_anc2tar) <= 0 # angle between target and min vectors [0, 180], between target and max vectors [-180, 0)
is_within_dist = np.linalg.norm(vec_anc2tar) <= MAX_RANGE
if is_same_dir_mean and is_between_min_max and is_within_dist:
is_pred_true = True
break
if plot:
# Plot the computed vector range
plt.figure(figsize=(10,6))
plt.title(f"Grounding SRE: {sre}\n(Target:{target_candidate}, Anchor:{anchor_candidates})")
plt.scatter(x=[robot["x"]], y=[robot["y"]], marker="o", color="yellow", label="robot")
plt.scatter(x=[anchor[0]], y=[anchor[1]], marker="o", color="orange", label="anchor")
plt.scatter(x=[target[0]], y=[target[1]], marker="o", color="green", label="target")
plt.plot([robot["x"], anchor[0]], [robot["y"], anchor[1]], linestyle="dotted", c="k", label="normal")
plt.text(anchor[0], anchor[1], s=anchor_candidates[0])
plt.text(target[0], target[1], s=target_candidate)
for vec_idx, range_vec in enumerate(range_vecs):
mean_pose = np.array([(range_vec["mean"][0] * MAX_RANGE) + anchor[0],
(range_vec["mean"][1] * MAX_RANGE) + anchor[1]])
plt.scatter(x=[mean_pose[0]], y=[mean_pose[1]], c="grey", marker="x", label="mean")
min_pose = np.array([(range_vec["min"][0] * MAX_RANGE) + anchor[0],
(range_vec["min"][1] * MAX_RANGE) + anchor[1]])
plt.scatter(x=[min_pose[0]], y=[min_pose[1]], c="r", marker="x", label="min")
max_pose = np.array([(range_vec["max"][0] * MAX_RANGE) + anchor[0],
(range_vec["max"][1] * MAX_RANGE) + anchor[1]])
plt.scatter(x=[max_pose[0]], y=[max_pose[1]], c="b", marker="x", label="max")
if vec_idx == (len(range_vecs) - 1):
plt.plot([anchor[0], mean_pose[0]], [anchor[1], mean_pose[1]], linestyle="dotted", c="grey", label="mean_range" )
plt.plot([anchor[0], min_pose[0]], [anchor[1], min_pose[1]], linestyle="dotted", c="r", label="min_range")
plt.plot([anchor[0], max_pose[0]], [anchor[1], max_pose[1]], linestyle="dotted", c="b", label="max_range")
else:
plt.plot([anchor[0], mean_pose[0]], [anchor[1], mean_pose[1]], linestyle="dotted", c="grey")
plt.plot([anchor[0], min_pose[0]], [anchor[1], min_pose[1]], linestyle="dotted", c="r")
plt.plot([anchor[0], max_pose[0]], [anchor[1], max_pose[1]], linestyle="dotted", c="b")
plt.legend()
plt.axis("square")
plt.show(block=False)
plt.savefig(f"eval-spatial-pred-{spatial_rel}-{target_candidate}-{'-'.join(anchor_candidates)}.png")
return is_pred_true
def spg(landmarks, reg_out, topk, rel_embeds_fpath, max_range=None):
# print(f"***** SPG Command: {reg_out['utt']}")
if max_range:
global MAX_RANGE
MAX_RANGE = max_range
# print(f" -> MAX_RANGE = {MAX_RANGE}\n")
spg_out = {}
for sre, grounded_spatial_preds in reg_out["grounded_sre_to_preds"].items():
# print(f"Grounding SRE: {sre}")
rel_query, lmk_grounds = list(grounded_spatial_preds.items())[0]
# Rank all combinations of target and anchor landmarks
lmk_grounds_sorted = sort_combs(lmk_grounds)
if rel_query == "None":
# Referring expression without spatial relation
groundings = [{"target": lmk_ground["target"][0]} for lmk_ground in lmk_grounds_sorted[:topk]]
else:
groundings = []
rel_match = rel_query
if rel_query not in KNOWN_RELATIONS:
# Find best match for unseen spatial relation in set of known spatial relations
rel_match = find_match_rel(rel_query, rel_embeds_fpath)
# print(f"UNSEEN SPATIAL RELATION:\t'{rel_query}' matched to '{rel_match}'")
if len(lmk_grounds) == 1:
# Spatial referring expression contains only a anchor landmark
for lmk_ground in lmk_grounds_sorted[:topk]:
groundings.append(get_target_loc(landmarks, rel_match, lmk_ground["target"][0], sre))
else:
# Spatial referring expression contains a target landmark and one or two anchor landmarks
# one anchor, e.g., <tar> left of <anc>
# two anchors, e.g., <tar> between <anc1> and <anc2>
for lmk_ground in lmk_grounds_sorted:
target_name = lmk_ground["target"][0]
anchor_names = lmk_ground["anchor"]
is_valid = eval_spatial_pred(landmarks, rel_match, target_name, anchor_names, sre)
if is_valid:
groundings.append({"target": target_name, "anchor": anchor_names})
if len(groundings) == topk:
break
spg_out[sre] = groundings
plt.close("all")
# print("\n")
return spg_out
def run_exp_spg(reg_out_fpath, graph_dpath, osm_fpath, topk, rel_embeds_fpath, spg_out_fpath):
if not os.path.isfile(spg_out_fpath):
reg_outs = load_from_file(reg_out_fpath)
landmarks = load_lmks(graph_dpath, osm_fpath)
for reg_out in tqdm(reg_outs, desc="Running spatial predicate grounding (SPG) module"):
reg_out["grounded_sps"] = spg(landmarks, reg_out, topk, rel_embeds_fpath)
save_to_file(reg_outs, spg_out_fpath)
if __name__ == "__main__":
location = "blackstone"
data_dpath = os.path.join(os.path.expanduser("~"), "ground", "data")
graph_dpath = os.path.join(data_dpath, "maps", "downloaded_graph_2024-01-27_07-48-53")
osm_fpath = os.path.join(data_dpath, "osm", f"{location}.json")
reg_outs_fpath = os.path.join(os.path.expanduser("~"), "ground", "results", f"reg_outs_{location}.json")
reg_outputs = load_from_file(reg_outs_fpath)
landmarks = load_lmks(graph_dpath, osm_fpath)
for reg_output in reg_outputs:
spg(landmarks, reg_output, topk=5)