diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 8464866c3..2b987bb96 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -56,12 +56,14 @@ public function send(Destination $destination, Message $message): void ); } else { $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + $this->producer->poll(0); return; } } $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + $this->producer->poll(0); } /** diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index 7cc601aef..467f1a43e 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -69,6 +69,11 @@ public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube() ->with('theQueueName') ->willReturn($kafkaTopic) ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; $serializer = $this->createSerializerMock(); $serializer @@ -99,6 +104,11 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic() ->with('theQueueName', null) ->willReturn($kafkaTopic) ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; $serializer = $this->createSerializerMock(); $serializer @@ -135,6 +145,11 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic() ->with('theQueueName', $this->identicalTo($conf)) ->willReturn($kafkaTopic) ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; $serializer = $this->createSerializerMock(); $serializer @@ -189,6 +204,11 @@ public function testShouldAllowSerializersToSerializeKeys() ->method('newTopic') ->willReturn($kafkaTopic) ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; $serializer = $this->createSerializerMock(); $serializer