Skip to content

Commit b0f9101

Browse files
committed
Allow reading headers from Kafka Message headers
A simpler part of #843, adds just the consumption part. This makes headers accessible in the consumer in case another application makes use of those, be it PHP or not.
1 parent 202de9c commit b0f9101

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

pkg/rdkafka/RdKafkaConsumer.php

+6
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage
169169
$message->setPartition($kafkaMessage->partition);
170170
$message->setKafkaMessage($kafkaMessage);
171171

172+
// Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's.
173+
// Note: Requires phprdkafka >= 3.1.0
174+
if (isset($kafkaMessage->headers)) {
175+
$message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers));
176+
}
177+
172178
return $message;
173179
default:
174180
throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);

0 commit comments

Comments
 (0)