Skip to content
Snippets Groups Projects
Commit 1a08e452 authored by Reiter, Christoph's avatar Reiter, Christoph :snake:
Browse files

Add a queuing system by wrapping symfony messenger

By abstracting away the Symfony messenger internals we can simplify the
deployment while being more flexible in the future in case we need to
change something.

This adds commands for running workers and restarting them. The rest
is handled by internally.
parent 8538d388
No related branches found
No related tags found
1 merge request!66Add a queuing system by wrapping symfony messenger
Pipeline #60128 passed
......@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "06fcdc0b0163095f806f3dcf9986c508",
"content-hash": "83accfd991b38ee637acc3a0cade2f17",
"packages": [
{
"name": "api-platform/core",
......@@ -1528,6 +1528,75 @@
},
"time": "2019-03-08T08:55:37+00:00"
},
{
"name": "symfony/amqp-messenger",
"version": "v5.3.7",
"source": {
"type": "git",
"url": "https://github.com/symfony/amqp-messenger.git",
"reference": "7bf38ef8c72d51163aa9048d7a8abe03be33f2b4"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/7bf38ef8c72d51163aa9048d7a8abe03be33f2b4",
"reference": "7bf38ef8c72d51163aa9048d7a8abe03be33f2b4",
"shasum": ""
},
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1",
"symfony/messenger": "^5.3"
},
"require-dev": {
"symfony/event-dispatcher": "^4.4|^5.0",
"symfony/process": "^4.4|^5.0",
"symfony/property-access": "^4.4|^5.0",
"symfony/serializer": "^4.4|^5.0"
},
"type": "symfony-bridge",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Symfony AMQP extension Messenger Bridge",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/amqp-messenger/tree/v5.3.7"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2021-08-25T04:45:08+00:00"
},
{
"name": "symfony/asset",
"version": "v5.3.4",
......@@ -2012,6 +2081,79 @@
],
"time": "2021-03-23T23:28:01+00:00"
},
{
"name": "symfony/doctrine-messenger",
"version": "v5.3.10",
"source": {
"type": "git",
"url": "https://github.com/symfony/doctrine-messenger.git",
"reference": "971b7d5bd1c641cb8a699f4dcfd1079e2030761a"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/971b7d5bd1c641cb8a699f4dcfd1079e2030761a",
"reference": "971b7d5bd1c641cb8a699f4dcfd1079e2030761a",
"shasum": ""
},
"require": {
"php": ">=7.2.5",
"symfony/messenger": "^5.1",
"symfony/service-contracts": "^1.1|^2"
},
"conflict": {
"doctrine/dbal": "<2.10",
"doctrine/persistence": "<1.3"
},
"require-dev": {
"doctrine/dbal": "^2.10|^3.0",
"doctrine/persistence": "^1.3|^2",
"symfony/property-access": "^4.4|^5.0",
"symfony/serializer": "^4.4|^5.0"
},
"type": "symfony-bridge",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\Bridge\\Doctrine\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Symfony Doctrine Messenger Bridge",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/doctrine-messenger/tree/v5.3.10"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2021-10-21T08:22:59+00:00"
},
{
"name": "symfony/error-handler",
"version": "v5.3.7",
......@@ -2846,6 +2988,96 @@
],
"time": "2021-10-29T08:36:48+00:00"
},
{
"name": "symfony/messenger",
"version": "v5.3.10",
"source": {
"type": "git",
"url": "https://github.com/symfony/messenger.git",
"reference": "5146f9ecede00a3570f766a6c14cf494d479e038"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/messenger/zipball/5146f9ecede00a3570f766a6c14cf494d479e038",
"reference": "5146f9ecede00a3570f766a6c14cf494d479e038",
"shasum": ""
},
"require": {
"php": ">=7.2.5",
"psr/log": "^1|^2|^3",
"symfony/amqp-messenger": "^5.1",
"symfony/deprecation-contracts": "^2.1",
"symfony/doctrine-messenger": "^5.1",
"symfony/polyfill-php80": "^1.16",
"symfony/redis-messenger": "^5.1"
},
"conflict": {
"symfony/event-dispatcher": "<4.4",
"symfony/framework-bundle": "<4.4",
"symfony/http-kernel": "<4.4",
"symfony/serializer": "<5.0"
},
"require-dev": {
"psr/cache": "^1.0|^2.0|^3.0",
"symfony/console": "^4.4|^5.0",
"symfony/dependency-injection": "^4.4|^5.0",
"symfony/event-dispatcher": "^4.4|^5.0",
"symfony/http-kernel": "^4.4|^5.0",
"symfony/process": "^4.4|^5.0",
"symfony/property-access": "^4.4|^5.0",
"symfony/routing": "^4.4|^5.0",
"symfony/serializer": "^5.0",
"symfony/service-contracts": "^1.1|^2",
"symfony/stopwatch": "^4.4|^5.0",
"symfony/validator": "^4.4|^5.0"
},
"suggest": {
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
},
"type": "library",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Samuel Roze",
"email": "samuel.roze@gmail.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Helps applications send and receive messages to/from other applications or via message queues",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/messenger/tree/v5.3.10"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2021-10-28T19:22:18+00:00"
},
{
"name": "symfony/mime",
"version": "v5.3.8",
......@@ -3901,6 +4133,73 @@
],
"time": "2021-09-07T07:41:40+00:00"
},
{
"name": "symfony/redis-messenger",
"version": "v5.3.10",
"source": {
"type": "git",
"url": "https://github.com/symfony/redis-messenger.git",
"reference": "94ba9b20a7f2b28ec9e93823d7912ced0108b398"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/redis-messenger/zipball/94ba9b20a7f2b28ec9e93823d7912ced0108b398",
"reference": "94ba9b20a7f2b28ec9e93823d7912ced0108b398",
"shasum": ""
},
"require": {
"php": ">=7.2.5",
"symfony/deprecation-contracts": "^2.1",
"symfony/messenger": "^5.1"
},
"require-dev": {
"symfony/property-access": "^4.4|^5.0",
"symfony/serializer": "^4.4|^5.0"
},
"type": "symfony-bridge",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\Bridge\\Redis\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Symfony Redis extension Messenger Bridge",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/redis-messenger/tree/v5.3.10"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2021-10-25T14:58:02+00:00"
},
{
"name": "symfony/routing",
"version": "v5.3.7",
......
......@@ -13,8 +13,9 @@ dbp_relay_core:
docs_title: 'Relay API Gateway'
# The description text of the API docs page (supports markdown)
docs_description: '*part of the [Digital Blueprint](https://gitlab.tugraz.at/dbp) project*'
messenger_transport_dsn: '' # Deprecated (Since dbp/relay-core-bundle 0.1.20: Use "queue_dsn" instead.)
# See https://symfony.com/doc/5.3/messenger.html#redis-transport
messenger_transport_dsn: '' # Example: 'redis://localhost:6379/messages'
queue_dsn: '' # Example: 'redis://localhost:6379'
# https://symfony.com/doc/5.3/components/lock.html
lock_dsn: '' # Example: 'redis://redis:6379'
```
......@@ -35,27 +36,3 @@ LOCK_DSN=redis://redis:6379/
# Semaphore (local locking)
LOCK_DSN=semaphore
```
## Symfony Messenger
For projects that also use the [Symfony Messenger](https://symfony.com/doc/current/components/messenger.html)
you need to set above `messenger_transport_dsn` config, for example as `messenger_transport_dsn: '%env(MESSENGER_TRANSPORT_DSN)%'`
with an environment variable `MESSENGER_TRANSPORT_DSN` in your `.env` file or by any other means.
[Redis](https://redis.io/) is also a way for doing this.
Example:
```dotenv
MESSENGER_TRANSPORT_DSN=redis://redis:6379/local-messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
```
You need to have a system in place to run the [Symfony Messenger](https://symfony.com/doc/current/components/messenger.html).
Symfony recommends to use [Supervisor](http://supervisord.org/) to do this. You can use
[Supervisor configuration](https://symfony.com/doc/current/messenger.html#supervisor-configuration) to help you with the setup process.
Keep in mind that you need to **restart** the Symfony Messenger **workers** when you **deploy** Relay API **updates**
to your server, so changes to the messaging system can be picked up.
If you are using Supervisor to run the Symfony Messenger you can just stop the workers with
`php bin/console messenger:stop-workers`, Supervisor will start them again.
# Queued Tasks
The Relay API gateway optionally requires a queuing system, which means tasks
get queued in a central data store and worked on after a request has finished.
The tasks can be processes using one or more workers on multiple machines in
parallel.
This requires two extra deployment related tasks:
1) One or more worker tasks have to be run in the background and automatically
restarted if they stop.
2) On deployment the worker processes have to be restarted to use the new code.
## Configuration
In the bundle configuration set the `queue_dsn` key to a DSN supported by the
[Symfony messenger component](https://symfony.com/doc/current/messenger.html)
At the moment we only support the redis transport.
Example:
```yaml
queue_dsn: 'redis://localhost:6379'
```
## Run the workers
Start a worker using
```bash
./bin/console dbp:relay:queue:work my-worker-01
```
It will automatically exit after a specific amount 0f time or after a specific
number of processed tasks.
Note:
* You need to take care of restarting it automatically.
* Each active worker needs to have a unique name passed as the first argument
## Restart the workers
After deployment run
```bash
./bin/console dbp:relay:queue:restart
```
This will signal the workers to exit after the current task, which means they
will be restarted by supervisor and will run the newly deployed code.
Symfony recommends to use [Supervisor](http://supervisord.org/) to do this. You can use
[Supervisor configuration](https://symfony.com/doc/current/messenger.html#supervisor-configuration) to help you with the setup process.
\ No newline at end of file
......@@ -31,9 +31,13 @@ class Configuration implements ConfigurationInterface
->defaultValue('*part of the [Digital Blueprint](https://gitlab.tugraz.at/dbp) project*')
->end()
->scalarNode('messenger_transport_dsn')
->defaultValue('')
->setDeprecated('dbp/relay-core-bundle', '0.1.20', 'Use "queue_dsn" instead.')
->end()
->scalarNode('queue_dsn')
->info('See https://symfony.com/doc/5.3/messenger.html#redis-transport')
->defaultValue('')
->example('redis://localhost:6379/messages')
->example('redis://redis:6379')
->end()
->scalarNode('lock_dsn')
->info('https://symfony.com/doc/5.3/components/lock.html')
......
......@@ -164,7 +164,11 @@ class DbpRelayCoreExtension extends ConfigurableExtension implements PrependExte
]);
// https://symfony.com/doc/4.4/messenger.html#transports-async-queued-messages
$messengerTransportDsn = $config['queue_dsn'];
if ($messengerTransportDsn === '') {
// backward compatibility
$messengerTransportDsn = $config['messenger_transport_dsn'];
}
if ($container->hasParameter('dbp_api.messenger_routing')) {
$routing = [];
$routing = array_merge($routing, $container->getParameter('dbp_api.messenger_routing'));
......
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class RestartCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'dbp:relay:queue:restart';
protected function execute(InputInterface $input, OutputInterface $output)
{
// Now run the real messenger:stop-workers command
$app = $this->getApplication();
assert($app !== null);
$command = $app->find('messenger:stop-workers');
$consumeInput = new ArrayInput([]);
return $command->run($consumeInput, $output);
}
}
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* Decorated because we want to set reasonable defaults while still allowing the user to set a full transport
* DSN if needed.
*/
class TransportFactoryDecorator implements TransportFactoryInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/**
* @var TransportFactoryInterface
*/
private $decorated;
/**
* @var string
*/
private $workerName;
public function __construct(TransportFactoryInterface $decorated)
{
$this->decorated = $decorated;
$this->workerName = 'worker';
}
public function setActiveWorkerName(string $name)
{
$this->workerName = $name;
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
// In case we build our main transport we make sure the defaults are set to our liking.
// The DSN content still wins, but ideally the user only sets the minimal DSN.
if ($options['transport_name'] === Utils::QUEUE_TRANSPORT_NAME) {
$this->logger->debug("Creating queue transport for worker: '$this->workerName'");
$redis = new RedisTransportFactory();
if ($redis->supports($dsn, $options)) {
// We set some nice namespaced default, so the user doesn't have to care about potential conflicts
$options['stream'] = 'dbp_relay_queue_stream';
$options['group'] = 'dbp_relay_queue_group';
$options['consumer'] = $this->workerName;
// Use the new recommended default:
// https://github.com/symfony/symfony/pull/42163
$options['delete_after_ack'] = true;
} else {
throw new \Exception('Only redis currently supported as a messenger transport (current DSN: '.$dsn.')');
}
}
return $this->decorated->createTransport($dsn, $options, $serializer);
}
public function supports(string $dsn, array $options): bool
{
return $this->decorated->supports($dsn, $options);
}
}
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
class Utils
{
public const QUEUE_TRANSPORT_NAME = 'async';
// These just should be good defaults, feel free to adjust
public const DEFAULT_TIME_LIMIT = 3600;
public const DEFAULT_TASK_LIMIT = 10;
}
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Queue;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
/**
* This is a simpler variant of "messenger:consume". It works with a fixed transport and by requiring a unique
* worker name works the same across redis and DB transports.
*
* It also sets some nice defaults for everything.
*/
class WorkCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'dbp:relay:queue:work';
/**
* @var TransportFactoryDecorator
*/
private $transportFactory;
public function __construct(TransportFactoryDecorator $transportFactory)
{
parent::__construct();
$this->transportFactory = $transportFactory;
}
protected function configure(): void
{
$this->addArgument('worker-name', InputArgument::REQUIRED, 'A unique and stable worker name');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
// The question is why can't we use a random worker name based on the hostname or pid etc.
// https://github.com/symfony/symfony-docs/pull/11869/files explains why consumer names
// should be unique, reused, and ideally stable, when using redis. By requiring this for all transports
// we keep the config/docs simple.
$workerName = $input->getArgument('worker-name');
$this->transportFactory->setActiveWorkerName($workerName);
// Now run the real messenger:consume command
$app = $this->getApplication();
assert($app !== null);
$command = $app->find('messenger:consume');
// use some good default limits, since lots of php code leaks the workers have to be restarted from time to time
$consumeInput = new ArrayInput([
'--time-limit' => (string) Utils::DEFAULT_TIME_LIMIT,
'--limit' => (string) Utils::DEFAULT_TASK_LIMIT,
'receivers' => [Utils::QUEUE_TRANSPORT_NAME],
]);
return $command->run($consumeInput, $output);
}
}
......@@ -18,3 +18,16 @@ services:
decorates: 'api_platform.openapi.factory'
autowire: true
autoconfigure: false
Dbp\Relay\CoreBundle\Queue\TransportFactoryDecorator:
decorates: 'messenger.transport_factory'
autowire: true
autoconfigure: true
Dbp\Relay\CoreBundle\Queue\WorkCommand:
autowire: true
autoconfigure: true
Dbp\Relay\CoreBundle\Queue\RestartCommand:
autowire: true
autoconfigure: true
\ No newline at end of file
<?php
declare(strict_types=1);
namespace Dbp\Relay\CoreBundle\Tests\Queue;
use Symfony\Bundle\FrameworkBundle\Console\Application;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Console\Tester\CommandTester;
class RestartCommandTest extends KernelTestCase
{
public function testExecute()
{
$kernel = static::createKernel();
$application = new Application($kernel);
$command = $application->find('dbp:relay:queue:restart');
$commandTester = new CommandTester($command);
$res = $commandTester->execute([]);
$this->assertSame(0, $res);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment