Skip to content

Commit 575703b

Browse files
committed
[fs] fix bugs intrdoced in #181.
The #181 introduce a bug that corrupt the file structure and make it impossible to read.
1 parent 95ca902 commit 575703b

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

pkg/fs/FsConsumer.php

+15-1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ public function receiveNoWait()
119119
$count = $this->preFetchCount;
120120
while ($count) {
121121
$frame = $this->readFrame($file, 1);
122+
123+
//guards
124+
if ($frame && false == ('|' == $frame[0] || ' ' == $frame[0])) {
125+
throw new \LogicException(sprintf('The frame could start from either " " or "|". The malformed frame starts with "%s".', $frame[0]));
126+
}
127+
if (0 !== $reminder = strlen($frame) % 64) {
128+
throw new \LogicException(sprintf('The frame size is "%d" and it must divide exactly to 64 but it leaves a reminder "%d".', strlen($frame), $reminder));
129+
}
130+
122131
ftruncate($file, fstat($file)['size'] - strlen($frame));
123132
rewind($file);
124133

@@ -212,7 +221,12 @@ private function readFrame($file, $frameNumber)
212221
$previousFrame = $this->readFrame($file, $frameNumber + 1);
213222

214223
if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) {
215-
return '|'.$frame;
224+
$matched = [];
225+
if (false === preg_match('/\ *?\|$/', $previousFrame, $matched)) {
226+
throw new \LogicException('Something went completely wrong.');
227+
}
228+
229+
return $matched[0].$frame;
216230
}
217231

218232
return $previousFrame.$frame;

pkg/fs/Tests/Functional/FsConsumerTest.php

+80
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Fs\FsConnectionFactory;
66
use Enqueue\Fs\FsContext;
7+
use Enqueue\Fs\FsDestination;
78
use Enqueue\Fs\FsMessage;
89
use PHPUnit\Framework\TestCase;
910

@@ -69,6 +70,7 @@ public function testShouldConsumeMessagesFromFileOneByOne()
6970

7071
/**
7172
* @group bug
73+
* @group bug170
7274
*/
7375
public function testShouldNotFailOnSpecificMessageSize()
7476
{
@@ -91,4 +93,82 @@ public function testShouldNotFailOnSpecificMessageSize()
9193
$message = $consumer->receiveNoWait();
9294
$this->assertNull($message);
9395
}
96+
97+
/**
98+
* @group bug
99+
* @group bug170
100+
*/
101+
public function testShouldNotCorruptFrameSize()
102+
{
103+
$context = $this->fsContext;
104+
$queue = $context->createQueue('fs_test_queue');
105+
$context->purge($queue);
106+
107+
$consumer = $context->createConsumer($queue);
108+
$producer = $context->createProducer();
109+
110+
$producer->send($queue, $context->createMessage(str_repeat('a', 23)));
111+
$producer->send($queue, $context->createMessage(str_repeat('b', 24)));
112+
113+
$message = $consumer->receiveNoWait();
114+
$this->assertNotNull($message);
115+
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
116+
$this->assertSame(0, fstat($file)['size'] % 64);
117+
});
118+
119+
$message = $consumer->receiveNoWait();
120+
$this->assertNotNull($message);
121+
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
122+
$this->assertSame(0, fstat($file)['size'] % 64);
123+
});
124+
125+
$message = $consumer->receiveNoWait();
126+
$this->assertNull($message);
127+
}
128+
129+
/**
130+
* @group bug
131+
* @group bug202
132+
*/
133+
public function testShouldThrowExceptionForTheCorruptedQueueFile()
134+
{
135+
$context = $this->fsContext;
136+
$queue = $context->createQueue('fs_test_queue');
137+
$context->purge($queue);
138+
139+
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
140+
fwrite($file, '|{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_red_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"46fdc345-5d0c-426e-95ac-227c7e657839","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_black_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"c4d60e39-3a8c-42df-b536-c8b7c13e006d","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_green_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"3a6aa176-c879-4435-9626-c48e0643defa","timestamp":1505379216,"reply_to":null,"correlation_id":""}}');
141+
});
142+
143+
$consumer = $context->createConsumer($queue);
144+
145+
$this->expectException(\LogicException::class);
146+
$this->expectExceptionMessage('The frame could start from either " " or "|". The malformed frame starts with """.');
147+
$consumer->receiveNoWait();
148+
}
149+
150+
/**
151+
* @group bug
152+
* @group bug202
153+
*/
154+
public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly()
155+
{
156+
$context = $this->fsContext;
157+
$queue = $context->createQueue('fs_test_queue');
158+
$context->purge($queue);
159+
160+
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
161+
$msg = '|{"body":""}';
162+
//guard
163+
$this->assertNotSame(0, strlen($msg) % 64);
164+
165+
fwrite($file, $msg);
166+
});
167+
168+
$consumer = $context->createConsumer($queue);
169+
170+
$this->expectException(\LogicException::class);
171+
$this->expectExceptionMessage('The frame size is "12" and it must divide exactly to 64 but it leaves a reminder "12".');
172+
$consumer->receiveNoWait();
173+
}
94174
}

0 commit comments

Comments
 (0)