Configure event queues
Edit on GitHubThis document describes how event queues are configured.
To configure the event queue, follow these steps:
- Install
spryker/queueversion at least 1.*. andspryker/rabbit-mq - Create the configuration for RabbitMQ in
\Pyz\Client\RabbitMq\RabbitMqDependencyProvideras follows:
<?php
namespace Pyz\Client\RabbitMq;
use ArrayObject;
use Generated\Shared\Transfer\RabbitMqOptionTransfer;
use Spryker\Client\RabbitMq\Model\Connection\Connection;
use Spryker\Client\RabbitMq\RabbitMqDependencyProvider as RabbitMqRabbitMqDependencyProvider;
use Spryker\Shared\Event\EventConstants;
class RabbitMqDependencyProvider extends RabbitMqRabbitMqDependencyProvider
{
/**
* @return \ArrayObject
*/
protected function getQueueOptions()
{
$queueOptionCollection = new ArrayObject();
$queueOptionCollection->append($this->createEventExchangeQueueOption());
$queueOptionCollection->append($this->createEventErrorExchangeQueueOption());
return $queueOptionCollection;
}
/**
* @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
*/
protected function createEventExchangeQueueOption()
{
$rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
$rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
$rabbitMqOptionTransfer->setAutoDelete(false);
$rabbitMqOptionTransfer->setDurable(true);
$rabbitMqOptionTransfer->setPassive(false);
$rabbitMqOptionTransfer->setType('direct');
$rabbitMqOptionTransfer->setDeclarationType(Connection::RABBIT_MQ_EXCHANGE);
$rabbitMqOptionTransfer->setBindingQueue($this->createEventQueueBinding());
return $rabbitMqOptionTransfer;
}
/**
* @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
*/
protected function createEventErrorExchangeQueueOption()
{
$rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
$rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
$rabbitMqOptionTransfer->setAutoDelete(false);
$rabbitMqOptionTransfer->setDurable(true);
$rabbitMqOptionTransfer->setPassive(false);
$rabbitMqOptionTransfer->setType('direct');
$rabbitMqOptionTransfer->setDeclarationType(Connection::RABBIT_MQ_EXCHANGE);
$rabbitMqOptionTransfer->setBindingQueue($this->createEventErrorQueueBinding());
return $rabbitMqOptionTransfer;
}
/**
* @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
*/
protected function createEventErrorQueueBinding()
{
$rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
$rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE . '.error');
$rabbitMqOptionTransfer->setAutoDelete(false);
$rabbitMqOptionTransfer->setDurable(true);
$rabbitMqOptionTransfer->setExclusive(false);
$rabbitMqOptionTransfer->setPassive(false);
$rabbitMqOptionTransfer->setRoutingKey('error');
return $rabbitMqOptionTransfer;
}
/**
* @return \Generated\Shared\Transfer\RabbitMqOptionTransfer
*/
protected function createEventQueueBinding()
{
$rabbitMqOptionTransfer = new RabbitMqOptionTransfer();
$rabbitMqOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
$rabbitMqOptionTransfer->setAutoDelete(false);
$rabbitMqOptionTransfer->setDurable(true);
$rabbitMqOptionTransfer->setExclusive(false);
$rabbitMqOptionTransfer->setPassive(false);
return $rabbitMqOptionTransfer;
}
}
- In
\Pyz\Client\Queue\QueueDependencyProvider, add the RabbitMQ adapter:
<?php
namespace Pyz\Client\Queue;
use Spryker\Client\Kernel\Container;
use Spryker\Client\Queue\QueueDependencyProvider as BaseQueueDependencyProvider;
class QueueDependencyProvider extends BaseQueueDependencyProvider
{
/**
* @param \Spryker\Client\Kernel\Container $container
*
* @return \Spryker\Client\Queue\Model\Adapter\AdapterInterface[]
*/
protected function createQueueAdapters(Container $container)
{
return [
$container->getLocator()->rabbitMq()->client()->createQueueAdapter(),
];
}
}
- In
\Pyz\Zed\Queue\QueueConfig, add receiver options for the event queue:
<?php
namespace Pyz\Zed\Queue;
use Generated\Shared\Transfer\RabbitMqConsumerOptionTransfer;
use Spryker\Shared\Event\EventConstants;
use Spryker\Zed\Queue\QueueConfig as SprykerQueueConfig;
class QueueConfig extends SprykerQueueConfig
{
/**
* @return array
*/
protected function getQueueReceiverOptions()
{
return [
EventConstants::EVENT_QUEUE => [
'rabbitmq' => $this->getRabbitMqQueueConsumerOptions(),
],
];
}
/**
* @return \Generated\Shared\Transfer\RabbitMqConsumerOptionTransfer
*/
protected function getRabbitMqQueueConsumerOptions()
{
$queueOptionTransfer = new RabbitMqConsumerOptionTransfer();
$queueOptionTransfer->setQueueName(EventConstants::EVENT_QUEUE);
$queueOptionTransfer->setConsumerTag('');
$queueOptionTransfer->setNoLocal(false);
$queueOptionTransfer->setNoAck(false);
$queueOptionTransfer->setConsumerExclusive(false);
$queueOptionTransfer->setNoWait(false);
return $queueOptionTransfer;
}
}
- In
\Pyz\Zed\Queue\QueueDependencyProvider, add a plugin (consumer) to process messages or events.
Since spryker/event:^2.17.1, use EventQueueMessageProcessorPlugin instead of EventRetryQueueMessageProcessorPlugin for the event retry queue. Both plugins define how the retry queue behaves when a message fails:
EventQueueMessageProcessorPlugin— processes failed messages directly in the retry queue. They are handled independently and do not re-enter the main queue.EventRetryQueueMessageProcessorPlugin— routes failed messages back to the main queue (the queue without the.retrypostfix). Under high load, this can slow down the processing of new messages in the main queue.
<?php
namespace Pyz\Zed\Queue;
use Spryker\Shared\Event\EventConstants;
use Spryker\Zed\Event\Communication\Plugin\Queue\EventQueueMessageProcessorPlugin;
use Spryker\Zed\Kernel\Container;
use Spryker\Zed\Queue\QueueDependencyProvider as SprykerDependencyProvider;
use Spryker\Zed\RabbitMq\Communication\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin;
class QueueDependencyProvider extends SprykerDependencyProvider
{
/**
* @param \Spryker\Zed\Kernel\Container $container
*
* @return \Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface[]
*/
protected function getProcessorMessagePlugins(Container $container)
{
return [
EventConstants::EVENT_QUEUE => new EventQueueMessageProcessorPlugin(),
];
}
/**
* @return array<\Spryker\Zed\RabbitMq\Communication\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin>
*/
protected function getQueueMetricsExpanderPlugins(): array
{
return [
new RabbitMqQueueMetricsReaderPlugin(), // Provides RabbitMQ-specific metrics for the resource-aware queue worker. Applicable only for RabbitMqAdapter.
];
}
}
- In your application’s configuration file
./config/Shared/config_default.phpor environment specific, make sure you have this configuration:
<?php
$config[QueueConstants::QUEUE_SERVER_ID] = (gethostname()) ?: php_uname('n');
$config[QueueConstants::QUEUE_WORKER_INTERVAL_MILLISECONDS] = 1000; // default 1000
$config[QueueConstants::QUEUE_WORKER_MAX_THRESHOLD_SECONDS] = 60; // 1min
$config[QueueConstants::QUEUE_WORKER_WAIT_LIMIT_ENABLED] = true; // Stop worker on exceeded execution time; waits up to QUEUE_WORKER_MAX_WAITING_SECONDS (default: 30s) for subprocesses
$config[QueueConstants::QUEUE_ADAPTER_CONFIGURATION] = [
EventConstants::EVENT_QUEUE => [
QueueConfig::CONFIG_QUEUE_ADAPTER => \Spryker\Client\RabbitMq\Model\RabbitMqAdapter::class,
QueueConfig::CONFIG_MAX_WORKER_NUMBER => 1, //Increase number of workers if higher concurrency needed.
],
];
// Enables processing of queues with resource aware queue worker.
$config[QueueConstants::RESOURCE_AWARE_QUEUE_WORKER_ENABLED] = (bool)getenv('RESOURCE_AWARE_QUEUE_WORKER_ENABLED') ?? false;
$config[QueueConstants::QUEUE_WORKER_FREE_MEMORY_BUFFER] = (int)getenv('QUEUE_WORKER_FREE_MEMORY_BUFFER') ?: 750;
$config[QueueConstants::QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT] = (int)getenv('QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT') ?: 5;
$config[QueueConstants::QUEUE_WORKER_MAX_PROCESSES] = 10; // concurrent, for all queues/stores, default 5
$config[QueueConstants::QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT] = 600; // 10 min, default 5 min
Metrics and resource-aware worker configuration
Starting with 202512.0, Spryker ships a resource-aware queue worker that uses queue metrics and system resource monitoring to make intelligent decisions about process spawning. The configuration keys referenced above (RESOURCE_AWARE_QUEUE_WORKER_ENABLED, QUEUE_WORKER_FREE_MEMORY_BUFFER, QUEUE_WORKER_MEMORY_READ_PROCESS_TIMEOUT, QUEUE_WORKER_MAX_PROCESSES, QUEUE_WORKER_PROCESSES_COMPLETE_TIMEOUT) control this worker’s behavior.
RabbitMqQueueMetricsReaderPlugin
Register the RabbitMqQueueMetricsReaderPlugin in your QueueDependencyProvider to supply RabbitMQ queue metrics (message counts, batch sizes) to the resource-aware worker:
// src/Pyz/Zed/Queue/QueueDependencyProvider.php
use Spryker\Client\RabbitMq\Plugin\Queue\RabbitMqQueueMetricsReaderPlugin;
class QueueDependencyProvider extends SprykerQueueDependencyProvider
{
/**
* @return array<\Spryker\Client\QueueExtension\Dependency\Plugin\QueueMetricsReaderPluginInterface>
*/
protected function getQueueMetricsReaderPlugins(): array
{
return [
new RabbitMqQueueMetricsReaderPlugin(),
];
}
}
For the full configuration reference, architecture details, tuning recommendations, and backporting guidance, see Optimizing Jenkins execution with the resource-aware queue worker.
Thank you!
For submitting the form