diff --git a/docs/monitoring.md b/docs/monitoring.md index 12fa85fd1..93ea8876e 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -9,19 +9,19 @@ Enqueue is an MIT-licensed open source project with its ongoing development made # Monitoring. -Enqueue provides a tool for monitoring message queues. +Enqueue provides a tool for monitoring message queues. With it, you can control how many messages were sent, how many processed successfuly or failed. -How many consumers are working, their up time, processed messages stats, memory usage and system load. +How many consumers are working, their up time, processed messages stats, memory usage and system load. The tool could be integrated with virtually any analytics and monitoring platform. -There are several integration: +There are several integration: * [Datadog StatsD](https://datadoghq.com) * [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/) - * [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/) + * [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/) We are working on a JS\WAMP based real-time UI tool, for more information please [contact us](opensource@forma-pro.com). ![Grafana Monitoring](images/grafana_monitoring.jpg) -[contact us](opensource@forma-pro.com) if need a Grafana template such as on the picture. +[contact us](opensource@forma-pro.com) if need a Grafana template such as on the picture. * [Installation](#installation) * [Track sent messages](#track-sent-messages) @@ -40,7 +40,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please composer req enqueue/monitoring:0.9.x-dev ``` -## Track sent messages +## Track sent messages ```php create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushSentMessageStats(new SentMessageStats( (int) (microtime(true) * 1000), // timestamp - 'queue_name', // queue + 'queue_name', // queue 'aMessageId', 'aCorrelationId', [], // headers @@ -76,7 +76,7 @@ $context->createProducer()->send($queue, $message); $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushSentMessageStats(new SentMessageStats( (int) (microtime(true) * 1000), - $queue->getQueueName(), + $queue->getQueueName(), $message->getMessageId(), $message->getCorrelationId(), $message->getHeaders()[], @@ -99,7 +99,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1 $statsStorage->pushConsumedMessageStats(new ConsumedMessageStats( 'consumerId', (int) (microtime(true) * 1000), // now - $receivedAt, + $receivedAt, 'aQueue', 'aMessageId', 'aCorrelationId', @@ -127,16 +127,16 @@ $consumer = $context->createConsumer($queue); $consumerId = uniqid('consumer-id', true); // we suggest using UUID here if ($message = $consumer->receiveNoWait()) { $receivedAt = (int) (microtime(true) * 1000); - + // heavy processing here. - + $consumer->acknowledge($message); - + $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushConsumedMessageStats(new ConsumedMessageStats( $consumerId, (int) (microtime(true) * 1000), // now - $receivedAt, + $receivedAt, $queue->getQueueName(), $message->getMessageId(), $message->getCorrelationId(), @@ -151,7 +151,7 @@ if ($message = $consumer->receiveNoWait()) { ## Track consumer metrics Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so. -This example shows how you can send such metrics. +This example shows how you can send such metrics. Call this code from time to time between processing messages. ```php @@ -165,13 +165,13 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1 $statsStorage->pushConsumerStats(new ConsumerStats( 'consumerId', (int) (microtime(true) * 1000), // now - $startedAt, + $startedAt, null, // finished at - true, // is started? + true, // is started? false, // is finished? false, // is failed ['foo'], // consume from queues - 123, // received messages + 123, // received messages 120, // acknowledged messages 1, // rejected messages 1, // requeued messages @@ -182,7 +182,7 @@ $statsStorage->pushConsumerStats(new ConsumerStats( ## Consumption extension -There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption). +There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption). It could collect consumed messages and consumer stats for you. ```php @@ -236,8 +236,16 @@ There are available options: * 'measurementSentMessages' => 'sent-messages', * 'measurementConsumedMessages' => 'consumed-messages', * 'measurementConsumers' => 'consumers', +* 'client' => null, +* 'retentionPolicy' => null, ``` +You can pass InfluxDB\Client instance in `client` option. Otherwise, it will be created on first use according to other +options. + +If your InfluxDB\Client uses driver that implements InfluxDB\Driver\QueryDriverInterface, then database will be +automatically created for you if it doesn't exist. Default InfluxDB\Client will also do that. + ## Datadog storage Install additional packages: @@ -256,7 +264,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1: For best experience please adjust units and types in metric summary. Example dashboard: - + ![Datadog monitoring](images/datadog_monitoring.png) @@ -311,7 +319,7 @@ There are available options: ## Symfony App -You have to register some services in order to incorporate monitoring facilities into your Symfony application. +You have to register some services in order to incorporate monitoring facilities into your Symfony application. ```yaml # config/packages/enqueue.yaml @@ -325,11 +333,11 @@ enqueue: transport: 'amqp://guest:guest@foo:5672/%2f' monitoring: 'wamp://127.0.0.1:9090?topic=stats' client: ~ - + datadog: transport: 'amqp://guest:guest@foo:5672/%2f' monitoring: 'datadog://127.0.0.1:8125?batched=false' client: ~ ``` -[back to index](index.md) +[back to index](index.md) diff --git a/pkg/monitoring/InfluxDbStorage.php b/pkg/monitoring/InfluxDbStorage.php index d1e8e7c00..401d79ddb 100644 --- a/pkg/monitoring/InfluxDbStorage.php +++ b/pkg/monitoring/InfluxDbStorage.php @@ -6,6 +6,8 @@ use Enqueue\Dsn\Dsn; use InfluxDB\Client; use InfluxDB\Database; +use InfluxDB\Driver\QueryDriverInterface; +use InfluxDB\Exception as InfluxDBException; use InfluxDB\Point; class InfluxDbStorage implements StatsStorage @@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage * 'measurementSentMessages' => 'sent-messages', * 'measurementConsumedMessages' => 'consumed-messages', * 'measurementConsumers' => 'consumers', + * 'client' => null, # Client instance. Null by default. + * 'retentionPolicy' => null, * ] * * or @@ -55,10 +59,17 @@ public function __construct($config = 'influxdb:') if (empty($config)) { $config = []; } elseif (is_string($config)) { - $config = $this->parseDsn($config); + $config = self::parseDsn($config); } elseif (is_array($config)) { - $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); + $config = empty($config['dsn']) ? $config : self::parseDsn($config['dsn']); } elseif ($config instanceof Client) { + // Passing Client instead of array config is deprecated because it prevents setting any configuration values + // and causes library to use defaults. + @trigger_error( + sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property or use createWithClient instead', + Client::class, + __METHOD__ + ), E_USER_DEPRECATED); $this->client = $config; $config = []; } else { @@ -74,11 +85,41 @@ public function __construct($config = 'influxdb:') 'measurementSentMessages' => 'sent-messages', 'measurementConsumedMessages' => 'consumed-messages', 'measurementConsumers' => 'consumers', + 'client' => null, + 'retentionPolicy' => null, ], $config); + if (null !== $config['client']) { + if (!$config['client'] instanceof Client) { + throw new \InvalidArgumentException(sprintf( + '%s configuration property is expected to be an instance of %s class. %s was passed instead.', + 'client', + Client::class, + gettype($config['client']) + )); + } + $this->client = $config['client']; + } + $this->config = $config; } + /** + * @param Client $client + * @param string $config + * + * @return InfluxDbStorage + */ + public static function createWithClient(Client $client, $config = 'influxdb:'): self + { + if (is_string($config)) { + $config = self::parseDsn($config); + } + $config['client'] = $client; + + return new static($config); + } + public function pushConsumerStats(ConsumerStats $stats): void { $points = []; @@ -109,7 +150,7 @@ public function pushConsumerStats(ConsumerStats $stats): void $points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs()); } - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } public function pushConsumedMessageStats(ConsumedMessageStats $stats): void @@ -135,7 +176,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()), ]; - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } public function pushSentMessageStats(SentMessageStats $stats): void @@ -158,29 +199,47 @@ public function pushSentMessageStats(SentMessageStats $stats): void new Point($this->config['measurementSentMessages'], 1, $tags, [], $stats->getTimestampMs()), ]; - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } - private function getDb(): Database + private function doWrite(array $points): void { - if (null === $this->database) { - if (null === $this->client) { - $this->client = new Client( - $this->config['host'], - $this->config['port'], - $this->config['user'], - $this->config['password'] - ); + if (null === $this->client) { + $this->client = new Client( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['password'] + ); + } + + if ($this->client->getDriver() instanceof QueryDriverInterface) { + if (null === $this->database) { + $this->database = $this->client->selectDB($this->config['db']); + $this->database->create(); } - $this->database = $this->client->selectDB($this->config['db']); - $this->database->create(); + $this->database->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']); + } else { + // Code below mirrors what `writePoints` method of Database does. + try { + $parameters = [ + 'url' => sprintf('write?db=%s&precision=%s', $this->config['db'], Database::PRECISION_MILLISECONDS), + 'database' => $this->config['db'], + 'method' => 'post', + ]; + if (null !== $this->config['retentionPolicy']) { + $parameters['url'] .= sprintf('&rp=%s', $this->config['retentionPolicy']); + } + + $this->client->write($parameters, $points); + } catch (\Exception $e) { + throw new InfluxDBException($e->getMessage(), $e->getCode()); + } } - - return $this->database; } - private function parseDsn(string $dsn): array + private static function parseDsn(string $dsn): array { $dsn = Dsn::parseFirst($dsn); @@ -200,6 +259,7 @@ private function parseDsn(string $dsn): array 'measurementSentMessages' => $dsn->getString('measurementSentMessages'), 'measurementConsumedMessages' => $dsn->getString('measurementConsumedMessages'), 'measurementConsumers' => $dsn->getString('measurementConsumers'), + 'retentionPolicy' => $dsn->getString('retentionPolicy'), ]), function ($value) { return null !== $value; }); } }