From a46c6067a2135c359e2f185eb15e664fbdad04b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Thu, 28 Mar 2019 23:09:05 +0100 Subject: [PATCH 1/5] Allow passing Client as configuration option. Prevent calling database creation if Client driver does not support QueryDriverInterface --- pkg/monitoring/InfluxDbStorage.php | 73 +++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/pkg/monitoring/InfluxDbStorage.php b/pkg/monitoring/InfluxDbStorage.php index d1e8e7c00..011837c38 100644 --- a/pkg/monitoring/InfluxDbStorage.php +++ b/pkg/monitoring/InfluxDbStorage.php @@ -6,6 +6,8 @@ use Enqueue\Dsn\Dsn; use InfluxDB\Client; use InfluxDB\Database; +use InfluxDB\Driver\QueryDriverInterface; +use InfluxDB\Exception as InfluxDBException; use InfluxDB\Point; class InfluxDbStorage implements StatsStorage @@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage * 'measurementSentMessages' => 'sent-messages', * 'measurementConsumedMessages' => 'consumed-messages', * 'measurementConsumers' => 'consumers', + * 'client' => null, # Client instance. Null by default. + * 'retentionPolicy' => null, * ] * * or @@ -59,6 +63,13 @@ public function __construct($config = 'influxdb:') } elseif (is_array($config)) { $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); } elseif ($config instanceof Client) { + // Passing Client instead of array config is deprecated because it prevents setting any configuration values + // and causes library to use defaults. + @trigger_error( + sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property instead', + Client::class, + __METHOD__ + ), E_USER_DEPRECATED); $this->client = $config; $config = []; } else { @@ -74,8 +85,22 @@ public function __construct($config = 'influxdb:') 'measurementSentMessages' => 'sent-messages', 'measurementConsumedMessages' => 'consumed-messages', 'measurementConsumers' => 'consumers', + 'client' => null, + 'retentionPolicy' => null, ], $config); + if (null !== $config['client']) { + if (!$config['client'] instanceof Client) { + throw new \InvalidArgumentException(sprintf( + '%s configuration property is expected to be an instance of %s class. %s was passed instead.', + 'client', + Client::class, + gettype($config['client']) + )); + } + $this->client = $config['client']; + } + $this->config = $config; } @@ -109,7 +134,7 @@ public function pushConsumerStats(ConsumerStats $stats): void $points[] = new Point($this->config['measurementConsumers'], null, $tags, $values, $stats->getTimestampMs()); } - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } public function pushConsumedMessageStats(ConsumedMessageStats $stats): void @@ -135,7 +160,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void new Point($this->config['measurementConsumedMessages'], $runtime, $tags, $values, $stats->getTimestampMs()), ]; - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } public function pushSentMessageStats(SentMessageStats $stats): void @@ -158,21 +183,44 @@ public function pushSentMessageStats(SentMessageStats $stats): void new Point($this->config['measurementSentMessages'], 1, $tags, [], $stats->getTimestampMs()), ]; - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS); + $this->doWrite($points); } - private function getDb(): Database + private function doWrite(array $points): void { - if (null === $this->database) { - if (null === $this->client) { - $this->client = new Client( - $this->config['host'], - $this->config['port'], - $this->config['user'], - $this->config['password'] - ); + if (!$this->client || $this->client->getDriver() instanceof QueryDriverInterface) { + $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']); + } else { + // Code below mirrors what `writePoints` method of Database does. + try { + $parameters = [ + 'url' => sprintf('write?db=%s&precision=%s', $this->config['db'], Database::PRECISION_MILLISECONDS), + 'database' => $this->config['db'], + 'method' => 'post', + ]; + if (null !== $this->config['retentionPolicy']) { + $parameters['url'] .= sprintf('&rp=%s', $this->config['retentionPolicy']); + } + + $this->client->write($parameters, $points); + } catch (\Exception $e) { + throw new InfluxDBException($e->getMessage(), $e->getCode()); } + } + } + + private function getDb(): Database + { + if (null === $this->client) { + $this->client = new Client( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['password'] + ); + } + if (null === $this->database) { $this->database = $this->client->selectDB($this->config['db']); $this->database->create(); } @@ -200,6 +248,7 @@ private function parseDsn(string $dsn): array 'measurementSentMessages' => $dsn->getString('measurementSentMessages'), 'measurementConsumedMessages' => $dsn->getString('measurementConsumedMessages'), 'measurementConsumers' => $dsn->getString('measurementConsumers'), + 'retentionPolicy' => $dsn->getString('retentionPolicy'), ]), function ($value) { return null !== $value; }); } } From ac396d7fd3bc2b01f687dc0415c7c1fe449ee927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 29 Mar 2019 17:49:30 +0100 Subject: [PATCH 2/5] Allow passing Client as configuration option. Prevent calling database creation if Client driver does not support QueryDriverInterface --- pkg/monitoring/InfluxDbStorage.php | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/monitoring/InfluxDbStorage.php b/pkg/monitoring/InfluxDbStorage.php index 011837c38..ae105fec3 100644 --- a/pkg/monitoring/InfluxDbStorage.php +++ b/pkg/monitoring/InfluxDbStorage.php @@ -63,13 +63,6 @@ public function __construct($config = 'influxdb:') } elseif (is_array($config)) { $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); } elseif ($config instanceof Client) { - // Passing Client instead of array config is deprecated because it prevents setting any configuration values - // and causes library to use defaults. - @trigger_error( - sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property instead', - Client::class, - __METHOD__ - ), E_USER_DEPRECATED); $this->client = $config; $config = []; } else { From a1a958590f9b50c75407cf7685993beb449b79c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 29 Mar 2019 20:19:34 +0100 Subject: [PATCH 3/5] Allow passing Client as configuration option. Prevent calling database creation if Client driver does not support QueryDriverInterface --- pkg/monitoring/InfluxDbStorage.php | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/pkg/monitoring/InfluxDbStorage.php b/pkg/monitoring/InfluxDbStorage.php index ae105fec3..e63341943 100644 --- a/pkg/monitoring/InfluxDbStorage.php +++ b/pkg/monitoring/InfluxDbStorage.php @@ -59,10 +59,17 @@ public function __construct($config = 'influxdb:') if (empty($config)) { $config = []; } elseif (is_string($config)) { - $config = $this->parseDsn($config); + $config = self::parseDsn($config); } elseif (is_array($config)) { - $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); + $config = empty($config['dsn']) ? $config : self::parseDsn($config['dsn']); } elseif ($config instanceof Client) { + // Passing Client instead of array config is deprecated because it prevents setting any configuration values + // and causes library to use defaults. + @trigger_error( + sprintf('Passing %s as %s argument is deprecated. Pass it as "client" array property or use createWithClient instead', + Client::class, + __METHOD__ + ), E_USER_DEPRECATED); $this->client = $config; $config = []; } else { @@ -97,6 +104,22 @@ public function __construct($config = 'influxdb:') $this->config = $config; } + /** + * @param Client $client + * @param string $config + * + * @return InfluxDbStorage + */ + public static function createWithClient(Client $client, $config = 'influxdb:'): self + { + if (is_string($config)) { + $config = self::parseDsn($config); + } + $config['client'] = $client; + + return new static($config); + } + public function pushConsumerStats(ConsumerStats $stats): void { $points = []; @@ -221,7 +244,7 @@ private function getDb(): Database return $this->database; } - private function parseDsn(string $dsn): array + private static function parseDsn(string $dsn): array { $dsn = Dsn::parseFirst($dsn); From 81845abd0172739d305088450db898a44dd73e53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 29 Mar 2019 20:37:07 +0100 Subject: [PATCH 4/5] Allow passing Client as configuration option. Prevent calling database creation if Client driver does not support QueryDriverInterface --- pkg/monitoring/InfluxDbStorage.php | 37 +++++++++++++----------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/pkg/monitoring/InfluxDbStorage.php b/pkg/monitoring/InfluxDbStorage.php index e63341943..401d79ddb 100644 --- a/pkg/monitoring/InfluxDbStorage.php +++ b/pkg/monitoring/InfluxDbStorage.php @@ -204,8 +204,22 @@ public function pushSentMessageStats(SentMessageStats $stats): void private function doWrite(array $points): void { - if (!$this->client || $this->client->getDriver() instanceof QueryDriverInterface) { - $this->getDb()->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']); + if (null === $this->client) { + $this->client = new Client( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['password'] + ); + } + + if ($this->client->getDriver() instanceof QueryDriverInterface) { + if (null === $this->database) { + $this->database = $this->client->selectDB($this->config['db']); + $this->database->create(); + } + + $this->database->writePoints($points, Database::PRECISION_MILLISECONDS, $this->config['retentionPolicy']); } else { // Code below mirrors what `writePoints` method of Database does. try { @@ -225,25 +239,6 @@ private function doWrite(array $points): void } } - private function getDb(): Database - { - if (null === $this->client) { - $this->client = new Client( - $this->config['host'], - $this->config['port'], - $this->config['user'], - $this->config['password'] - ); - } - - if (null === $this->database) { - $this->database = $this->client->selectDB($this->config['db']); - $this->database->create(); - } - - return $this->database; - } - private static function parseDsn(string $dsn): array { $dsn = Dsn::parseFirst($dsn); From 68c47fd410ca638b54193f606c57aa4f960b56d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 29 Mar 2019 20:55:34 +0100 Subject: [PATCH 5/5] Allow passing Client as configuration option. Prevent calling database creation if Client driver does not support QueryDriverInterface --- docs/monitoring.md | 52 ++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 12fa85fd1..93ea8876e 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -9,19 +9,19 @@ Enqueue is an MIT-licensed open source project with its ongoing development made # Monitoring. -Enqueue provides a tool for monitoring message queues. +Enqueue provides a tool for monitoring message queues. With it, you can control how many messages were sent, how many processed successfuly or failed. -How many consumers are working, their up time, processed messages stats, memory usage and system load. +How many consumers are working, their up time, processed messages stats, memory usage and system load. The tool could be integrated with virtually any analytics and monitoring platform. -There are several integration: +There are several integration: * [Datadog StatsD](https://datadoghq.com) * [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/) - * [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/) + * [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/) We are working on a JS\WAMP based real-time UI tool, for more information please [contact us](opensource@forma-pro.com). ![Grafana Monitoring](images/grafana_monitoring.jpg) -[contact us](opensource@forma-pro.com) if need a Grafana template such as on the picture. +[contact us](opensource@forma-pro.com) if need a Grafana template such as on the picture. * [Installation](#installation) * [Track sent messages](#track-sent-messages) @@ -40,7 +40,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please composer req enqueue/monitoring:0.9.x-dev ``` -## Track sent messages +## Track sent messages ```php create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushSentMessageStats(new SentMessageStats( (int) (microtime(true) * 1000), // timestamp - 'queue_name', // queue + 'queue_name', // queue 'aMessageId', 'aCorrelationId', [], // headers @@ -76,7 +76,7 @@ $context->createProducer()->send($queue, $message); $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushSentMessageStats(new SentMessageStats( (int) (microtime(true) * 1000), - $queue->getQueueName(), + $queue->getQueueName(), $message->getMessageId(), $message->getCorrelationId(), $message->getHeaders()[], @@ -99,7 +99,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1 $statsStorage->pushConsumedMessageStats(new ConsumedMessageStats( 'consumerId', (int) (microtime(true) * 1000), // now - $receivedAt, + $receivedAt, 'aQueue', 'aMessageId', 'aCorrelationId', @@ -127,16 +127,16 @@ $consumer = $context->createConsumer($queue); $consumerId = uniqid('consumer-id', true); // we suggest using UUID here if ($message = $consumer->receiveNoWait()) { $receivedAt = (int) (microtime(true) * 1000); - + // heavy processing here. - + $consumer->acknowledge($message); - + $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo'); $statsStorage->pushConsumedMessageStats(new ConsumedMessageStats( $consumerId, (int) (microtime(true) * 1000), // now - $receivedAt, + $receivedAt, $queue->getQueueName(), $message->getMessageId(), $message->getCorrelationId(), @@ -151,7 +151,7 @@ if ($message = $consumer->receiveNoWait()) { ## Track consumer metrics Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so. -This example shows how you can send such metrics. +This example shows how you can send such metrics. Call this code from time to time between processing messages. ```php @@ -165,13 +165,13 @@ $statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1 $statsStorage->pushConsumerStats(new ConsumerStats( 'consumerId', (int) (microtime(true) * 1000), // now - $startedAt, + $startedAt, null, // finished at - true, // is started? + true, // is started? false, // is finished? false, // is failed ['foo'], // consume from queues - 123, // received messages + 123, // received messages 120, // acknowledged messages 1, // rejected messages 1, // requeued messages @@ -182,7 +182,7 @@ $statsStorage->pushConsumerStats(new ConsumerStats( ## Consumption extension -There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption). +There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption). It could collect consumed messages and consumer stats for you. ```php @@ -236,8 +236,16 @@ There are available options: * 'measurementSentMessages' => 'sent-messages', * 'measurementConsumedMessages' => 'consumed-messages', * 'measurementConsumers' => 'consumers', +* 'client' => null, +* 'retentionPolicy' => null, ``` +You can pass InfluxDB\Client instance in `client` option. Otherwise, it will be created on first use according to other +options. + +If your InfluxDB\Client uses driver that implements InfluxDB\Driver\QueryDriverInterface, then database will be +automatically created for you if it doesn't exist. Default InfluxDB\Client will also do that. + ## Datadog storage Install additional packages: @@ -256,7 +264,7 @@ $statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1: For best experience please adjust units and types in metric summary. Example dashboard: - + ![Datadog monitoring](images/datadog_monitoring.png) @@ -311,7 +319,7 @@ There are available options: ## Symfony App -You have to register some services in order to incorporate monitoring facilities into your Symfony application. +You have to register some services in order to incorporate monitoring facilities into your Symfony application. ```yaml # config/packages/enqueue.yaml @@ -325,11 +333,11 @@ enqueue: transport: 'amqp://guest:guest@foo:5672/%2f' monitoring: 'wamp://127.0.0.1:9090?topic=stats' client: ~ - + datadog: transport: 'amqp://guest:guest@foo:5672/%2f' monitoring: 'datadog://127.0.0.1:8125?batched=false' client: ~ ``` -[back to index](index.md) +[back to index](index.md)