diff --git a/bin/dev b/bin/dev
index 22aa8e2f5..ce28e36f5 100755
--- a/bin/dev
+++ b/bin/dev
@@ -21,7 +21,7 @@ while getopts "bustefc" OPTION; do
./bin/php-cs-fixer fix
;;
t)
- COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test
+ COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$2"
;;
c)
COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm generate-changelog github_changelog_generator --future-release "$2" --simple-list
diff --git a/bin/subtree-split b/bin/subtree-split
index 0d030e9e3..678753169 100755
--- a/bin/subtree-split
+++ b/bin/subtree-split
@@ -47,6 +47,7 @@ remote psr-queue git@github.com:php-enqueue/psr-queue.git
remote enqueue git@github.com:php-enqueue/enqueue.git
remote stomp git@github.com:php-enqueue/stomp.git
remote amqp-ext git@github.com:php-enqueue/amqp-ext.git
+remote fs git@github.com:php-enqueue/fs.git
remote enqueue-bundle git@github.com:php-enqueue/enqueue-bundle.git
remote job-queue git@github.com:php-enqueue/job-queue.git
remote test git@github.com:php-enqueue/test.git
@@ -55,6 +56,7 @@ split 'pkg/psr-queue' psr-queue
split 'pkg/enqueue' enqueue
split 'pkg/stomp' stomp
split 'pkg/amqp-ext' amqp-ext
+split 'pkg/fs' fs
split 'pkg/enqueue-bundle' enqueue-bundle
split 'pkg/job-queue' job-queue
split 'pkg/test' test
diff --git a/composer.json b/composer.json
index 1a6941dd4..2f5381b1d 100644
--- a/composer.json
+++ b/composer.json
@@ -8,6 +8,7 @@
"enqueue/enqueue": "*@dev",
"enqueue/stomp": "*@dev",
"enqueue/amqp-ext": "*@dev",
+ "enqueue/fs": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/test": "*@dev"
@@ -51,6 +52,10 @@
{
"type": "path",
"url": "pkg/job-queue"
+ },
+ {
+ "type": "path",
+ "url": "pkg/fs"
}
]
}
diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md
index 8caf72fe1..2c5a9fe0f 100644
--- a/docs/bundle/config_reference.md
+++ b/docs/bundle/config_reference.md
@@ -1,7 +1,8 @@
# Config reference
+You can get this info by running `./bin/console config:dump-reference enqueue` command.
+
```yaml
-# Default configuration for extension with alias: "enqueue"
enqueue:
transport: # Required
default:
@@ -91,6 +92,16 @@ enqueue:
# The option tells whether RabbitMQ broker has delay plugin installed or not
delay_plugin_installed: false
+ fs:
+
+ # The store directory where all queue\topics files will be created and messages are stored
+ store_dir: ~ # Required
+
+ # The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.
+ pre_fetch_count: 1
+
+ # The queue files are created with this given permissions if not exist.
+ chmod: 384
client:
traceable_producer: false
prefix: enqueue
diff --git a/docs/client/supported_brokers.md b/docs/client/supported_brokers.md
index 39c92c215..aff2f5bff 100644
--- a/docs/client/supported_brokers.md
+++ b/docs/client/supported_brokers.md
@@ -8,7 +8,7 @@ Here's the list of protocols and Client features supported by them
| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes |
| STOMP | No | No | Yes | No | Yes** |
| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** |
-
+| Filesystem | No | No | No | Yes | No |
* \* Possible if a RabbitMQ delay plugin is installed.
* \*\* Possible if topics (exchanges) are configured on broker side manually.
diff --git a/docs/filesystem_transport.md b/docs/filesystem_transport.md
new file mode 100644
index 000000000..db3ebb450
--- /dev/null
+++ b/docs/filesystem_transport.md
@@ -0,0 +1,82 @@
+# Filesystem transport
+
+Use files on local filesystem as queues.
+It creates a file per queue\topic.
+A message is a line inside the file.
+**Limitations** It works only in auto ack mode. Local by nature therefor messages are not visible on other servers.
+
+* [Create context](#create-context)
+* [Declare topic](#declare-topic)
+* [Declare queue](#decalre-queue)
+* [Bind queue to topic](#bind-queue-to-topic)
+* [Send message to topic](#send-message-to-topic)
+* [Send message to queue](#send-message-to-queue)
+* [Consume message](#consume-message)
+* [Purge queue messages](#purge-queue-messages)
+
+## Create context
+
+```php
+ '/tmp'
+]);
+
+$psrContext = $connectionFactory->createContext();
+```
+
+## Send message to topic
+
+```php
+createTopic('aTopic');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooTopic, $message);
+```
+
+## Send message to queue
+
+```php
+createQueue('aQueue');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooQueue, $message);
+```
+
+## Consume message:
+
+```php
+createQueue('aQueue');
+$consumer = $psrContext->createConsumer($fooQueue);
+
+$message = $consumer->receive();
+
+// process a message
+
+$consumer->acknowledge($message);
+// $consumer->reject($message);
+```
+
+## Purge queue messages:
+
+```php
+createQueue('aQueue');
+
+$psrContext->purge($fooQueue);
+```
+
+[back to index](index.md)
\ No newline at end of file
diff --git a/docs/index.md b/docs/index.md
index 6dfcaebc9..ada57c0ab 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -2,9 +2,10 @@
* [Quick tour](quick_tour.md)
* Transports
- - [AMQP](amqp_transport.md)
- - [STOMP](stomp_transport.md)
- - [NULL](null_transport.md)
+ - [Amqp](amqp_transport.md)
+ - [Stomp](stomp_transport.md)
+ - [Filesystem](filesystem_transport.md)
+ - [Null](null_transport.md)
* Consumption
- [Extensions](consumption/extensions.md)
* Client
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 21c4f77fe..fb3be97c3 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -29,6 +29,10 @@
pkg/amqp-ext/Tests
+
+ pkg/fs/Tests
+
+
pkg/enqueue-bundle/Tests
diff --git a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php
index 57064a396..ba4dee55c 100644
--- a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php
+++ b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php
@@ -121,5 +121,14 @@ public function testShouldCreateDriver()
$driver = $container->getDefinition($serviceId);
$this->assertSame(AmqpDriver::class, $driver->getClass());
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(0));
+ $this->assertEquals('enqueue.transport.amqp.context', (string) $driver->getArgument(0));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(1));
+ $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(2));
+ $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
}
}
diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php
index 44b14cab1..f8e75cebc 100644
--- a/pkg/enqueue-bundle/EnqueueBundle.php
+++ b/pkg/enqueue-bundle/EnqueueBundle.php
@@ -11,6 +11,8 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
+use Enqueue\Fs\FsContext;
+use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
@@ -46,5 +48,9 @@ public function build(ContainerBuilder $container)
$extension->addTransportFactory(new AmqpTransportFactory());
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory());
}
+
+ if (class_exists(FsContext::class)) {
+ $extension->addTransportFactory(new FsTransportFactory());
+ }
}
}
diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
index 2e9f6589f..3f390fa78 100644
--- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
+++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
@@ -11,6 +11,7 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
use Enqueue\Bundle\EnqueueBundle;
+use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
use Enqueue\Symfony\DefaultTransportFactory;
@@ -139,6 +140,23 @@ public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories()
$bundle->build($container);
}
+ public function testShouldRegisterFSTransportFactory()
+ {
+ $extensionMock = $this->createEnqueueExtensionMock();
+
+ $container = new ContainerBuilder();
+ $container->registerExtension($extensionMock);
+
+ $extensionMock
+ ->expects($this->at(6))
+ ->method('addTransportFactory')
+ ->with($this->isInstanceOf(FsTransportFactory::class))
+ ;
+
+ $bundle = new EnqueueBundle();
+ $bundle->build($container);
+ }
+
/**
* @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension
*/
diff --git a/pkg/fs/.gitignore b/pkg/fs/.gitignore
new file mode 100644
index 000000000..a770439e5
--- /dev/null
+++ b/pkg/fs/.gitignore
@@ -0,0 +1,6 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
diff --git a/pkg/fs/.travis.yml b/pkg/fs/.travis.yml
new file mode 100644
index 000000000..42374ddc7
--- /dev/null
+++ b/pkg/fs/.travis.yml
@@ -0,0 +1,21 @@
+sudo: false
+
+git:
+ depth: 1
+
+language: php
+
+php:
+ - '5.6'
+ - '7.0'
+
+cache:
+ directories:
+ - $HOME/.composer/cache
+
+install:
+ - composer self-update
+ - composer install --prefer-source
+
+script:
+ - vendor/bin/phpunit --exclude-group=functional
diff --git a/pkg/fs/Client/FsDriver.php b/pkg/fs/Client/FsDriver.php
new file mode 100644
index 000000000..a8cd5d1dd
--- /dev/null
+++ b/pkg/fs/Client/FsDriver.php
@@ -0,0 +1,179 @@
+context = $context;
+ $this->config = $config;
+ $this->queueMetaRegistry = $queueMetaRegistry;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToRouter(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
+ throw new \LogicException('Topic name parameter is required but is not set');
+ }
+
+ $topic = $this->createRouterTopic();
+ $transportMessage = $this->createTransportMessage($message);
+
+ $this->context->createProducer()->send($topic, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToProcessor(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
+ throw new \LogicException('Processor name parameter is required but is not set');
+ }
+
+ if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
+ throw new \LogicException('Queue name parameter is required but is not set');
+ }
+
+ $transportMessage = $this->createTransportMessage($message);
+ $destination = $this->createQueue($queueName);
+
+ $this->context->createProducer()->send($destination, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setupBroker(LoggerInterface $logger = null)
+ {
+ $logger = $logger ?: new NullLogger();
+ $log = function ($text, ...$args) use ($logger) {
+ $logger->debug(sprintf('[FsDriver] '.$text, ...$args));
+ };
+
+ // setup router
+ $routerTopic = $this->createRouterTopic();
+ $routerQueue = $this->createQueue($this->config->getRouterQueueName());
+
+ $log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo());
+ $this->context->declareDestination($routerTopic);
+
+ $log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo());
+ $this->context->declareDestination($routerQueue);
+
+ // setup queues
+ foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
+ $queue = $this->createQueue($meta->getClientName());
+
+ $log('Declare processor queue "%s" file: %s', $queue->getQueueName(), $queue->getFileInfo());
+ $this->context->declareDestination($queue);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsDestination
+ */
+ public function createQueue($queueName)
+ {
+ return $this->context->createQueue($this->config->createTransportQueueName($queueName));
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsMessage
+ */
+ public function createTransportMessage(Message $message)
+ {
+ $properties = $message->getProperties();
+
+ $headers = $message->getHeaders();
+ $headers['content_type'] = $message->getContentType();
+
+ $transportMessage = $this->context->createMessage();
+ $transportMessage->setBody($message->getBody());
+ $transportMessage->setHeaders($headers);
+ $transportMessage->setProperties($properties);
+ $transportMessage->setMessageId($message->getMessageId());
+ $transportMessage->setTimestamp($message->getTimestamp());
+
+ return $transportMessage;
+ }
+
+ /**
+ * @param FsMessage $message
+ *
+ * {@inheritdoc}
+ */
+ public function createClientMessage(TransportMessage $message)
+ {
+ $clientMessage = new Message();
+
+ $clientMessage->setBody($message->getBody());
+ $clientMessage->setHeaders($message->getHeaders());
+ $clientMessage->setProperties($message->getProperties());
+
+ $clientMessage->setContentType($message->getHeader('content_type'));
+ $clientMessage->setMessageId($message->getMessageId());
+ $clientMessage->setTimestamp($message->getTimestamp());
+ $clientMessage->setPriority(MessagePriority::NORMAL);
+
+ return $clientMessage;
+ }
+
+ /**
+ * @return Config
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+
+ /**
+ * @return FsDestination
+ */
+ private function createRouterTopic()
+ {
+ return $this->context->createTopic(
+ $this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
+ );
+ }
+}
diff --git a/pkg/fs/FsConnectionFactory.php b/pkg/fs/FsConnectionFactory.php
new file mode 100644
index 000000000..417e63542
--- /dev/null
+++ b/pkg/fs/FsConnectionFactory.php
@@ -0,0 +1,35 @@
+config = array_replace([
+ 'store_dir' => null,
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ], $config);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsContext
+ */
+ public function createContext()
+ {
+ return new FsContext($this->config['store_dir'], $this->config['pre_fetch_count'], $this->config['chmod']);
+ }
+}
diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php
new file mode 100644
index 000000000..ba47de3cb
--- /dev/null
+++ b/pkg/fs/FsConsumer.php
@@ -0,0 +1,168 @@
+context = $context;
+ $this->destination = $destination;
+ $this->preFetchCount = $preFetchCount;
+
+ $this->preFetchedMessages = [];
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsDestination
+ */
+ public function getQueue()
+ {
+ return $this->destination;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsMessage|null
+ */
+ public function receive($timeout = 0)
+ {
+ $end = microtime(true) + ($timeout / 1000);
+ while (0 === $timeout || microtime(true) < $end) {
+ if ($message = $this->receiveNoWait()) {
+ return $message;
+ }
+
+ usleep(100);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function receiveNoWait()
+ {
+ if ($this->preFetchedMessages) {
+ return array_shift($this->preFetchedMessages);
+ }
+
+ $this->context->workWithFile($this->destination, 'c+', function (FsDestination $destination, $file) {
+ $count = $this->preFetchCount;
+ while ($count) {
+ $frame = $this->readFrame($file, 1);
+ ftruncate($file, fstat($file)['size'] - strlen($frame));
+ rewind($file);
+
+ $rawMessage = substr(trim($frame), 1);
+
+ if ($rawMessage) {
+ try {
+ $this->preFetchedMessages[] = FsMessage::jsonUnserialize($rawMessage);
+ } catch (\Exception $e) {
+ throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e);
+ }
+ } else {
+ return;
+ }
+
+ --$count;
+ }
+ });
+
+ if ($this->preFetchedMessages) {
+ return array_shift($this->preFetchedMessages);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function acknowledge(Message $message)
+ {
+ // do nothing. fs transport always works in auto ack mode
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Message $message, $requeue = false)
+ {
+ // do nothing on reject. fs transport always works in auto ack mode
+
+ if ($requeue) {
+ $this->context->createProducer()->send($this->destination, $message);
+ }
+ }
+
+ /**
+ * @return int
+ */
+ public function getPreFetchCount()
+ {
+ return $this->preFetchCount;
+ }
+
+ /**
+ * @param int $preFetchCount
+ */
+ public function setPreFetchCount($preFetchCount)
+ {
+ $this->preFetchCount = $preFetchCount;
+ }
+
+ /**
+ * @param resource $file
+ * @param int $frameNumber
+ *
+ * @return string
+ */
+ private function readFrame($file, $frameNumber)
+ {
+ $frameSize = 64;
+ $offset = $frameNumber * $frameSize;
+
+ fseek($file, -$offset, SEEK_END);
+ $frame = fread($file, $frameSize);
+
+ if ('' == $frame) {
+ return '';
+ }
+
+ if (false !== strpos($frame, '|{')) {
+ return $frame;
+ }
+
+ return $this->readFrame($file, $frameNumber + 1).$frame;
+ }
+}
diff --git a/pkg/fs/FsContext.php b/pkg/fs/FsContext.php
new file mode 100644
index 000000000..d90fc0d9e
--- /dev/null
+++ b/pkg/fs/FsContext.php
@@ -0,0 +1,228 @@
+storeDir = $storeDir;
+ $this->preFetchCount = $preFetchCount;
+ $this->chmod = $chmod;
+
+ $this->lockHandlers = [];
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsMessage
+ */
+ public function createMessage($body = '', array $properties = [], array $headers = [])
+ {
+ return new FsMessage($body, $properties, $headers);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsDestination
+ */
+ public function createTopic($topicName)
+ {
+ return $this->createQueue($topicName);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsDestination
+ */
+ public function createQueue($queueName)
+ {
+ return new FsDestination(new \SplFileInfo($this->getStoreDir().'/'.$queueName));
+ }
+
+ /**
+ * @param Destination|FsDestination $destination
+ */
+ public function declareDestination(Destination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class);
+
+ set_error_handler(function ($severity, $message, $file, $line) {
+ throw new \ErrorException($message, 0, $severity, $file, $line);
+ });
+
+ try {
+ if (false == file_exists($destination->getFileInfo())) {
+ touch($destination->getFileInfo());
+ chmod($destination->getFileInfo(), $this->chmod);
+ }
+ } finally {
+ restore_error_handler();
+ }
+ }
+
+ public function workWithFile(FsDestination $destination, $mode, callable $callback)
+ {
+ $this->declareDestination($destination);
+
+ set_error_handler(function ($severity, $message, $file, $line) {
+ throw new \ErrorException($message, 0, $severity, $file, $line);
+ });
+
+ try {
+ $file = fopen($destination->getFileInfo(), $mode);
+ $lockHandler = $this->getLockHandler($destination);
+
+ if (false == $lockHandler->lock(true)) {
+ throw new \LogicException(sprintf('Cannot obtain the lock for destination %s', $destination->getName()));
+ }
+
+ return call_user_func($callback, $destination, $file);
+ } finally {
+ if (isset($file)) {
+ fclose($file);
+ }
+ if (isset($lockHandler)) {
+ $lockHandler->release();
+ }
+
+ restore_error_handler();
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsDestination
+ */
+ public function createTemporaryQueue()
+ {
+ return new FsDestination(new TempFile($this->getStoreDir().'/'.uniqid('tmp-q-', true)));
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return FsProducer
+ */
+ public function createProducer()
+ {
+ return new FsProducer($this);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param FsDestination $destination
+ *
+ * @return FsConsumer
+ */
+ public function createConsumer(Destination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class);
+
+ return new FsConsumer($this, $destination, $this->preFetchCount);
+ }
+
+ public function close()
+ {
+ foreach ($this->lockHandlers as $lockHandler) {
+ $lockHandler->release();
+ }
+
+ $this->lockHandlers = [];
+ }
+
+ /**
+ * @param Queue|FsDestination $queue
+ */
+ public function purge(Queue $queue)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($queue, FsDestination::class);
+
+ $this->workWithFile($queue, 'c', function (FsDestination $destination, $file) use ($queue) {
+ ftruncate($file, 0);
+ });
+ }
+
+ /**
+ * @return int
+ */
+ public function getPreFetchCount()
+ {
+ return $this->preFetchCount;
+ }
+
+ /**
+ * @param int $preFetchCount
+ */
+ public function setPreFetchCount($preFetchCount)
+ {
+ $this->preFetchCount = $preFetchCount;
+ }
+
+ /**
+ * @return string
+ */
+ private function getStoreDir()
+ {
+ if (false == is_dir($this->storeDir)) {
+ throw new \LogicException(sprintf('The directory %s does not exist', $this->storeDir));
+ }
+
+ if (false == is_writable($this->storeDir)) {
+ throw new \LogicException(sprintf('The directory %s is not writable', $this->storeDir));
+ }
+
+ return $this->storeDir;
+ }
+
+ /**
+ * @param FsDestination $destination
+ *
+ * @return LockHandler
+ */
+ private function getLockHandler(FsDestination $destination)
+ {
+ if (false == isset($this->lockHandlers[$destination->getName()])) {
+ $this->lockHandlers[$destination->getName()] = new LockHandler($destination->getName(), $this->storeDir);
+ }
+
+ return $this->lockHandlers[$destination->getName()];
+ }
+}
diff --git a/pkg/fs/FsDestination.php b/pkg/fs/FsDestination.php
new file mode 100644
index 000000000..c357a262a
--- /dev/null
+++ b/pkg/fs/FsDestination.php
@@ -0,0 +1,54 @@
+file = $file;
+ }
+
+ /**
+ * @return \SplFileInfo
+ */
+ public function getFileInfo()
+ {
+ return $this->file;
+ }
+
+ /**
+ * @return string
+ */
+ public function getName()
+ {
+ return $this->file->getFilename();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueueName()
+ {
+ return $this->getName();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTopicName()
+ {
+ return $this->getName();
+ }
+}
diff --git a/pkg/fs/FsMessage.php b/pkg/fs/FsMessage.php
new file mode 100644
index 000000000..881d3dbb9
--- /dev/null
+++ b/pkg/fs/FsMessage.php
@@ -0,0 +1,234 @@
+body = $body;
+ $this->properties = $properties;
+ $this->headers = $headers;
+ $this->redelivered = false;
+ }
+
+ /**
+ * @param string $body
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * @param array $properties
+ */
+ public function setProperties(array $properties)
+ {
+ $this->properties = $properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperties()
+ {
+ return $this->properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperty($name, $value)
+ {
+ $this->properties[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperty($name, $default = null)
+ {
+ return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
+ }
+
+ /**
+ * @param array $headers
+ */
+ public function setHeaders(array $headers)
+ {
+ $this->headers = $headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setHeader($name, $value)
+ {
+ $this->headers[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeader($name, $default = null)
+ {
+ return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default;
+ }
+
+ /**
+ * @return bool
+ */
+ public function isRedelivered()
+ {
+ return $this->redelivered;
+ }
+
+ /**
+ * @param bool $redelivered
+ */
+ public function setRedelivered($redelivered)
+ {
+ $this->redelivered = $redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setCorrelationId($correlationId)
+ {
+ $this->setHeader('correlation_id', (string) $correlationId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getCorrelationId()
+ {
+ return $this->getHeader('correlation_id', '');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setMessageId($messageId)
+ {
+ $this->setHeader('message_id', (string) $messageId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageId()
+ {
+ return $this->getHeader('message_id', '');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimestamp()
+ {
+ $value = $this->getHeader('timestamp');
+
+ return $value === null ? null : (int) $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimestamp($timestamp)
+ {
+ $this->setHeader('timestamp', $timestamp);
+ }
+
+ /**
+ * @param string|null $replyTo
+ */
+ public function setReplyTo($replyTo)
+ {
+ $this->setHeader('reply-to', $replyTo);
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getReplyTo()
+ {
+ return $this->getHeader('reply-to');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function jsonSerialize()
+ {
+ return [
+ 'body' => $this->getBody(),
+ 'properties' => $this->getProperties(),
+ 'headers' => $this->getHeaders(),
+ ];
+ }
+
+ /**
+ * @param string $json
+ *
+ * @return FsMessage
+ */
+ public static function jsonUnserialize($json)
+ {
+ $data = json_decode($json, true);
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf(
+ 'The malformed json given. Error %s and message %s',
+ json_last_error(),
+ json_last_error_msg()
+ ));
+ }
+
+ return new self($data['body'], $data['properties'], $data['headers']);
+ }
+}
diff --git a/pkg/fs/FsProducer.php b/pkg/fs/FsProducer.php
new file mode 100644
index 000000000..2f5864bf4
--- /dev/null
+++ b/pkg/fs/FsProducer.php
@@ -0,0 +1,59 @@
+context = $context;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param FsDestination $destination
+ * @param FsMessage $message
+ */
+ public function send(Destination $destination, Message $message)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class);
+ InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class);
+
+ $this->context->workWithFile($destination, 'a+', function (FsDestination $destination, $file) use ($message) {
+ $fileInfo = $destination->getFileInfo();
+ if ($fileInfo instanceof TempFile && false == file_exists($fileInfo)) {
+ return;
+ }
+
+ $rawMessage = '|'.json_encode($message);
+
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf(
+ 'Could not encode value into json. Error %s and message %s',
+ json_last_error(),
+ json_last_error_msg()
+ ));
+ }
+
+ $rawMessage = str_repeat(' ', 64 - (strlen($rawMessage) % 64)).$rawMessage;
+
+ fwrite($file, $rawMessage);
+ });
+ }
+}
diff --git a/pkg/fs/LICENSE b/pkg/fs/LICENSE
new file mode 100644
index 000000000..70fa75252
--- /dev/null
+++ b/pkg/fs/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2017 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
\ No newline at end of file
diff --git a/pkg/fs/README.md b/pkg/fs/README.md
new file mode 100644
index 000000000..f3b1a9a43
--- /dev/null
+++ b/pkg/fs/README.md
@@ -0,0 +1,11 @@
+# Enqueue Filesystem Transport
+
+## Resources
+
+* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Questions](https://gitter.im/php-enqueue/Lobby)
+* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
+
+## License
+
+It is released under the [MIT License](LICENSE).
diff --git a/pkg/fs/Symfony/FsTransportFactory.php b/pkg/fs/Symfony/FsTransportFactory.php
new file mode 100644
index 000000000..0447aed01
--- /dev/null
+++ b/pkg/fs/Symfony/FsTransportFactory.php
@@ -0,0 +1,108 @@
+name = $name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addConfiguration(ArrayNodeDefinition $builder)
+ {
+ $builder
+ ->children()
+ ->scalarNode('store_dir')
+ ->isRequired()
+ ->cannotBeEmpty()
+ ->info('The store directory where all queue\topics files will be created and messages are stored')
+ ->end()
+ ->integerNode('pre_fetch_count')
+ ->min(1)
+ ->defaultValue(1)
+ ->info('The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.')
+ ->end()
+ ->integerNode('chmod')
+ ->defaultValue(0600)
+ ->info('The queue files are created with this given permissions if not exist.')
+ ->end()
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createConnectionFactory(ContainerBuilder $container, array $config)
+ {
+ $factory = new Definition(FsConnectionFactory::class);
+ $factory->setArguments([$config]);
+
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+ $container->setDefinition($factoryId, $factory);
+
+ return $factoryId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createContext(ContainerBuilder $container, array $config)
+ {
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+
+ $context = new Definition(FsContext::class);
+ $context->setFactory([new Reference($factoryId), 'createContext']);
+
+ $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
+ $container->setDefinition($contextId, $context);
+
+ return $contextId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createDriver(ContainerBuilder $container, array $config)
+ {
+ $driver = new Definition(FsDriver::class);
+ $driver->setArguments([
+ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
+ new Reference('enqueue.client.config'),
+ new Reference('enqueue.client.meta.queue_meta_registry'),
+ ]);
+
+ $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
+ $container->setDefinition($driverId, $driver);
+
+ return $driverId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getName()
+ {
+ return $this->name;
+ }
+}
diff --git a/pkg/fs/Tests/Driver/FsDriverTest.php b/pkg/fs/Tests/Driver/FsDriverTest.php
new file mode 100644
index 000000000..47d145302
--- /dev/null
+++ b/pkg/fs/Tests/Driver/FsDriverTest.php
@@ -0,0 +1,347 @@
+assertClassImplements(DriverInterface::class, FsDriver::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new FsDriver(
+ $this->createPsrContextMock(),
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+ }
+
+ public function testShouldReturnConfigObject()
+ {
+ $config = new Config('', '', '', '', '', '');
+
+ $driver = new FsDriver($this->createPsrContextMock(), $config, $this->createQueueMetaRegistryMock());
+
+ $this->assertSame($config, $driver->getConfig());
+ }
+
+ public function testShouldCreateAndReturnQueueInstance()
+ {
+ $expectedQueue = new FsDestination(new TempFile(sys_get_temp_dir().'/queue-name'));
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('name')
+ ->will($this->returnValue($expectedQueue))
+ ;
+
+ $driver = new FsDriver($context, new Config('', '', '', '', '', ''), $this->createQueueMetaRegistryMock());
+
+ $queue = $driver->createQueue('name');
+
+ $this->assertSame($expectedQueue, $queue);
+ $this->assertSame('queue-name', $queue->getQueueName());
+ }
+
+ public function testShouldConvertTransportMessageToClientMessage()
+ {
+ $transportMessage = new FsMessage();
+ $transportMessage->setBody('body');
+ $transportMessage->setHeaders(['hkey' => 'hval']);
+ $transportMessage->setProperties(['key' => 'val']);
+ $transportMessage->setHeader('content_type', 'ContentType');
+ $transportMessage->setMessageId('MessageId');
+ $transportMessage->setTimestamp(1000);
+
+ $driver = new FsDriver(
+ $this->createPsrContextMock(),
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $clientMessage = $driver->createClientMessage($transportMessage);
+
+ $this->assertInstanceOf(Message::class, $clientMessage);
+ $this->assertSame('body', $clientMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ ], $clientMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $clientMessage->getProperties());
+ $this->assertSame('MessageId', $clientMessage->getMessageId());
+ $this->assertSame('ContentType', $clientMessage->getContentType());
+ $this->assertSame(1000, $clientMessage->getTimestamp());
+
+ $this->assertNull($clientMessage->getExpire());
+ $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority());
+ }
+
+ public function testShouldConvertClientMessageToTransportMessage()
+ {
+ $clientMessage = new Message();
+ $clientMessage->setBody('body');
+ $clientMessage->setHeaders(['hkey' => 'hval']);
+ $clientMessage->setProperties(['key' => 'val']);
+ $clientMessage->setContentType('ContentType');
+ $clientMessage->setExpire(123);
+ $clientMessage->setPriority(MessagePriority::VERY_HIGH);
+ $clientMessage->setMessageId('MessageId');
+ $clientMessage->setTimestamp(1000);
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn(new FsMessage())
+ ;
+
+ $driver = new FsDriver(
+ $context,
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $transportMessage = $driver->createTransportMessage($clientMessage);
+
+ $this->assertInstanceOf(FsMessage::class, $transportMessage);
+ $this->assertSame('body', $transportMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ ], $transportMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $transportMessage->getProperties());
+ $this->assertSame('MessageId', $transportMessage->getMessageId());
+ $this->assertSame(1000, $transportMessage->getTimestamp());
+ }
+
+ public function testShouldSendMessageToRouter()
+ {
+ $topic = new FsDestination(TempFile::generate());
+ $transportMessage = new FsMessage();
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createTopic')
+ ->willReturn($topic)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new FsDriver(
+ $context,
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic');
+
+ $driver->sendToRouter($message);
+ }
+
+ public function testShouldThrowExceptionIfTopicParameterIsNotSet()
+ {
+ $driver = new FsDriver(
+ $this->createPsrContextMock(),
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Topic name parameter is required but is not set');
+
+ $driver->sendToRouter(new Message());
+ }
+
+ public function testShouldSendMessageToProcessor()
+ {
+ $queue = new FsDestination(TempFile::generate());
+ $transportMessage = new FsMessage();
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($queue), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->willReturn($queue)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new FsDriver(
+ $context,
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+ $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'queue');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet()
+ {
+ $driver = new FsDriver(
+ $this->createPsrContextMock(),
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Processor name parameter is required but is not set');
+
+ $driver->sendToProcessor(new Message());
+ }
+
+ public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet()
+ {
+ $driver = new FsDriver(
+ $this->createPsrContextMock(),
+ new Config('', '', '', '', '', ''),
+ $this->createQueueMetaRegistryMock()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Queue name parameter is required but is not set');
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldSetupBroker()
+ {
+ $routerTopic = new FsDestination(TempFile::generate());
+ $routerQueue = new FsDestination(TempFile::generate());
+
+ $processorQueue = new FsDestination(TempFile::generate());
+
+ $context = $this->createPsrContextMock();
+ // setup router
+ $context
+ ->expects($this->at(0))
+ ->method('createTopic')
+ ->willReturn($routerTopic)
+ ;
+ $context
+ ->expects($this->at(1))
+ ->method('createQueue')
+ ->willReturn($routerQueue)
+ ;
+ $context
+ ->expects($this->at(2))
+ ->method('declareDestination')
+ ->with($this->identicalTo($routerTopic))
+ ;
+ $context
+ ->expects($this->at(3))
+ ->method('declareDestination')
+ ->with($this->identicalTo($routerQueue))
+ ;
+ // setup processor queue
+ $context
+ ->expects($this->at(4))
+ ->method('createQueue')
+ ->willReturn($processorQueue)
+ ;
+ $context
+ ->expects($this->at(5))
+ ->method('declareDestination')
+ ->with($this->identicalTo($processorQueue))
+ ;
+
+ $meta = new QueueMetaRegistry(new Config('', '', '', '', '', ''), [
+ 'default' => [],
+ ], 'default');
+
+ $driver = new FsDriver(
+ $context,
+ new Config('', '', '', '', '', ''),
+ $meta
+ );
+
+ $driver->setupBroker();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|FsContext
+ */
+ private function createPsrContextMock()
+ {
+ return $this->createMock(FsContext::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Producer
+ */
+ private function createPsrProducerMock()
+ {
+ return $this->createMock(Producer::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|QueueMetaRegistry
+ */
+ private function createQueueMetaRegistryMock()
+ {
+ return $this->createMock(QueueMetaRegistry::class);
+ }
+}
diff --git a/pkg/fs/Tests/FsConnectionFactoryTest.php b/pkg/fs/Tests/FsConnectionFactoryTest.php
new file mode 100644
index 000000000..fa356cd12
--- /dev/null
+++ b/pkg/fs/Tests/FsConnectionFactoryTest.php
@@ -0,0 +1,57 @@
+assertClassImplements(ConnectionFactory::class, FsConnectionFactory::class);
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $factory = new FsConnectionFactory([]);
+
+ $this->assertAttributeEquals([
+ 'store_dir' => null,
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ], 'config', $factory);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $factory = new FsConnectionFactory(['store_dir' => 'theCustomDir']);
+
+ $this->assertAttributeEquals([
+ 'store_dir' => 'theCustomDir',
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ], 'config', $factory);
+ }
+
+ public function testShouldCreateContext()
+ {
+ $factory = new FsConnectionFactory([
+ 'store_dir' => 'theDir',
+ 'pre_fetch_count' => 123,
+ 'chmod' => 0765,
+ ]);
+
+ $context = $factory->createContext();
+
+ $this->assertInstanceOf(FsContext::class, $context);
+
+ $this->assertAttributeSame('theDir', 'storeDir', $context);
+ $this->assertAttributeSame(123, 'preFetchCount', $context);
+ $this->assertAttributeSame(0765, 'chmod', $context);
+ }
+}
diff --git a/pkg/fs/Tests/FsConsumerTest.php b/pkg/fs/Tests/FsConsumerTest.php
new file mode 100644
index 000000000..8fcf062f8
--- /dev/null
+++ b/pkg/fs/Tests/FsConsumerTest.php
@@ -0,0 +1,143 @@
+assertClassImplements(Consumer::class, FsConsumer::class);
+ }
+
+ public function testCouldBeConstructedWithContextAndDestinationAndPreFetchCountAsArguments()
+ {
+ new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 1);
+ }
+
+ public function testShouldReturnDestinationSetInConstructorOnGetQueue()
+ {
+ $destination = new FsDestination(TempFile::generate());
+
+ $consumer = new FsConsumer($this->createContextMock(), $destination, 1);
+
+ $this->assertSame($destination, $consumer->getQueue());
+ }
+
+ public function testShouldAllowGetPreFetchCountSetInConstructor()
+ {
+ $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
+
+ $this->assertSame(123, $consumer->getPreFetchCount());
+ }
+
+ public function testShouldAllowGetPreviouslySetPreFetchCount()
+ {
+ $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
+
+ $consumer->setPreFetchCount(456);
+
+ $this->assertSame(456, $consumer->getPreFetchCount());
+ }
+
+ public function testShouldDoNothingOnAcknowledge()
+ {
+ $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
+
+ $consumer->acknowledge(new FsMessage());
+ }
+
+ public function testShouldDoNothingOnReject()
+ {
+ $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
+
+ $consumer->reject(new FsMessage());
+ }
+
+ public function testShouldSendSameMessageToDestinationOnReQueue()
+ {
+ $message = new FsMessage();
+
+ $destination = new FsDestination(TempFile::generate());
+
+ $producerMock = $this->createProducerMock();
+ $producerMock
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($destination), $this->identicalTo($message))
+ ;
+
+ $contextMock = $this->createContextMock();
+ $contextMock
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producerMock)
+ ;
+
+ $consumer = new FsConsumer($contextMock, $destination, 123);
+
+ $consumer->reject($message, true);
+ }
+
+ public function testShouldCallContextWorkWithFileAndCallbackToItOnReceiveNoWait()
+ {
+ $destination = new FsDestination(TempFile::generate());
+
+ $contextMock = $this->createContextMock();
+ $contextMock
+ ->expects($this->once())
+ ->method('workWithFile')
+ ->with($this->identicalTo($destination), 'c+', $this->isInstanceOf(\Closure::class))
+ ;
+
+ $consumer = new FsConsumer($contextMock, $destination, 1);
+
+ $consumer->receiveNoWait();
+ }
+
+ public function testShouldWaitTwoSecondsForMessageAndExitOnReceive()
+ {
+ $destination = new FsDestination(TempFile::generate());
+
+ $contextMock = $this->createContextMock();
+ $contextMock
+ ->expects($this->atLeastOnce())
+ ->method('workWithFile')
+ ;
+
+ $consumer = new FsConsumer($contextMock, $destination, 1);
+
+ $start = microtime(true);
+ $consumer->receive(2000);
+ $end = microtime(true);
+
+ $this->assertGreaterThan(1.5, $end - $start);
+ $this->assertLessThan(3.5, $end - $start);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|FsProducer
+ */
+ private function createProducerMock()
+ {
+ return $this->createMock(FsProducer::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|FsContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(FsContext::class);
+ }
+}
diff --git a/pkg/fs/Tests/FsContextTest.php b/pkg/fs/Tests/FsContextTest.php
new file mode 100644
index 000000000..f51a0b242
--- /dev/null
+++ b/pkg/fs/Tests/FsContextTest.php
@@ -0,0 +1,236 @@
+assertClassImplements(Context::class, FsContext::class);
+ }
+
+ public function testCouldBeConstructedWithExpectedArguments()
+ {
+ new FsContext(sys_get_temp_dir(), 1, 0666);
+ }
+
+ public function testShouldAllowCreateEmptyMessage()
+ {
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $message = $context->createMessage();
+
+ $this->assertInstanceOf(FsMessage::class, $message);
+
+ $this->assertSame('', $message->getBody());
+ $this->assertSame([], $message->getProperties());
+ $this->assertSame([], $message->getHeaders());
+ }
+
+ public function testShouldAllowCreateCustomMessage()
+ {
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']);
+
+ $this->assertInstanceOf(FsMessage::class, $message);
+
+ $this->assertSame('theBody', $message->getBody());
+ $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties());
+ $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders());
+ }
+
+ public function testShouldCreateQueue()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $queue = $context->createQueue($tmpFile->getFilename());
+
+ $this->assertInstanceOf(FsDestination::class, $queue);
+ $this->assertInstanceOf(\SplFileInfo::class, $queue->getFileInfo());
+ $this->assertSame((string) $tmpFile, (string) $queue->getFileInfo());
+
+ $this->assertSame($tmpFile->getFilename(), $queue->getTopicName());
+ }
+
+ public function testShouldAllowCreateTopic()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $topic = $context->createTopic($tmpFile->getFilename());
+
+ $this->assertInstanceOf(FsDestination::class, $topic);
+ $this->assertInstanceOf(\SplFileInfo::class, $topic->getFileInfo());
+ $this->assertSame((string) $tmpFile, (string) $topic->getFileInfo());
+
+ $this->assertSame($tmpFile->getFilename(), $topic->getTopicName());
+ }
+
+ public function testShouldAllowCreateTmpQueue()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $queue = $context->createTemporaryQueue();
+
+ $this->assertInstanceOf(FsDestination::class, $queue);
+ $this->assertInstanceOf(TempFile::class, $queue->getFileInfo());
+ $this->assertNotEmpty($queue->getQueueName());
+ }
+
+ public function testShouldCreateProducer()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $producer = $context->createProducer();
+
+ $this->assertInstanceOf(FsProducer::class, $producer);
+ }
+
+ public function testShouldThrowIfNotFsDestinationGivenOnCreateConsumer()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage('The destination must be an instance of Enqueue\Fs\FsDestination but got Enqueue\Transport\Null\NullQueue.');
+ $consumer = $context->createConsumer(new NullQueue('aQueue'));
+
+ $this->assertInstanceOf(FsConsumer::class, $consumer);
+ }
+
+ public function testShouldCreateConsumer()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $queue = $context->createQueue($tmpFile->getFilename());
+
+ $context->createConsumer($queue);
+ }
+
+ public function testShouldPropagatePreFetchCountToCreatedConsumer()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 123, 0666);
+
+ $queue = $context->createQueue($tmpFile->getFilename());
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertInstanceOf(FsConsumer::class, $consumer);
+
+ $this->assertAttributeSame(123, 'preFetchCount', $consumer);
+ }
+
+ public function testShouldAllowGetPreFetchCountSetInConstructor()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 123, 0666);
+
+ $this->assertSame(123, $context->getPreFetchCount());
+ }
+
+ public function testShouldAllowGetPreviouslySetPreFetchCount()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $context->setPreFetchCount(456);
+
+ $this->assertSame(456, $context->getPreFetchCount());
+ }
+
+ public function testShouldAllowPurgeMessagesFromQueue()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/foo');
+
+ file_put_contents($tmpFile, 'foo');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $queue = $context->createQueue($tmpFile->getFilename());
+
+ $context->purge($queue);
+
+ $this->assertEmpty(file_get_contents($tmpFile));
+ }
+
+ public function testShouldReleaseAllLocksOnClose()
+ {
+ new TempFile(sys_get_temp_dir().'/foo');
+ new TempFile(sys_get_temp_dir().'/bar');
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $fooQueue = $context->createQueue('foo');
+ $barQueue = $context->createTopic('bar');
+
+ $this->assertAttributeCount(0, 'lockHandlers', $context);
+
+ $context->workWithFile($fooQueue, 'r+', function () {
+ });
+ $context->workWithFile($barQueue, 'r+', function () {
+ });
+ $context->workWithFile($fooQueue, 'c+', function () {
+ });
+ $context->workWithFile($barQueue, 'c+', function () {
+ });
+
+ $this->assertAttributeCount(2, 'lockHandlers', $context);
+
+ $context->close();
+
+ $this->assertAttributeCount(0, 'lockHandlers', $context);
+ }
+
+ public function testShouldCreateFileOnFilesystemIfNotExistOnDeclareDestination()
+ {
+ $tmpFile = new TempFile(sys_get_temp_dir().'/'.uniqid());
+
+ $context = new FsContext(sys_get_temp_dir(), 1, 0666);
+
+ $queue = $context->createQueue($tmpFile->getFilename());
+
+ $this->assertFileNotExists((string) $tmpFile);
+
+ $context->declareDestination($queue);
+
+ $this->assertFileExists((string) $tmpFile);
+ $this->assertTrue(is_readable($tmpFile));
+ $this->assertTrue(is_writable($tmpFile));
+
+ // do nothing if file already exists
+ $context->declareDestination($queue);
+
+ $this->assertFileExists((string) $tmpFile);
+
+ unlink($tmpFile);
+ }
+}
diff --git a/pkg/fs/Tests/FsDestinationTest.php b/pkg/fs/Tests/FsDestinationTest.php
new file mode 100644
index 000000000..199243a44
--- /dev/null
+++ b/pkg/fs/Tests/FsDestinationTest.php
@@ -0,0 +1,65 @@
+assertClassImplements(Topic::class, FsDestination::class);
+ $this->assertClassImplements(Queue::class, FsDestination::class);
+ }
+
+ public function testCouldBeConstructedWithSplFileAsFirstArgument()
+ {
+ $splFile = new \SplFileInfo((string) TempFile::generate());
+
+ $destination = new FsDestination($splFile);
+
+ $this->assertSame($splFile, $destination->getFileInfo());
+ }
+
+ public function testCouldBeConstructedWithTempFileAsFirstArgument()
+ {
+ $tmpFile = new TempFile((string) TempFile::generate());
+
+ $destination = new FsDestination($tmpFile);
+
+ $this->assertSame($tmpFile, $destination->getFileInfo());
+ }
+
+ public function testShouldReturnFileNameOnGetNameCall()
+ {
+ $splFile = new \SplFileInfo((string) TempFile::generate());
+
+ $destination = new FsDestination($splFile);
+
+ $this->assertSame($splFile->getFilename(), $destination->getName());
+ }
+
+ public function testShouldReturnFileNameOnGetQueueNameCall()
+ {
+ $splFile = new \SplFileInfo((string) TempFile::generate());
+
+ $destination = new FsDestination($splFile);
+
+ $this->assertSame($splFile->getFilename(), $destination->getQueueName());
+ }
+
+ public function testShouldReturnFileNameOnGetTopicNameCall()
+ {
+ $splFile = new \SplFileInfo((string) TempFile::generate());
+
+ $destination = new FsDestination($splFile);
+
+ $this->assertSame($splFile->getFilename(), $destination->getTopicName());
+ }
+}
diff --git a/pkg/fs/Tests/FsMessageTest.php b/pkg/fs/Tests/FsMessageTest.php
new file mode 100644
index 000000000..7412b8c4e
--- /dev/null
+++ b/pkg/fs/Tests/FsMessageTest.php
@@ -0,0 +1,179 @@
+assertClassImplements(Message::class, FsMessage::class);
+ }
+
+ public function testShouldImplementJsonSerializableInterface()
+ {
+ $this->assertClassImplements(\JsonSerializable::class, FsMessage::class);
+ }
+
+ public function testCouldConstructMessageWithBody()
+ {
+ $message = new FsMessage('body');
+
+ $this->assertSame('body', $message->getBody());
+ }
+
+ public function testCouldConstructMessageWithProperties()
+ {
+ $message = new FsMessage('', ['key' => 'value']);
+
+ $this->assertSame(['key' => 'value'], $message->getProperties());
+ }
+
+ public function testCouldConstructMessageWithHeaders()
+ {
+ $message = new FsMessage('', [], ['key' => 'value']);
+
+ $this->assertSame(['key' => 'value'], $message->getHeaders());
+ }
+
+ public function testCouldSetGetBody()
+ {
+ $message = new FsMessage();
+ $message->setBody('body');
+
+ $this->assertSame('body', $message->getBody());
+ }
+
+ public function testCouldSetGetProperties()
+ {
+ $message = new FsMessage();
+ $message->setProperties(['key' => 'value']);
+
+ $this->assertSame(['key' => 'value'], $message->getProperties());
+ }
+
+ public function testCouldSetGetHeaders()
+ {
+ $message = new FsMessage();
+ $message->setHeaders(['key' => 'value']);
+
+ $this->assertSame(['key' => 'value'], $message->getHeaders());
+ }
+
+ public function testCouldSetGetRedelivered()
+ {
+ $message = new FsMessage();
+
+ $message->setRedelivered(true);
+ $this->assertTrue($message->isRedelivered());
+
+ $message->setRedelivered(false);
+ $this->assertFalse($message->isRedelivered());
+ }
+
+ public function testCouldSetGetCorrelationId()
+ {
+ $message = new FsMessage();
+ $message->setCorrelationId('the-correlation-id');
+
+ $this->assertSame('the-correlation-id', $message->getCorrelationId());
+ }
+
+ public function testShouldSetCorrelationIdAsHeader()
+ {
+ $message = new FsMessage();
+ $message->setCorrelationId('the-correlation-id');
+
+ $this->assertSame(['correlation_id' => 'the-correlation-id'], $message->getHeaders());
+ }
+
+ public function testCouldSetGetMessageId()
+ {
+ $message = new FsMessage();
+ $message->setMessageId('the-message-id');
+
+ $this->assertSame('the-message-id', $message->getMessageId());
+ }
+
+ public function testCouldSetMessageIdAsHeader()
+ {
+ $message = new FsMessage();
+ $message->setMessageId('the-message-id');
+
+ $this->assertSame(['message_id' => 'the-message-id'], $message->getHeaders());
+ }
+
+ public function testCouldSetGetTimestamp()
+ {
+ $message = new FsMessage();
+ $message->setTimestamp(12345);
+
+ $this->assertSame(12345, $message->getTimestamp());
+ }
+
+ public function testCouldSetTimestampAsHeader()
+ {
+ $message = new FsMessage();
+ $message->setTimestamp(12345);
+
+ $this->assertSame(['timestamp' => 12345], $message->getHeaders());
+ }
+
+ public function testShouldReturnNullAsDefaultReplyTo()
+ {
+ $message = new FsMessage();
+
+ $this->assertSame(null, $message->getReplyTo());
+ }
+
+ public function testShouldAllowGetPreviouslySetReplyTo()
+ {
+ $message = new FsMessage();
+ $message->setReplyTo('theQueueName');
+
+ $this->assertSame('theQueueName', $message->getReplyTo());
+ }
+
+ public function testShouldAllowGetPreviouslySetReplyToAsHeader()
+ {
+ $message = new FsMessage();
+ $message->setReplyTo('theQueueName');
+
+ $this->assertSame(['reply-to' => 'theQueueName'], $message->getHeaders());
+ }
+
+ public function testColdBeSerializedToJson()
+ {
+ $message = new FsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']);
+
+ $this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message));
+ }
+
+ public function testCouldBeUnserializedFromJson()
+ {
+ $message = new FsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']);
+
+ $json = json_encode($message);
+
+ //guard
+ $this->assertNotEmpty($json);
+
+ $unserializedMessage = FsMessage::jsonUnserialize($json);
+
+ $this->assertInstanceOf(FsMessage::class, $unserializedMessage);
+ $this->assertEquals($message, $unserializedMessage);
+ }
+
+ public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson()
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('The malformed json given.');
+
+ FsMessage::jsonUnserialize('{]');
+ }
+}
diff --git a/pkg/fs/Tests/FsProducerTest.php b/pkg/fs/Tests/FsProducerTest.php
new file mode 100644
index 000000000..0ad135661
--- /dev/null
+++ b/pkg/fs/Tests/FsProducerTest.php
@@ -0,0 +1,72 @@
+assertClassImplements(Producer::class, FsProducer::class);
+ }
+
+ public function testCouldBeConstructedWithContextAsFirstArgument()
+ {
+ new FsProducer($this->createContextMock());
+ }
+
+ public function testThrowIfDestinationNotFsOnSend()
+ {
+ $producer = new FsProducer($this->createContextMock());
+
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage('The destination must be an instance of Enqueue\Fs\FsDestination but got Enqueue\Transport\Null\NullQueue.');
+ $producer->send(new NullQueue('aQueue'), new FsMessage());
+ }
+
+ public function testThrowIfMessageNotFsOnSend()
+ {
+ $producer = new FsProducer($this->createContextMock());
+
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage('The message must be an instance of Enqueue\Fs\FsMessage but it is Enqueue\Transport\Null\NullMessage.');
+ $producer->send(new FsDestination(TempFile::generate()), new NullMessage());
+ }
+
+ public function testShouldCallContextWorkWithFileAndCallbackToItOnSend()
+ {
+ $destination = new FsDestination(TempFile::generate());
+
+ $contextMock = $this->createContextMock();
+ $contextMock
+ ->expects($this->once())
+ ->method('workWithFile')
+ ->with($this->identicalTo($destination), 'a+', $this->isInstanceOf(\Closure::class))
+ ;
+
+ $producer = new FsProducer($contextMock);
+
+ $producer->send($destination, new FsMessage());
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|FsContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(FsContext::class);
+ }
+}
diff --git a/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php b/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php
new file mode 100644
index 000000000..0636aff67
--- /dev/null
+++ b/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php
@@ -0,0 +1,146 @@
+fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext();
+
+ new TempFile(sys_get_temp_dir().'/fs_test_queue');
+ }
+
+ public function tearDown()
+ {
+ $this->fsContext->close();
+ }
+
+ public function testWaitsForTwoSecondsAndReturnNullOnReceive()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $startAt = microtime(true);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $message = $consumer->receive(2000);
+
+ $endAt = microtime(true);
+
+ $this->assertNull($message);
+
+ $this->assertGreaterThan(1.5, $endAt - $startAt);
+ $this->assertLessThan(2.5, $endAt - $startAt);
+ }
+
+ public function testReturnNullImmediatelyOnReceiveNoWait()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $startAt = microtime(true);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $message = $consumer->receiveNoWait();
+
+ $endAt = microtime(true);
+
+ $this->assertNull($message);
+
+ $this->assertLessThan(0.5, $endAt - $startAt);
+ }
+
+ public function testProduceAndReceiveOneMessageSentDirectlyToQueue()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $message = $this->fsContext->createMessage(
+ __METHOD__,
+ ['FooProperty' => 'FooVal'],
+ ['BarHeader' => 'BarVal']
+ );
+
+ $producer = $this->fsContext->createProducer();
+ $producer->send($queue, $message);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $message = $consumer->receive(1000);
+
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertEquals(__METHOD__, $message->getBody());
+ $this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties());
+ $this->assertEquals([
+ 'BarHeader' => 'BarVal',
+ ], $message->getHeaders());
+ }
+
+ public function testProduceAndReceiveOneMessageSentDirectlyToTemporaryQueue()
+ {
+ $queue = $this->fsContext->createTemporaryQueue();
+
+ $message = $this->fsContext->createMessage(__METHOD__);
+
+ $producer = $this->fsContext->createProducer();
+ $producer->send($queue, $message);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $message = $consumer->receive(1000);
+
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertEquals(__METHOD__, $message->getBody());
+ }
+
+ public function testConsumerReceiveMessageWithZeroTimeout()
+ {
+ $topic = $this->fsContext->createTopic('fs_test_queue_exchange');
+
+ $consumer = $this->fsContext->createConsumer($topic);
+ //guard
+ $this->assertNull($consumer->receive(1000));
+
+ $message = $this->fsContext->createMessage(__METHOD__);
+
+ $producer = $this->fsContext->createProducer();
+ $producer->send($topic, $message);
+ usleep(100);
+ $actualMessage = $consumer->receive(0);
+
+ $this->assertInstanceOf(FsMessage::class, $actualMessage);
+ $consumer->acknowledge($message);
+
+ $this->assertEquals(__METHOD__, $message->getBody());
+ }
+
+ public function testPurgeMessagesFromQueue()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $consumer = $this->fsContext->createConsumer($queue);
+
+ $message = $this->fsContext->createMessage(__METHOD__);
+
+ $producer = $this->fsContext->createProducer();
+ $producer->send($queue, $message);
+ $producer->send($queue, $message);
+
+ $this->fsContext->purge($queue);
+
+ $this->assertNull($consumer->receive(1));
+ }
+}
diff --git a/pkg/fs/Tests/Functional/FsConsumerTest.php b/pkg/fs/Tests/Functional/FsConsumerTest.php
new file mode 100644
index 000000000..dbdb09f93
--- /dev/null
+++ b/pkg/fs/Tests/Functional/FsConsumerTest.php
@@ -0,0 +1,70 @@
+fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext();
+
+ new TempFile(sys_get_temp_dir().'/fs_test_queue');
+ file_put_contents(sys_get_temp_dir().'/fs_test_queue', '');
+ }
+
+ public function tearDown()
+ {
+ $this->fsContext->close();
+ }
+
+ public function testShouldConsumeMessagesFromFileOneByOne()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ file_put_contents(
+ sys_get_temp_dir().'/fs_test_queue',
+ ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]} |{"body":"third message","properties":[],"headers":[]}'
+ );
+
+ $consumer = $this->fsContext->createConsumer($queue);
+
+ $message = $consumer->receiveNoWait();
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $this->assertSame('third message', $message->getBody());
+
+ $this->assertSame(
+ ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]}',
+ file_get_contents(sys_get_temp_dir().'/fs_test_queue')
+ );
+
+ $message = $consumer->receiveNoWait();
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $this->assertSame('second message', $message->getBody());
+
+ $this->assertSame(
+ ' |{"body":"first message","properties":[],"headers":[]}',
+ file_get_contents(sys_get_temp_dir().'/fs_test_queue')
+ );
+
+ $message = $consumer->receiveNoWait();
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $this->assertSame('first message', $message->getBody());
+
+ $this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue'));
+
+ $message = $consumer->receiveNoWait();
+ $this->assertNull($message);
+
+ $this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue'));
+ }
+}
diff --git a/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php b/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php
new file mode 100644
index 000000000..c3d2448e6
--- /dev/null
+++ b/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php
@@ -0,0 +1,109 @@
+fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext();
+
+ new TempFile(sys_get_temp_dir().'/fs_test_queue');
+ }
+
+ public function tearDown()
+ {
+ $this->fsContext->close();
+ }
+
+ public function testConsumeOneMessageAndExit()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $message = $this->fsContext->createMessage(__METHOD__);
+ $this->fsContext->createProducer()->send($queue, $message);
+
+ $queueConsumer = new QueueConsumer($this->fsContext, new ChainExtension([
+ new LimitConsumedMessagesExtension(1),
+ new LimitConsumptionTimeExtension(new \DateTime('+3sec')),
+ ]));
+
+ $processor = new StubProcessor();
+ $queueConsumer->bind($queue, $processor);
+
+ $queueConsumer->consume();
+
+ $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage);
+ $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody());
+ }
+
+ public function testConsumeOneMessageAndSendReplyExit()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $replyQueue = $this->fsContext->createQueue('fs_test_queue_reply');
+
+ $message = $this->fsContext->createMessage(__METHOD__);
+ $message->setReplyTo($replyQueue->getQueueName());
+ $this->fsContext->createProducer()->send($queue, $message);
+
+ $queueConsumer = new QueueConsumer($this->fsContext, new ChainExtension([
+ new LimitConsumedMessagesExtension(2),
+ new LimitConsumptionTimeExtension(new \DateTime('+3sec')),
+ new ReplyExtension(),
+ ]));
+
+ $replyMessage = $this->fsContext->createMessage(__METHOD__.'.reply');
+
+ $processor = new StubProcessor();
+ $processor->result = Result::reply($replyMessage);
+
+ $replyProcessor = new StubProcessor();
+
+ $queueConsumer->bind($queue, $processor);
+ $queueConsumer->bind($replyQueue, $replyProcessor);
+ $queueConsumer->consume();
+
+ $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage);
+ $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody());
+
+ $this->assertInstanceOf(Message::class, $replyProcessor->lastProcessedMessage);
+ $this->assertEquals(__METHOD__.'.reply', $replyProcessor->lastProcessedMessage->getBody());
+ }
+}
+
+class StubProcessor implements Processor
+{
+ public $result = Result::ACK;
+
+ /** @var \Enqueue\Psr\Message */
+ public $lastProcessedMessage;
+
+ public function process(Message $message, Context $context)
+ {
+ $this->lastProcessedMessage = $message;
+
+ return $this->result;
+ }
+}
diff --git a/pkg/fs/Tests/Functional/FsProducerTest.php b/pkg/fs/Tests/Functional/FsProducerTest.php
new file mode 100644
index 000000000..519eefc87
--- /dev/null
+++ b/pkg/fs/Tests/Functional/FsProducerTest.php
@@ -0,0 +1,47 @@
+fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext();
+
+ new TempFile(sys_get_temp_dir().'/fs_test_queue');
+ file_put_contents(sys_get_temp_dir().'/fs_test_queue', '');
+ }
+
+ public function tearDown()
+ {
+ $this->fsContext->close();
+ }
+
+ public function testShouldStoreFilesToFileInExpectedFormat()
+ {
+ $queue = $this->fsContext->createQueue('fs_test_queue');
+
+ $firstMessage = $this->fsContext->createMessage('first message');
+ $secondMessage = $this->fsContext->createMessage('second message');
+ $thirdMessage = $this->fsContext->createMessage('third message');
+
+ $this->fsContext->createProducer()->send($queue, $firstMessage);
+ $this->fsContext->createProducer()->send($queue, $secondMessage);
+ $this->fsContext->createProducer()->send($queue, $thirdMessage);
+
+ $this->assertSame(0, strlen(file_get_contents(sys_get_temp_dir().'/fs_test_queue')) % 64);
+ $this->assertSame(
+ ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]} |{"body":"third message","properties":[],"headers":[]}',
+ file_get_contents(sys_get_temp_dir().'/fs_test_queue')
+ );
+ }
+}
diff --git a/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php b/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php
new file mode 100644
index 000000000..dc394a4cc
--- /dev/null
+++ b/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php
@@ -0,0 +1,94 @@
+fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext();
+
+ new TempFile(sys_get_temp_dir().'/fs_rpc_queue');
+ new TempFile(sys_get_temp_dir().'/fs_reply_queue');
+ }
+
+ public function tearDown()
+ {
+ $this->fsContext->close();
+ }
+
+ public function testDoAsyncRpcCallWithCustomReplyQueue()
+ {
+ $queue = $this->fsContext->createQueue('fs_rpc_queue');
+
+ $replyQueue = $this->fsContext->createQueue('fs_reply_queue');
+
+ $rpcClient = new RpcClient($this->fsContext);
+
+ $message = $this->fsContext->createMessage();
+ $message->setReplyTo($replyQueue->getQueueName());
+
+ $promise = $rpcClient->callAsync($queue, $message, 10);
+ $this->assertInstanceOf(Promise::class, $promise);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $message = $consumer->receive(1);
+ $this->assertInstanceOf(FsMessage::class, $message);
+ $this->assertNotNull($message->getReplyTo());
+ $this->assertNotNull($message->getCorrelationId());
+ $consumer->acknowledge($message);
+
+ $replyQueue = $this->fsContext->createQueue($message->getReplyTo());
+ $replyMessage = $this->fsContext->createMessage('This a reply!');
+ $replyMessage->setCorrelationId($message->getCorrelationId());
+
+ $this->fsContext->createProducer()->send($replyQueue, $replyMessage);
+
+ $actualReplyMessage = $promise->getMessage();
+ $this->assertInstanceOf(FsMessage::class, $actualReplyMessage);
+ }
+
+ public function testDoAsyncRecCallWithCastInternallyCreatedTemporaryReplyQueue()
+ {
+ $queue = $this->fsContext->createQueue('fs_rpc_queue');
+
+ $rpcClient = new RpcClient($this->fsContext);
+
+ $message = $this->fsContext->createMessage();
+
+ $promise = $rpcClient->callAsync($queue, $message, 10);
+ $this->assertInstanceOf(Promise::class, $promise);
+
+ $consumer = $this->fsContext->createConsumer($queue);
+ $receivedMessage = $consumer->receive(1);
+
+ $this->assertInstanceOf(FsMessage::class, $receivedMessage);
+ $this->assertNotNull($receivedMessage->getReplyTo());
+ $this->assertNotNull($receivedMessage->getCorrelationId());
+ $consumer->acknowledge($receivedMessage);
+
+ $replyQueue = $this->fsContext->createQueue($receivedMessage->getReplyTo());
+ $replyMessage = $this->fsContext->createMessage('This a reply!');
+ $replyMessage->setCorrelationId($receivedMessage->getCorrelationId());
+
+ $this->fsContext->createProducer()->send($replyQueue, $replyMessage);
+
+ $actualReplyMessage = $promise->getMessage();
+ $this->assertInstanceOf(FsMessage::class, $actualReplyMessage);
+ }
+}
diff --git a/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php b/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php
new file mode 100644
index 000000000..316a5326b
--- /dev/null
+++ b/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php
@@ -0,0 +1,123 @@
+assertClassImplements(TransportFactoryInterface::class, FsTransportFactory::class);
+ }
+
+ public function testCouldBeConstructedWithDefaultName()
+ {
+ $transport = new FsTransportFactory();
+
+ $this->assertEquals('fs', $transport->getName());
+ }
+
+ public function testCouldBeConstructedWithCustomName()
+ {
+ $transport = new FsTransportFactory('theCustomName');
+
+ $this->assertEquals('theCustomName', $transport->getName());
+ }
+
+ public function testShouldAllowAddConfiguration()
+ {
+ $transport = new FsTransportFactory();
+ $tb = new TreeBuilder();
+ $rootNode = $tb->root('foo');
+
+ $transport->addConfiguration($rootNode);
+ $processor = new Processor();
+ $config = $processor->process($tb->buildTree(), [[
+ 'store_dir' => sys_get_temp_dir(),
+ ]]);
+
+ $this->assertEquals([
+ 'store_dir' => sys_get_temp_dir(),
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ], $config);
+ }
+
+ public function testShouldCreateConnectionFactory()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new FsTransportFactory();
+
+ $serviceId = $transport->createConnectionFactory($container, [
+ 'store_dir' => sys_get_temp_dir(),
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ]);
+
+ $this->assertTrue($container->hasDefinition($serviceId));
+ $factory = $container->getDefinition($serviceId);
+ $this->assertEquals(FsConnectionFactory::class, $factory->getClass());
+ $this->assertSame([[
+ 'store_dir' => sys_get_temp_dir(),
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ]], $factory->getArguments());
+ }
+
+ public function testShouldCreateContext()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new FsTransportFactory();
+
+ $serviceId = $transport->createContext($container, [
+ 'store_dir' => sys_get_temp_dir(),
+ 'pre_fetch_count' => 1,
+ 'chmod' => 0600,
+ ]);
+
+ $this->assertEquals('enqueue.transport.fs.context', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $context = $container->getDefinition('enqueue.transport.fs.context');
+ $this->assertInstanceOf(Reference::class, $context->getFactory()[0]);
+ $this->assertEquals('enqueue.transport.fs.connection_factory', (string) $context->getFactory()[0]);
+ $this->assertEquals('createContext', $context->getFactory()[1]);
+ }
+
+ public function testShouldCreateDriver()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new FsTransportFactory();
+
+ $serviceId = $transport->createDriver($container, []);
+
+ $this->assertEquals('enqueue.client.fs.driver', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $driver = $container->getDefinition($serviceId);
+ $this->assertSame(FsDriver::class, $driver->getClass());
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(0));
+ $this->assertEquals('enqueue.transport.fs.context', (string) $driver->getArgument(0));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(1));
+ $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(2));
+ $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
+ }
+}
diff --git a/pkg/fs/composer.json b/pkg/fs/composer.json
new file mode 100644
index 000000000..a33ad2333
--- /dev/null
+++ b/pkg/fs/composer.json
@@ -0,0 +1,34 @@
+{
+ "name": "enqueue/fs",
+ "type": "library",
+ "description": "Enqueue Filesystem based transport",
+ "keywords": ["messaging", "queue", "filesystem", "local"],
+ "license": "MIT",
+ "repositories": [
+ {
+ "type": "vcs",
+ "url": "git@github.com:php-enqueue/test.git"
+ }
+ ],
+ "require": {
+ "php": ">=5.6",
+ "symfony/filesystem": "^2.8|^3",
+ "makasim/temp-file": "^0.2"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.5",
+ "enqueue/test": "^0.2"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\Fs\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.2.x-dev"
+ }
+ }
+}
diff --git a/pkg/fs/phpunit.xml.dist b/pkg/fs/phpunit.xml.dist
new file mode 100644
index 000000000..0ba207de0
--- /dev/null
+++ b/pkg/fs/phpunit.xml.dist
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Resources
+ ./Tests
+
+
+
+