【云计算】Elasticsearch数据全量导入HBase,scroll的正确使用姿势,HBase数据到Hive
小标 2019-01-23 来源 : 阅读 910 评论 0

摘要:本文主要向大家介绍了【云计算】Elasticsearch数据全量导入HBase,scroll的正确使用姿势,HBase数据到Hive,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】Elasticsearch数据全量导入HBase,scroll的正确使用姿势,HBase数据到Hive,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

1、代码


按照惯例,先上代码


(代码还有很多可以优化的地方,待正式工作了,有了更海量的需要处理的数据,更复杂的应用场景,我再回来更新此文。)


只贴出Es用scroll方式读取数据以及批量写入HBase的核心代码,其他工具类、方法,比如es、HBase配置、client、connection获取就不贴了。


1-1、es获取数据


package ipl.restapi.service.bigdata.es;


import ipl.restapi.util.EsOpenCloseUtils;


import ipl.restapi.util.EsPropertiesUtils;


import ipl.restapi.util.HbaseApiUtils;


import org.apache.hadoop.hbase.client.Connection;


import org.elasticsearch.action.search.SearchResponse;


import org.elasticsearch.client.Client;


import org.elasticsearch.client.transport.TransportClient;


import org.elasticsearch.common.unit.TimeValue;


import org.elasticsearch.search.SearchHits;


import org.slf4j.Logger;


import org.slf4j.LoggerFactory;


import java.io.IOException;


import java.util.ArrayList;


import java.util.HashMap;


import java.util.Map;


/**


*


pakage: ipl.restapi.service.bigdata.es


* * descirption: es检索某一所以全量数据,导入HBase * * @author wanghai * @version V1.0 * @since


2018/8/15 下午9:03


*/ public class ReadFromEs { private static final Logger LOGGER = LoggerFactory.getLogger("es"); private static final int SCROLL_SIZE = 10000; private static final int HBASE_PUT_SIZE = 1000; /** * es数据导入HBase * @param tableName 数据导入到HBase的那一张表 */ public void putAllDataToHbase(String tableName) { Map hashMap = EsPropertiesUtils.getConf(); TransportClient esClient = EsOpenCloseUtils.getInstance(hashMap); // 创建查询体 Map queryParams = new HashMap<>(); queryParams.put("index", "papper_little"); queryParams.put("type", "automatic"); // index String indexName = queryParams.get("index").toString(); // type String type = queryParams.get("type").toString(); scrollDataToHbase(esClient, indexName, type, tableName); } /** * 允许我们做一个初始搜索并且持续批量从Elasticsearch里拉取另一部分数据,结果直到没有结果剩下,类似于数据库的游标, * 为了避免数据量过大,每次从上次scroll的位置继续获取数据获取的数据写入HBase。 * * @param esClient es客户端 * @param indexName 索引名 * @param typeName es type * @param tableName 数据导入到HBase的那一张表 */ public void scrollDataToHbase(Client esClient, String indexName, String typeName, String tableName) { // TODO:通信过程中getScrollId丢失怎么办?比如说kafka,就有多种机制验证处理请求者的请求数据的偏移量 int baseRowNum = 0; SearchResponse scrollResp = esClient.prepareSearch(indexName) .setTypes(typeName) .setScroll(new TimeValue(300000)) // 每次返回10000条数据(如果够) .setSize(SCROLL_SIZE).get(); // arraylist放1000个map,一个map为一条论文数据,map中存放key-value对,是存入HBase的列名和值 ArrayList> hit1000List = new ArrayList<>(1024); Connection connection = HbaseApiUtils.getConnection(); do { SearchHits searchHits = scrollResp.getHits(); long num = searchHits.getHits().length; System.out.println("数量:" + num); for (int i = 0; i < num; ) { hit1000List.add(searchHits.getAt(i).getSourceAsMap()); i++; if (i % HBASE_PUT_SIZE == 0) { baseRowNum++; try { // TODO:网络不好,1000条可以考虑异步读写 HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); } catch (IOException e) { LOGGER.error("es批量插入hbase异常-1!"); LOGGER.error(e.getMessage()); } } } System.out.println("目前完成 " + baseRowNum * HBASE_PUT_SIZE); // 处理不足1000的数据 if (!hit1000List.isEmpty()) { baseRowNum++; // 避免不足1000的数据覆盖之前的 try { HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); System.out.println("down!"); } catch (IOException e) { LOGGER.error("es批量插入hbase异常-2!"); LOGGER.error(e.getMessage()); } } scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0); } }


1-2、批量写入HBase


/**


* 批量导入数据到HBase table


*


* @param tableName 要导入数据的 表名


* @param arraylist 封装数据的arraylist


* @param baseRowNum 传入的是第几轮的数据


* @param filedFamily 列族


* @param keyPrefix rowKey前缀


* @param putSize 每一轮传入的数据量(最后一次也许除外)


* @throws IOException


*/


public static void putListByMap(Connection connection, String tableName, ArrayList> arraylist, int baseRowNum, String filedFamily, String keyPrefix, int putSize) throws IOException {


Table table = connection.getTable(TableName.valueOf(tableName));


List puts = new ArrayList<>();


int rowNum = 0;


baseRowNum = (baseRowNum - 1) * putSize;


for (Map hitMap : arraylist) {


rowNum++;


// 传输时,数据需要序列化——Bytes.toBytes


for (Map.Entry entry : hitMap.entrySet()) {


Put put = new Put(Bytes.toBytes(keyPrefix + (rowNum + baseRowNum)));


put.addColumn(Bytes.toBytes(filedFamily), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue().toString()));


puts.add(put);


}


}


table.put(puts);


LOGGER.info("表:{}已使用putListByMap方法批量更新!!!{}", tableName, baseRowNum);


}


public static Connection getConnection() {


Configuration config = HBaseConfiguration.create();


config.set("hbase.zookeeper.quorum", "your_ipaddress");


config.set("hbase.zookeeper.property.clientPort", "2181");


// 创建一个连接到集群的connection


Connection connection = null;


try {


connection = ConnectionFactory.createConnection(config);


} catch (IOException e) {


e.printStackTrace();


}


return connection;


}


2、思路


2-1、es中取数据


es取数据的方式很多,但是这里是取全量数据,所以不需要检索词,定义index、type即可。 也有其他不少博客写es获取全量数据,但是大多有问题,大致可以分为两类问题:


1、数据量不够大,可以一次性读进内存;


2、代码有bug,获取下一部分数据时,有丢失或者重复,或者说简单使用setFrom,并不是真正的部分读取。


对于数据不能全部读进内存的情况,我们可以使用es中的scroll进行“下标选择“。允许我们做一个初始搜索并且持续批量从Elasticsearch里拉取另一部分数据,结果直到没有结果剩下,类似于数据库的游标,为了避免数据量过大,每次从上次scroll的位置继续获取数据。


scroll 并不适合用来做实时搜索,而更适用于后台批处理任务。


scroll获取数据大致可分为初始化和遍历两个阶段,初始化时将所有符合搜索条件的搜索结果缓存起来,可以想象成快照,然后更新scroll_id遍历,从这个快照里继续取数据。


核心是


scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();while (scrollResp.getHits().getHits().length != 0);


2-2、数据封装


es获取的数据封装进Map,key-value刚好作为column-value。由于HBase稀疏列的特性,各条记录列数不同也是ok的。


Map数据封装进Arraylist,批量put进HBase。


3、mac 修改hosts文件,老是自动恢复原样


冥思不得解,发现每次都是连接了学校的VPN后发生的。于是查询了解到/private/etc/pulse-hosts.bak,/etc/hosts在pulse


启动后或者使用中被重置


解决办法:


在/private/etc/pulse-hosts.bak中填写hosts即可(类似白名单的概念吧),当然,加入白名单后,/etc/hosts中还是需要填写的


4、HBase数据到Hive


进行ing


5、TODO


1、通信过程中getScrollId丢失怎么办?比如说kafka,就有多种机制验证处理请求者的请求数据的偏移量


2、优化:异步,读es与数据存HBase线程分开,以减少处理时间。


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程