PHP连接kafka发布生产消息
0、安装rdkafka
1、启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
2、现在启动Kafka服务器
bin/kafka-server-start.sh config/server.properties
3、创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic have_a_try_test
4、生产者demo
<?php $conf = new RdKafka\Conf(); $conf->set('metadata.broker.list', 'localhost:9092'); $producer = new RdKafka\Producer($conf); $topic = $producer->newTopic("have_a_try_test"); while (true) { $topic->produce(0, 0, date(DATE_W3C)); //$topic->produce(1, 0, date(DATE_W3C)); //$topic->produce(2, 0, date(DATE_W3C)); sleep(2); }
5、消费者demo
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignemts (optional) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'myConsumerGroup_1'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', 'localhost'); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test' $consumer->subscribe(['have_a_try_test']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joinging the group after leaving it.)\n"; while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }