Skip to content

Commit 90163db

Browse files
committed
[RdKafka] Enable serializers to serialize message keys
Currently the `RdKafkaProducer` is serializing just the message, which I guess us fine for JSON serialization. But when you want to use an AvroSerializer that integrates with the Confluent Platform, it is possible to use complex types as keys as well, which in turn means that keys need to be serialized, too. This change leverages the mutability of the Message by shifting the `getKey` method call after the call to the Serializer.
1 parent 952f592 commit 90163db

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

pkg/rdkafka/RdKafkaProducer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public function send(PsrDestination $destination, PsrMessage $message)
4141
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
4242

4343
$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
44-
$key = $message->getKey() ?: $destination->getKey() ?: null;
4544
$payload = $this->serializer->toString($message);
45+
$key = $message->getKey() ?: $destination->getKey() ?: null;
4646

4747
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
4848
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+40
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,46 @@ public function testShouldAllowGetPreviouslySetSerializer()
9595
$this->assertSame($expectedSerializer, $producer->getSerializer());
9696
}
9797

98+
public function testShouldAllowSerializersToSerializeKeys()
99+
{
100+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
101+
$message->setKey('key');
102+
103+
$kafkaTopic = $this->createKafkaTopicMock();
104+
$kafkaTopic
105+
->expects($this->once())
106+
->method('produce')
107+
->with(
108+
RD_KAFKA_PARTITION_UA,
109+
0,
110+
'theSerializedMessage',
111+
'theSerializedKey'
112+
)
113+
;
114+
115+
$kafkaProducer = $this->createKafkaProducerMock();
116+
$kafkaProducer
117+
->expects($this->once())
118+
->method('newTopic')
119+
->willReturn($kafkaTopic)
120+
;
121+
122+
$serializer = $this->createSerializerMock();
123+
$serializer
124+
->expects($this->once())
125+
->method('toString')
126+
->willReturnCallback(function () use ($message) {
127+
$message->setKey('theSerializedKey');
128+
129+
return 'theSerializedMessage';
130+
});
131+
;
132+
133+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
134+
135+
$producer->send(new RdKafkaTopic('theQueueName'), $message);
136+
}
137+
98138
/**
99139
* @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic
100140
*/

0 commit comments

Comments
 (0)