-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
mqtt_messages_handler.rb
137 lines (120 loc) · 4.06 KB
/
mqtt_messages_handler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class MqttMessagesHandler
def handle_topic(topic, message, retry_on_nil_device=true)
return if topic.nil?
message = message.encode("US-ASCII", invalid: :replace, undef: :replace, replace: "")
log_message_to_sentry(topic, message)
handshake_device(topic)
if topic.to_s.include?('inventory')
handle_inventory(message)
return true
else
device = find_device_for_topic(topic, message, retry_on_nil_device)
return nil if device.nil?
with_device_error_handling(device, topic, message) do
if topic.to_s.include?('raw')
handle_readings(device, parse_raw_readings(message))
elsif topic.to_s.include?('readings')
handle_readings(device, message)
elsif topic.to_s.include?('info')
handle_info(device, message)
else
true
end
end
end
end
private
def handle_inventory(message)
DeviceInventory.create({ report: (message rescue nil) })
return true
end
def handle_readings(device, message)
parsed = JSON.parse(message) if message
data = parsed["data"] if parsed
return nil if data.nil? or data&.empty?
data.each do |reading|
storer.store(device, reading)
end
return true
rescue Exception => e
Sentry.capture_exception(e)
raise e if Rails.env.test?
end
def handle_info(device, message)
json_message = JSON.parse(message)
device.update_column(:hardware_info, json_message)
return true
end
def parse_raw_readings(message)
JSON[raw_readings_parser.parse(message)]
end
def handshake_device(topic)
orphan_device = OrphanDevice.find_by(device_token: device_token(topic))
return if orphan_device.nil?
orphan_device.update!(device_handshake: true)
Redis.current.publish('token-received', {
onboarding_session: orphan_device.onboarding_session
}.to_json)
end
def log_message_to_sentry(topic, message)
Sentry.set_tags('mqtt-topic': topic)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Handling topic #{topic}",
data: { topic: topic, message: message }
)
Sentry.add_breadcrumb(crumb)
end
def find_device_for_topic(topic, message, retry_on_nil_device)
device = Device.find_by(device_token: device_token(topic))
handle_nil_device(topic, message, retry_on_nil_device) if device.nil?
return device
end
def handle_nil_device(topic, message, retry_on_nil_device)
orphan_device = OrphanDevice.find_by_device_token(device_token(topic))
if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device
RetryMQTTMessageJob.perform_later(topic, message) if retry_on_nil_device
end
end
def with_device_error_handling(device, topic, message, reraise=true, &block)
begin
block.call
rescue Exception => e
hardware_info = device.hardware_info
Sentry.set_tags({
"device-id": device.id,
"device-hardware-version": hardware_info&.[]("hw_ver"),
"device-esp-version": hardware_info&.[]("esp_ver"),
"device-sam-version": hardware_info&.[]("sam_ver"),
})
Sentry.capture_exception(e)
last_error = device.ingest_errors.order(created_at: :desc).first
ingest_error = device.ingest_errors.create({
topic: topic,
message: message,
error_class: e.class.name,
error_message: e.message,
error_trace: e.full_message
})
if send_device_error_warnings && (!last_error || last_error.created_at < device_error_warning_threshold)
UserMailer.device_ingest_errors(device.id).deliver_later
end
raise e if reraise
end
end
def send_device_error_warnings
ENV.fetch("SEND_DEVICE_ERROR_WARNINGS", false)
end
def device_error_warning_threshold
ENV.fetch("DEVICE_ERROR_WARNING_THRESHOLD_HOURS", "6").to_i.hours.ago
end
def device_token(topic)
device_token = topic[/device\/sck\/(.*?)\//m, 1].to_s
end
def storer
@storer ||= Storer.new
end
def raw_readings_parser
@raw_readings_parser ||= RawMqttMessageParser.new
end
end