diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php index c42ea4dfc..f0d8463fe 100644 --- a/pkg/fs/FsConsumer.php +++ b/pkg/fs/FsConsumer.php @@ -131,7 +131,8 @@ public function receiveNoWait() ftruncate($file, fstat($file)['size'] - strlen($frame)); rewind($file); - $rawMessage = substr(trim($frame), 1); + $rawMessage = str_replace('\|\{', '|{', $frame); + $rawMessage = substr(trim($rawMessage), 1); if ($rawMessage) { try { diff --git a/pkg/fs/FsProducer.php b/pkg/fs/FsProducer.php index a8dbac925..21f890575 100644 --- a/pkg/fs/FsProducer.php +++ b/pkg/fs/FsProducer.php @@ -52,7 +52,9 @@ public function send(PsrDestination $destination, PsrMessage $message) $message->setHeader('x-expire-at', microtime(true) + ($this->timeToLive / 1000)); } - $rawMessage = '|'.json_encode($message); + $rawMessage = json_encode($message); + $rawMessage = str_replace('|{', '\|\{', $rawMessage); + $rawMessage = '|'.$rawMessage; if (JSON_ERROR_NONE !== json_last_error()) { throw new \InvalidArgumentException(sprintf( diff --git a/pkg/fs/Tests/Functional/FsConsumerTest.php b/pkg/fs/Tests/Functional/FsConsumerTest.php index f3a641159..d04090d18 100644 --- a/pkg/fs/Tests/Functional/FsConsumerTest.php +++ b/pkg/fs/Tests/Functional/FsConsumerTest.php @@ -171,4 +171,31 @@ public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly() $this->expectExceptionMessage('The frame size is "12" and it must divide exactly to 64 but it leaves a reminder "12".'); $consumer->receiveNoWait(); } + + /** + * @group bug + * @group bug390 + */ + public function testShouldUnEscapeDelimiterSymbolsInMessageBody() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purge($queue); + + $message = $this->fsContext->createMessage(' |{"body":"aMessageData","properties":{"enqueue.topic_name":"user_updated"},"headers":{"content_type":"text\/plain","message_id":"90979b6c-d9ff-4b39-9938-878b83a95360","timestamp":1519899428,"reply_to":null,"correlation_id":""}}'); + + $this->fsContext->createProducer()->send($queue, $message); + + $this->assertSame(0, strlen(file_get_contents(sys_get_temp_dir().'/fs_test_queue')) % 64); + $this->assertSame( + ' |{"body":" \|\{\"body\":\"aMessageData\",\"properties\":{\"enqueue.topic_name\":\"user_updated\"},\"headers\":{\"content_type\":\"text\\\\\/plain\",\"message_id\":\"90979b6c-d9ff-4b39-9938-878b83a95360\",\"timestamp\":1519899428,\"reply_to\":null,\"correlation_id\":\"\"}}","properties":[],"headers":[]}', + file_get_contents(sys_get_temp_dir().'/fs_test_queue') + ); + + $consumer = $context->createConsumer($queue); + + $message = $consumer->receiveNoWait(); + + $this->assertSame(' |{"body":"aMessageData","properties":{"enqueue.topic_name":"user_updated"},"headers":{"content_type":"text\/plain","message_id":"90979b6c-d9ff-4b39-9938-878b83a95360","timestamp":1519899428,"reply_to":null,"correlation_id":""}}', $message->getBody()); + } }