【云计算】大数据学习之kafka集群安装
小标 2018-12-05 来源 : 阅读 1109 评论 0

摘要:本文主要向大家介绍了【云计算】大数据学习之kafka集群安装,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】大数据学习之kafka集群安装,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


解压 Kafka?安装包


修改配置文件 config/server.properties


vi  server.properties
broker.id=0 //为依次增长的:0、1、2、3、4,集群中唯一id
log.dirs=/kafkaData/logs // Kafka 的消息数据存储路径zookeeper.connect=master:2181,slave1:2181,slave2:2181   //zookeeperServers   列表,各节点以逗号分开

Vi  zookeeper.properties
dataDir=/root/zkdata #指向你安装的zk 的数据存储目录

#  将 Kafka server.properties zookeeper.properties    文件拷贝到其他节点机器
KAFKA_HOME/config>scp server.properties zookeeper.properties xx:$PWD


在每台节点上启动:


bin/kafka-server-start.sh???? config/server.properties? &


如果我们希望启动在后台,并且不把一堆日志展现在页面就可以这么启动:



bin/kafka-server-start.sh     config/server.properties  >> /var/kafka.log 2>&1 &


接下来我们查看一下有哪些topic信息,在默认情况下它没有任何的topic:



bin/kafka-topics.sh --list --zookeeper localhost:2181


这里的kafka-topics.sh相当于是一个客户端,它如果想要看kafka里面的信息,就要连接到我们的集群上。


所以客户端就要先连接zookeeper才能连接到我们的集群上。


我们如何使用kafka?其实使用kafka就是往kafka中写数据和从kafka中读取数据


我们在往kafka中写数据之前,首先就要创建一个topic,就像我们在往数据库中写数据之前首先要创建一张表一样。


这个topic其实就是一个分类,以后不同类型的数据写到不同的topic


接下来我们来创建topic,其实我们在任何一台机器上创建topic都可以,因为我们在一台机器上创建,其他的机器会同步。



bin/kafka-topics.sh --create --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --replication-factor 3 --partitions 1 
--topic test


replication-factor 就是副本因子保存3份


partitions? 就是分区


生产者向topic中写入数据:


bin/kafka-console-producer.sh --broker-list marshal:9092,marshal01:9092,marshal02:9092,
marshal03:9092,marshal04:9092,marshal05:9092 --topic test


消费者进行消费:


bin/kafka-console-consumer.sh --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic test --from-beginning


这里的 --from-beginning就是从最开始读。如果不加这个,则在消费者进程启动之前的数据不会被读到。


我们上面是通过命令行的方式进行消费的,我们还可以通过写程序的方式来消费。


package com.xiaoniu.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args)throws Exception{
        //封装配置参数
        Properties props = new Properties();
        //kafka的brokers列表
        props.setProperty("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
        //key和value的序列化方式,因为需要网络传输所以需要序列化
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /**
         * 发送数据的时候是否需要应答
         * 取值范围:
         *  [all, -1, 0, 1]
         *  0:leader不做任何应答
         *  1:leader会给producer做出应答
         *  all、-1:fllower->leader -> producer
         * 默认值:
         *  1
         */
        //props.setProperty("acks", "1");

        /**
         * 自定义分区
         * 默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
         */
        //props.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        //创建一个生产者的客户端实例
        KafkaProducer kafkaProducer = new KafkaProducer(props);

        int count = 0;
        while (count < 1000) {
            int partitionNum = count % 1;

            //封装一条消息
            ProducerRecord record = new ProducerRecord("test", partitionNum,"", count + "");
            //发送一条消息
            kafkaProducer.send(record);

            count++;
            Thread.sleep(1 * 1000);
        }
        //释放
        kafkaProducer.close();
        System.out.println("send End...");

    }
}



package com.xiaoniu.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;

public class ConsumerDemo {
    public static void main(String[] args) {

        HashMap config  = new HashMap();
        config.put("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
        config.put("key.deserializer", StringDeserializer.class.getName());
        config.put("value.deserializer", StringDeserializer.class.getName());
        config.put("group.id", "g000001");

        /**
         * 从哪个位置开始获取数据
         * 取值范围:
         *  [latest, earliest, none]
         * 默认值:
         *  latest
         */
        config.put("auto.offset.reset", "earliest");
        /**
         * 是否要自动递交偏移量(offset)这条数据在某个分区所在位置的编号
         */
        config.put("enable.auto.commit", false);

        //创建一个消费者客户端实例
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(config);
        //订阅主题(告诉客户端从哪个主题获取数据)
        kafkaConsumer.subscribe(Arrays.asList("test"));

        while (true) {
            //拉去数据, 会从kafka所有分区下拉取数据
            ConsumerRecords records = kafkaConsumer.poll(2000);
            Iterator<consumerrecord> iterator = records.iterator();
            while (iterator.hasNext()) {
                ConsumerRecord record = iterator.next();
                System.out.println("record = " + record);
            }
        }

        //释放连接
        //kafkaConsumer.close();
    }
}


我们还可以查看集群的状态:


活跃的分区的职责负责读写数据,不活跃的分区负责同步数据。


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved