摘要:本文主要向大家介绍了【云计算】Hbase程序解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】Hbase程序解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
插入数据
package hbase.filter;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class DataInit {
public static void main(String[] args) throws Exception{
testCreateTable();
testPutData();
}
public static void testCreateTable() throws Exception{
//指定的配置信息: ZooKeeper
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建一个HBase客户端: HBaseAdmin
HBaseAdmin admin = new HBaseAdmin(conf);
//创建一个表的描述符: 表名
HTableDescriptor hd = new HTableDescriptor(TableName.valueOf("emp"));
//创建列族描述符
HColumnDescriptor hcd1 = new HColumnDescriptor("empinfo");
//加入列族
hd.addFamily(hcd1);
//创建表
admin.createTable(hd);
//关闭客户端
admin.close();
}
public static void testPutData() throws Exception{
//指定的配置信息: ZooKeeper
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//客户端
HTable table = new HTable(conf, "emp");
//第一条数据
Put put1 = new Put(Bytes.toBytes("7369"));
put1.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("SMITH"));
Put put2 = new Put(Bytes.toBytes("7369"));
put2.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("800"));
//第二条数据
Put put3 = new Put(Bytes.toBytes("7499"));
put3.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("ALLEN"));
Put put4 = new Put(Bytes.toBytes("7499"));
put4.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1600"));
//第三条数据
Put put5 = new Put(Bytes.toBytes("7521"));
put5.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("WARD"));
Put put6 = new Put(Bytes.toBytes("7521"));
put6.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1250"));
//第四条数据
Put put7 = new Put(Bytes.toBytes("7566"));
put7.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("JONES"));
Put put8 = new Put(Bytes.toBytes("7566"));
put8.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2975"));
//第五条数据
Put put9 = new Put(Bytes.toBytes("7654"));
put9.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("MARTIN"));
Put put10 = new Put(Bytes.toBytes("7654"));
put10.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1250"));
//第六条数据
Put put11 = new Put(Bytes.toBytes("7698"));
put11.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("BLAKE"));
Put put12 = new Put(Bytes.toBytes("7698"));
put12.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2850"));
//第七条数据
Put put13 = new Put(Bytes.toBytes("7782"));
put13.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("CLARK"));
Put put14 = new Put(Bytes.toBytes("7782"));
put14.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2450"));
//第八条数据
Put put15 = new Put(Bytes.toBytes("7788"));
put15.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("SCOTT"));
Put put16 = new Put(Bytes.toBytes("7788"));
put16.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("3000"));
//第九条数据
Put put17 = new Put(Bytes.toBytes("7839"));
put17.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("KING"));
Put put18 = new Put(Bytes.toBytes("7839"));
put18.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("5000"));
//第十条数据
Put put19 = new Put(Bytes.toBytes("7844"));
put19.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("TURNER"));
Put put20 = new Put(Bytes.toBytes("7844"));
put20.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1500"));
//第十一条数据
Put put21 = new Put(Bytes.toBytes("7876"));
put21.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("ADAMS"));
Put put22 = new Put(Bytes.toBytes("7876"));
put22.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1100"));
//第十二条数据
Put put23 = new Put(Bytes.toBytes("7900"));
put23.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("JAMES"));
Put put24 = new Put(Bytes.toBytes("7900"));
put24.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("950"));
//第十三条数据
Put put25 = new Put(Bytes.toBytes("7902"));
put25.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("FORD"));
Put put26 = new Put(Bytes.toBytes("7902"));
put26.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("3000"));
//第十四条数据
Put put27 = new Put(Bytes.toBytes("7934"));
put27.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("MILLER"));
Put put28 = new Put(Bytes.toBytes("7934"));
put28.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1300"));
//构造List
List list = new ArrayList();
list.add(put1);
list.add(put2);
list.add(put3);
list.add(put4);
list.add(put5);
list.add(put6);
list.add(put7);
list.add(put8);
list.add(put9);
list.add(put10);
list.add(put11);
list.add(put12);
list.add(put13);
list.add(put14);
list.add(put15);
list.add(put16);
list.add(put17);
list.add(put18);
list.add(put19);
list.add(put20);
list.add(put21);
list.add(put22);
list.add(put23);
list.add(put24);
list.add(put25);
list.add(put26);
list.add(put27);
list.add(put28);
//插入数据
table.put(list);
table.close();
}
}
过滤器
package hbase.filter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class TestHBaseFilter {
public static void main(String[] args) throws Exception {
//testSingleColumnValueFilter();
//testColumnPrefixFilter();
//testMultipleColumnPrefixFilter();
//testRowFilter();
testMoreFilter();
}
private static void testMoreFilter() throws Exception {
/*
* 查询员工号是7839的员工姓名 select ename from emp where empno=7839;
* 1、rowkey过滤器 :查询7839
* 2、列名前缀过滤器:查询姓名
*/
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建表的客户端
HTable emp = new HTable(conf,"emp");
//第一个过滤器
RowFilter filter1 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("7839"));
//第二个过滤器
ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));
//创建一个Filter的list
/*
* Operator.MUST_PASS_ALL 相当于 and
* Operator.MUST_PASS_ONE 相当于 or
*/
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
//创建Scanner
Scan scanner = new Scan();
scanner.setFilter(filterList);
//执行查询
ResultScanner rs = emp.getScanner(scanner);
for(Result r:rs){
String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
System.out.println(ename);
}
emp.close();
}
private static void testRowFilter() throws Exception {
/*
* 查询行键为7839的员工薪水 select * from emp where empno=7839;
*/
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建表的客户端
HTable emp = new HTable(conf,"emp");
//构造一个过滤器 这个可以是一个正则表达式
RowFilter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("7839"));
Scan scanner = new Scan();
scanner.setFilter(filter);
//执行查询
ResultScanner rs = emp.getScanner(scanner);
for(Result r:rs){
String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
System.out.println(ename+"\t"+sal);
}
emp.close();
}
private static void testMultipleColumnPrefixFilter() throws Exception {
/*
* 查询员工的姓名和薪水:select ename,sal from emp;
*/
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建表的客户端
HTable emp = new HTable(conf,"emp");
//构造一个二维数据,代表每个列的名字
byte[][] prefixes = new byte[][]{Bytes.toBytes("ename"),Bytes.toBytes("sal")};
//定义多个列名前缀过滤器
MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefixes);
Scan scanner = new Scan();
scanner.setFilter(filter);
//执行查询
ResultScanner rs = emp.getScanner(scanner);
for(Result r:rs){
String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
System.out.println(ename+"\t"+sal);
}
emp.close();
}
private static void testColumnPrefixFilter() throws Exception {
/*
* 查询员工的姓名:select ename from emp;
*/
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建表的客户端
HTable emp = new HTable(conf,"emp");
//定义一个过滤器
ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));
Scan scanner = new Scan();
scanner.setFilter(filter);
//执行查询
ResultScanner rs = emp.getScanner(scanner);
for(Result r:rs){
String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
System.out.println(ename);
}
emp.close();
}
//列值过滤器: SingleColumnValueFilter
public static void testSingleColumnValueFilter() throws Exception{
/*
* 查询薪水等于3000的员工
* select * from emp where sal=3000;
*/
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建表的客户端
HTable emp = new HTable(conf,"emp");
//创建过滤器
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"), //列族
Bytes.toBytes("sal"), //列名
CompareOp.EQUAL, //比较运算符
Bytes.toBytes("3000")); //值
//创建一个Scanner
Scan scanner = new Scan();
scanner.setFilter(filter);
//执行查询
ResultScanner rs = emp.getScanner(scanner);
for(Result r:rs){
String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
System.out.println(ename);
}
emp.close();
}
}
mapreduce程序操作hbase
package hbase.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class WordCountMain {
public static void main(String[] args) throws Exception {
//从HBase中读取数据
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建任务
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountMain.class);
//定义一个扫描器读取要处理的数据
Scan scan = new Scan();
//指定扫描器扫描的数据
scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));
//指定Map,输入是表 word
TableMapReduceUtil.initTableMapperJob("word", scan, WordCountMapper.class,
Text.class, IntWritable.class, job);
//指定Reduce 输出的表 result
TableMapReduceUtil.initTableReducerJob("result", WordCountReducer.class, job);
job.waitForCompletion(true);
}
}
package hbase.mr;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
//处理的HBase表中的一条记录 k2 v2
public class WordCountMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value,Context context)
throws IOException, InterruptedException {
/*
* key 相当于行键
* value 一行记录
*/
// I love Beijing
String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
String[] words = data.split(" ");
for(String w:words){
context.write(new Text(w), new IntWritable(1));
}
}
}
package hbase.mr;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 相当于是rowkey
public class WordCountReducer extends TableReducer {
@Override
protected void reduce(Text k3, Iterable v3,Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i:v3){
sum = sum + i.get();
}
//输出:表中的一条记录 Put对象
//使用单词作为行键
Put put = new Put(Bytes.toBytes(k3.toString()));
put.addColumn(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
//写入HBase
context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put);
}
}
Java程序操作hbase
package hbase.java;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
public class TestHBaseDemo {
public static void main(String[] args) throws Exception{
//createTable();
//insertOne();
//get();
//scan();
dropTable();
}
private static void dropTable() throws IOException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建一个HBase的客户端
HBaseAdmin client = new HBaseAdmin(conf);
client.disableTable("students");
client.deleteTable("students");
client.close();
}
private static void scan()throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//指定表的客户端
HTable table = new HTable(conf, "students");
//创建一个扫描器 Scan
Scan scanner = new Scan(); //----> 相当于: select * from students;
//scanner.setFilter(filter) ----> 过滤器
//执行查询
ResultScanner rs = table.getScanner(scanner); //返回ScannerResult ---> Oracle中的游标
for(Result r:rs){
String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
String age = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
System.out.println(name + " "+ age);
}
table.close();
}
private static void get()throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//指定表的客户端
HTable table = new HTable(conf, "students");
//通过Get查询
Get get = new Get(Bytes.toBytes("stu001"));
//执行查询
Result record = table.get(get);
//输出
String name = Bytes.toString(record.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
System.out.println(name);
table.close();
}
private static void createTable() throws Exception {
//指定ZooKeeper地址,从zk中获取HMaster的地址
//注意:ZK返回的是HMaster的主机名, 不是IP地址 ---> 配置Windows的hosts文件
//C:\Windows\System32\drivers\etc\hosts
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//创建一个HBase的客户端
HBaseAdmin client = new HBaseAdmin(conf);
//创建表: 通过表的描述符
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("students"));
//列族的信息
HColumnDescriptor h1 = new HColumnDescriptor("info");
HColumnDescriptor h2 = new HColumnDescriptor("grade");
//将列族加入表
htd.addFamily(h1);
htd.addFamily(h2);
//创建表
client.createTable(htd);
client.close();
}
//插入单条数据
private static void insertOne() throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "192.168.157.111");
//指定表的客户端
HTable table = new HTable(conf, "students");
//构造一条数据
Put put = new Put(Bytes.toBytes("stu001"));
put.addColumn(Bytes.toBytes("info"), //列族的名字
Bytes.toBytes("name"), //列的名字
Bytes.toBytes("Tom")); //值
//插入
table.put(put);
table.close();
}
}
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号