Skip to content
Snippets Groups Projects
Commit b8b14a8c authored by Reiter, Christoph's avatar Reiter, Christoph :snake:
Browse files

queue: add support for doctrine transports

We set a default table name and group name with proper namespacing,
so it should be safe to reuse existing DBs.

This also adds a dbp:relay:core:queue:test command which queues
one or more dummy messages with an optional delay. This can be
used to see if the queue consumers work correctly and receive
tasks.
parent 744e4df4
No related branches found
No related tags found
No related merge requests found
......@@ -16,14 +16,48 @@ This requires two extra deployment related tasks:
In the bundle configuration set the `queue_dsn` key to a DSN supported by the
[Symfony messenger component](https://symfony.com/doc/current/messenger.html)
At the moment we only support the redis transport.
At the moment we only support the redis and doctrine transports:
Example:
### Redis
This transport requires the Redis PHP extension (>=4.3) and a running Redis server (^5.0).
```yaml
# config/packages/dbp_relay_core.yaml
dbp_relay_core:
# redis[s]://[pass@][ip|host|socket[:port]]
queue_dsn: 'redis://localhost:6379'
```
This creates a redis stream automatically when active.
### Doctrine
In case of doctrine you have to install `symfony/doctrine-messenger`
```bash
composer require symfony/doctrine-messenger
```
then create a doctrine connection and point the `queue_dsn` to that connection:
```yaml
# config/packages/doctrine.yaml
doctrine:
dbal:
connections:
my-queue-connection-name:
url: 'mysql://db:secret@mariadb:3306/db'
```
```yaml
# config/packages/dbp_relay_core.yaml
dbp_relay_core:
queue_dsn: 'doctrine://my-queue-connection-name'
```
I will automatically create a new database table when active.
## Run the workers
Start a worker using
......
......@@ -4,6 +4,8 @@ declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\DependencyInjection;
use Dbp\Relay\CoreBundle\Queue\TestMessage;
use Dbp\Relay\CoreBundle\Queue\Utils as QueueUtils;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
......@@ -163,6 +165,10 @@ class DbpRelayCoreExtension extends ConfigurableExtension implements PrependExte
]),
]);
$routing = [
TestMessage::class => QueueUtils::QUEUE_TRANSPORT_NAME,
];
// https://symfony.com/doc/4.4/messenger.html#transports-async-queued-messages
$messengerTransportDsn = $config['queue_dsn'];
if ($messengerTransportDsn === '') {
......@@ -170,35 +176,27 @@ class DbpRelayCoreExtension extends ConfigurableExtension implements PrependExte
$messengerTransportDsn = $config['messenger_transport_dsn'];
}
if ($container->hasParameter('dbp_api.messenger_routing')) {
$routing = [];
$routing = array_merge($routing, $container->getParameter('dbp_api.messenger_routing'));
if ($messengerTransportDsn === '') {
throw new \RuntimeException('A bundle requires a worker queue: set "queue_dsn" in the core bundle config');
}
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async' => $messengerTransportDsn,
],
'routing' => $routing,
],
]);
} else {
// By always setting a transport, we ensure that the messenger commands work in all cases, even if they
// are not stricly needed
if ($messengerTransportDsn === '') {
$messengerTransportDsn = 'in-memory://dummy-queue-not-configured';
}
}
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'async' => $messengerTransportDsn,
QueueUtils::QUEUE_TRANSPORT_NAME => $messengerTransportDsn,
],
'routing' => $routing,
],
]);
}
// https://symfony.com/doc/5.3/components/lock.html
$lockDsn = $config['lock_dsn'];
......
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
class TestCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'dbp:relay:core:queue:test';
/**
* @var MessageBusInterface
*/
private $bus;
public function __construct(MessageBusInterface $bus)
{
parent::__construct();
$this->bus = $bus;
}
protected function configure()
{
$this->setDescription('Start some dummy tasks for testing');
$this->addOption('count', null, InputArgument::OPTIONAL, 'The number of messages to send', 1);
$this->addOption('delay', null, InputArgument::OPTIONAL, 'Delay in seconds', 0);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$count = (int) $input->getOption('count');
$delay = (int) $input->getOption('delay');
for ($i = 0; $i < $count; ++$i) {
if ($delay !== 0) {
$envelope = new Envelope(new TestMessage(), [new DelayStamp($delay * 1000)]);
} else {
$envelope = new Envelope(new TestMessage());
}
$this->bus->dispatch($envelope);
}
return Command::SUCCESS;
}
}
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class TestHandler implements MessageHandlerInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
public function __invoke(TestMessage $message)
{
$this->logger->info('Handled test message');
}
}
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
class TestMessage
{
}
......@@ -56,10 +56,13 @@ class TransportFactoryDecorator implements TransportFactoryInterface, LoggerAwar
// Use the new recommended default:
// https://github.com/symfony/symfony/pull/42163
$options['delete_after_ack'] = true;
} elseif (strpos($dsn, 'doctrine://') === 0) {
$options['table_name'] = 'core_queue_messages';
$options['queue_name'] = 'main';
} elseif ($dsn === 'in-memory://dummy-queue-not-configured') {
// This is used when no queue is configured, so allow it.
} else {
throw new \Exception('Only redis currently supported as a messenger transport (current DSN: '.$dsn.')');
throw new \Exception('Only redis and doctrine currently supported as a queue transport (current DSN: '.$dsn.')');
}
}
......
......@@ -31,3 +31,11 @@ services:
Dbp\Relay\CoreBundle\Queue\RestartCommand:
autowire: true
autoconfigure: true
Dbp\Relay\CoreBundle\Queue\TestCommand:
autowire: true
autoconfigure: true
Dbp\Relay\CoreBundle\Queue\TestHandler:
autowire: true
autoconfigure: true
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment