Skip to content

Commit

Permalink
Allow customizing log level in beam.LogElements
Browse files Browse the repository at this point in the history
  • Loading branch information
tvalentyn committed Sep 5, 2023
1 parent 3ff66d3 commit 6c41a40
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
38 changes: 34 additions & 4 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,13 +1155,24 @@ def Iterables(delimiter=None):
class LogElements(PTransform):
"""
PTransform for printing the elements of a PCollection.
Args:
label (str, optional): A custom label for the transform.
prefix (str, optional): A prefix string to prepend to each logged element.
with_timestamp (bool, optional): Whether to include element's timestamp.
with_window (bool, optional): Whether to include element's window.
level (optional): The logging level for the output (e.g. `logging.DEBUG`,
`logging.INFO`, `logging.WARNING`, `logging.ERROR`). If not specified,
the log is printed to stdout.
"""
class _LoggingFn(DoFn):
def __init__(self, prefix='', with_timestamp=False, with_window=False):
def __init__(
self, prefix='', with_timestamp=False, with_window=False, level=None):
super().__init__()
self.prefix = prefix
self.with_timestamp = with_timestamp
self.with_window = with_window
self.level = level

def process(
self,
Expand All @@ -1178,19 +1189,38 @@ def process(
log_line += ', window(start=' + window.start.to_rfc3339()
log_line += ', end=' + window.end.to_rfc3339() + ')'

print(log_line)
if self.level == logging.DEBUG:
logging.debug(log_line)
elif self.level == logging.INFO:
logging.info(log_line)
elif self.level == logging.WARNING:
logging.warning(log_line)
elif self.level == logging.ERROR:
logging.error(log_line)
elif self.level == logging.CRITICAL:
logging.critical(log_line)
else:
print(log_line)

yield element

def __init__(
self, label=None, prefix='', with_timestamp=False, with_window=False):
self,
label=None,
prefix='',
with_timestamp=False,
with_window=False,
level=None):
super().__init__(label)
self.prefix = prefix
self.with_timestamp = with_timestamp
self.with_window = with_window
self.level = level

def expand(self, input):
return input | ParDo(
self._LoggingFn(self.prefix, self.with_timestamp, self.with_window))
self._LoggingFn(
self.prefix, self.with_timestamp, self.with_window, self.level))


class Reify(object):
Expand Down
25 changes: 25 additions & 0 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,31 @@ def test_ptransform_output(self):
| util.LogElements(prefix='prefix_'))
assert_that(result, equal_to(['a', 'b', 'c']))

@pytest.fixture(scope="function")
def _capture_logs(request, caplog):
with caplog.at_level(logging.INFO):
with TestPipeline() as p:
_ = (
p | "info" >> beam.Create(["element"])
| "I" >> beam.LogElements(prefix='info_', level=logging.INFO))
_ = (
p | "warning" >> beam.Create(["element"])
| "W" >> beam.LogElements(prefix='warning_', level=logging.WARNING))
_ = (
p | "error" >> beam.Create(["element"])
| "E" >> beam.LogElements(prefix='error_', level=logging.ERROR))

request.captured_log = caplog.text

@pytest.mark.usefixtures("_capture_logs")
def test_setting_level_uses_appropriate_log_channel(self):
self.assertTrue(
re.compile('INFO(.*)info_element').search(self.captured_log))
self.assertTrue(
re.compile('WARNING(.*)warning_element').search(self.captured_log))
self.assertTrue(
re.compile('ERROR(.*)error_element').search(self.captured_log))


class ReifyTest(unittest.TestCase):
def test_timestamp(self):
Expand Down

0 comments on commit 6c41a40

Please sign in to comment.