Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit decoding of documents from binary to string #260

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/warc2zim/content_rewriting/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from jinja2.environment import Template
from warcio.recordloader import ArcWarcRecord

from warc2zim.constants import logger
from warc2zim.content_rewriting.css import CssRewriter
from warc2zim.content_rewriting.ds import get_ds_rules
from warc2zim.content_rewriting.html import HtmlRewriter
Expand Down Expand Up @@ -79,9 +80,20 @@ def __init__(
self.js_modules = js_modules

@property
def content_str(self):
def content_str(self) -> str:
try:
return to_string(self.content, self.encoding)
result = to_string(self.content, self.encoding)
if result.encoding and result.encoding != self.encoding:
logger.warning(
f"Encoding issue, '{result.encoding}' has been used instead of "
f"'{self.encoding}' to decode content of '{self.orig_url_str}'"
)
if result.chars_ignored:
logger.warning(
"Encoding issue, some chars had to be ignored to properly decode "
f"content of '{self.orig_url_str}' with '{result.encoding}'"
)
return result.value
except ValueError as e:
raise RuntimeError(f"Impossible to decode item {self.path}") from e

Expand Down
67 changes: 51 additions & 16 deletions src/warc2zim/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import re
from http import HTTPStatus
from typing import NamedTuple

import chardet
from bs4 import BeautifulSoup
Expand All @@ -18,6 +19,12 @@
)


class StringConversionResult(NamedTuple):
value: str
encoding: str | None
chars_ignored: bool


def get_version():
return __version__

Expand Down Expand Up @@ -125,55 +132,83 @@ def get_record_encoding(record: ArcWarcRecord) -> str | None:
return m.group("encoding")


def to_string(input_: str | bytes, encoding: str | None) -> str:
def to_string(input_: str | bytes, encoding: str | None) -> StringConversionResult:
"""
Decode content to string, trying to be the more tolerant possible to invalid
declared encoding.

This try decode the content using 3 methods:
This try to decode the content using 3 methods:
- From http headers in the warc record (given as `encoding` argument)
- From encoding declaration inside the content (hopping that content can be
losely decode using ascii to something usable)
- From statistical analysis of the content (made by chardet)

If all these methods fails, try again with the encoding passed via http headers but
ignore all unrecognized characters.

Returns the decoded content, the encoding used (or None if the input was already
decoded) and a boolean indicating wether unrecognized characters had to been ignored
or not.

"""
tried_encodings = set()
http_encoding = encoding

tried_encodings: set[str] = set()
if isinstance(input_, str):
return input_
return StringConversionResult(input_, None, False)

if not input_:
# Empty bytes are easy to decode
return ""
return StringConversionResult("", None, False)

if encoding:
try:
return input_.decode(encoding)
return StringConversionResult(input_.decode(encoding), encoding, False)
except (ValueError, LookupError):
tried_encodings.add(encoding)
pass

# Detect encoding from content.
# Search for encoding from content first bytes based on regexp
content_start = input_[:1024].decode("ascii", errors="replace")
if m := ENCODING_RE.search(content_start):
encoding = m.group("encoding")
if encoding and encoding not in tried_encodings:
try:
return input_.decode(encoding)
return StringConversionResult(input_.decode(encoding), encoding, False)
except (ValueError, LookupError):
tried_encodings.add(encoding)
pass

encodings = (
encoding
for e in chardet.detect_all(input_)
if (encoding := e["encoding"]) and encoding not in tried_encodings
)
# Try to detect the most probable encoding with chardet (and only most probable
# one, since otherwise we will likely find an encoding which pass but produces only
# garbage with most characters badly decoded just due to a wrongly encoded character
# see https://github.com/openzim/warc2zim/issues/221)
# Nota: we use the detect_all method of chardet even if we are interesting only in
# the most probable encoding, because (as-of chardet 5.2.0 at least) the detect
# chardet method seems to be more naive, and detect_all gives better results in our
# tests
chardet_encodings = chardet.detect_all(input_)
if len(chardet_encodings):
chardet_encoding = chardet_encodings[0]["encoding"]
benoit74 marked this conversation as resolved.
Show resolved Hide resolved
if chardet_encoding and chardet_encoding not in tried_encodings:
try:
return StringConversionResult(
input_.decode(chardet_encoding), chardet_encoding, False
)
except (ValueError, LookupError):
tried_encodings.add(chardet_encoding)
pass

for encoding in encodings:
# Try again encoding detected by chardet (most probable one), but this time ignore
# all bad chars
if http_encoding:
try:
return input_.decode(encoding)
except ValueError:
return StringConversionResult(
input_.decode(http_encoding, errors="ignore"), http_encoding, True
)
except (ValueError, LookupError):
pass

raise ValueError(f"Impossible to decode content {input_[:200]}")


Expand Down
42 changes: 30 additions & 12 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def test_decode(simple_encoded_content):
if not simple_encoded_content.valid:
# Nothing to test
return
assert (
to_string(simple_encoded_content.encoded, None)
== simple_encoded_content.content
)
result = to_string(simple_encoded_content.encoded, None)
assert result.value == simple_encoded_content.content
assert result.encoding
assert not result.chars_ignored


@pytest.fixture(
Expand Down Expand Up @@ -152,9 +152,11 @@ def test_declared_decode(declared_encoded_content):
if not test_case.valid:
return

decoded = to_string(test_case.encoded, test_case.declared_encoding)
result = to_string(test_case.encoded, test_case.declared_encoding)
if test_case.correct:
assert decoded == test_case.content
assert result.value == test_case.content
assert result.encoding
assert not result.chars_ignored


# This is a set of content/encoding/decoding that we know to fail.
Expand Down Expand Up @@ -225,13 +227,18 @@ def test_declared_decode_html(declared_html_encoded_content):
if not test_case.valid:
return

html_decoded = to_string(test_case.encoded, None)
result = to_string(test_case.encoded, None)
if test_case.correct:
assert html_decoded == test_case.content
assert result.value == test_case.content
assert result.encoding
assert not result.chars_ignored


def test_decode_str(content, declared_encoding):
assert to_string(content, declared_encoding) == content
result = to_string(content, declared_encoding)
assert result.value == content
assert result.encoding is None
assert not result.chars_ignored


def test_binary_content():
Expand All @@ -243,6 +250,17 @@ def test_binary_content():
with pytest.raises(ValueError):
assert to_string(content, None)

with pytest.raises(ValueError):
# Make coverage pass on code avoiding us to try the same encoding twice
assert to_string(content, "UTF-8-SIG")
# Make coverage pass on code avoiding us to try the same encoding twice
result = to_string(content, "UTF-8-SIG")
assert result.encoding == "UTF-8-SIG"
assert result.chars_ignored


def test_single_bad_character():
content = bytes([0xEF, 0xBB, 0xBF]) + b"prem" + bytes([0xC3]) + "ière".encode()
# [0xEF, 0xBB, 0xBF] is a BOM marker for utf-8-sig
# 0xC3 is a bad character (nothing in utf-8-sig at this position)
result = to_string(content, "utf-8-sig")
assert result.value == "première"
assert result.encoding == "utf-8-sig"
assert result.chars_ignored
Loading