Skip to content

Commit b7d5f9e

Browse files
authored
Merge pull request #1102 from nick-zh/add-non-blocking-poll-after-produce
[rdkafka] add non-blocking poll call to serve cb's
2 parents ef76e88 + 3096078 commit b7d5f9e

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

pkg/rdkafka/RdKafkaProducer.php

+2
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,14 @@ public function send(Destination $destination, Message $message): void
5656
);
5757
} else {
5858
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
59+
$this->producer->poll(0);
5960

6061
return;
6162
}
6263
}
6364

6465
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
66+
$this->producer->poll(0);
6567
}
6668

6769
/**

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+20
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube()
6969
->with('theQueueName')
7070
->willReturn($kafkaTopic)
7171
;
72+
$kafkaProducer
73+
->expects($this->once())
74+
->method('poll')
75+
->with(0)
76+
;
7277

7378
$serializer = $this->createSerializerMock();
7479
$serializer
@@ -99,6 +104,11 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic()
99104
->with('theQueueName', null)
100105
->willReturn($kafkaTopic)
101106
;
107+
$kafkaProducer
108+
->expects($this->once())
109+
->method('poll')
110+
->with(0)
111+
;
102112

103113
$serializer = $this->createSerializerMock();
104114
$serializer
@@ -135,6 +145,11 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic()
135145
->with('theQueueName', $this->identicalTo($conf))
136146
->willReturn($kafkaTopic)
137147
;
148+
$kafkaProducer
149+
->expects($this->once())
150+
->method('poll')
151+
->with(0)
152+
;
138153

139154
$serializer = $this->createSerializerMock();
140155
$serializer
@@ -189,6 +204,11 @@ public function testShouldAllowSerializersToSerializeKeys()
189204
->method('newTopic')
190205
->willReturn($kafkaTopic)
191206
;
207+
$kafkaProducer
208+
->expects($this->once())
209+
->method('poll')
210+
->with(0)
211+
;
192212

193213
$serializer = $this->createSerializerMock();
194214
$serializer

0 commit comments

Comments
 (0)