Skip to content

Commit b11fd8a

Browse files
authored
Merge pull request #955 from TiMESPLiNTER/feat/kafka-header-support
Add header support for kafka
2 parents 599ed87 + 2ae4655 commit b11fd8a

File tree

3 files changed

+41
-7
lines changed

3 files changed

+41
-7
lines changed

pkg/rdkafka/RdKafkaContext.php

+12
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,18 @@ public function purgeQueue(Queue $queue): void
151151
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
152152
}
153153

154+
public static function getLibrdKafkaVersion(): string
155+
{
156+
if (!defined('RD_KAFKA_VERSION')) {
157+
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
158+
}
159+
$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
160+
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
161+
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;
162+
163+
return "$major.$minor.$patch";
164+
}
165+
154166
private function getProducer(): VendorProducer
155167
{
156168
if (null === $this->producer) {

pkg/rdkafka/RdKafkaProducer.php

+19
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ public function send(Destination $destination, Message $message): void
4242
$key = $message->getKey() ?: $destination->getKey() ?: null;
4343

4444
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
45+
46+
// Note: Topic::producev method exists in phprdkafka > 3.1.0
47+
// Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
48+
if (method_exists($topic, 'producev')) {
49+
// Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault
50+
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
51+
&& version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
52+
trigger_error(
53+
'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '.
54+
'Falling back to `produce` (without message headers) instead.',
55+
E_USER_WARNING
56+
);
57+
} else {
58+
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
59+
60+
return;
61+
}
62+
}
63+
4564
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
4665
}
4766

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+10-7
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,20 @@ public function testThrowIfMessageInvalid()
4545

4646
public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube()
4747
{
48-
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
48+
$messageHeaders = ['bar' => 'barVal'];
49+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
4950
$message->setKey('key');
5051

5152
$kafkaTopic = $this->createKafkaTopicMock();
5253
$kafkaTopic
5354
->expects($this->once())
54-
->method('produce')
55+
->method('producev')
5556
->with(
5657
RD_KAFKA_PARTITION_UA,
5758
0,
5859
'theSerializedMessage',
59-
'key'
60+
'key',
61+
$messageHeaders
6062
)
6163
;
6264

@@ -87,7 +89,7 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic()
8789
$kafkaTopic = $this->createKafkaTopicMock();
8890
$kafkaTopic
8991
->expects($this->once())
90-
->method('produce')
92+
->method('producev')
9193
;
9294

9395
$kafkaProducer = $this->createKafkaProducerMock();
@@ -123,7 +125,7 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic()
123125
$kafkaTopic = $this->createKafkaTopicMock();
124126
$kafkaTopic
125127
->expects($this->once())
126-
->method('produce')
128+
->method('producev')
127129
;
128130

129131
$kafkaProducer = $this->createKafkaProducerMock();
@@ -165,13 +167,14 @@ public function testShouldAllowGetPreviouslySetSerializer()
165167

166168
public function testShouldAllowSerializersToSerializeKeys()
167169
{
168-
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
170+
$messageHeaders = ['bar' => 'barVal'];
171+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
169172
$message->setKey('key');
170173

171174
$kafkaTopic = $this->createKafkaTopicMock();
172175
$kafkaTopic
173176
->expects($this->once())
174-
->method('produce')
177+
->method('producev')
175178
->with(
176179
RD_KAFKA_PARTITION_UA,
177180
0,

0 commit comments

Comments
 (0)