摘要:本文主要向大家介绍了【云计算】Elasticsearch数据全量导入HBase,scroll的正确使用姿势,HBase数据到Hive,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】Elasticsearch数据全量导入HBase,scroll的正确使用姿势,HBase数据到Hive,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
1、代码
按照惯例,先上代码
(代码还有很多可以优化的地方,待正式工作了,有了更海量的需要处理的数据,更复杂的应用场景,我再回来更新此文。)
只贴出Es用scroll方式读取数据以及批量写入HBase的核心代码,其他工具类、方法,比如es、HBase配置、client、connection获取就不贴了。
1-1、es获取数据
package ipl.restapi.service.bigdata.es;
import ipl.restapi.util.EsOpenCloseUtils;
import ipl.restapi.util.EsPropertiesUtils;
import ipl.restapi.util.HbaseApiUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
*
pakage: ipl.restapi.service.bigdata.es
* * descirption: es检索某一所以全量数据,导入HBase * * @author wanghai * @version V1.0 * @since
2018/8/15 下午9:03
*/ public class ReadFromEs { private static final Logger LOGGER = LoggerFactory.getLogger("es"); private static final int SCROLL_SIZE = 10000; private static final int HBASE_PUT_SIZE = 1000; /** * es数据导入HBase * @param tableName 数据导入到HBase的那一张表 */ public void putAllDataToHbase(String tableName) { Map hashMap = EsPropertiesUtils.getConf(); TransportClient esClient = EsOpenCloseUtils.getInstance(hashMap); // 创建查询体 Map queryParams = new HashMap<>(); queryParams.put("index", "papper_little"); queryParams.put("type", "automatic"); // index String indexName = queryParams.get("index").toString(); // type String type = queryParams.get("type").toString(); scrollDataToHbase(esClient, indexName, type, tableName); } /** * 允许我们做一个初始搜索并且持续批量从Elasticsearch里拉取另一部分数据,结果直到没有结果剩下,类似于数据库的游标, * 为了避免数据量过大,每次从上次scroll的位置继续获取数据获取的数据写入HBase。 * * @param esClient es客户端 * @param indexName 索引名 * @param typeName es type * @param tableName 数据导入到HBase的那一张表 */ public void scrollDataToHbase(Client esClient, String indexName, String typeName, String tableName) { // TODO:通信过程中getScrollId丢失怎么办?比如说kafka,就有多种机制验证处理请求者的请求数据的偏移量 int baseRowNum = 0; SearchResponse scrollResp = esClient.prepareSearch(indexName) .setTypes(typeName) .setScroll(new TimeValue(300000)) // 每次返回10000条数据(如果够) .setSize(SCROLL_SIZE).get(); // arraylist放1000个map,一个map为一条论文数据,map中存放key-value对,是存入HBase的列名和值 ArrayList> hit1000List = new ArrayList<>(1024); Connection connection = HbaseApiUtils.getConnection(); do { SearchHits searchHits = scrollResp.getHits(); long num = searchHits.getHits().length; System.out.println("数量:" + num); for (int i = 0; i < num; ) { hit1000List.add(searchHits.getAt(i).getSourceAsMap()); i++; if (i % HBASE_PUT_SIZE == 0) { baseRowNum++; try { // TODO:网络不好,1000条可以考虑异步读写 HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); } catch (IOException e) { LOGGER.error("es批量插入hbase异常-1!"); LOGGER.error(e.getMessage()); } } } System.out.println("目前完成 " + baseRowNum * HBASE_PUT_SIZE); // 处理不足1000的数据 if (!hit1000List.isEmpty()) { baseRowNum++; // 避免不足1000的数据覆盖之前的 try { HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); System.out.println("down!"); } catch (IOException e) { LOGGER.error("es批量插入hbase异常-2!"); LOGGER.error(e.getMessage()); } } scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0); } }
1-2、批量写入HBase
/**
* 批量导入数据到HBase table
*
* @param tableName 要导入数据的 表名
* @param arraylist 封装数据的arraylist
* @param baseRowNum 传入的是第几轮的数据
* @param filedFamily 列族
* @param keyPrefix rowKey前缀
* @param putSize 每一轮传入的数据量(最后一次也许除外)
* @throws IOException
*/
public static void putListByMap(Connection connection, String tableName, ArrayList> arraylist, int baseRowNum, String filedFamily, String keyPrefix, int putSize) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List puts = new ArrayList<>();
int rowNum = 0;
baseRowNum = (baseRowNum - 1) * putSize;
for (Map hitMap : arraylist) {
rowNum++;
// 传输时,数据需要序列化——Bytes.toBytes
for (Map.Entry entry : hitMap.entrySet()) {
Put put = new Put(Bytes.toBytes(keyPrefix + (rowNum + baseRowNum)));
put.addColumn(Bytes.toBytes(filedFamily), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue().toString()));
puts.add(put);
}
}
table.put(puts);
LOGGER.info("表:{}已使用putListByMap方法批量更新!!!{}", tableName, baseRowNum);
}
public static Connection getConnection() {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "your_ipaddress");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 创建一个连接到集群的connection
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(config);
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}
2、思路
2-1、es中取数据
es取数据的方式很多,但是这里是取全量数据,所以不需要检索词,定义index、type即可。 也有其他不少博客写es获取全量数据,但是大多有问题,大致可以分为两类问题:
1、数据量不够大,可以一次性读进内存;
2、代码有bug,获取下一部分数据时,有丢失或者重复,或者说简单使用setFrom,并不是真正的部分读取。
对于数据不能全部读进内存的情况,我们可以使用es中的scroll进行“下标选择“。允许我们做一个初始搜索并且持续批量从Elasticsearch里拉取另一部分数据,结果直到没有结果剩下,类似于数据库的游标,为了避免数据量过大,每次从上次scroll的位置继续获取数据。
scroll 并不适合用来做实时搜索,而更适用于后台批处理任务。
scroll获取数据大致可分为初始化和遍历两个阶段,初始化时将所有符合搜索条件的搜索结果缓存起来,可以想象成快照,然后更新scroll_id遍历,从这个快照里继续取数据。
核心是
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();while (scrollResp.getHits().getHits().length != 0);
2-2、数据封装
es获取的数据封装进Map,key-value刚好作为column-value。由于HBase稀疏列的特性,各条记录列数不同也是ok的。
Map数据封装进Arraylist,批量put进HBase。
3、mac 修改hosts文件,老是自动恢复原样
冥思不得解,发现每次都是连接了学校的VPN后发生的。于是查询了解到/private/etc/pulse-hosts.bak,/etc/hosts在pulse
启动后或者使用中被重置
解决办法:
在/private/etc/pulse-hosts.bak中填写hosts即可(类似白名单的概念吧),当然,加入白名单后,/etc/hosts中还是需要填写的
4、HBase数据到Hive
进行ing
5、TODO
1、通信过程中getScrollId丢失怎么办?比如说kafka,就有多种机制验证处理请求者的请求数据的偏移量
2、优化:异步,读es与数据存HBase线程分开,以减少处理时间。
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号