Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils import timestamp
from apache_beam.utils import windowed_value

Expand Down Expand Up @@ -768,6 +769,97 @@ def process_clear_timer(self):
expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))

def _run_pardo_et_timer_test(
self, n, timer_delay, reset_count=True, clear_timer=True, expected=None):
class EventTimeTimerDoFn(beam.DoFn):
COUNT = userstate.ReadModifyWriteStateSpec(
'count', coders.VarInt32Coder())
# event-time timer
TIMER = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)

def __init__(self):
self._n = n
self._timer_delay = timer_delay
self._reset_count = reset_count
self._clear_timer = clear_timer

def process(
self,
element_pair,
t=beam.DoFn.TimestampParam,
count=beam.DoFn.StateParam(COUNT),
timer=beam.DoFn.TimerParam(TIMER)):
local_count = count.read() or 0
local_count += 1

_LOGGER.info(
"get element %s, count=%d", str(element_pair[1]), local_count)
if local_count == 1:
_LOGGER.info("set timer to %s", str(t + self._timer_delay))
timer.set(t + self._timer_delay)

if local_count == self._n:
if self._reset_count:
_LOGGER.info("reset count")
local_count = 0

# don't need the timer now
if self._clear_timer:
_LOGGER.info("clear timer")
timer.clear()

count.write(local_count)

@userstate.on_timer(TIMER)
def timer_callback(self, t=beam.DoFn.TimestampParam):
_LOGGER.error("Timer should not fire here")
_LOGGER.info("timer callback start (timestamp=%s)", str(t))
yield "fired"

with self.create_pipeline() as p:
actual = (
p | PeriodicImpulse(
start_timestamp=timestamp.Timestamp.now(),
stop_timestamp=timestamp.Timestamp.now() + 14,
fire_interval=1)
| beam.WithKeys(0)
| beam.ParDo(EventTimeTimerDoFn()))
assert_that(actual, equal_to(expected))

def test_pardo_et_timer_with_no_firing(self):
if type(self) in [FnApiRunnerTest,
FnApiRunnerTestWithGrpc,
FnApiRunnerTestWithGrpcAndMultiWorkers,
FnApiRunnerTestWithDisabledCaching,
FnApiRunnerTestWithMultiWorkers,
FnApiRunnerTestWithBundleRepeat,
FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")

# The timer will not fire. It is initially set to T + 10, but then it is
# cleared at T + 4 (count == 5), and reset to T + 5 + 10
# (count is reset every 5 seconds).
self._run_pardo_et_timer_test(5, 10, True, True, [])

def test_pardo_et_timer_with_no_reset(self):
if type(self) in [FnApiRunnerTest,
FnApiRunnerTestWithGrpc,
FnApiRunnerTestWithGrpcAndMultiWorkers,
FnApiRunnerTestWithDisabledCaching,
FnApiRunnerTestWithMultiWorkers,
FnApiRunnerTestWithBundleRepeat,
FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")

# The timer will not fire. It is initially set to T + 10, and then it is
# cleared at T + 4 and never set again (count is not reset).
self._run_pardo_et_timer_test(5, 10, False, True, [])

def test_pardo_et_timer_with_no_reset_and_no_clear(self):
# The timer will fire at T + 10. After the timer is set, it is never
# cleared or set again.
self._run_pardo_et_timer_test(5, 10, False, False, ["fired"])

def test_pardo_state_timers(self):
self._run_pardo_state_timers(windowed=False)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ def test_sdf_with_truncate(self):
def test_draining_sdf_with_sdf_initiated_checkpointing(self):
raise unittest.SkipTest("Portable runners don't support drain yet.")

def test_pardo_et_timer_with_no_firing(self):
if type(self) in [PortableRunnerTest,
PortableRunnerTestWithSubprocesses,
PortableRunnerTestWithSubprocessesAndMultiWorkers,
PortableRunnerTestWithExternalEnv,
PortableRunnerTestWithLocalDocker,
PortableRunnerOptimizedWithoutFusion]:
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
super().test_pardo_et_timer_with_no_firing()

def test_pardo_et_timer_with_no_reset(self):
if type(self) in [PortableRunnerTest,
PortableRunnerTestWithSubprocesses,
PortableRunnerTestWithSubprocessesAndMultiWorkers,
PortableRunnerTestWithExternalEnv,
PortableRunnerTestWithLocalDocker,
PortableRunnerOptimizedWithoutFusion]:
raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
super().test_pardo_et_timer_with_no_reset()


@unittest.skip("https://github.com/apache/beam/issues/19422")
class PortableRunnerOptimized(PortableRunnerTest):
Expand Down
Loading