【云计算】Hbase程序解析
小标 2019-01-23 来源 : 阅读 644 评论 0

摘要:本文主要向大家介绍了【云计算】Hbase程序解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】Hbase程序解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

插入数据


package hbase.filter;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

public class DataInit {

    public static void main(String[] args) throws Exception{
        testCreateTable();
        testPutData();
    }

    public static void testCreateTable() throws Exception{
        //指定的配置信息: ZooKeeper
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建一个HBase客户端: HBaseAdmin
        HBaseAdmin admin = new HBaseAdmin(conf);

        //创建一个表的描述符: 表名
        HTableDescriptor hd = new HTableDescriptor(TableName.valueOf("emp"));

        //创建列族描述符
        HColumnDescriptor hcd1 = new HColumnDescriptor("empinfo");

        //加入列族
        hd.addFamily(hcd1);

        //创建表
        admin.createTable(hd);

        //关闭客户端
        admin.close();
    }


    public static void testPutData() throws Exception{
        //指定的配置信息: ZooKeeper
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //客户端
        HTable table = new HTable(conf, "emp");

        //第一条数据
        Put put1 = new Put(Bytes.toBytes("7369"));
        put1.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("SMITH"));
        Put put2 = new Put(Bytes.toBytes("7369"));
        put2.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("800"));

        //第二条数据
        Put put3 = new Put(Bytes.toBytes("7499"));
        put3.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("ALLEN"));
        Put put4 = new Put(Bytes.toBytes("7499"));
        put4.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1600"));        

        //第三条数据
        Put put5 = new Put(Bytes.toBytes("7521"));
        put5.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("WARD"));
        Put put6 = new Put(Bytes.toBytes("7521"));
        put6.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1250"));        

        //第四条数据
        Put put7 = new Put(Bytes.toBytes("7566"));
        put7.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("JONES"));
        Put put8 = new Put(Bytes.toBytes("7566"));
        put8.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2975"));        

        //第五条数据
        Put put9 = new Put(Bytes.toBytes("7654"));
        put9.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("MARTIN"));
        Put put10 = new Put(Bytes.toBytes("7654"));
        put10.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1250"));

        //第六条数据
        Put put11 = new Put(Bytes.toBytes("7698"));
        put11.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("BLAKE"));
        Put put12 = new Put(Bytes.toBytes("7698"));
        put12.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2850"));

        //第七条数据
        Put put13 = new Put(Bytes.toBytes("7782"));
        put13.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("CLARK"));
        Put put14 = new Put(Bytes.toBytes("7782"));
        put14.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("2450"));

        //第八条数据
        Put put15 = new Put(Bytes.toBytes("7788"));
        put15.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("SCOTT"));
        Put put16 = new Put(Bytes.toBytes("7788"));
        put16.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("3000"));       

        //第九条数据
        Put put17 = new Put(Bytes.toBytes("7839"));
        put17.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("KING"));
        Put put18 = new Put(Bytes.toBytes("7839"));
        put18.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("5000"));   

        //第十条数据
        Put put19 = new Put(Bytes.toBytes("7844"));
        put19.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("TURNER"));
        Put put20 = new Put(Bytes.toBytes("7844"));
        put20.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1500"));   

        //第十一条数据
        Put put21 = new Put(Bytes.toBytes("7876"));
        put21.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("ADAMS"));
        Put put22 = new Put(Bytes.toBytes("7876"));
        put22.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1100"));   

        //第十二条数据
        Put put23 = new Put(Bytes.toBytes("7900"));
        put23.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("JAMES"));
        Put put24 = new Put(Bytes.toBytes("7900"));
        put24.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("950"));

        //第十三条数据
        Put put25 = new Put(Bytes.toBytes("7902"));
        put25.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("FORD"));
        Put put26 = new Put(Bytes.toBytes("7902"));
        put26.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("3000"));

        //第十四条数据
        Put put27 = new Put(Bytes.toBytes("7934"));
        put27.add(Bytes.toBytes("empinfo"), Bytes.toBytes("ename"), Bytes.toBytes("MILLER"));
        Put put28 = new Put(Bytes.toBytes("7934"));
        put28.add(Bytes.toBytes("empinfo"), Bytes.toBytes("sal"), Bytes.toBytes("1300"));

        //构造List
        List list = new ArrayList();
        list.add(put1);
        list.add(put2);
        list.add(put3);
        list.add(put4);
        list.add(put5);
        list.add(put6);
        list.add(put7);
        list.add(put8);
        list.add(put9);
        list.add(put10);
        list.add(put11);
        list.add(put12);
        list.add(put13);
        list.add(put14);
        list.add(put15);
        list.add(put16);
        list.add(put17);
        list.add(put18);
        list.add(put19);
        list.add(put20);
        list.add(put21);
        list.add(put22);
        list.add(put23);
        list.add(put24);
        list.add(put25);
        list.add(put26);
        list.add(put27);
        list.add(put28);        

        //插入数据
        table.put(list);
        table.close();      
    }
}


过滤器


package hbase.filter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class TestHBaseFilter {

    public static void main(String[] args) throws Exception {
        //testSingleColumnValueFilter();
        //testColumnPrefixFilter();
        //testMultipleColumnPrefixFilter();
        //testRowFilter();
        testMoreFilter();
    }

    private static void testMoreFilter() throws Exception {
        /*
         * 查询员工号是7839的员工姓名   select ename from emp where empno=7839;
         * 1、rowkey过滤器 :查询7839
         * 2、列名前缀过滤器:查询姓名
         */

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建表的客户端
        HTable emp = new HTable(conf,"emp");    

        //第一个过滤器
        RowFilter filter1 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("7839"));

        //第二个过滤器
        ColumnPrefixFilter filter2 = new ColumnPrefixFilter(Bytes.toBytes("ename"));

        //创建一个Filter的list
        /*
         * Operator.MUST_PASS_ALL  相当于  and
         * Operator.MUST_PASS_ONE  相当于  or
         */
        FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
        filterList.addFilter(filter1);
        filterList.addFilter(filter2);

        //创建Scanner
        Scan scanner = new Scan();
        scanner.setFilter(filterList);

        //执行查询
        ResultScanner rs = emp.getScanner(scanner);
        for(Result r:rs){
            String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            System.out.println(ename);
        }

        emp.close();        
    }

    private static void testRowFilter() throws Exception {
        /*
         * 查询行键为7839的员工薪水  select * from emp where empno=7839;
         */
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建表的客户端
        HTable emp = new HTable(conf,"emp");

        //构造一个过滤器                                                                                                                     这个可以是一个正则表达式
        RowFilter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("7839"));
        Scan scanner = new Scan();
        scanner.setFilter(filter);

        //执行查询
        ResultScanner rs = emp.getScanner(scanner);
        for(Result r:rs){
            String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
            System.out.println(ename+"\t"+sal);
        }

        emp.close();        
    }

    private static void testMultipleColumnPrefixFilter() throws Exception {
        /*
         * 查询员工的姓名和薪水:select ename,sal from emp;
         */

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建表的客户端
        HTable emp = new HTable(conf,"emp");

        //构造一个二维数据,代表每个列的名字
        byte[][] prefixes = new byte[][]{Bytes.toBytes("ename"),Bytes.toBytes("sal")};

        //定义多个列名前缀过滤器
        MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefixes);

        Scan scanner = new Scan();
        scanner.setFilter(filter);

        //执行查询
        ResultScanner rs = emp.getScanner(scanner);
        for(Result r:rs){
            String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            String sal = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("sal")));
            System.out.println(ename+"\t"+sal);
        }

        emp.close();

    }

    private static void testColumnPrefixFilter() throws Exception {
        /*
         * 查询员工的姓名:select ename from emp;
         */
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建表的客户端
        HTable emp = new HTable(conf,"emp");

        //定义一个过滤器
        ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("ename"));

        Scan scanner = new Scan();
        scanner.setFilter(filter);

        //执行查询
        ResultScanner rs = emp.getScanner(scanner);
        for(Result r:rs){
            String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            System.out.println(ename);
        }

        emp.close();
    }

    //列值过滤器: SingleColumnValueFilter
    public static void testSingleColumnValueFilter() throws Exception{
        /*
         * 查询薪水等于3000的员工
         * select * from emp where sal=3000;
         */
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建表的客户端
        HTable emp = new HTable(conf,"emp");

        //创建过滤器
        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("empinfo"),      //列族
                                                                     Bytes.toBytes("sal"),   //列名
                                                                     CompareOp.EQUAL,   //比较运算符
                                                                     Bytes.toBytes("3000"));      //值

        //创建一个Scanner
        Scan scanner  = new Scan();
        scanner.setFilter(filter);

        //执行查询
        ResultScanner rs = emp.getScanner(scanner);
        for(Result r:rs){
            String ename = Bytes.toString(r.getValue(Bytes.toBytes("empinfo"), Bytes.toBytes("ename")));
            System.out.println(ename);
        }

        emp.close();
    }
}


mapreduce程序操作hbase


package hbase.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class WordCountMain {

    public static void main(String[] args) throws Exception {
        //从HBase中读取数据
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");      

        //创建任务
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountMain.class);

        //定义一个扫描器读取要处理的数据
        Scan scan = new Scan();
        //指定扫描器扫描的数据
        scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("info"));

        //指定Map,输入是表  word
        TableMapReduceUtil.initTableMapperJob("word", scan, WordCountMapper.class, 
                                              Text.class, IntWritable.class, job);

        //指定Reduce 输出的表  result
        TableMapReduceUtil.initTableReducerJob("result", WordCountReducer.class, job);

        job.waitForCompletion(true);
    }
}


package hbase.mr;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

//处理的HBase表中的一条记录                                                                                           k2       v2
public class WordCountMapper extends TableMapper {

    @Override
    protected void map(ImmutableBytesWritable key, Result value,Context context)
            throws IOException, InterruptedException {
        /*
         * key 相当于行键
         * value 一行记录
         */
        // I love Beijing
        String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));

        String[] words = data.split(" ");
        for(String w:words){
            context.write(new Text(w), new IntWritable(1));
        }
    }
}


package hbase.mr;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//                                                  k3     v3         相当于是rowkey
public class WordCountReducer extends TableReducer {

    @Override
    protected void reduce(Text k3, Iterable v3,Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable i:v3){
            sum = sum + i.get();
        }

        //输出:表中的一条记录 Put对象
        //使用单词作为行键
        Put put = new Put(Bytes.toBytes(k3.toString()));
        put.addColumn(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

        //写入HBase
        context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put);
    }
}


Java程序操作hbase


package hbase.java;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

public class TestHBaseDemo {


    public static void main(String[] args) throws Exception{
        //createTable();
        //insertOne();
        //get();
        //scan();
        dropTable();
    }

    private static void dropTable() throws IOException {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建一个HBase的客户端
        HBaseAdmin client = new HBaseAdmin(conf);

        client.disableTable("students");
        client.deleteTable("students");

        client.close();
    }

    private static void scan()throws Exception  {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //指定表的客户端
        HTable table = new HTable(conf, "students");

        //创建一个扫描器 Scan
        Scan scanner = new Scan(); //----> 相当于: select * from students;
        //scanner.setFilter(filter)  ----> 过滤器

        //执行查询
        ResultScanner rs = table.getScanner(scanner); //返回ScannerResult ---> Oracle中的游标
        for(Result r:rs){
            String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
            String age = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
            System.out.println(name + "   "+ age);
        }
        table.close();
    }

    private static void get()throws Exception  {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");      

        //指定表的客户端
        HTable table = new HTable(conf, "students");

        //通过Get查询
        Get get = new Get(Bytes.toBytes("stu001"));
        //执行查询
        Result record = table.get(get);

        //输出
        String name = Bytes.toString(record.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
        System.out.println(name);

        table.close();
    }

    private static void createTable() throws Exception {
        //指定ZooKeeper地址,从zk中获取HMaster的地址 
        //注意:ZK返回的是HMaster的主机名, 不是IP地址 ---> 配置Windows的hosts文件
        //C:\Windows\System32\drivers\etc\hosts
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //创建一个HBase的客户端
        HBaseAdmin client = new HBaseAdmin(conf);

        //创建表: 通过表的描述符
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("students"));

        //列族的信息
        HColumnDescriptor h1 = new HColumnDescriptor("info");
        HColumnDescriptor h2 = new HColumnDescriptor("grade");

        //将列族加入表
        htd.addFamily(h1);
        htd.addFamily(h2);

        //创建表
        client.createTable(htd);
        client.close();
    }

    //插入单条数据
    private static void insertOne() throws Exception{
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "192.168.157.111");

        //指定表的客户端
        HTable table = new HTable(conf, "students");

        //构造一条数据
        Put put = new Put(Bytes.toBytes("stu001"));
        put.addColumn(Bytes.toBytes("info"),      //列族的名字
                      Bytes.toBytes("name"),   //列的名字
                      Bytes.toBytes("Tom"));       //值

        //插入
        table.put(put);

        table.close();
    }
}


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程