PHP连接kafka发布生产消息
2018-11-28 23:20:14 小德 访问次数 976

0、安装rdkafka

1、启动zookeeper

 bin/zookeeper-server-start.sh config/zookeeper.properties &

2、现在启动Kafka服务器

bin/kafka-server-start.sh config/server.properties

3、创建一个topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic have_a_try_test

4、生产者demo

<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("have_a_try_test");
while (true) {
    $topic->produce(0, 0, date(DATE_W3C));
    //$topic->produce(1, 0, date(DATE_W3C));
    //$topic->produce(2, 0, date(DATE_W3C));
    sleep(2);
}


5、消费者demo

<?php
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignemts (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;
        default:
            throw new \Exception($err);
    }
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup_1');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'localhost');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['have_a_try_test']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joinging the group after leaving it.)\n";
while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}