Skip to content

Commit ef84aa2

Browse files
authored
Merge pull request #86 from php-enqueue/async-events
Symfony. Async event dispatching
2 parents 9107114 + d2fbbc3 commit ef84aa2

27 files changed

+1529
-1
lines changed

docs/bundle/async_events.md

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# Async events
2+
3+
The EnqueueBundle allows you to dispatch events asynchronously.
4+
Behind the scene it replaces your listener with one that sends a message to MQ.
5+
The message contains the event object.
6+
The consumer, once it receives the message, restores the event and dispatches it to only async listeners.
7+
8+
Async listeners benefits:
9+
10+
* The response time lesser. It has to do less work.
11+
* Better fault tolerance. Bugs in async listener does not affect user. Messages will wait till you fix bugs.
12+
* Better scaling. Add more consumers to meet the load.
13+
14+
_**Note**: The php serializer transformer (the default one) does not work on Symfony prior 3.0. The event contains eventDispatcher and therefor could not be serialized. You have to register a transformer for every async event. Read the [event transformer](#event-transformer)._
15+
16+
## Configuration
17+
18+
I suppose you already [installed the bundle](quick_tour.md#install).
19+
Now, you have to enable `async_events`.
20+
If you do not enable it, events will be processed as before: synchronously.
21+
22+
```yaml
23+
# app/config/config.yml
24+
25+
enqueue:
26+
async_events:
27+
enabled: true
28+
# if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser):
29+
# spool_producer: true
30+
```
31+
32+
## Usage
33+
34+
To make your listener async you have add `async: true` attribute to the tag `kernel.event_listener`, like this:
35+
36+
```yaml
37+
# app/config/config.yml
38+
39+
services:
40+
acme.foo_listener:
41+
class: 'AcmeBundle\Listener\FooListener'
42+
tags:
43+
- { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent' }
44+
```
45+
46+
That's basically it. The rest of the doc describes advanced features.
47+
48+
## Advanced Usage.
49+
50+
You can also add an async listener directly and register a custom message processor for it:
51+
52+
```yaml
53+
# app/config/config.yml
54+
55+
services:
56+
acme.async_foo_listener:
57+
class: 'Enqueue\Bundle\Events\AsyncListener'
58+
public: false
59+
arguments: ['@enqueue.client.producer', '@enqueue.events.registry']
60+
tags:
61+
- { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' }
62+
```
63+
64+
The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`.
65+
66+
```php
67+
<?php
68+
69+
use Enqueue\Bundle\Events\Registry;
70+
use Enqueue\Client\TopicSubscriberInterface;
71+
use Enqueue\Psr\PsrContext;
72+
use Enqueue\Psr\PsrMessage;
73+
use Enqueue\Psr\PsrProcessor;
74+
75+
class FooEventProcessor implements PsrProcessor, TopicSubscriberInterface
76+
{
77+
/**
78+
* @var Registry
79+
*/
80+
private $registry;
81+
82+
/**
83+
* @param Registry $registry
84+
*/
85+
public function __construct(Registry $registry)
86+
{
87+
$this->registry = $registry;
88+
}
89+
90+
public function process(PsrMessage $message, PsrContext $context)
91+
{
92+
if (false == $eventName = $message->getProperty('event_name')) {
93+
return self::REJECT;
94+
}
95+
if (false == $transformerName = $message->getProperty('transformer_name')) {
96+
return self::REJECT;
97+
}
98+
99+
// do what you want with the event.
100+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
101+
102+
103+
return self::ACK;
104+
}
105+
106+
public static function getSubscribedTopics()
107+
{
108+
return ['event.foo'];
109+
}
110+
}
111+
```
112+
113+
114+
## Event transformer
115+
116+
The bundle uses [php serializer](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php) transformer by default to pass events through MQ.
117+
You could create a transformer for the given event type. The transformer must implement `Enqueue\Bundle\Events\EventTransformer` interface.
118+
Consider the next example. It shows how to send an event that contains Doctrine entity as a subject
119+
120+
```php
121+
<?php
122+
namespace AcmeBundle\Listener;
123+
124+
// src/AcmeBundle/Listener/FooEventTransformer.php
125+
126+
use Enqueue\Client\Message;
127+
use Enqueue\Consumption\Result;
128+
use Enqueue\Psr\PsrMessage;
129+
use Enqueue\Util\JSON;
130+
use Symfony\Component\EventDispatcher\Event;
131+
use Enqueue\Bundle\Events\EventTransformer;
132+
use Doctrine\Bundle\DoctrineBundle\Registry;
133+
use Symfony\Component\EventDispatcher\GenericEvent;
134+
135+
class FooEventTransformer implements EventTransformer
136+
{
137+
/** @var Registry @doctrine */
138+
private $doctrine;
139+
140+
public function __construct(Registry $doctrine)
141+
{
142+
$this->doctrine = $doctrine;
143+
}
144+
145+
/**
146+
* {@inheritdoc}
147+
*
148+
* @param GenericEvent $event
149+
*/
150+
public function toMessage($eventName, Event $event = null)
151+
{
152+
$entity = $event->getSubject();
153+
$entityClass = get_class($event);
154+
155+
$manager = $this->doctrine->getManagerForClass($entityClass);
156+
$meta = $manager->getClassMetadata($entityClass);
157+
158+
$id = $meta->getIdentifierValues($entity);
159+
160+
$message = new Message();
161+
$message->setBody([
162+
'entityClass' => $entityClass,
163+
'entityId' => $id,
164+
'arguments' => $event->getArguments()
165+
]);
166+
167+
return $message;
168+
}
169+
170+
/**
171+
* {@inheritdoc}
172+
*/
173+
public function toEvent($eventName, PsrMessage $message)
174+
{
175+
$data = JSON::decode($message->getBody());
176+
177+
$entityClass = $data['entityClass'];
178+
179+
$manager = $this->doctrine->getManagerForClass($entityClass);
180+
if (false == $entity = $manager->find($entityClass, $data['entityId'])) {
181+
return Result::reject('The entity could not be found.');
182+
}
183+
184+
return new GenericEvent($entity, $data['arguments']);
185+
}
186+
}
187+
```
188+
189+
and register it:
190+
191+
```yaml
192+
# app/config/config.yml
193+
194+
services:
195+
acme.foo_event_transofrmer:
196+
class: 'AcmeBundle\Listener\FooEventTransformer'
197+
arguments: ['@doctrine']
198+
tags:
199+
- {name: 'enqueue.event_transformer', eventName: 'foo' }
200+
```
201+
202+
The `eventName` attribute accepts a regexp. You can do next `eventName: '/foo\..*?/'`.
203+
It uses this transformer for all event with the name beginning with `foo.`
204+
205+
[back to index](../index.md)

docs/bundle/quick_tour.md

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ It adds easy to use [configuration layer](config_reference.md), register service
99
$ composer require enqueue/enqueue-bundle enqueue/amqp-ext
1010
```
1111

12+
_**Note**: You could use not only AMQP transport but other available: STOMP, Amazon SQS, Redis, Filesystem, Doctrine DBAL and others._
13+
1214
## Enable the Bundle
1315

1416
Then, enable the bundle by adding `new Enqueue\Bundle\EnqueueBundle()` to the bundles array of the registerBundles method in your project's `app/AppKernel.php` file:

docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
- [Cli commands](bundle/cli_commands.md)
2828
- [Message producer](bundle/message_producer.md)
2929
- [Message processor](bundle/message_processor.md)
30+
- [Async events](bundle/async_events.md)
3031
- [Job queue](bundle/job_queue.md)
3132
- [Consumption extension](bundle/consumption_extension.md)
3233
- [Production settings](bundle/production_settings.md)

pkg/enqueue-bundle/DependencyInjection/Configuration.php

+6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public function getConfigTreeBuilder()
5151
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
5252
->end()->end()
5353
->booleanNode('job')->defaultFalse()->end()
54+
->arrayNode('async_events')
55+
->canBeEnabled()
56+
->children()
57+
->booleanNode('spool_producer')->defaultFalse()->end()
58+
->end()
59+
->end()
5460
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
5561
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
5662
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+10
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ public function load(array $configs, ContainerBuilder $container)
113113
$loader->load('job.yml');
114114
}
115115

116+
if (isset($config['async_events']['enabled'])) {
117+
$loader->load('events.yml');
118+
119+
if (false == empty($config['async_events']['spool_producer'])) {
120+
$container->getDefinition('enqueue.events.async_listener')
121+
->replaceArgument(0, new Reference('enqueue.client.spool_producer'))
122+
;
123+
}
124+
}
125+
116126
if ($config['extensions']['doctrine_ping_connection_extension']) {
117127
$loader->load('extensions/doctrine_ping_connection_extension.yml');
118128
}

pkg/enqueue-bundle/EnqueueBundle.php

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1313
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1414
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
15+
use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass;
16+
use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass;
1517
use Enqueue\Dbal\DbalContext;
1618
use Enqueue\Dbal\Symfony\DbalTransportFactory;
1719
use Enqueue\Fs\FsContext;
@@ -23,6 +25,7 @@
2325
use Enqueue\Stomp\StompContext;
2426
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
2527
use Enqueue\Stomp\Symfony\StompTransportFactory;
28+
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
2629
use Symfony\Component\DependencyInjection\ContainerBuilder;
2730
use Symfony\Component\HttpKernel\Bundle\Bundle;
2831

@@ -68,5 +71,8 @@ public function build(ContainerBuilder $container)
6871
if (class_exists(SqsContext::class)) {
6972
$extension->addTransportFactory(new SqsTransportFactory());
7073
}
74+
75+
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
76+
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
7177
}
7278
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\Message;
6+
use Enqueue\Client\ProducerInterface;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
class AsyncListener
10+
{
11+
/**
12+
* @var ProducerInterface
13+
*/
14+
private $producer;
15+
16+
/**
17+
* @var Registry
18+
*/
19+
private $registry;
20+
21+
/**
22+
* @var bool
23+
*/
24+
private $syncMode;
25+
26+
/**
27+
* @param ProducerInterface $producer
28+
* @param Registry $registry
29+
*/
30+
public function __construct(ProducerInterface $producer, Registry $registry)
31+
{
32+
$this->producer = $producer;
33+
$this->registry = $registry;
34+
}
35+
36+
public function resetSyncMode()
37+
{
38+
$this->syncMode = [];
39+
}
40+
41+
/**
42+
* @param string $eventName
43+
*/
44+
public function syncMode($eventName)
45+
{
46+
$this->syncMode[$eventName] = true;
47+
}
48+
49+
/**
50+
* @param Event $event
51+
* @param string $eventName
52+
*/
53+
public function onEvent(Event $event = null, $eventName)
54+
{
55+
if (false == isset($this->syncMode[$eventName])) {
56+
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
57+
58+
$message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
59+
$message->setScope(Message::SCOPE_APP);
60+
$message->setProperty('event_name', $eventName);
61+
$message->setProperty('transformer_name', $transformerName);
62+
63+
$this->producer->send('event.'.$eventName, $message);
64+
}
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Consumption\Result;
6+
use Enqueue\Psr\PsrContext;
7+
use Enqueue\Psr\PsrMessage;
8+
use Enqueue\Psr\PsrProcessor;
9+
10+
class AsyncProcessor implements PsrProcessor
11+
{
12+
/**
13+
* @var Registry
14+
*/
15+
private $registry;
16+
17+
/**
18+
* @var ProxyEventDispatcher
19+
*/
20+
private $eventDispatcher;
21+
22+
/**
23+
* @param Registry $registry
24+
* @param ProxyEventDispatcher $eventDispatcher
25+
*/
26+
public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher)
27+
{
28+
$this->registry = $registry;
29+
$this->eventDispatcher = $eventDispatcher;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function process(PsrMessage $message, PsrContext $context)
36+
{
37+
if (false == $eventName = $message->getProperty('event_name')) {
38+
return Result::reject('The message is missing "event_name" property');
39+
}
40+
if (false == $transformerName = $message->getProperty('transformer_name')) {
41+
return Result::reject('The message is missing "transformer_name" property');
42+
}
43+
44+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
45+
46+
$this->eventDispatcher->dispatchAsyncListenersOnly($eventName, $event);
47+
48+
return self::ACK;
49+
}
50+
}

0 commit comments

Comments
 (0)