Skip to content

Commit

Permalink
Merge pull request #401 from Shopify/streaming_bug
Browse files Browse the repository at this point in the history
Logs from different streams can arrive out of order
  • Loading branch information
KnVerey authored Dec 14, 2018
2 parents 01dc26f + f9a0193 commit 0311955
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
24 changes: 15 additions & 9 deletions lib/kubernetes-deploy/container_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(parent_id:, container_name:, namespace:, context:, logger:)
def sync
new_logs = fetch_latest
return unless new_logs.present?
@lines += deduplicate(new_logs)
@lines += sort_and_deduplicate(new_logs)
end

def empty?
Expand Down Expand Up @@ -60,18 +60,24 @@ def rfc3339_timestamp(time)
time.strftime("%FT%T.%N%:z")
end

def deduplicate(logs)
deduped = []
check_for_duplicate = true
def sort_and_deduplicate(logs)
parsed_lines = logs.map { |line| split_timestamped_line(line) }
sorted_lines = parsed_lines.sort do |(timestamp1, _msg1), (timestamp2, _msg2)|
if timestamp1.nil?
-1
elsif timestamp2.nil?
1
else
timestamp1 <=> timestamp2
end
end

logs.each do |line|
timestamp, msg = split_timestamped_line(line)
next if check_for_duplicate && likely_duplicate?(timestamp)
check_for_duplicate = false # logs are ordered, so once we've seen a new one, assume all subsequent logs are new
deduped = []
sorted_lines.each do |timestamp, msg|
next if likely_duplicate?(timestamp)
@last_timestamp = timestamp if timestamp
deduped << msg
end

deduped
end

Expand Down
50 changes: 47 additions & 3 deletions test/unit/container_logs_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,23 @@ def test_print_latest_supports_prefixing
end

def test_logs_without_timestamps_are_not_deduped
logs_response_1_with_anomaly = logs_response_1 + "Line 3.5"
logs_response_2_with_anomaly = "Line 3.5\n" + logs_response_2
logs_response_1_with_anomaly = logs_response_1 + "No timestamp"
logs_response_2_with_anomaly = "No timestamp 2\n" + logs_response_2
KubernetesDeploy::Kubectl.any_instance.stubs(:run)
.returns([logs_response_1_with_anomaly, "", ""])
.then.returns([logs_response_2_with_anomaly, "", ""])

@logs.sync
@logs.sync
@logs.print_all
assert_logs_match("Line 3.5", 2)
assert_logs_match_all([
"No timestamp", # moved to start of batch 1
"Line 1",
"Line 2",
"Line 3",
"No timestamp 2", # moved to start of batch 2
"Line 4"
], in_order: true)
end

def test_deduplication_works_when_exact_same_batch_is_returned_more_than_once
Expand All @@ -108,6 +115,43 @@ def test_deduplication_works_when_exact_same_batch_is_returned_more_than_once
assert_logs_match("Line 2", 1)
end

def test_deduplication_works_when_last_line_is_out_of_order
regression_data = <<~STRING
2018-12-13T12:17:23.727605598Z Line 1
2018-12-13T12:17:23.727696012Z Line 2
2018-12-13T12:17:23.728538913Z Line 3
2018-12-13T12:17:23.7287293Z Line 4
2018-12-13T12:17:23.729694842Z Line 5
2018-12-13T12:17:23.731259592Z Line 7
2018-12-13T12:17:23.73127007Z Line 8
2018-12-13T12:17:23.731273672Z Line 9
2018-12-13T12:17:23.731276862Z Line 10
2018-12-13T12:17:23.731284069Z Line 11
2018-12-13T12:17:23.731287054Z Line 12
2018-12-13T12:17:23.731289959Z Line 13
2018-12-13T12:17:23.731292814Z Line 14
2018-12-13T12:17:23.731295298Z Line 15
2018-12-13T12:17:23.731297747Z Line 16
2018-12-13T12:17:23.731297748Z Line 17
2018-12-13T12:17:23.729851532Z Line 6
STRING

KubernetesDeploy::Kubectl.any_instance.stubs(:run)
.returns([regression_data, "", ""]).times(12)

12.times do
@logs.sync
@logs.print_latest
end

expected_lines = generate_log_messages(1..17)

expected_lines.each do |line|
assert_logs_match(/#{line}$/, 1) # no duplicates
end
assert_logs_match_all(expected_lines, in_order: true) # sorted correctly
end

private

def generate_log_messages(range)
Expand Down

0 comments on commit 0311955

Please sign in to comment.