摘要:本文主要向大家介绍了【云计算】MapJoin和ReduceJoin的区别,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】MapJoin和ReduceJoin的区别,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
MapReduce Join
对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。
如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。
1 思路
1.1 reduce join
在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。
这种方法有2个问题:
1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
实现代码如下:
主程序入口代码:
packagecom.ibeifeng.mapreduce.join;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Iterator;
importjava.util.List;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.mapreduce.task.reduce.Shuffle;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;
publicclassMapReduceJoinextendsConfiguredimplementsTool{
//定义map处理类模板
publicstaticclassmapextendsMapper{
privateIntWritableoutputkey=newIntWritable();
privateDataJoindatajoin=newDataJoin();
protectedvoidmap(LongWritablekey,Textvalues,Contextcontext)
throwsIOException,InterruptedException{
//1.获取字符串
Stringstr=values.toString();
//2.对字符串进行分割
String[]value=str.split(",");
//3.对非法数据进行过滤
intlen=value.length;
if(len!=3&&len!=4){
return;
}
//4.取出cid
Stringcid=value[0];
//5.判断是是customer表还是order表
if(len==3){
//表示是customer表
Stringcname=value[1];
Stringcphone=value[2];
datajoin.set("Customer",cid+","+cname+","+cphone);
}
if(len==4){
//表示是order表
Stringoname=value[1];
Stringoprice=value[2];
Stringotime=value[3];
datajoin.set("Order",cid+","+oname+","+oprice+","+otime);
}
outputkey.set(Integer.valueOf(cid));
context.write(outputkey,datajoin);
}
}
//定义reduce处理类模板
publicstaticclassreduceextendsReducer{
privateTextoutputvalue=newText();
@Override
protectedvoidreduce(IntWritablekey,Iterablevalues,
Contextcontext)throwsIOException,InterruptedException{
//定义一个字符串用于保存客户信息
StringcustomerInfo=null;
//定义一个List集合用于保存订单信息
Listlist=newArrayList();
for(DataJoindatajoin:values){
if(datajoin.getTag().equals("Customer")){
System.out.println(datajoin.getData());
customerInfo=datajoin.getData();
}
if(datajoin.getTag().equals("Order")){
list.add(datajoin.getData());
}
}
//进行输出
for(Strings:list){
outputvalue.set(customerInfo+","+s);
context.write(NullWritable.get(),outputvalue);
}
}
}
//配置Driver模块
publicintrun(String[]args){
//1.获取配置配置文件对象
Configurationconfiguration=newConfiguration();
//2.创建给mapreduce处理的任务
Jobjob=null;
try{
job=Job.getInstance(configuration,this.getClass().getSimpleName());
}catch(IOExceptione){
e.printStackTrace();
}
try{
//3.创建输入路径
Pathsource_path=newPath(args[0]);
FileInputFormat.addInputPath(job,source_path);
//4.创建输出路径
Pathdes_path=newPath(args[1]);
FileOutputFormat.setOutputPath(job,des_path);
}catch(IllegalArgumentExceptione){
e.printStackTrace();
}catch(IOExceptione){
e.printStackTrace();
}
//设置让任务打包jar运行
job.setJarByClass(MapReduceJoin.class);
//5.设置map
job.setMapperClass(map.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DataJoin.class);
//================shuffle========================
//1.分区
//job.setPartitionerClass(MyPartitioner.class);
//2.排序
//job.setSortComparatorClass(cls);
//3.分组
//job.setGroupingComparatorClass(MyGroup.class);
//4.可选项,设置combiner,相当于map过程的reduce处理,优化选项
//job.setCombinerClass(Combiner.class);
//设置reduce个数
//job.setNumReduceTasks(2);
//================shuffle========================
//6.设置reduce
job.setReducerClass(reduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//7.提交job到yarn组件上
booleanisSuccess=false;
try{
isSuccess=job.waitForCompletion(true);
}catch(ClassNotFoundExceptione){
e.printStackTrace();
}catch(IOExceptione){
e.printStackTrace();
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnisSuccess0:1;
}
//书写主函数
publicstaticvoidmain(String[]args){
Configurationconfiguration=newConfiguration();
//1.书写输入和输出路径
String[]args1=newString[]{
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"
};
//2.设置系统以什么用户执行job任务
System.setProperty("HADOOP_USER_NAME","beifeng");
//3.运行job任务
intstatus=0;
try{
status=ToolRunner.run(configuration,newMapReduceJoin(),args1);
}catch(Exceptione){
e.printStackTrace();
}
//intstatus=newMyWordCountMapReduce().run(args1);
//4.退出系统
System.exit(status);
}
}
自定义包装类代码:
packagecom.ibeifeng.mapreduce.join;
importjava.io.DataInput;
importjava.io.DataOutput;
importjava.io.IOException;
importorg.apache.hadoop.io.Writable;
publicclassDataJoinimplementsWritable{
privateStringtag;
privateStringdata;
publicStringgetTag(){
returntag;
}
publicStringgetData(){
returndata;
}
publicvoidset(Stringtag,Stringdata){
this.tag=tag;
this.data=data;
}
@Override
publicStringtoString(){
returntag+","+data;
}
publicvoidwrite(DataOutputout)throwsIOException{
out.writeUTF(this.tag);
out.writeUTF(this.data);
}
publicvoidreadFields(DataInputin)throwsIOException{
this.tag=in.readUTF();
this.data=in.readUTF();
}
}
准备测试数据如下(两个csv文件):
将csv文件上传至HDFS当中,并且将代码打包成jar,然后执行以下命令:
bin/yarn jar datas/mapreduce_join.jar /user/beifeng/wordcount/input/ /user/beifeng/wordcount/output
结果如下:
Map join
MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引:
这里假设Customer为小表,Orders为大表,这也符合实际生产环境。
关于这种分布式缓存的用法,直接看下代码的演示:
主函数入口代码:
packagecom.ibeifeng.mapreduce.join;
importjava.io.BufferedReader;
importjava.io.IOException;
importjava.io.InputStreamReader;
importjava.net.URI;
importjava.util.HashMap;
importjava.util.Map;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;
importjavax.jdo.annotations.Order;
publicclassMapJoinextendsConfiguredimplementsTool{
//定义缓存文件的读取路径
privatestaticStringcacheFile="hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input1/customers.csv";
//定义map处理类模板
publicstaticclassmapextendsMapper{
privateTextoutputValue=newText();
Mapmap=null;
@Override
protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{
//读取分布式缓存文件
FileSystemfs=FileSystem.get(URI.create(cacheFile),context.getConfiguration());
FSDataInputStreamfdis=fs.open(newPath(cacheFile));
BufferedReaderbr=newBufferedReader(newInputStreamReader(fdis));
//创建一个map集合来保存读取文件的数据
map=newHashMap();
Stringline=null;
while((line=br.readLine())!=null){
String[]split=line.split(",");
Customercustomer=newCustomer(Integer.parseInt(split[0]),split[1],split[2]);
map.put(customer.getCid(),customer);
}
//关闭IO流
br.close();
}
@Override
protectedvoidmap(LongWritablekey,Textvalues,Contextcontext)
throwsIOException,InterruptedException{
//将Customer表和Orders表的数据进行组合
Stringstr=values.toString();
String[]Orders=str.split(",");
intjoinID=Integer.valueOf(Orders[0]);
Customercustomerid=map.get(joinID);
StringBuffersbf=newStringBuffer();
sbf.append(Orders[0]).append(",")
.append(customerid.getCname()).append(",")
.append(customerid.getCphone()).append(",")
.append(Orders[1]).append(",")
.append(Orders[2]).append(",")
.append(Orders[3]).append(",");
outputValue.set(sbf.toString());
context.write(NullWritable.get(),outputValue);
}
}
//无reduce程序
//配置Driver模块
publicintrun(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
//获取配置配置文件对象
Configurationconfiguration=newConfiguration();
//创建给mapreduce处理的任务
Jobjob=Job.getInstance(configuration,this.getClass().getSimpleName());
//获取将要读取到内存的文件的路径,并加载进内存
job.addCacheFile(URI.create(cacheFile));
//创建输入路径
Pathsource_path=newPath(args[0]);
//创建输出路径
Pathdes_path=newPath(args[1]);
//创建操作hdfs的FileSystem对象
FileSystemfs=FileSystem.get(configuration);
if(fs.exists(des_path)){
fs.delete(des_path,true);
}
FileInputFormat.addInputPath(job,source_path);
FileOutputFormat.setOutputPath(job,des_path);
//设置让任务打包jar运行
job.setJarByClass(MapJoin.class);
//设置map
job.setMapperClass(map.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
//设置reduceTask的任务数为0,即没有reduce阶段和shuffle阶段
job.setNumReduceTasks(0);
//提交job到yarn组件上
booleanisSuccess=job.waitForCompletion(true);
returnisSuccess0:1;
}
//书写主函数
publicstaticvoidmain(String[]args){
Configurationconfiguration=newConfiguration();
//1.书写输入和输出路径
String[]args1=newString[]{
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"
};
//2.设置系统以什么用户执行job任务
System.setProperty("HADOOP_USER_NAME","beifeng");
//3.运行job任务
intstatus=0;
try{
status=ToolRunner.run(configuration,newMapJoin(),args1);
}catch(Exceptione){
e.printStackTrace();
}
//intstatus=newMyWordCountMapReduce().run(args1);
//4.退出系统
System.exit(status);
}
}
构造类代码:
packagecom.ibeifeng.mapreduce.join;
importjava.io.DataInput;
importjava.io.DataOutput;
importjava.io.IOException;
importorg.apache.hadoop.io.Writable;
publicclassCustomerimplementsWritable{
privateintcid;
privateStringcname;
privateStringcphone;
publicintgetCid(){
returncid;
}
publicvoidsetCid(intcid){
this.cid=cid;
}
publicStringgetCname(){
returncname;
}
publicvoidsetCname(Stringcname){
this.cname=cname;
}
publicStringgetCphone(){
returncphone;
}
publicvoidsetCphone(Stringcphone){
this.cphone=cphone;
}
publicCustomer(intcid,Stringcname,Stringcphone){
super();
this.cid=cid;
this.cname=cname;
this.cphone=cphone;
}
publicvoidwrite(DataOutputout)throwsIOException{
out.writeInt(this.cid);
out.writeUTF(this.cname);
out.writeUTF(this.cphone);
}
publicvoidreadFields(DataInputin)throwsIOException{
this.cid=in.readInt();
this.cname=in.readUTF();
this.cphone=in.readUTF();
}
@Override
publicStringtoString(){
return"Customer[cid="+cid+",cname="+cname+",cphone="+cphone+"]";
}
}
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号