-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
101 lines (88 loc) · 3.73 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import sys
import time
import cv2
import ntcore
from calibration.CalibrationCommandSource import (
CalibrationCommandSource,
NTCalibrationCommandSource,
)
from calibration.CalibrationSession import CalibrationSession
from config.config import ConfigStore, LocalConfig, RemoteConfig
from config.ConfigSource import ConfigSource, FileConfigSource, NTConfigSource
from output.OutputPublisher import NTOutputPublisher, OutputPublisher
from output.overlay_util import overlay_image_observation
from output.StreamServer import MjpegServer
from pipeline.CameraPoseEstimator import MultiTargetCameraPoseEstimator
from pipeline.FiducialPoseEstimator import SquareTargetPoseEstimator
from pipeline.Capture import GStreamerCapture
from pipeline.FiducialDetector import ArucoFiducialDetector
if __name__ == "__main__":
config = ConfigStore(LocalConfig(), RemoteConfig())
local_config_source: ConfigSource = FileConfigSource()
remote_config_source: ConfigSource = NTConfigSource()
calibration_command_source: CalibrationCommandSource = NTCalibrationCommandSource()
capture = GStreamerCapture()
fiducial_detector = ArucoFiducialDetector(cv2.aruco.DICT_APRILTAG_36H11)
camera_pose_estimator = MultiTargetCameraPoseEstimator()
fiducial_pose_estimator = SquareTargetPoseEstimator()
output_publisher: OutputPublisher = NTOutputPublisher()
stream_server = MjpegServer()
calibration_session = CalibrationSession()
local_config_source.update(config)
ntcore.NetworkTableInstance.getDefault().setServer(config.local_config.server_ip)
ntcore.NetworkTableInstance.getDefault().startClient4(config.local_config.device_id)
stream_server.start(config)
frame_count = 0
last_print = 0
was_calibrating = False
while True:
try:
remote_config_source.update(config)
timestamp = time.time()
success, image = capture.get_frame(config)
if not success:
time.sleep(0.5)
continue
fps = None
frame_count += 1
if time.time() - last_print > 1:
last_print = time.time()
fps = frame_count
print("Running at", frame_count, "fps")
frame_count = 0
if calibration_command_source.get_calibrating(config):
# Calibration mode
was_calibrating = True
calibration_session.process_frame(
image, calibration_command_source.get_capture_flag(config)
)
elif was_calibrating:
# Finish calibration
calibration_session.finish()
sys.exit(1)
elif config.local_config.has_calibration:
# Normal mode
try:
image_observations = fiducial_detector.detect_fiducials(
image, config
)
[overlay_image_observation(image, x) for x in image_observations]
pose_observation = camera_pose_estimator.solve_camera_pose(
image_observations, config
)
if pose_observation is not None:
for pose in pose_observation:
output_publisher.send(config, timestamp, pose, fps)
except Exception as e:
print(e)
output_publisher.error(str(e))
time.sleep(0.5)
else:
# No calibration
print("No calibration found")
time.sleep(0.5)
stream_server.set_frame(image)
except KeyboardInterrupt:
print("Interrupted")
break
capture.stop()