Supporting Enqueue
Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you’d like to join them, please consider:
Async events
The EnqueueBundle allows you to dispatch events asynchronously. Behind the scene it replaces your listener with one that sends a message to MQ. The message contains the event object. The consumer, once it receives the message, restores the event and dispatches it to only async listeners.
Async listeners benefits:
- Reduces response time. Work is deferred to consumer processes.
- Better fault tolerance. Bugs in async listener do not affect user. Messages will wait till you fix bugs.
- Better scaling. Add more consumers to meet the load.
Note: Prior to Symfony 3.0, events contain eventDispatcher
and the default php serializer transformer is unable to serialize the object. A transformer should be registered for every async event. Read the event transformer.
Configuration
Symfony events are currently processed synchronously, enabling the async configuration for EnqueueBundle causes tagged listeners to defer action to a consumer asynchronously. If you already installed the bundle, then enable async_events
.
# app/config/config.yml
enqueue:
default:
async_events:
enabled: true
# if you'd like to send send messages onTerminate use spool_producer (it further reduces response time):
# spool_producer: true
Usage
To make your listener async you have add async: true
and dispatcher: 'enqueue.events.event_dispatcher'
attributes to the tag kernel.event_listener
, like this:
# app/config/config.yml
services:
acme.foo_listener:
class: 'AcmeBundle\Listener\FooListener'
tags:
- { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent', dispatcher: 'enqueue.events.event_dispatcher' }
or to kernel.event_subscriber
:
# app/config/config.yml
services:
test_async_subscriber:
class: 'AcmeBundle\Listener\TestAsyncSubscriber'
tags:
- { name: 'kernel.event_subscriber', async: true, dispatcher: 'enqueue.events.event_dispatcher' }
That’s basically it. The rest of the doc describes advanced features.
Advanced Usage.
You can also add an async listener directly and register a custom message processor for it:
# app/config/config.yml
services:
acme.async_foo_listener:
class: 'Enqueue\AsyncEventDispatcher\AsyncListener'
public: false
arguments: ['@enqueue.transport.default.context', '@enqueue.events.registry', 'a_queue_name']
tags:
- { name: 'kernel.event_listener', event: 'foo', method: 'onEvent', dispatcher: 'enqueue.events.event_dispatcher' }
Event transformer
The bundle uses php serializer transformer by default to pass events through MQ. You can write a transformer for each event type by implementing the Enqueue\AsyncEventDispatcher\EventTransformer
interface. Consider the next example. It shows how to send an event that contains Doctrine entity as a subject
<?php
namespace AcmeBundle\Listener;
// src/AcmeBundle/Listener/FooEventTransformer.php
use Enqueue\Client\Message;
use Enqueue\Consumption\Result;
use Interop\Queue\Message as QueueMessage;
use Enqueue\Util\JSON;
use Symfony\Component\EventDispatcher\Event;
use Enqueue\AsyncEventDispatcher\EventTransformer;
use Doctrine\Bundle\DoctrineBundle\Registry;
use Symfony\Component\EventDispatcher\GenericEvent;
class FooEventTransformer implements EventTransformer
{
/** @var Registry @doctrine */
private $doctrine;
public function __construct(Registry $doctrine)
{
$this->doctrine = $doctrine;
}
/**
* {@inheritdoc}
*
* @param GenericEvent $event
*/
public function toMessage($eventName, Event $event = null)
{
$entity = $event->getSubject();
$entityClass = get_class($entity);
$manager = $this->doctrine->getManagerForClass($entityClass);
$meta = $manager->getClassMetadata($entityClass);
$id = $meta->getIdentifierValues($entity);
$message = new Message();
$message->setBody([
'entityClass' => $entityClass,
'entityId' => $id,
'arguments' => $event->getArguments()
]);
return $message;
}
/**
* {@inheritdoc}
*/
public function toEvent($eventName, QueueMessage $message)
{
$data = JSON::decode($message->getBody());
$entityClass = $data['entityClass'];
$manager = $this->doctrine->getManagerForClass($entityClass);
if (false == $entity = $manager->find($entityClass, $data['entityId'])) {
return Result::reject('The entity could not be found.');
}
return new GenericEvent($entity, $data['arguments']);
}
}
and register it:
# app/config/config.yml
services:
acme.foo_event_transformer:
class: 'AcmeBundle\Listener\FooEventTransformer'
arguments: ['@doctrine']
tags:
- {name: 'enqueue.event_transformer', eventName: 'foo' }
The eventName
attribute accepts a regexp. You can do next eventName: '/foo\..*?/'
. It uses this transformer for all event with the name beginning with foo.