Skip to content

Commit af93ec6

Browse files
authored
Merge pull request #181 from php-enqueue/fs-edge-case-with-message-size
[fs] fix bug that happens with specific message length.
2 parents 1bc46cc + 1e2b7ba commit af93ec6

File tree

2 files changed

+33
-5
lines changed

2 files changed

+33
-5
lines changed

pkg/fs/FsConsumer.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ private function readFrame($file, $frameNumber)
164164

165165
fseek($file, -$offset, SEEK_END);
166166
$frame = fread($file, $frameSize);
167-
168167
if ('' == $frame) {
169168
return '';
170169
}
@@ -173,6 +172,12 @@ private function readFrame($file, $frameNumber)
173172
return $frame;
174173
}
175174

176-
return $this->readFrame($file, $frameNumber + 1).$frame;
175+
$previousFrame = $this->readFrame($file, $frameNumber + 1);
176+
177+
if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) {
178+
return '|'.$frame;
179+
}
180+
181+
return $previousFrame.$frame;
177182
}
178183
}

pkg/fs/Tests/Functional/FsConsumerTest.php

+26-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
use Enqueue\Fs\FsConnectionFactory;
66
use Enqueue\Fs\FsContext;
77
use Enqueue\Fs\FsMessage;
8-
use Makasim\File\TempFile;
98
use PHPUnit\Framework\TestCase;
109

1110
class FsConsumerTest extends TestCase
@@ -19,8 +18,7 @@ public function setUp()
1918
{
2019
$this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext();
2120

22-
new TempFile(sys_get_temp_dir().'/fs_test_queue');
23-
file_put_contents(sys_get_temp_dir().'/fs_test_queue', '');
21+
$this->fsContext->purge($this->fsContext->createQueue('fs_test_queue'));
2422
}
2523

2624
public function tearDown()
@@ -68,4 +66,29 @@ public function testShouldConsumeMessagesFromFileOneByOne()
6866

6967
$this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue'));
7068
}
69+
70+
/**
71+
* @group bug
72+
*/
73+
public function testShouldNotFailOnSpecificMessageSize()
74+
{
75+
$context = $this->fsContext;
76+
$queue = $context->createQueue('fs_test_queue');
77+
$context->purge($queue);
78+
79+
$consumer = $context->createConsumer($queue);
80+
$producer = $context->createProducer();
81+
82+
$producer->send($queue, $context->createMessage(str_repeat('a', 23)));
83+
$producer->send($queue, $context->createMessage(str_repeat('b', 24)));
84+
85+
$message = $consumer->receiveNoWait();
86+
$this->assertSame(str_repeat('b', 24), $message->getBody());
87+
88+
$message = $consumer->receiveNoWait();
89+
$this->assertSame(str_repeat('a', 23), $message->getBody());
90+
91+
$message = $consumer->receiveNoWait();
92+
$this->assertNull($message);
93+
}
7194
}

0 commit comments

Comments
 (0)