From 56cdf33075b377755ba3b01a6de0be734188bce5 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 12 Jan 2017 22:59:09 +0200 Subject: [PATCH] Implement Filesystem transport --- bin/dev | 2 +- bin/subtree-split | 2 + composer.json | 5 + docs/bundle/config_reference.md | 13 +- docs/client/supported_brokers.md | 2 +- docs/filesystem_transport.md | 82 +++++ docs/index.md | 7 +- phpunit.xml.dist | 4 + .../Symfony/AmqpTransportFactoryTest.php | 9 + pkg/enqueue-bundle/EnqueueBundle.php | 6 + .../Tests/Unit/EnqueueBundleTest.php | 18 + pkg/fs/.gitignore | 6 + pkg/fs/.travis.yml | 21 ++ pkg/fs/Client/FsDriver.php | 179 +++++++++ pkg/fs/FsConnectionFactory.php | 35 ++ pkg/fs/FsConsumer.php | 168 +++++++++ pkg/fs/FsContext.php | 228 ++++++++++++ pkg/fs/FsDestination.php | 54 +++ pkg/fs/FsMessage.php | 234 ++++++++++++ pkg/fs/FsProducer.php | 59 +++ pkg/fs/LICENSE | 19 + pkg/fs/README.md | 11 + pkg/fs/Symfony/FsTransportFactory.php | 108 ++++++ pkg/fs/Tests/Driver/FsDriverTest.php | 347 ++++++++++++++++++ pkg/fs/Tests/FsConnectionFactoryTest.php | 57 +++ pkg/fs/Tests/FsConsumerTest.php | 143 ++++++++ pkg/fs/Tests/FsContextTest.php | 236 ++++++++++++ pkg/fs/Tests/FsDestinationTest.php | 65 ++++ pkg/fs/Tests/FsMessageTest.php | 179 +++++++++ pkg/fs/Tests/FsProducerTest.php | 72 ++++ .../Tests/Functional/FsCommonUseCasesTest.php | 146 ++++++++ pkg/fs/Tests/Functional/FsConsumerTest.php | 70 ++++ .../Functional/FsConsumptionUseCasesTest.php | 109 ++++++ pkg/fs/Tests/Functional/FsProducerTest.php | 47 +++ pkg/fs/Tests/Functional/FsRpcUseCasesTest.php | 94 +++++ .../Tests/Symfony/FsTransportFactoryTest.php | 123 +++++++ pkg/fs/composer.json | 34 ++ pkg/fs/phpunit.xml.dist | 31 ++ 38 files changed, 3019 insertions(+), 6 deletions(-) create mode 100644 docs/filesystem_transport.md create mode 100644 pkg/fs/.gitignore create mode 100644 pkg/fs/.travis.yml create mode 100644 pkg/fs/Client/FsDriver.php create mode 100644 pkg/fs/FsConnectionFactory.php create mode 100644 pkg/fs/FsConsumer.php create mode 100644 pkg/fs/FsContext.php create mode 100644 pkg/fs/FsDestination.php create mode 100644 pkg/fs/FsMessage.php create mode 100644 pkg/fs/FsProducer.php create mode 100644 pkg/fs/LICENSE create mode 100644 pkg/fs/README.md create mode 100644 pkg/fs/Symfony/FsTransportFactory.php create mode 100644 pkg/fs/Tests/Driver/FsDriverTest.php create mode 100644 pkg/fs/Tests/FsConnectionFactoryTest.php create mode 100644 pkg/fs/Tests/FsConsumerTest.php create mode 100644 pkg/fs/Tests/FsContextTest.php create mode 100644 pkg/fs/Tests/FsDestinationTest.php create mode 100644 pkg/fs/Tests/FsMessageTest.php create mode 100644 pkg/fs/Tests/FsProducerTest.php create mode 100644 pkg/fs/Tests/Functional/FsCommonUseCasesTest.php create mode 100644 pkg/fs/Tests/Functional/FsConsumerTest.php create mode 100644 pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php create mode 100644 pkg/fs/Tests/Functional/FsProducerTest.php create mode 100644 pkg/fs/Tests/Functional/FsRpcUseCasesTest.php create mode 100644 pkg/fs/Tests/Symfony/FsTransportFactoryTest.php create mode 100644 pkg/fs/composer.json create mode 100644 pkg/fs/phpunit.xml.dist diff --git a/bin/dev b/bin/dev index 22aa8e2f5..ce28e36f5 100755 --- a/bin/dev +++ b/bin/dev @@ -21,7 +21,7 @@ while getopts "bustefc" OPTION; do ./bin/php-cs-fixer fix ;; t) - COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test + COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$2" ;; c) COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm generate-changelog github_changelog_generator --future-release "$2" --simple-list diff --git a/bin/subtree-split b/bin/subtree-split index 0d030e9e3..678753169 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -47,6 +47,7 @@ remote psr-queue git@github.com:php-enqueue/psr-queue.git remote enqueue git@github.com:php-enqueue/enqueue.git remote stomp git@github.com:php-enqueue/stomp.git remote amqp-ext git@github.com:php-enqueue/amqp-ext.git +remote fs git@github.com:php-enqueue/fs.git remote enqueue-bundle git@github.com:php-enqueue/enqueue-bundle.git remote job-queue git@github.com:php-enqueue/job-queue.git remote test git@github.com:php-enqueue/test.git @@ -55,6 +56,7 @@ split 'pkg/psr-queue' psr-queue split 'pkg/enqueue' enqueue split 'pkg/stomp' stomp split 'pkg/amqp-ext' amqp-ext +split 'pkg/fs' fs split 'pkg/enqueue-bundle' enqueue-bundle split 'pkg/job-queue' job-queue split 'pkg/test' test diff --git a/composer.json b/composer.json index 1a6941dd4..2f5381b1d 100644 --- a/composer.json +++ b/composer.json @@ -8,6 +8,7 @@ "enqueue/enqueue": "*@dev", "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", + "enqueue/fs": "*@dev", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", "enqueue/test": "*@dev" @@ -51,6 +52,10 @@ { "type": "path", "url": "pkg/job-queue" + }, + { + "type": "path", + "url": "pkg/fs" } ] } diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index 8caf72fe1..2c5a9fe0f 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -1,7 +1,8 @@ # Config reference +You can get this info by running `./bin/console config:dump-reference enqueue` command. + ```yaml -# Default configuration for extension with alias: "enqueue" enqueue: transport: # Required default: @@ -91,6 +92,16 @@ enqueue: # The option tells whether RabbitMQ broker has delay plugin installed or not delay_plugin_installed: false + fs: + + # The store directory where all queue\topics files will be created and messages are stored + store_dir: ~ # Required + + # The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose. + pre_fetch_count: 1 + + # The queue files are created with this given permissions if not exist. + chmod: 384 client: traceable_producer: false prefix: enqueue diff --git a/docs/client/supported_brokers.md b/docs/client/supported_brokers.md index 39c92c215..aff2f5bff 100644 --- a/docs/client/supported_brokers.md +++ b/docs/client/supported_brokers.md @@ -8,7 +8,7 @@ Here's the list of protocols and Client features supported by them | RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes | | STOMP | No | No | Yes | No | Yes** | | RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** | - +| Filesystem | No | No | No | Yes | No | * \* Possible if a RabbitMQ delay plugin is installed. * \*\* Possible if topics (exchanges) are configured on broker side manually. diff --git a/docs/filesystem_transport.md b/docs/filesystem_transport.md new file mode 100644 index 000000000..db3ebb450 --- /dev/null +++ b/docs/filesystem_transport.md @@ -0,0 +1,82 @@ +# Filesystem transport + +Use files on local filesystem as queues. +It creates a file per queue\topic. +A message is a line inside the file. +**Limitations** It works only in auto ack mode. Local by nature therefor messages are not visible on other servers. + +* [Create context](#create-context) +* [Declare topic](#declare-topic) +* [Declare queue](#decalre-queue) +* [Bind queue to topic](#bind-queue-to-topic) +* [Send message to topic](#send-message-to-topic) +* [Send message to queue](#send-message-to-queue) +* [Consume message](#consume-message) +* [Purge queue messages](#purge-queue-messages) + +## Create context + +```php + '/tmp' +]); + +$psrContext = $connectionFactory->createContext(); +``` + +## Send message to topic + +```php +createTopic('aTopic'); +$message = $psrContext->createMessage('Hello world!'); + +$psrContext->createProducer()->send($fooTopic, $message); +``` + +## Send message to queue + +```php +createQueue('aQueue'); +$message = $psrContext->createMessage('Hello world!'); + +$psrContext->createProducer()->send($fooQueue, $message); +``` + +## Consume message: + +```php +createQueue('aQueue'); +$consumer = $psrContext->createConsumer($fooQueue); + +$message = $consumer->receive(); + +// process a message + +$consumer->acknowledge($message); +// $consumer->reject($message); +``` + +## Purge queue messages: + +```php +createQueue('aQueue'); + +$psrContext->purge($fooQueue); +``` + +[back to index](index.md) \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 6dfcaebc9..ada57c0ab 100644 --- a/docs/index.md +++ b/docs/index.md @@ -2,9 +2,10 @@ * [Quick tour](quick_tour.md) * Transports - - [AMQP](amqp_transport.md) - - [STOMP](stomp_transport.md) - - [NULL](null_transport.md) + - [Amqp](amqp_transport.md) + - [Stomp](stomp_transport.md) + - [Filesystem](filesystem_transport.md) + - [Null](null_transport.md) * Consumption - [Extensions](consumption/extensions.md) * Client diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 21c4f77fe..fb3be97c3 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -29,6 +29,10 @@ pkg/amqp-ext/Tests + + pkg/fs/Tests + + pkg/enqueue-bundle/Tests diff --git a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php index 57064a396..ba4dee55c 100644 --- a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php @@ -121,5 +121,14 @@ public function testShouldCreateDriver() $driver = $container->getDefinition($serviceId); $this->assertSame(AmqpDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.amqp.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); } } diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 44b14cab1..f8e75cebc 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -11,6 +11,8 @@ use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\DependencyInjection\EnqueueExtension; +use Enqueue\Fs\FsContext; +use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Stomp\StompContext; use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; use Enqueue\Stomp\Symfony\StompTransportFactory; @@ -46,5 +48,9 @@ public function build(ContainerBuilder $container) $extension->addTransportFactory(new AmqpTransportFactory()); $extension->addTransportFactory(new RabbitMqAmqpTransportFactory()); } + + if (class_exists(FsContext::class)) { + $extension->addTransportFactory(new FsTransportFactory()); + } } } diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index 2e9f6589f..3f390fa78 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -11,6 +11,7 @@ use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\DependencyInjection\EnqueueExtension; use Enqueue\Bundle\EnqueueBundle; +use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; use Enqueue\Stomp\Symfony\StompTransportFactory; use Enqueue\Symfony\DefaultTransportFactory; @@ -139,6 +140,23 @@ public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories() $bundle->build($container); } + public function testShouldRegisterFSTransportFactory() + { + $extensionMock = $this->createEnqueueExtensionMock(); + + $container = new ContainerBuilder(); + $container->registerExtension($extensionMock); + + $extensionMock + ->expects($this->at(6)) + ->method('addTransportFactory') + ->with($this->isInstanceOf(FsTransportFactory::class)) + ; + + $bundle = new EnqueueBundle(); + $bundle->build($container); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension */ diff --git a/pkg/fs/.gitignore b/pkg/fs/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/fs/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/fs/.travis.yml b/pkg/fs/.travis.yml new file mode 100644 index 000000000..42374ddc7 --- /dev/null +++ b/pkg/fs/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/fs/Client/FsDriver.php b/pkg/fs/Client/FsDriver.php new file mode 100644 index 000000000..a8cd5d1dd --- /dev/null +++ b/pkg/fs/Client/FsDriver.php @@ -0,0 +1,179 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $topic = $this->createRouterTopic(); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($topic, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[FsDriver] '.$text, ...$args)); + }; + + // setup router + $routerTopic = $this->createRouterTopic(); + $routerQueue = $this->createQueue($this->config->getRouterQueueName()); + + $log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo()); + $this->context->declareDestination($routerTopic); + + $log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo()); + $this->context->declareDestination($routerQueue); + + // setup queues + foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { + $queue = $this->createQueue($meta->getClientName()); + + $log('Declare processor queue "%s" file: %s', $queue->getQueueName(), $queue->getFileInfo()); + $this->context->declareDestination($queue); + } + } + + /** + * {@inheritdoc} + * + * @return FsDestination + */ + public function createQueue($queueName) + { + return $this->context->createQueue($this->config->createTransportQueueName($queueName)); + } + + /** + * {@inheritdoc} + * + * @return FsMessage + */ + public function createTransportMessage(Message $message) + { + $properties = $message->getProperties(); + + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($properties); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + + return $transportMessage; + } + + /** + * @param FsMessage $message + * + * {@inheritdoc} + */ + public function createClientMessage(TransportMessage $message) + { + $clientMessage = new Message(); + + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setContentType($message->getHeader('content_type')); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setPriority(MessagePriority::NORMAL); + + return $clientMessage; + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->config; + } + + /** + * @return FsDestination + */ + private function createRouterTopic() + { + return $this->context->createTopic( + $this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) + ); + } +} diff --git a/pkg/fs/FsConnectionFactory.php b/pkg/fs/FsConnectionFactory.php new file mode 100644 index 000000000..417e63542 --- /dev/null +++ b/pkg/fs/FsConnectionFactory.php @@ -0,0 +1,35 @@ +config = array_replace([ + 'store_dir' => null, + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ], $config); + } + + /** + * {@inheritdoc} + * + * @return FsContext + */ + public function createContext() + { + return new FsContext($this->config['store_dir'], $this->config['pre_fetch_count'], $this->config['chmod']); + } +} diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php new file mode 100644 index 000000000..ba47de3cb --- /dev/null +++ b/pkg/fs/FsConsumer.php @@ -0,0 +1,168 @@ +context = $context; + $this->destination = $destination; + $this->preFetchCount = $preFetchCount; + + $this->preFetchedMessages = []; + } + + /** + * {@inheritdoc} + * + * @return FsDestination + */ + public function getQueue() + { + return $this->destination; + } + + /** + * {@inheritdoc} + * + * @return FsMessage|null + */ + public function receive($timeout = 0) + { + $end = microtime(true) + ($timeout / 1000); + while (0 === $timeout || microtime(true) < $end) { + if ($message = $this->receiveNoWait()) { + return $message; + } + + usleep(100); + } + } + + /** + * {@inheritdoc} + */ + public function receiveNoWait() + { + if ($this->preFetchedMessages) { + return array_shift($this->preFetchedMessages); + } + + $this->context->workWithFile($this->destination, 'c+', function (FsDestination $destination, $file) { + $count = $this->preFetchCount; + while ($count) { + $frame = $this->readFrame($file, 1); + ftruncate($file, fstat($file)['size'] - strlen($frame)); + rewind($file); + + $rawMessage = substr(trim($frame), 1); + + if ($rawMessage) { + try { + $this->preFetchedMessages[] = FsMessage::jsonUnserialize($rawMessage); + } catch (\Exception $e) { + throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e); + } + } else { + return; + } + + --$count; + } + }); + + if ($this->preFetchedMessages) { + return array_shift($this->preFetchedMessages); + } + } + + /** + * {@inheritdoc} + */ + public function acknowledge(Message $message) + { + // do nothing. fs transport always works in auto ack mode + } + + /** + * {@inheritdoc} + */ + public function reject(Message $message, $requeue = false) + { + // do nothing on reject. fs transport always works in auto ack mode + + if ($requeue) { + $this->context->createProducer()->send($this->destination, $message); + } + } + + /** + * @return int + */ + public function getPreFetchCount() + { + return $this->preFetchCount; + } + + /** + * @param int $preFetchCount + */ + public function setPreFetchCount($preFetchCount) + { + $this->preFetchCount = $preFetchCount; + } + + /** + * @param resource $file + * @param int $frameNumber + * + * @return string + */ + private function readFrame($file, $frameNumber) + { + $frameSize = 64; + $offset = $frameNumber * $frameSize; + + fseek($file, -$offset, SEEK_END); + $frame = fread($file, $frameSize); + + if ('' == $frame) { + return ''; + } + + if (false !== strpos($frame, '|{')) { + return $frame; + } + + return $this->readFrame($file, $frameNumber + 1).$frame; + } +} diff --git a/pkg/fs/FsContext.php b/pkg/fs/FsContext.php new file mode 100644 index 000000000..d90fc0d9e --- /dev/null +++ b/pkg/fs/FsContext.php @@ -0,0 +1,228 @@ +storeDir = $storeDir; + $this->preFetchCount = $preFetchCount; + $this->chmod = $chmod; + + $this->lockHandlers = []; + } + + /** + * {@inheritdoc} + * + * @return FsMessage + */ + public function createMessage($body = '', array $properties = [], array $headers = []) + { + return new FsMessage($body, $properties, $headers); + } + + /** + * {@inheritdoc} + * + * @return FsDestination + */ + public function createTopic($topicName) + { + return $this->createQueue($topicName); + } + + /** + * {@inheritdoc} + * + * @return FsDestination + */ + public function createQueue($queueName) + { + return new FsDestination(new \SplFileInfo($this->getStoreDir().'/'.$queueName)); + } + + /** + * @param Destination|FsDestination $destination + */ + public function declareDestination(Destination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); + + set_error_handler(function ($severity, $message, $file, $line) { + throw new \ErrorException($message, 0, $severity, $file, $line); + }); + + try { + if (false == file_exists($destination->getFileInfo())) { + touch($destination->getFileInfo()); + chmod($destination->getFileInfo(), $this->chmod); + } + } finally { + restore_error_handler(); + } + } + + public function workWithFile(FsDestination $destination, $mode, callable $callback) + { + $this->declareDestination($destination); + + set_error_handler(function ($severity, $message, $file, $line) { + throw new \ErrorException($message, 0, $severity, $file, $line); + }); + + try { + $file = fopen($destination->getFileInfo(), $mode); + $lockHandler = $this->getLockHandler($destination); + + if (false == $lockHandler->lock(true)) { + throw new \LogicException(sprintf('Cannot obtain the lock for destination %s', $destination->getName())); + } + + return call_user_func($callback, $destination, $file); + } finally { + if (isset($file)) { + fclose($file); + } + if (isset($lockHandler)) { + $lockHandler->release(); + } + + restore_error_handler(); + } + } + + /** + * {@inheritdoc} + * + * @return FsDestination + */ + public function createTemporaryQueue() + { + return new FsDestination(new TempFile($this->getStoreDir().'/'.uniqid('tmp-q-', true))); + } + + /** + * {@inheritdoc} + * + * @return FsProducer + */ + public function createProducer() + { + return new FsProducer($this); + } + + /** + * {@inheritdoc} + * + * @param FsDestination $destination + * + * @return FsConsumer + */ + public function createConsumer(Destination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); + + return new FsConsumer($this, $destination, $this->preFetchCount); + } + + public function close() + { + foreach ($this->lockHandlers as $lockHandler) { + $lockHandler->release(); + } + + $this->lockHandlers = []; + } + + /** + * @param Queue|FsDestination $queue + */ + public function purge(Queue $queue) + { + InvalidDestinationException::assertDestinationInstanceOf($queue, FsDestination::class); + + $this->workWithFile($queue, 'c', function (FsDestination $destination, $file) use ($queue) { + ftruncate($file, 0); + }); + } + + /** + * @return int + */ + public function getPreFetchCount() + { + return $this->preFetchCount; + } + + /** + * @param int $preFetchCount + */ + public function setPreFetchCount($preFetchCount) + { + $this->preFetchCount = $preFetchCount; + } + + /** + * @return string + */ + private function getStoreDir() + { + if (false == is_dir($this->storeDir)) { + throw new \LogicException(sprintf('The directory %s does not exist', $this->storeDir)); + } + + if (false == is_writable($this->storeDir)) { + throw new \LogicException(sprintf('The directory %s is not writable', $this->storeDir)); + } + + return $this->storeDir; + } + + /** + * @param FsDestination $destination + * + * @return LockHandler + */ + private function getLockHandler(FsDestination $destination) + { + if (false == isset($this->lockHandlers[$destination->getName()])) { + $this->lockHandlers[$destination->getName()] = new LockHandler($destination->getName(), $this->storeDir); + } + + return $this->lockHandlers[$destination->getName()]; + } +} diff --git a/pkg/fs/FsDestination.php b/pkg/fs/FsDestination.php new file mode 100644 index 000000000..c357a262a --- /dev/null +++ b/pkg/fs/FsDestination.php @@ -0,0 +1,54 @@ +file = $file; + } + + /** + * @return \SplFileInfo + */ + public function getFileInfo() + { + return $this->file; + } + + /** + * @return string + */ + public function getName() + { + return $this->file->getFilename(); + } + + /** + * {@inheritdoc} + */ + public function getQueueName() + { + return $this->getName(); + } + + /** + * {@inheritdoc} + */ + public function getTopicName() + { + return $this->getName(); + } +} diff --git a/pkg/fs/FsMessage.php b/pkg/fs/FsMessage.php new file mode 100644 index 000000000..881d3dbb9 --- /dev/null +++ b/pkg/fs/FsMessage.php @@ -0,0 +1,234 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } + + /** + * @param string $body + */ + public function setBody($body) + { + $this->body = $body; + } + + /** + * {@inheritdoc} + */ + public function getBody() + { + return $this->body; + } + + /** + * @param array $properties + */ + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + /** + * {@inheritdoc} + */ + public function getProperties() + { + return $this->properties; + } + + /** + * {@inheritdoc} + */ + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + /** + * @param array $headers + */ + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + /** + * {@inheritdoc} + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * {@inheritdoc} + */ + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + /** + * @return bool + */ + public function isRedelivered() + { + return $this->redelivered; + } + + /** + * @param bool $redelivered + */ + public function setRedelivered($redelivered) + { + $this->redelivered = $redelivered; + } + + /** + * {@inheritdoc} + */ + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', (string) $correlationId); + } + + /** + * {@inheritdoc} + */ + public function getCorrelationId() + { + return $this->getHeader('correlation_id', ''); + } + + /** + * {@inheritdoc} + */ + public function setMessageId($messageId) + { + $this->setHeader('message_id', (string) $messageId); + } + + /** + * {@inheritdoc} + */ + public function getMessageId() + { + return $this->getHeader('message_id', ''); + } + + /** + * {@inheritdoc} + */ + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return $value === null ? null : (int) $value; + } + + /** + * {@inheritdoc} + */ + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + /** + * @param string|null $replyTo + */ + public function setReplyTo($replyTo) + { + $this->setHeader('reply-to', $replyTo); + } + + /** + * @return string|null + */ + public function getReplyTo() + { + return $this->getHeader('reply-to'); + } + + /** + * {@inheritdoc} + */ + public function jsonSerialize() + { + return [ + 'body' => $this->getBody(), + 'properties' => $this->getProperties(), + 'headers' => $this->getHeaders(), + ]; + } + + /** + * @param string $json + * + * @return FsMessage + */ + public static function jsonUnserialize($json) + { + $data = json_decode($json, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new self($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/fs/FsProducer.php b/pkg/fs/FsProducer.php new file mode 100644 index 000000000..2f5864bf4 --- /dev/null +++ b/pkg/fs/FsProducer.php @@ -0,0 +1,59 @@ +context = $context; + } + + /** + * {@inheritdoc} + * + * @param FsDestination $destination + * @param FsMessage $message + */ + public function send(Destination $destination, Message $message) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); + InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class); + + $this->context->workWithFile($destination, 'a+', function (FsDestination $destination, $file) use ($message) { + $fileInfo = $destination->getFileInfo(); + if ($fileInfo instanceof TempFile && false == file_exists($fileInfo)) { + return; + } + + $rawMessage = '|'.json_encode($message); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'Could not encode value into json. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + $rawMessage = str_repeat(' ', 64 - (strlen($rawMessage) % 64)).$rawMessage; + + fwrite($file, $rawMessage); + }); + } +} diff --git a/pkg/fs/LICENSE b/pkg/fs/LICENSE new file mode 100644 index 000000000..70fa75252 --- /dev/null +++ b/pkg/fs/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2017 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/pkg/fs/README.md b/pkg/fs/README.md new file mode 100644 index 000000000..f3b1a9a43 --- /dev/null +++ b/pkg/fs/README.md @@ -0,0 +1,11 @@ +# Enqueue Filesystem Transport + +## Resources + +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## License + +It is released under the [MIT License](LICENSE). diff --git a/pkg/fs/Symfony/FsTransportFactory.php b/pkg/fs/Symfony/FsTransportFactory.php new file mode 100644 index 000000000..0447aed01 --- /dev/null +++ b/pkg/fs/Symfony/FsTransportFactory.php @@ -0,0 +1,108 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->children() + ->scalarNode('store_dir') + ->isRequired() + ->cannotBeEmpty() + ->info('The store directory where all queue\topics files will be created and messages are stored') + ->end() + ->integerNode('pre_fetch_count') + ->min(1) + ->defaultValue(1) + ->info('The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.') + ->end() + ->integerNode('chmod') + ->defaultValue(0600) + ->info('The queue files are created with this given permissions if not exist.') + ->end() + ; + } + + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factory = new Definition(FsConnectionFactory::class); + $factory->setArguments([$config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(FsContext::class); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(FsDriver::class); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return $this->name; + } +} diff --git a/pkg/fs/Tests/Driver/FsDriverTest.php b/pkg/fs/Tests/Driver/FsDriverTest.php new file mode 100644 index 000000000..47d145302 --- /dev/null +++ b/pkg/fs/Tests/Driver/FsDriverTest.php @@ -0,0 +1,347 @@ +assertClassImplements(DriverInterface::class, FsDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new FsDriver( + $this->createPsrContextMock(), + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + } + + public function testShouldReturnConfigObject() + { + $config = new Config('', '', '', '', '', ''); + + $driver = new FsDriver($this->createPsrContextMock(), $config, $this->createQueueMetaRegistryMock()); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new FsDestination(new TempFile(sys_get_temp_dir().'/queue-name')); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('name') + ->will($this->returnValue($expectedQueue)) + ; + + $driver = new FsDriver($context, new Config('', '', '', '', '', ''), $this->createQueueMetaRegistryMock()); + + $queue = $driver->createQueue('name'); + + $this->assertSame($expectedQueue, $queue); + $this->assertSame('queue-name', $queue->getQueueName()); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new FsMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + + $driver = new FsDriver( + $this->createPsrContextMock(), + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + + $this->assertNull($clientMessage->getExpire()); + $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setPriority(MessagePriority::VERY_HIGH); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new FsMessage()) + ; + + $driver = new FsDriver( + $context, + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(FsMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new FsDestination(TempFile::generate()); + $transportMessage = new FsMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new FsDriver( + $context, + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new FsDriver( + $this->createPsrContextMock(), + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new FsDestination(TempFile::generate()); + $transportMessage = new FsMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new FsDriver( + $context, + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'queue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new FsDriver( + $this->createPsrContextMock(), + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new FsDriver( + $this->createPsrContextMock(), + new Config('', '', '', '', '', ''), + $this->createQueueMetaRegistryMock() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $routerTopic = new FsDestination(TempFile::generate()); + $routerQueue = new FsDestination(TempFile::generate()); + + $processorQueue = new FsDestination(TempFile::generate()); + + $context = $this->createPsrContextMock(); + // setup router + $context + ->expects($this->at(0)) + ->method('createTopic') + ->willReturn($routerTopic) + ; + $context + ->expects($this->at(1)) + ->method('createQueue') + ->willReturn($routerQueue) + ; + $context + ->expects($this->at(2)) + ->method('declareDestination') + ->with($this->identicalTo($routerTopic)) + ; + $context + ->expects($this->at(3)) + ->method('declareDestination') + ->with($this->identicalTo($routerQueue)) + ; + // setup processor queue + $context + ->expects($this->at(4)) + ->method('createQueue') + ->willReturn($processorQueue) + ; + $context + ->expects($this->at(5)) + ->method('declareDestination') + ->with($this->identicalTo($processorQueue)) + ; + + $meta = new QueueMetaRegistry(new Config('', '', '', '', '', ''), [ + 'default' => [], + ], 'default'); + + $driver = new FsDriver( + $context, + new Config('', '', '', '', '', ''), + $meta + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|FsContext + */ + private function createPsrContextMock() + { + return $this->createMock(FsContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Producer + */ + private function createPsrProducerMock() + { + return $this->createMock(Producer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|QueueMetaRegistry + */ + private function createQueueMetaRegistryMock() + { + return $this->createMock(QueueMetaRegistry::class); + } +} diff --git a/pkg/fs/Tests/FsConnectionFactoryTest.php b/pkg/fs/Tests/FsConnectionFactoryTest.php new file mode 100644 index 000000000..fa356cd12 --- /dev/null +++ b/pkg/fs/Tests/FsConnectionFactoryTest.php @@ -0,0 +1,57 @@ +assertClassImplements(ConnectionFactory::class, FsConnectionFactory::class); + } + + public function testCouldBeConstructedWithEmptyConfiguration() + { + $factory = new FsConnectionFactory([]); + + $this->assertAttributeEquals([ + 'store_dir' => null, + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ], 'config', $factory); + } + + public function testCouldBeConstructedWithCustomConfiguration() + { + $factory = new FsConnectionFactory(['store_dir' => 'theCustomDir']); + + $this->assertAttributeEquals([ + 'store_dir' => 'theCustomDir', + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ], 'config', $factory); + } + + public function testShouldCreateContext() + { + $factory = new FsConnectionFactory([ + 'store_dir' => 'theDir', + 'pre_fetch_count' => 123, + 'chmod' => 0765, + ]); + + $context = $factory->createContext(); + + $this->assertInstanceOf(FsContext::class, $context); + + $this->assertAttributeSame('theDir', 'storeDir', $context); + $this->assertAttributeSame(123, 'preFetchCount', $context); + $this->assertAttributeSame(0765, 'chmod', $context); + } +} diff --git a/pkg/fs/Tests/FsConsumerTest.php b/pkg/fs/Tests/FsConsumerTest.php new file mode 100644 index 000000000..8fcf062f8 --- /dev/null +++ b/pkg/fs/Tests/FsConsumerTest.php @@ -0,0 +1,143 @@ +assertClassImplements(Consumer::class, FsConsumer::class); + } + + public function testCouldBeConstructedWithContextAndDestinationAndPreFetchCountAsArguments() + { + new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 1); + } + + public function testShouldReturnDestinationSetInConstructorOnGetQueue() + { + $destination = new FsDestination(TempFile::generate()); + + $consumer = new FsConsumer($this->createContextMock(), $destination, 1); + + $this->assertSame($destination, $consumer->getQueue()); + } + + public function testShouldAllowGetPreFetchCountSetInConstructor() + { + $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); + + $this->assertSame(123, $consumer->getPreFetchCount()); + } + + public function testShouldAllowGetPreviouslySetPreFetchCount() + { + $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); + + $consumer->setPreFetchCount(456); + + $this->assertSame(456, $consumer->getPreFetchCount()); + } + + public function testShouldDoNothingOnAcknowledge() + { + $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); + + $consumer->acknowledge(new FsMessage()); + } + + public function testShouldDoNothingOnReject() + { + $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); + + $consumer->reject(new FsMessage()); + } + + public function testShouldSendSameMessageToDestinationOnReQueue() + { + $message = new FsMessage(); + + $destination = new FsDestination(TempFile::generate()); + + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($destination), $this->identicalTo($message)) + ; + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producerMock) + ; + + $consumer = new FsConsumer($contextMock, $destination, 123); + + $consumer->reject($message, true); + } + + public function testShouldCallContextWorkWithFileAndCallbackToItOnReceiveNoWait() + { + $destination = new FsDestination(TempFile::generate()); + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('workWithFile') + ->with($this->identicalTo($destination), 'c+', $this->isInstanceOf(\Closure::class)) + ; + + $consumer = new FsConsumer($contextMock, $destination, 1); + + $consumer->receiveNoWait(); + } + + public function testShouldWaitTwoSecondsForMessageAndExitOnReceive() + { + $destination = new FsDestination(TempFile::generate()); + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->atLeastOnce()) + ->method('workWithFile') + ; + + $consumer = new FsConsumer($contextMock, $destination, 1); + + $start = microtime(true); + $consumer->receive(2000); + $end = microtime(true); + + $this->assertGreaterThan(1.5, $end - $start); + $this->assertLessThan(3.5, $end - $start); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|FsProducer + */ + private function createProducerMock() + { + return $this->createMock(FsProducer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|FsContext + */ + private function createContextMock() + { + return $this->createMock(FsContext::class); + } +} diff --git a/pkg/fs/Tests/FsContextTest.php b/pkg/fs/Tests/FsContextTest.php new file mode 100644 index 000000000..f51a0b242 --- /dev/null +++ b/pkg/fs/Tests/FsContextTest.php @@ -0,0 +1,236 @@ +assertClassImplements(Context::class, FsContext::class); + } + + public function testCouldBeConstructedWithExpectedArguments() + { + new FsContext(sys_get_temp_dir(), 1, 0666); + } + + public function testShouldAllowCreateEmptyMessage() + { + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $message = $context->createMessage(); + + $this->assertInstanceOf(FsMessage::class, $message); + + $this->assertSame('', $message->getBody()); + $this->assertSame([], $message->getProperties()); + $this->assertSame([], $message->getHeaders()); + } + + public function testShouldAllowCreateCustomMessage() + { + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $this->assertInstanceOf(FsMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testShouldCreateQueue() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $queue = $context->createQueue($tmpFile->getFilename()); + + $this->assertInstanceOf(FsDestination::class, $queue); + $this->assertInstanceOf(\SplFileInfo::class, $queue->getFileInfo()); + $this->assertSame((string) $tmpFile, (string) $queue->getFileInfo()); + + $this->assertSame($tmpFile->getFilename(), $queue->getTopicName()); + } + + public function testShouldAllowCreateTopic() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $topic = $context->createTopic($tmpFile->getFilename()); + + $this->assertInstanceOf(FsDestination::class, $topic); + $this->assertInstanceOf(\SplFileInfo::class, $topic->getFileInfo()); + $this->assertSame((string) $tmpFile, (string) $topic->getFileInfo()); + + $this->assertSame($tmpFile->getFilename(), $topic->getTopicName()); + } + + public function testShouldAllowCreateTmpQueue() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $queue = $context->createTemporaryQueue(); + + $this->assertInstanceOf(FsDestination::class, $queue); + $this->assertInstanceOf(TempFile::class, $queue->getFileInfo()); + $this->assertNotEmpty($queue->getQueueName()); + } + + public function testShouldCreateProducer() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $producer = $context->createProducer(); + + $this->assertInstanceOf(FsProducer::class, $producer); + } + + public function testShouldThrowIfNotFsDestinationGivenOnCreateConsumer() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\Fs\FsDestination but got Enqueue\Transport\Null\NullQueue.'); + $consumer = $context->createConsumer(new NullQueue('aQueue')); + + $this->assertInstanceOf(FsConsumer::class, $consumer); + } + + public function testShouldCreateConsumer() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $queue = $context->createQueue($tmpFile->getFilename()); + + $context->createConsumer($queue); + } + + public function testShouldPropagatePreFetchCountToCreatedConsumer() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 123, 0666); + + $queue = $context->createQueue($tmpFile->getFilename()); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertInstanceOf(FsConsumer::class, $consumer); + + $this->assertAttributeSame(123, 'preFetchCount', $consumer); + } + + public function testShouldAllowGetPreFetchCountSetInConstructor() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 123, 0666); + + $this->assertSame(123, $context->getPreFetchCount()); + } + + public function testShouldAllowGetPreviouslySetPreFetchCount() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $context->setPreFetchCount(456); + + $this->assertSame(456, $context->getPreFetchCount()); + } + + public function testShouldAllowPurgeMessagesFromQueue() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); + + file_put_contents($tmpFile, 'foo'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $queue = $context->createQueue($tmpFile->getFilename()); + + $context->purge($queue); + + $this->assertEmpty(file_get_contents($tmpFile)); + } + + public function testShouldReleaseAllLocksOnClose() + { + new TempFile(sys_get_temp_dir().'/foo'); + new TempFile(sys_get_temp_dir().'/bar'); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $fooQueue = $context->createQueue('foo'); + $barQueue = $context->createTopic('bar'); + + $this->assertAttributeCount(0, 'lockHandlers', $context); + + $context->workWithFile($fooQueue, 'r+', function () { + }); + $context->workWithFile($barQueue, 'r+', function () { + }); + $context->workWithFile($fooQueue, 'c+', function () { + }); + $context->workWithFile($barQueue, 'c+', function () { + }); + + $this->assertAttributeCount(2, 'lockHandlers', $context); + + $context->close(); + + $this->assertAttributeCount(0, 'lockHandlers', $context); + } + + public function testShouldCreateFileOnFilesystemIfNotExistOnDeclareDestination() + { + $tmpFile = new TempFile(sys_get_temp_dir().'/'.uniqid()); + + $context = new FsContext(sys_get_temp_dir(), 1, 0666); + + $queue = $context->createQueue($tmpFile->getFilename()); + + $this->assertFileNotExists((string) $tmpFile); + + $context->declareDestination($queue); + + $this->assertFileExists((string) $tmpFile); + $this->assertTrue(is_readable($tmpFile)); + $this->assertTrue(is_writable($tmpFile)); + + // do nothing if file already exists + $context->declareDestination($queue); + + $this->assertFileExists((string) $tmpFile); + + unlink($tmpFile); + } +} diff --git a/pkg/fs/Tests/FsDestinationTest.php b/pkg/fs/Tests/FsDestinationTest.php new file mode 100644 index 000000000..199243a44 --- /dev/null +++ b/pkg/fs/Tests/FsDestinationTest.php @@ -0,0 +1,65 @@ +assertClassImplements(Topic::class, FsDestination::class); + $this->assertClassImplements(Queue::class, FsDestination::class); + } + + public function testCouldBeConstructedWithSplFileAsFirstArgument() + { + $splFile = new \SplFileInfo((string) TempFile::generate()); + + $destination = new FsDestination($splFile); + + $this->assertSame($splFile, $destination->getFileInfo()); + } + + public function testCouldBeConstructedWithTempFileAsFirstArgument() + { + $tmpFile = new TempFile((string) TempFile::generate()); + + $destination = new FsDestination($tmpFile); + + $this->assertSame($tmpFile, $destination->getFileInfo()); + } + + public function testShouldReturnFileNameOnGetNameCall() + { + $splFile = new \SplFileInfo((string) TempFile::generate()); + + $destination = new FsDestination($splFile); + + $this->assertSame($splFile->getFilename(), $destination->getName()); + } + + public function testShouldReturnFileNameOnGetQueueNameCall() + { + $splFile = new \SplFileInfo((string) TempFile::generate()); + + $destination = new FsDestination($splFile); + + $this->assertSame($splFile->getFilename(), $destination->getQueueName()); + } + + public function testShouldReturnFileNameOnGetTopicNameCall() + { + $splFile = new \SplFileInfo((string) TempFile::generate()); + + $destination = new FsDestination($splFile); + + $this->assertSame($splFile->getFilename(), $destination->getTopicName()); + } +} diff --git a/pkg/fs/Tests/FsMessageTest.php b/pkg/fs/Tests/FsMessageTest.php new file mode 100644 index 000000000..7412b8c4e --- /dev/null +++ b/pkg/fs/Tests/FsMessageTest.php @@ -0,0 +1,179 @@ +assertClassImplements(Message::class, FsMessage::class); + } + + public function testShouldImplementJsonSerializableInterface() + { + $this->assertClassImplements(\JsonSerializable::class, FsMessage::class); + } + + public function testCouldConstructMessageWithBody() + { + $message = new FsMessage('body'); + + $this->assertSame('body', $message->getBody()); + } + + public function testCouldConstructMessageWithProperties() + { + $message = new FsMessage('', ['key' => 'value']); + + $this->assertSame(['key' => 'value'], $message->getProperties()); + } + + public function testCouldConstructMessageWithHeaders() + { + $message = new FsMessage('', [], ['key' => 'value']); + + $this->assertSame(['key' => 'value'], $message->getHeaders()); + } + + public function testCouldSetGetBody() + { + $message = new FsMessage(); + $message->setBody('body'); + + $this->assertSame('body', $message->getBody()); + } + + public function testCouldSetGetProperties() + { + $message = new FsMessage(); + $message->setProperties(['key' => 'value']); + + $this->assertSame(['key' => 'value'], $message->getProperties()); + } + + public function testCouldSetGetHeaders() + { + $message = new FsMessage(); + $message->setHeaders(['key' => 'value']); + + $this->assertSame(['key' => 'value'], $message->getHeaders()); + } + + public function testCouldSetGetRedelivered() + { + $message = new FsMessage(); + + $message->setRedelivered(true); + $this->assertTrue($message->isRedelivered()); + + $message->setRedelivered(false); + $this->assertFalse($message->isRedelivered()); + } + + public function testCouldSetGetCorrelationId() + { + $message = new FsMessage(); + $message->setCorrelationId('the-correlation-id'); + + $this->assertSame('the-correlation-id', $message->getCorrelationId()); + } + + public function testShouldSetCorrelationIdAsHeader() + { + $message = new FsMessage(); + $message->setCorrelationId('the-correlation-id'); + + $this->assertSame(['correlation_id' => 'the-correlation-id'], $message->getHeaders()); + } + + public function testCouldSetGetMessageId() + { + $message = new FsMessage(); + $message->setMessageId('the-message-id'); + + $this->assertSame('the-message-id', $message->getMessageId()); + } + + public function testCouldSetMessageIdAsHeader() + { + $message = new FsMessage(); + $message->setMessageId('the-message-id'); + + $this->assertSame(['message_id' => 'the-message-id'], $message->getHeaders()); + } + + public function testCouldSetGetTimestamp() + { + $message = new FsMessage(); + $message->setTimestamp(12345); + + $this->assertSame(12345, $message->getTimestamp()); + } + + public function testCouldSetTimestampAsHeader() + { + $message = new FsMessage(); + $message->setTimestamp(12345); + + $this->assertSame(['timestamp' => 12345], $message->getHeaders()); + } + + public function testShouldReturnNullAsDefaultReplyTo() + { + $message = new FsMessage(); + + $this->assertSame(null, $message->getReplyTo()); + } + + public function testShouldAllowGetPreviouslySetReplyTo() + { + $message = new FsMessage(); + $message->setReplyTo('theQueueName'); + + $this->assertSame('theQueueName', $message->getReplyTo()); + } + + public function testShouldAllowGetPreviouslySetReplyToAsHeader() + { + $message = new FsMessage(); + $message->setReplyTo('theQueueName'); + + $this->assertSame(['reply-to' => 'theQueueName'], $message->getHeaders()); + } + + public function testColdBeSerializedToJson() + { + $message = new FsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); + + $this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message)); + } + + public function testCouldBeUnserializedFromJson() + { + $message = new FsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); + + $json = json_encode($message); + + //guard + $this->assertNotEmpty($json); + + $unserializedMessage = FsMessage::jsonUnserialize($json); + + $this->assertInstanceOf(FsMessage::class, $unserializedMessage); + $this->assertEquals($message, $unserializedMessage); + } + + public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The malformed json given.'); + + FsMessage::jsonUnserialize('{]'); + } +} diff --git a/pkg/fs/Tests/FsProducerTest.php b/pkg/fs/Tests/FsProducerTest.php new file mode 100644 index 000000000..0ad135661 --- /dev/null +++ b/pkg/fs/Tests/FsProducerTest.php @@ -0,0 +1,72 @@ +assertClassImplements(Producer::class, FsProducer::class); + } + + public function testCouldBeConstructedWithContextAsFirstArgument() + { + new FsProducer($this->createContextMock()); + } + + public function testThrowIfDestinationNotFsOnSend() + { + $producer = new FsProducer($this->createContextMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\Fs\FsDestination but got Enqueue\Transport\Null\NullQueue.'); + $producer->send(new NullQueue('aQueue'), new FsMessage()); + } + + public function testThrowIfMessageNotFsOnSend() + { + $producer = new FsProducer($this->createContextMock()); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\Fs\FsMessage but it is Enqueue\Transport\Null\NullMessage.'); + $producer->send(new FsDestination(TempFile::generate()), new NullMessage()); + } + + public function testShouldCallContextWorkWithFileAndCallbackToItOnSend() + { + $destination = new FsDestination(TempFile::generate()); + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('workWithFile') + ->with($this->identicalTo($destination), 'a+', $this->isInstanceOf(\Closure::class)) + ; + + $producer = new FsProducer($contextMock); + + $producer->send($destination, new FsMessage()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|FsContext + */ + private function createContextMock() + { + return $this->createMock(FsContext::class); + } +} diff --git a/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php b/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php new file mode 100644 index 000000000..0636aff67 --- /dev/null +++ b/pkg/fs/Tests/Functional/FsCommonUseCasesTest.php @@ -0,0 +1,146 @@ +fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext(); + + new TempFile(sys_get_temp_dir().'/fs_test_queue'); + } + + public function tearDown() + { + $this->fsContext->close(); + } + + public function testWaitsForTwoSecondsAndReturnNullOnReceive() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $startAt = microtime(true); + + $consumer = $this->fsContext->createConsumer($queue); + $message = $consumer->receive(2000); + + $endAt = microtime(true); + + $this->assertNull($message); + + $this->assertGreaterThan(1.5, $endAt - $startAt); + $this->assertLessThan(2.5, $endAt - $startAt); + } + + public function testReturnNullImmediatelyOnReceiveNoWait() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $startAt = microtime(true); + + $consumer = $this->fsContext->createConsumer($queue); + $message = $consumer->receiveNoWait(); + + $endAt = microtime(true); + + $this->assertNull($message); + + $this->assertLessThan(0.5, $endAt - $startAt); + } + + public function testProduceAndReceiveOneMessageSentDirectlyToQueue() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $message = $this->fsContext->createMessage( + __METHOD__, + ['FooProperty' => 'FooVal'], + ['BarHeader' => 'BarVal'] + ); + + $producer = $this->fsContext->createProducer(); + $producer->send($queue, $message); + + $consumer = $this->fsContext->createConsumer($queue); + $message = $consumer->receive(1000); + + $this->assertInstanceOf(FsMessage::class, $message); + $consumer->acknowledge($message); + + $this->assertEquals(__METHOD__, $message->getBody()); + $this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties()); + $this->assertEquals([ + 'BarHeader' => 'BarVal', + ], $message->getHeaders()); + } + + public function testProduceAndReceiveOneMessageSentDirectlyToTemporaryQueue() + { + $queue = $this->fsContext->createTemporaryQueue(); + + $message = $this->fsContext->createMessage(__METHOD__); + + $producer = $this->fsContext->createProducer(); + $producer->send($queue, $message); + + $consumer = $this->fsContext->createConsumer($queue); + $message = $consumer->receive(1000); + + $this->assertInstanceOf(FsMessage::class, $message); + $consumer->acknowledge($message); + + $this->assertEquals(__METHOD__, $message->getBody()); + } + + public function testConsumerReceiveMessageWithZeroTimeout() + { + $topic = $this->fsContext->createTopic('fs_test_queue_exchange'); + + $consumer = $this->fsContext->createConsumer($topic); + //guard + $this->assertNull($consumer->receive(1000)); + + $message = $this->fsContext->createMessage(__METHOD__); + + $producer = $this->fsContext->createProducer(); + $producer->send($topic, $message); + usleep(100); + $actualMessage = $consumer->receive(0); + + $this->assertInstanceOf(FsMessage::class, $actualMessage); + $consumer->acknowledge($message); + + $this->assertEquals(__METHOD__, $message->getBody()); + } + + public function testPurgeMessagesFromQueue() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $consumer = $this->fsContext->createConsumer($queue); + + $message = $this->fsContext->createMessage(__METHOD__); + + $producer = $this->fsContext->createProducer(); + $producer->send($queue, $message); + $producer->send($queue, $message); + + $this->fsContext->purge($queue); + + $this->assertNull($consumer->receive(1)); + } +} diff --git a/pkg/fs/Tests/Functional/FsConsumerTest.php b/pkg/fs/Tests/Functional/FsConsumerTest.php new file mode 100644 index 000000000..dbdb09f93 --- /dev/null +++ b/pkg/fs/Tests/Functional/FsConsumerTest.php @@ -0,0 +1,70 @@ +fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext(); + + new TempFile(sys_get_temp_dir().'/fs_test_queue'); + file_put_contents(sys_get_temp_dir().'/fs_test_queue', ''); + } + + public function tearDown() + { + $this->fsContext->close(); + } + + public function testShouldConsumeMessagesFromFileOneByOne() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + file_put_contents( + sys_get_temp_dir().'/fs_test_queue', + ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]} |{"body":"third message","properties":[],"headers":[]}' + ); + + $consumer = $this->fsContext->createConsumer($queue); + + $message = $consumer->receiveNoWait(); + $this->assertInstanceOf(FsMessage::class, $message); + $this->assertSame('third message', $message->getBody()); + + $this->assertSame( + ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]}', + file_get_contents(sys_get_temp_dir().'/fs_test_queue') + ); + + $message = $consumer->receiveNoWait(); + $this->assertInstanceOf(FsMessage::class, $message); + $this->assertSame('second message', $message->getBody()); + + $this->assertSame( + ' |{"body":"first message","properties":[],"headers":[]}', + file_get_contents(sys_get_temp_dir().'/fs_test_queue') + ); + + $message = $consumer->receiveNoWait(); + $this->assertInstanceOf(FsMessage::class, $message); + $this->assertSame('first message', $message->getBody()); + + $this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue')); + + $message = $consumer->receiveNoWait(); + $this->assertNull($message); + + $this->assertEmpty(file_get_contents(sys_get_temp_dir().'/fs_test_queue')); + } +} diff --git a/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php b/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php new file mode 100644 index 000000000..c3d2448e6 --- /dev/null +++ b/pkg/fs/Tests/Functional/FsConsumptionUseCasesTest.php @@ -0,0 +1,109 @@ +fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext(); + + new TempFile(sys_get_temp_dir().'/fs_test_queue'); + } + + public function tearDown() + { + $this->fsContext->close(); + } + + public function testConsumeOneMessageAndExit() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $message = $this->fsContext->createMessage(__METHOD__); + $this->fsContext->createProducer()->send($queue, $message); + + $queueConsumer = new QueueConsumer($this->fsContext, new ChainExtension([ + new LimitConsumedMessagesExtension(1), + new LimitConsumptionTimeExtension(new \DateTime('+3sec')), + ])); + + $processor = new StubProcessor(); + $queueConsumer->bind($queue, $processor); + + $queueConsumer->consume(); + + $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage); + $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); + } + + public function testConsumeOneMessageAndSendReplyExit() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $replyQueue = $this->fsContext->createQueue('fs_test_queue_reply'); + + $message = $this->fsContext->createMessage(__METHOD__); + $message->setReplyTo($replyQueue->getQueueName()); + $this->fsContext->createProducer()->send($queue, $message); + + $queueConsumer = new QueueConsumer($this->fsContext, new ChainExtension([ + new LimitConsumedMessagesExtension(2), + new LimitConsumptionTimeExtension(new \DateTime('+3sec')), + new ReplyExtension(), + ])); + + $replyMessage = $this->fsContext->createMessage(__METHOD__.'.reply'); + + $processor = new StubProcessor(); + $processor->result = Result::reply($replyMessage); + + $replyProcessor = new StubProcessor(); + + $queueConsumer->bind($queue, $processor); + $queueConsumer->bind($replyQueue, $replyProcessor); + $queueConsumer->consume(); + + $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage); + $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); + + $this->assertInstanceOf(Message::class, $replyProcessor->lastProcessedMessage); + $this->assertEquals(__METHOD__.'.reply', $replyProcessor->lastProcessedMessage->getBody()); + } +} + +class StubProcessor implements Processor +{ + public $result = Result::ACK; + + /** @var \Enqueue\Psr\Message */ + public $lastProcessedMessage; + + public function process(Message $message, Context $context) + { + $this->lastProcessedMessage = $message; + + return $this->result; + } +} diff --git a/pkg/fs/Tests/Functional/FsProducerTest.php b/pkg/fs/Tests/Functional/FsProducerTest.php new file mode 100644 index 000000000..519eefc87 --- /dev/null +++ b/pkg/fs/Tests/Functional/FsProducerTest.php @@ -0,0 +1,47 @@ +fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext(); + + new TempFile(sys_get_temp_dir().'/fs_test_queue'); + file_put_contents(sys_get_temp_dir().'/fs_test_queue', ''); + } + + public function tearDown() + { + $this->fsContext->close(); + } + + public function testShouldStoreFilesToFileInExpectedFormat() + { + $queue = $this->fsContext->createQueue('fs_test_queue'); + + $firstMessage = $this->fsContext->createMessage('first message'); + $secondMessage = $this->fsContext->createMessage('second message'); + $thirdMessage = $this->fsContext->createMessage('third message'); + + $this->fsContext->createProducer()->send($queue, $firstMessage); + $this->fsContext->createProducer()->send($queue, $secondMessage); + $this->fsContext->createProducer()->send($queue, $thirdMessage); + + $this->assertSame(0, strlen(file_get_contents(sys_get_temp_dir().'/fs_test_queue')) % 64); + $this->assertSame( + ' |{"body":"first message","properties":[],"headers":[]} |{"body":"second message","properties":[],"headers":[]} |{"body":"third message","properties":[],"headers":[]}', + file_get_contents(sys_get_temp_dir().'/fs_test_queue') + ); + } +} diff --git a/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php b/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php new file mode 100644 index 000000000..dc394a4cc --- /dev/null +++ b/pkg/fs/Tests/Functional/FsRpcUseCasesTest.php @@ -0,0 +1,94 @@ +fsContext = (new FsConnectionFactory(['store_dir' => sys_get_temp_dir()]))->createContext(); + + new TempFile(sys_get_temp_dir().'/fs_rpc_queue'); + new TempFile(sys_get_temp_dir().'/fs_reply_queue'); + } + + public function tearDown() + { + $this->fsContext->close(); + } + + public function testDoAsyncRpcCallWithCustomReplyQueue() + { + $queue = $this->fsContext->createQueue('fs_rpc_queue'); + + $replyQueue = $this->fsContext->createQueue('fs_reply_queue'); + + $rpcClient = new RpcClient($this->fsContext); + + $message = $this->fsContext->createMessage(); + $message->setReplyTo($replyQueue->getQueueName()); + + $promise = $rpcClient->callAsync($queue, $message, 10); + $this->assertInstanceOf(Promise::class, $promise); + + $consumer = $this->fsContext->createConsumer($queue); + $message = $consumer->receive(1); + $this->assertInstanceOf(FsMessage::class, $message); + $this->assertNotNull($message->getReplyTo()); + $this->assertNotNull($message->getCorrelationId()); + $consumer->acknowledge($message); + + $replyQueue = $this->fsContext->createQueue($message->getReplyTo()); + $replyMessage = $this->fsContext->createMessage('This a reply!'); + $replyMessage->setCorrelationId($message->getCorrelationId()); + + $this->fsContext->createProducer()->send($replyQueue, $replyMessage); + + $actualReplyMessage = $promise->getMessage(); + $this->assertInstanceOf(FsMessage::class, $actualReplyMessage); + } + + public function testDoAsyncRecCallWithCastInternallyCreatedTemporaryReplyQueue() + { + $queue = $this->fsContext->createQueue('fs_rpc_queue'); + + $rpcClient = new RpcClient($this->fsContext); + + $message = $this->fsContext->createMessage(); + + $promise = $rpcClient->callAsync($queue, $message, 10); + $this->assertInstanceOf(Promise::class, $promise); + + $consumer = $this->fsContext->createConsumer($queue); + $receivedMessage = $consumer->receive(1); + + $this->assertInstanceOf(FsMessage::class, $receivedMessage); + $this->assertNotNull($receivedMessage->getReplyTo()); + $this->assertNotNull($receivedMessage->getCorrelationId()); + $consumer->acknowledge($receivedMessage); + + $replyQueue = $this->fsContext->createQueue($receivedMessage->getReplyTo()); + $replyMessage = $this->fsContext->createMessage('This a reply!'); + $replyMessage->setCorrelationId($receivedMessage->getCorrelationId()); + + $this->fsContext->createProducer()->send($replyQueue, $replyMessage); + + $actualReplyMessage = $promise->getMessage(); + $this->assertInstanceOf(FsMessage::class, $actualReplyMessage); + } +} diff --git a/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php b/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php new file mode 100644 index 000000000..316a5326b --- /dev/null +++ b/pkg/fs/Tests/Symfony/FsTransportFactoryTest.php @@ -0,0 +1,123 @@ +assertClassImplements(TransportFactoryInterface::class, FsTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new FsTransportFactory(); + + $this->assertEquals('fs', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new FsTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new FsTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'store_dir' => sys_get_temp_dir(), + ]]); + + $this->assertEquals([ + 'store_dir' => sys_get_temp_dir(), + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new FsTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'store_dir' => sys_get_temp_dir(), + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(FsConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + 'store_dir' => sys_get_temp_dir(), + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ]], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new FsTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'store_dir' => sys_get_temp_dir(), + 'pre_fetch_count' => 1, + 'chmod' => 0600, + ]); + + $this->assertEquals('enqueue.transport.fs.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.fs.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.fs.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new FsTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.fs.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(FsDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.fs.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} diff --git a/pkg/fs/composer.json b/pkg/fs/composer.json new file mode 100644 index 000000000..a33ad2333 --- /dev/null +++ b/pkg/fs/composer.json @@ -0,0 +1,34 @@ +{ + "name": "enqueue/fs", + "type": "library", + "description": "Enqueue Filesystem based transport", + "keywords": ["messaging", "queue", "filesystem", "local"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "symfony/filesystem": "^2.8|^3", + "makasim/temp-file": "^0.2" + }, + "require-dev": { + "phpunit/phpunit": "~5.5", + "enqueue/test": "^0.2" + }, + "autoload": { + "psr-4": { "Enqueue\\Fs\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.2.x-dev" + } + } +} diff --git a/pkg/fs/phpunit.xml.dist b/pkg/fs/phpunit.xml.dist new file mode 100644 index 000000000..0ba207de0 --- /dev/null +++ b/pkg/fs/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Resources + ./Tests + + + +