小标
2018-12-05
来源 :
阅读 1109
评论 0
摘要:本文主要向大家介绍了【云计算】大数据学习之kafka集群安装,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】大数据学习之kafka集群安装,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
解压 Kafka?安装包
修改配置文件 config/server.properties
vi server.properties
broker.id=0 //为依次增长的:0、1、2、3、4,集群中唯一id
log.dirs=/kafkaData/logs // Kafka 的消息数据存储路径zookeeper.connect=master:2181,slave1:2181,slave2:2181 //zookeeperServers 列表,各节点以逗号分开
Vi zookeeper.properties
dataDir=/root/zkdata #指向你安装的zk 的数据存储目录
# 将 Kafka server.properties zookeeper.properties 文件拷贝到其他节点机器
KAFKA_HOME/config>scp server.properties zookeeper.properties xx:$PWD
在每台节点上启动:
bin/kafka-server-start.sh???? config/server.properties? &
如果我们希望启动在后台,并且不把一堆日志展现在页面就可以这么启动:
bin/kafka-server-start.sh config/server.properties >> /var/kafka.log 2>&1 &
接下来我们查看一下有哪些topic信息,在默认情况下它没有任何的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
这里的kafka-topics.sh相当于是一个客户端,它如果想要看kafka里面的信息,就要连接到我们的集群上。
所以客户端就要先连接zookeeper才能连接到我们的集群上。
我们如何使用kafka?其实使用kafka就是往kafka中写数据和从kafka中读取数据
我们在往kafka中写数据之前,首先就要创建一个topic,就像我们在往数据库中写数据之前首先要创建一张表一样。
这个topic其实就是一个分类,以后不同类型的数据写到不同的topic
接下来我们来创建topic,其实我们在任何一台机器上创建topic都可以,因为我们在一台机器上创建,其他的机器会同步。
bin/kafka-topics.sh --create --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --replication-factor 3 --partitions 1
--topic test
replication-factor 就是副本因子保存3份
partitions? 就是分区
生产者向topic中写入数据:
bin/kafka-console-producer.sh --broker-list marshal:9092,marshal01:9092,marshal02:9092,
marshal03:9092,marshal04:9092,marshal05:9092 --topic test
消费者进行消费:
bin/kafka-console-consumer.sh --zookeeper marshal:2181,marshal01:2181,marshal02:2181,
marshal03:2181,marshal04:2181,marshal05:2181 --topic test --from-beginning
这里的 --from-beginning就是从最开始读。如果不加这个,则在消费者进程启动之前的数据不会被读到。
我们上面是通过命令行的方式进行消费的,我们还可以通过写程序的方式来消费。
package com.xiaoniu.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args)throws Exception{
//封装配置参数
Properties props = new Properties();
//kafka的brokers列表
props.setProperty("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
//key和value的序列化方式,因为需要网络传输所以需要序列化
props.setProperty("key.serializer", StringSerializer.class.getName());
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**
* 发送数据的时候是否需要应答
* 取值范围:
* [all, -1, 0, 1]
* 0:leader不做任何应答
* 1:leader会给producer做出应答
* all、-1:fllower->leader -> producer
* 默认值:
* 1
*/
//props.setProperty("acks", "1");
/**
* 自定义分区
* 默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
*/
//props.setProperty("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
//创建一个生产者的客户端实例
KafkaProducer kafkaProducer = new KafkaProducer(props);
int count = 0;
while (count < 1000) {
int partitionNum = count % 1;
//封装一条消息
ProducerRecord record = new ProducerRecord("test", partitionNum,"", count + "");
//发送一条消息
kafkaProducer.send(record);
count++;
Thread.sleep(1 * 1000);
}
//释放
kafkaProducer.close();
System.out.println("send End...");
}
}
package com.xiaoniu.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
public class ConsumerDemo {
public static void main(String[] args) {
HashMap config = new HashMap();
config.put("bootstrap.servers", "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092");
config.put("key.deserializer", StringDeserializer.class.getName());
config.put("value.deserializer", StringDeserializer.class.getName());
config.put("group.id", "g000001");
/**
* 从哪个位置开始获取数据
* 取值范围:
* [latest, earliest, none]
* 默认值:
* latest
*/
config.put("auto.offset.reset", "earliest");
/**
* 是否要自动递交偏移量(offset)这条数据在某个分区所在位置的编号
*/
config.put("enable.auto.commit", false);
//创建一个消费者客户端实例
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(config);
//订阅主题(告诉客户端从哪个主题获取数据)
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
//拉去数据, 会从kafka所有分区下拉取数据
ConsumerRecords records = kafkaConsumer.poll(2000);
Iterator<consumerrecord> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
System.out.println("record = " + record);
}
}
//释放连接
//kafkaConsumer.close();
}
}
我们还可以查看集群的状态:
活跃的分区的职责负责读写数据,不活跃的分区负责同步数据。
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号