diff --git a/pkg/dbal/DbalConnectionFactory.php b/pkg/dbal/DbalConnectionFactory.php index 55164213c..75b86929e 100644 --- a/pkg/dbal/DbalConnectionFactory.php +++ b/pkg/dbal/DbalConnectionFactory.php @@ -45,7 +45,12 @@ public function __construct($config = 'mysql:') throw new \LogicException('The config must be either an array of options, a DSN string or null'); } - $this->config = $config; + $this->config = array_replace_recursive([ + 'connection' => [], + 'table_name' => 'enqueue', + 'polling_interval' => 1000, + 'lazy' => true, + ], $config); } /** diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index f898e567f..6721b53b3 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -178,7 +178,7 @@ protected function receiveMessage() // remove message $affectedRows = $this->dbal->delete($this->context->getTableName(), ['id' => $dbalMessage['id']], [ - 'id' => Type::INTEGER, + 'id' => Type::BINARY, ]); if (1 !== $affectedRows) { diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 0570a4463..682d11c26 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -4,6 +4,7 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Table; +use Doctrine\DBAL\Types\Type; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; @@ -170,21 +171,19 @@ public function createDataBaseTable() return; } - if ($this->getDbalConnection()->getDatabasePlatform()->hasNativeGuidType()) { - throw new \LogicException('The platform does not support UUIDs natively'); - } - $table = new Table($this->getTableName()); - $table->addColumn('id', 'guid'); - $table->addColumn('published_at', 'bigint'); - $table->addColumn('body', 'text', ['notnull' => false]); - $table->addColumn('headers', 'text', ['notnull' => false]); - $table->addColumn('properties', 'text', ['notnull' => false]); - $table->addColumn('redelivered', 'boolean', ['notnull' => false]); - $table->addColumn('queue', 'string'); - $table->addColumn('priority', 'smallint'); - $table->addColumn('delayed_until', 'integer', ['notnull' => false]); - $table->addColumn('time_to_live', 'integer', ['notnull' => false]); + + $table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]); + $table->addColumn('human_id', Type::STRING, ['length' => 36]); + $table->addColumn('published_at', Type::BIGINT); + $table->addColumn('body', Type::TEXT, ['notnull' => false]); + $table->addColumn('headers', Type::TEXT, ['notnull' => false]); + $table->addColumn('properties', Type::TEXT, ['notnull' => false]); + $table->addColumn('redelivered', Type::BOOLEAN, ['notnull' => false]); + $table->addColumn('queue', Type::STRING); + $table->addColumn('priority', Type::SMALLINT); + $table->addColumn('delayed_until', Type::INTEGER, ['notnull' => false]); + $table->addColumn('time_to_live', Type::INTEGER, ['notnull' => false]); $table->setPrimaryKey(['id']); $table->addIndex(['published_at']); @@ -194,4 +193,16 @@ public function createDataBaseTable() $sm->createTable($table); } + + /** + * @param DbalDestination $queue + */ + public function purgeQueue(DbalDestination $queue) + { + $this->getDbalConnection()->delete( + $this->getTableName(), + ['queue' => $queue->getQueueName()], + ['queue' => Type::STRING] + ); + } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index c8f58bc20..89612775b 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -10,6 +10,9 @@ use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; use Interop\Queue\PsrProducer; +use Ramsey\Uuid\Codec\OrderedTimeCodec; +use Ramsey\Uuid\Uuid; +use Ramsey\Uuid\UuidFactory; class DbalProducer implements PsrProducer { @@ -33,12 +36,18 @@ class DbalProducer implements PsrProducer */ private $context; + /** + * @var OrderedTimeCodec + */ + private $uuidCodec; + /** * @param DbalContext $context */ public function __construct(DbalContext $context) { $this->context = $context; + $this->uuidCodec = new OrderedTimeCodec((new UuidFactory())->getUuidBuilder()); } /** @@ -74,15 +83,11 @@ public function send(PsrDestination $destination, PsrMessage $message) )); } - $sql = 'SELECT '.$this->context->getDbalConnection()->getDatabasePlatform()->getGuidExpression(); - $uuid = $this->context->getDbalConnection()->query($sql)->fetchColumn(0); - - if (empty($uuid)) { - throw new \LogicException('The generated uuid is empty'); - } + $uuid = Uuid::uuid1(); $dbalMessage = [ - 'id' => $uuid, + 'id' => $this->uuidCodec->encodeBinary($uuid), + 'human_id' => $uuid->toString(), 'published_at' => (int) microtime(true) * 10000, 'body' => $body, 'headers' => JSON::encode($message->getHeaders()), diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index 15197f7f4..a37e646a2 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -8,7 +8,8 @@ "require": { "php": ">=5.6", "queue-interop/queue-interop": "^0.6@dev", - "doctrine/dbal": "~2.5" + "doctrine/dbal": "~2.5", + "ramsey/uuid": "^3" }, "require-dev": { "phpunit/phpunit": "~5.4.0",