Skip to content

Commit f7afb37

Browse files
authored
Merge pull request #575 from Steveb-p/Steveb-p-patch-1
[rdkafka] Backport changes to topic subscription
2 parents 2c56da7 + e46d316 commit f7afb37

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

pkg/rdkafka/RdKafkaConsumer.php

+9-5
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,15 @@ public function getQueue()
9999
public function receive($timeout = 0)
100100
{
101101
if (false == $this->subscribed) {
102-
$this->consumer->assign([new TopicPartition(
103-
$this->getQueue()->getQueueName(),
104-
$this->getQueue()->getPartition(),
105-
$this->offset
106-
)]);
102+
if (null === $this->offset) {
103+
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
104+
} else {
105+
$this->consumer->assign([new TopicPartition(
106+
$this->getQueue()->getQueueName(),
107+
$this->getQueue()->getPartition(),
108+
$this->offset
109+
)]);
110+
}
107111

108112
$this->subscribed = true;
109113
}

0 commit comments

Comments
 (0)