diff --git a/src/Stream/Stream.php b/src/Stream/Stream.php index df9c191..63fe538 100644 --- a/src/Stream/Stream.php +++ b/src/Stream/Stream.php @@ -42,6 +42,15 @@ public function delete(): self return $this; } + public function purge(): self + { + if ($this->exists()) { + $this->client->api("STREAM.PURGE." . $this->getName()); + } + + return $this; + } + public function exists(): bool { return in_array($this->getName(), $this->client->getApi()->getStreamNames()); diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index 9de10ce..8a4d471 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -61,6 +61,32 @@ public function testNack() $this->assertSame(0, $consumer->info()->num_pending); } + public function testPurge() + { + $client = $this->createClient(); + $stream = $client->getApi()->getStream('purge'); + $stream->getConfiguration()->setSubjects(['purge'])->setRetentionPolicy(RetentionPolicy::WORK_QUEUE); + $stream->create(); + + $consumer = $stream->getConsumer('purge'); + $consumer->setExpires(5); + $consumer->getConfiguration() + ->setSubjectFilter('purge') + ->setReplayPolicy(ReplayPolicy::INSTANT) + ->setAckPolicy(AckPolicy::EXPLICIT); + + $consumer->create(); + + $stream->publish('purge', 'first'); + $stream->publish('purge', 'second'); + + $this->assertSame(2, $consumer->info()->num_pending); + + $stream->purge(); + + $this->assertSame(0, $consumer->info()->num_pending); + } + public function testConsumerExpiration() { $client = $this->createClient(['timeout' => 0.2, 'delay' => 0.1]);