-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[APPSEC-10967] Compress and encode schema information #3177
Changes from 5 commits
8c52213
f9af157
b6b77a4
a686d5b
733d010
5b10018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
require 'json' | ||
require 'zlib' | ||
require 'base64' | ||
|
||
require_relative 'rate_limiter' | ||
|
||
|
@@ -34,86 +36,120 @@ module Event | |
Content-Language | ||
].map!(&:downcase).freeze | ||
|
||
MAX_ENCODED_SCHEMA_SIZE = 25000 | ||
|
||
# Record events for a trace | ||
# | ||
# This is expected to be called only once per trace for the rate limiter | ||
# to properly apply | ||
def self.record(span, *events) | ||
# ensure rate limiter is called only when there are events to record | ||
return if events.empty? || span.nil? | ||
|
||
Datadog::AppSec::RateLimiter.limit(:traces) do | ||
record_via_span(span, *events) | ||
end | ||
end | ||
class << self | ||
def record(span, *events) | ||
# ensure rate limiter is called only when there are events to record | ||
return if events.empty? || span.nil? | ||
|
||
def self.record_via_span(span, *events) | ||
events.group_by { |e| e[:trace] }.each do |trace, event_group| | ||
unless trace | ||
Datadog.logger.debug { "{ error: 'no trace: cannot record', event_group: #{event_group.inspect}}" } | ||
next | ||
Datadog::AppSec::RateLimiter.limit(:traces) do | ||
record_via_span(span, *events) | ||
end | ||
end | ||
|
||
trace.keep! | ||
trace.set_tag( | ||
Datadog::Tracing::Metadata::Ext::Distributed::TAG_DECISION_MAKER, | ||
Datadog::Tracing::Sampling::Ext::Decision::ASM | ||
) | ||
def record_via_span(span, *events) | ||
events.group_by { |e| e[:trace] }.each do |trace, event_group| | ||
unless trace | ||
Datadog.logger.debug { "{ error: 'no trace: cannot record', event_group: #{event_group.inspect}}" } | ||
next | ||
end | ||
|
||
# prepare and gather tags to apply | ||
service_entry_tags = build_service_entry_tags(event_group) | ||
trace.keep! | ||
trace.set_tag( | ||
Datadog::Tracing::Metadata::Ext::Distributed::TAG_DECISION_MAKER, | ||
Datadog::Tracing::Sampling::Ext::Decision::ASM | ||
) | ||
|
||
# complex types are unsupported, we need to serialize to a string | ||
triggers = service_entry_tags.delete('_dd.appsec.triggers') | ||
span.set_tag('_dd.appsec.json', JSON.dump({ triggers: triggers })) | ||
# prepare and gather tags to apply | ||
service_entry_tags = build_service_entry_tags(event_group) | ||
|
||
# apply tags to service entry span | ||
service_entry_tags.each do |key, value| | ||
span.set_tag(key, value) | ||
# apply tags to service entry span | ||
service_entry_tags.each do |key, value| | ||
span.set_tag(key, value) | ||
end | ||
end | ||
end | ||
end | ||
|
||
def self.build_service_entry_tags(event_group) | ||
event_group.each_with_object({}) do |event, tags| | ||
# TODO: assume HTTP request context for now | ||
|
||
if (request = event[:request]) | ||
request_headers = request.headers.select do |k, _| | ||
ALLOWED_REQUEST_HEADERS.include?(k.downcase) | ||
# rubocop: disable Metrics/MethodLength | ||
def build_service_entry_tags(event_group) | ||
waf_events = [] | ||
entry_tags = event_group.each_with_object({ '_dd.origin' => 'appsec' }) do |event, tags| | ||
# TODO: assume HTTP request context for now | ||
if (request = event[:request]) | ||
request.headers.each do |header, value| | ||
tags["http.request.headers.#{header}"] = value if ALLOWED_REQUEST_HEADERS.include?(header.downcase) | ||
end | ||
|
||
tags['http.host'] = request.host | ||
tags['http.useragent'] = request.user_agent | ||
tags['network.client.ip'] = request.remote_addr | ||
end | ||
|
||
request_headers.each do |header, value| | ||
tags["http.request.headers.#{header}"] = value | ||
if (response = event[:response]) | ||
response.headers.each do |header, value| | ||
tags["http.response.headers.#{header}"] = value if ALLOWED_RESPONSE_HEADERS.include?(header.downcase) | ||
end | ||
Comment on lines
+97
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reduce the number of times we iterate over |
||
end | ||
|
||
tags['http.host'] = request.host | ||
tags['http.useragent'] = request.user_agent | ||
tags['network.client.ip'] = request.remote_addr | ||
end | ||
waf_result = event[:waf_result] | ||
# accumulate triggers | ||
waf_events += waf_result.events | ||
|
||
if (response = event[:response]) | ||
response_headers = response.headers.select do |k, _| | ||
ALLOWED_RESPONSE_HEADERS.include?(k.downcase) | ||
end | ||
waf_result.derivatives.each do |key, value| | ||
parsed_value = json_parse(value) | ||
next unless parsed_value | ||
|
||
response_headers.each do |header, value| | ||
tags["http.response.headers.#{header}"] = value | ||
parsed_value_size = parsed_value.size | ||
|
||
compressed_data = compressed_and_base64_encoded(parsed_value) | ||
compressed_data_size = compressed_data.size | ||
|
||
if compressed_data_size >= MAX_ENCODED_SCHEMA_SIZE && parsed_value_size >= MAX_ENCODED_SCHEMA_SIZE | ||
Datadog.logger.debug do | ||
"Schema key: #{key} exceeds the max size value. It will not be included as part of the span tags" | ||
end | ||
next | ||
end | ||
|
||
derivative_value = parsed_value_size > compressed_data_size ? compressed_data : parsed_value | ||
|
||
tags[key] = derivative_value | ||
end | ||
|
||
tags | ||
end | ||
|
||
tags['_dd.origin'] = 'appsec' | ||
appsec_events = json_parse({ triggers: waf_events }) | ||
entry_tags['_dd.appsec.json'] = appsec_events if appsec_events | ||
Comment on lines
+132
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move from |
||
entry_tags | ||
end | ||
# rubocop: enable Metrics/MethodLength | ||
|
||
# accumulate triggers | ||
waf_result = event[:waf_result] | ||
tags['_dd.appsec.triggers'] ||= [] | ||
tags['_dd.appsec.triggers'] += waf_result.events | ||
private | ||
|
||
waf_result.derivatives.each do |key, value| | ||
tags[key] = JSON.dump(value) | ||
end | ||
def compressed_and_base64_encoded(value) | ||
Base64.encode64(gzip(value)) | ||
rescue TypeError | ||
nil | ||
end | ||
|
||
def json_parse(value) | ||
JSON.dump(value) | ||
rescue ArgumentError | ||
nil | ||
end | ||
|
||
tags | ||
def gzip(value) | ||
sio = StringIO.new | ||
gz = Zlib::GzipWriter.new(sio, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could I trouble to run very simple benchmarks in your machine with a sample |
||
gz.write(value) | ||
gz.close | ||
sio.string | ||
end | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce the number of times we iterate over
request.headers