Skip to content

Commit 93ea0a2

Browse files
authored
Merge pull request #12 from php-enqueue/filesystem-transport
Filesystem transport
2 parents 9e17d6e + 56cdf33 commit 93ea0a2

38 files changed

+3019
-6
lines changed

bin/dev

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ while getopts "bustefc" OPTION; do
2121
./bin/php-cs-fixer fix
2222
;;
2323
t)
24-
COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test
24+
COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$2"
2525
;;
2626
c)
2727
COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm generate-changelog github_changelog_generator --future-release "$2" --simple-list

bin/subtree-split

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ remote psr-queue git@github.com:php-enqueue/psr-queue.git
4747
remote enqueue git@github.com:php-enqueue/enqueue.git
4848
remote stomp git@github.com:php-enqueue/stomp.git
4949
remote amqp-ext git@github.com:php-enqueue/amqp-ext.git
50+
remote fs git@github.com:php-enqueue/fs.git
5051
remote enqueue-bundle git@github.com:php-enqueue/enqueue-bundle.git
5152
remote job-queue git@github.com:php-enqueue/job-queue.git
5253
remote test git@github.com:php-enqueue/test.git
@@ -55,6 +56,7 @@ split 'pkg/psr-queue' psr-queue
5556
split 'pkg/enqueue' enqueue
5657
split 'pkg/stomp' stomp
5758
split 'pkg/amqp-ext' amqp-ext
59+
split 'pkg/fs' fs
5860
split 'pkg/enqueue-bundle' enqueue-bundle
5961
split 'pkg/job-queue' job-queue
6062
split 'pkg/test' test

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"enqueue/enqueue": "*@dev",
99
"enqueue/stomp": "*@dev",
1010
"enqueue/amqp-ext": "*@dev",
11+
"enqueue/fs": "*@dev",
1112
"enqueue/enqueue-bundle": "*@dev",
1213
"enqueue/job-queue": "*@dev",
1314
"enqueue/test": "*@dev"
@@ -51,6 +52,10 @@
5152
{
5253
"type": "path",
5354
"url": "pkg/job-queue"
55+
},
56+
{
57+
"type": "path",
58+
"url": "pkg/fs"
5459
}
5560
]
5661
}

docs/bundle/config_reference.md

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Config reference
22

3+
You can get this info by running `./bin/console config:dump-reference enqueue` command.
4+
35
```yaml
4-
# Default configuration for extension with alias: "enqueue"
56
enqueue:
67
transport: # Required
78
default:
@@ -91,6 +92,16 @@ enqueue:
9192

9293
# The option tells whether RabbitMQ broker has delay plugin installed or not
9394
delay_plugin_installed: false
95+
fs:
96+
97+
# The store directory where all queue\topics files will be created and messages are stored
98+
store_dir: ~ # Required
99+
100+
# The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.
101+
pre_fetch_count: 1
102+
103+
# The queue files are created with this given permissions if not exist.
104+
chmod: 384
94105
client:
95106
traceable_producer: false
96107
prefix: enqueue

docs/client/supported_brokers.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Here's the list of protocols and Client features supported by them
88
| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes |
99
| STOMP | No | No | Yes | No | Yes** |
1010
| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** |
11-
11+
| Filesystem | No | No | No | Yes | No |
1212

1313
* \* Possible if a RabbitMQ delay plugin is installed.
1414
* \*\* Possible if topics (exchanges) are configured on broker side manually.

docs/filesystem_transport.md

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Filesystem transport
2+
3+
Use files on local filesystem as queues.
4+
It creates a file per queue\topic.
5+
A message is a line inside the file.
6+
**Limitations** It works only in auto ack mode. Local by nature therefor messages are not visible on other servers.
7+
8+
* [Create context](#create-context)
9+
* [Declare topic](#declare-topic)
10+
* [Declare queue](#decalre-queue)
11+
* [Bind queue to topic](#bind-queue-to-topic)
12+
* [Send message to topic](#send-message-to-topic)
13+
* [Send message to queue](#send-message-to-queue)
14+
* [Consume message](#consume-message)
15+
* [Purge queue messages](#purge-queue-messages)
16+
17+
## Create context
18+
19+
```php
20+
<?php
21+
use Enqueue\Fs\FsConnectionFactory;
22+
23+
$connectionFactory = new FsConnectionFactory([
24+
'store_dir' => '/tmp'
25+
]);
26+
27+
$psrContext = $connectionFactory->createContext();
28+
```
29+
30+
## Send message to topic
31+
32+
```php
33+
<?php
34+
/** @var \Enqueue\Fs\FsContext $psrContext */
35+
36+
$fooTopic = $psrContext->createTopic('aTopic');
37+
$message = $psrContext->createMessage('Hello world!');
38+
39+
$psrContext->createProducer()->send($fooTopic, $message);
40+
```
41+
42+
## Send message to queue
43+
44+
```php
45+
<?php
46+
/** @var \Enqueue\Fs\FsContext $psrContext */
47+
48+
$fooQueue = $psrContext->createQueue('aQueue');
49+
$message = $psrContext->createMessage('Hello world!');
50+
51+
$psrContext->createProducer()->send($fooQueue, $message);
52+
```
53+
54+
## Consume message:
55+
56+
```php
57+
<?php
58+
/** @var \Enqueue\Fs\FsContext $psrContext */
59+
60+
$fooQueue = $psrContext->createQueue('aQueue');
61+
$consumer = $psrContext->createConsumer($fooQueue);
62+
63+
$message = $consumer->receive();
64+
65+
// process a message
66+
67+
$consumer->acknowledge($message);
68+
// $consumer->reject($message);
69+
```
70+
71+
## Purge queue messages:
72+
73+
```php
74+
<?php
75+
/** @var \Enqueue\Fs\FsContext $psrContext */
76+
77+
$fooQueue = $psrContext->createQueue('aQueue');
78+
79+
$psrContext->purge($fooQueue);
80+
```
81+
82+
[back to index](index.md)

docs/index.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
* [Quick tour](quick_tour.md)
44
* Transports
5-
- [AMQP](amqp_transport.md)
6-
- [STOMP](stomp_transport.md)
7-
- [NULL](null_transport.md)
5+
- [Amqp](amqp_transport.md)
6+
- [Stomp](stomp_transport.md)
7+
- [Filesystem](filesystem_transport.md)
8+
- [Null](null_transport.md)
89
* Consumption
910
- [Extensions](consumption/extensions.md)
1011
* Client

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<directory>pkg/amqp-ext/Tests</directory>
3030
</testsuite>
3131

32+
<testsuite name="fs">
33+
<directory>pkg/fs/Tests</directory>
34+
</testsuite>
35+
3236
<testsuite name="enqueue-bundle">
3337
<directory>pkg/enqueue-bundle/Tests</directory>
3438
</testsuite>

pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php

+9
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,14 @@ public function testShouldCreateDriver()
121121

122122
$driver = $container->getDefinition($serviceId);
123123
$this->assertSame(AmqpDriver::class, $driver->getClass());
124+
125+
$this->assertInstanceOf(Reference::class, $driver->getArgument(0));
126+
$this->assertEquals('enqueue.transport.amqp.context', (string) $driver->getArgument(0));
127+
128+
$this->assertInstanceOf(Reference::class, $driver->getArgument(1));
129+
$this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
130+
131+
$this->assertInstanceOf(Reference::class, $driver->getArgument(2));
132+
$this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
124133
}
125134
}

pkg/enqueue-bundle/EnqueueBundle.php

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1212
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1313
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
14+
use Enqueue\Fs\FsContext;
15+
use Enqueue\Fs\Symfony\FsTransportFactory;
1416
use Enqueue\Stomp\StompContext;
1517
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
1618
use Enqueue\Stomp\Symfony\StompTransportFactory;
@@ -46,5 +48,9 @@ public function build(ContainerBuilder $container)
4648
$extension->addTransportFactory(new AmqpTransportFactory());
4749
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory());
4850
}
51+
52+
if (class_exists(FsContext::class)) {
53+
$extension->addTransportFactory(new FsTransportFactory());
54+
}
4955
}
5056
}

pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php

+18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1212
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
1313
use Enqueue\Bundle\EnqueueBundle;
14+
use Enqueue\Fs\Symfony\FsTransportFactory;
1415
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
1516
use Enqueue\Stomp\Symfony\StompTransportFactory;
1617
use Enqueue\Symfony\DefaultTransportFactory;
@@ -139,6 +140,23 @@ public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories()
139140
$bundle->build($container);
140141
}
141142

143+
public function testShouldRegisterFSTransportFactory()
144+
{
145+
$extensionMock = $this->createEnqueueExtensionMock();
146+
147+
$container = new ContainerBuilder();
148+
$container->registerExtension($extensionMock);
149+
150+
$extensionMock
151+
->expects($this->at(6))
152+
->method('addTransportFactory')
153+
->with($this->isInstanceOf(FsTransportFactory::class))
154+
;
155+
156+
$bundle = new EnqueueBundle();
157+
$bundle->build($container);
158+
}
159+
142160
/**
143161
* @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension
144162
*/

pkg/fs/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

pkg/fs/.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

0 commit comments

Comments
 (0)