From 1e2b7ba6c924c5251bd4ce4a40ee416585736ad7 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 28 Aug 2017 12:02:28 +0300 Subject: [PATCH] [fs] fix bug that happens with specific message length. --- pkg/fs/FsConsumer.php | 9 +++++-- pkg/fs/Tests/Functional/FsConsumerTest.php | 29 +++++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php index 6dca254f2..c9ef71552 100644 --- a/pkg/fs/FsConsumer.php +++ b/pkg/fs/FsConsumer.php @@ -164,7 +164,6 @@ private function readFrame($file, $frameNumber) fseek($file, -$offset, SEEK_END); $frame = fread($file, $frameSize); - if ('' == $frame) { return ''; } @@ -173,6 +172,12 @@ private function readFrame($file, $frameNumber) return $frame; } - return $this->readFrame($file, $frameNumber + 1).$frame; + $previousFrame = $this->readFrame($file, $frameNumber + 1); + + if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) { + return '|'.$frame; + } + + return $previousFrame.$frame; } } diff --git a/pkg/fs/Tests/Functional/FsConsumerTest.php b/pkg/fs/Tests/Functional/FsConsumerTest.php index 2c8f50c4c..33e676ae4 100644 --- a/pkg/fs/Tests/Functional/FsConsumerTest.php +++ b/pkg/fs/Tests/Functional/FsConsumerTest.php @@ -5,7 +5,6 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Fs\FsContext; use Enqueue\Fs\FsMessage; -use Makasim\File\TempFile; use PHPUnit\Framework\TestCase; class FsConsumerTest extends TestCase @@ -19,8 +18,7 @@ public function setUp() { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); - new TempFile(sys_get_temp_dir().'/fs_test_queue'); - file_put_contents(sys_get_temp_dir().'/fs_test_queue', ''); + $this->fsContext->purge($this->fsContext->createQueue('fs_test_queue')); } public function tearDown() @@ -68,4 +66,29 @@ public function testShouldConsumeMessagesFromFileOneByOne() $this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue')); } + + /** + * @group bug + */ + public function testShouldNotFailOnSpecificMessageSize() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purge($queue); + + $consumer = $context->createConsumer($queue); + $producer = $context->createProducer(); + + $producer->send($queue, $context->createMessage(str_repeat('a', 23))); + $producer->send($queue, $context->createMessage(str_repeat('b', 24))); + + $message = $consumer->receiveNoWait(); + $this->assertSame(str_repeat('b', 24), $message->getBody()); + + $message = $consumer->receiveNoWait(); + $this->assertSame(str_repeat('a', 23), $message->getBody()); + + $message = $consumer->receiveNoWait(); + $this->assertNull($message); + } }