Consuming messages has two requirements:
The first requirement is fulfilled by message routing, the second is by the event dispatcher system.
<?php use Symfony\Component\EventDispatcher\EventDispatcher; // $router = see bellow $eventDispatcher = new EventDispatcher(); // Create a Consumer and start the loop. $consumer = new Consumer($router, $eventDispatcher); // The second argument is optional and is an array // of options. Currently only ``max-runtime`` is supported which specifies the max runtime // in seconds. $consumer->consume($queueFactory->create('send-newsletter'), array( 'max-runtime' => 900, ));
A single message represents a job that needs to be performed, and as described earlier, by default a message’s name is used to determine which receiver should receive that message.
A receiver can be any of the following:
For the system to know which receiver should handle which messages, you are required to register them first.
<?php use Bernard\Router\ReceiverMapRouter; use Bernard\Consumer; // create driver and a queuefactory // NewsletterMessageHandler is a pseudo receiver that has a sendNewsletter method. $router = new ReceiverMapRouter([ 'SendNewsletter' => new NewsletterMessageHandler(), ]);
Message routing can also happen based on the message class instead of the message name.
<?php use Bernard\Router\ClassNameRouter; use Bernard\Consumer; // create driver and a queuefactory // NewsletterMessageHandler is a pseudo receiver that has a sendNewsletter method. // NewsletterMessage is a pseudo message $router = new ClassNameRouter([ NewsletterMessage::class => new NewsletterMessageHandler(), ]);
In some cases the above described receiver rules might not be enough. The provided router implementations also accept a receiver resolver which can be used for example to resolve receivers from a Dependency Injection container. A good example for that is the PSR-11 container resolver implementation that comes with this package.
<?php use Bernard\Router\ReceiverMapRouter; use Bernard\Router\ContainerReceiverResolver; use Bernard\Consumer; // create driver and a queuefactory // NewsletterMessageHandler is a pseudo receiver that has a sendNewsletter method. // $container = your PSR-11 compatible container $router = new ReceiverMapRouter( [ 'SendNewsletter' => NewsletterMessageHandler::class, ], new ContainerReceiverResolver($container), );
Bernard comes with a
ConsumeCommand which can be used with Symfony Console
<?php use Bernard\Command\ConsumeCommand; // create $console application $console->add(new ConsumeCommand($consumer, $queueFactory));
It can then be used as any other console command. The argument given should be the queue that your messages are on. If we use the earlier example with sending a newsletter, it would look like this.
$ /path/to/console bernard:consume send-newsletter
When a message is dequeued it is also marked as invisible (if the driver supports this) and when the message have been consumed then it will also be acknowledged. Some drivers have a timeout on the invisible state and will automatically requeue a message after that time. Therefore it is important to have a timeout greater than it takes for you to consume a single message.