【云计算】MapJoin和ReduceJoin的区别
小标 2018-12-24 来源 : 阅读 966 评论 0

摘要:本文主要向大家介绍了【云计算】MapJoin和ReduceJoin的区别,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】MapJoin和ReduceJoin的区别,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

MapReduce Join


对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。


如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。


1 思路


1.1 reduce join


在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。


这种方法有2个问题:


1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。


2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。


实现代码如下:


主程序入口代码:


packagecom.ibeifeng.mapreduce.join;


importjava.io.IOException;


importjava.util.ArrayList;


importjava.util.Iterator;


importjava.util.List;


importjava.util.StringTokenizer;


importorg.apache.hadoop.conf.Configuration;


importorg.apache.hadoop.conf.Configured;


importorg.apache.hadoop.fs.Path;


importorg.apache.hadoop.io.IntWritable;


importorg.apache.hadoop.io.LongWritable;


importorg.apache.hadoop.io.NullWritable;


importorg.apache.hadoop.io.Text;


importorg.apache.hadoop.mapreduce.Job;


importorg.apache.hadoop.mapreduce.Mapper;


importorg.apache.hadoop.mapreduce.Reducer;


importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;


importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


importorg.apache.hadoop.mapreduce.task.reduce.Shuffle;


importorg.apache.hadoop.util.Tool;


importorg.apache.hadoop.util.ToolRunner;


publicclassMapReduceJoinextendsConfiguredimplementsTool{


//定义map处理类模板


publicstaticclassmapextendsMapper{


privateIntWritableoutputkey=newIntWritable();


privateDataJoindatajoin=newDataJoin();


protectedvoidmap(LongWritablekey,Textvalues,Contextcontext)


throwsIOException,InterruptedException{


//1.获取字符串


Stringstr=values.toString();


//2.对字符串进行分割


String[]value=str.split(",");


//3.对非法数据进行过滤


intlen=value.length;


if(len!=3&&len!=4){


return;


}


//4.取出cid


Stringcid=value[0];


//5.判断是是customer表还是order表


if(len==3){


//表示是customer表


Stringcname=value[1];


Stringcphone=value[2];


datajoin.set("Customer",cid+","+cname+","+cphone);


}


if(len==4){


//表示是order表


Stringoname=value[1];


Stringoprice=value[2];


Stringotime=value[3];


datajoin.set("Order",cid+","+oname+","+oprice+","+otime);


}


outputkey.set(Integer.valueOf(cid));


context.write(outputkey,datajoin);


}


}


//定义reduce处理类模板


publicstaticclassreduceextendsReducer{


privateTextoutputvalue=newText();


@Override


protectedvoidreduce(IntWritablekey,Iterablevalues,


Contextcontext)throwsIOException,InterruptedException{


//定义一个字符串用于保存客户信息


StringcustomerInfo=null;


//定义一个List集合用于保存订单信息


Listlist=newArrayList();


for(DataJoindatajoin:values){


if(datajoin.getTag().equals("Customer")){


System.out.println(datajoin.getData());


customerInfo=datajoin.getData();


}


if(datajoin.getTag().equals("Order")){


list.add(datajoin.getData());


}


}


//进行输出


for(Strings:list){


outputvalue.set(customerInfo+","+s);


context.write(NullWritable.get(),outputvalue);


}


}


}


//配置Driver模块


publicintrun(String[]args){


//1.获取配置配置文件对象


Configurationconfiguration=newConfiguration();


//2.创建给mapreduce处理的任务


Jobjob=null;


try{


job=Job.getInstance(configuration,this.getClass().getSimpleName());


}catch(IOExceptione){


e.printStackTrace();


}


try{


//3.创建输入路径


Pathsource_path=newPath(args[0]);


FileInputFormat.addInputPath(job,source_path);


//4.创建输出路径


Pathdes_path=newPath(args[1]);


FileOutputFormat.setOutputPath(job,des_path);


}catch(IllegalArgumentExceptione){


e.printStackTrace();


}catch(IOExceptione){


e.printStackTrace();


}


//设置让任务打包jar运行


job.setJarByClass(MapReduceJoin.class);


//5.设置map


job.setMapperClass(map.class);


job.setMapOutputKeyClass(IntWritable.class);


job.setMapOutputValueClass(DataJoin.class);


//================shuffle========================


//1.分区


//job.setPartitionerClass(MyPartitioner.class);


//2.排序


//job.setSortComparatorClass(cls);


//3.分组


//job.setGroupingComparatorClass(MyGroup.class);


//4.可选项,设置combiner,相当于map过程的reduce处理,优化选项


//job.setCombinerClass(Combiner.class);


//设置reduce个数


//job.setNumReduceTasks(2);


//================shuffle========================


//6.设置reduce


job.setReducerClass(reduce.class);


job.setOutputKeyClass(NullWritable.class);


job.setOutputValueClass(Text.class);


//7.提交job到yarn组件上


booleanisSuccess=false;


try{


isSuccess=job.waitForCompletion(true);


}catch(ClassNotFoundExceptione){


e.printStackTrace();


}catch(IOExceptione){


e.printStackTrace();


}catch(InterruptedExceptione){


e.printStackTrace();


}


returnisSuccess0:1;


}


//书写主函数


publicstaticvoidmain(String[]args){


Configurationconfiguration=newConfiguration();


//1.书写输入和输出路径


String[]args1=newString[]{


"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",


"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"


};


//2.设置系统以什么用户执行job任务


System.setProperty("HADOOP_USER_NAME","beifeng");


//3.运行job任务


intstatus=0;


try{


status=ToolRunner.run(configuration,newMapReduceJoin(),args1);


}catch(Exceptione){


e.printStackTrace();


}


//intstatus=newMyWordCountMapReduce().run(args1);


//4.退出系统


System.exit(status);


}


}


自定义包装类代码:


packagecom.ibeifeng.mapreduce.join;


importjava.io.DataInput;


importjava.io.DataOutput;


importjava.io.IOException;


importorg.apache.hadoop.io.Writable;


publicclassDataJoinimplementsWritable{


privateStringtag;


privateStringdata;


publicStringgetTag(){


returntag;


}


publicStringgetData(){


returndata;


}


publicvoidset(Stringtag,Stringdata){


this.tag=tag;


this.data=data;


}


@Override


publicStringtoString(){


returntag+","+data;


}


publicvoidwrite(DataOutputout)throwsIOException{


out.writeUTF(this.tag);


out.writeUTF(this.data);


}


publicvoidreadFields(DataInputin)throwsIOException{


this.tag=in.readUTF();


this.data=in.readUTF();


}


}


准备测试数据如下(两个csv文件):




将csv文件上传至HDFS当中,并且将代码打包成jar,然后执行以下命令:


bin/yarn jar datas/mapreduce_join.jar /user/beifeng/wordcount/input/ /user/beifeng/wordcount/output



结果如下:



Map join


MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引:


这里假设Customer为小表,Orders为大表,这也符合实际生产环境。


关于这种分布式缓存的用法,直接看下代码的演示:


主函数入口代码:


packagecom.ibeifeng.mapreduce.join;


importjava.io.BufferedReader;


importjava.io.IOException;


importjava.io.InputStreamReader;


importjava.net.URI;


importjava.util.HashMap;


importjava.util.Map;


importorg.apache.hadoop.conf.Configuration;


importorg.apache.hadoop.conf.Configured;


importorg.apache.hadoop.fs.FSDataInputStream;


importorg.apache.hadoop.fs.FileSystem;


importorg.apache.hadoop.fs.Path;


importorg.apache.hadoop.io.IntWritable;


importorg.apache.hadoop.io.LongWritable;


importorg.apache.hadoop.io.NullWritable;


importorg.apache.hadoop.io.Text;


importorg.apache.hadoop.mapreduce.Job;


importorg.apache.hadoop.mapreduce.Mapper;


importorg.apache.hadoop.mapreduce.Reducer;


importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;


importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


importorg.apache.hadoop.util.Tool;


importorg.apache.hadoop.util.ToolRunner;


importjavax.jdo.annotations.Order;


publicclassMapJoinextendsConfiguredimplementsTool{


//定义缓存文件的读取路径


privatestaticStringcacheFile="hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input1/customers.csv";


//定义map处理类模板


publicstaticclassmapextendsMapper{


privateTextoutputValue=newText();


Mapmap=null;


@Override


protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{


//读取分布式缓存文件


FileSystemfs=FileSystem.get(URI.create(cacheFile),context.getConfiguration());


FSDataInputStreamfdis=fs.open(newPath(cacheFile));


BufferedReaderbr=newBufferedReader(newInputStreamReader(fdis));


//创建一个map集合来保存读取文件的数据


map=newHashMap();


Stringline=null;


while((line=br.readLine())!=null){


String[]split=line.split(",");


Customercustomer=newCustomer(Integer.parseInt(split[0]),split[1],split[2]);


map.put(customer.getCid(),customer);


}


//关闭IO流


br.close();


}


@Override


protectedvoidmap(LongWritablekey,Textvalues,Contextcontext)


throwsIOException,InterruptedException{


//将Customer表和Orders表的数据进行组合


Stringstr=values.toString();


String[]Orders=str.split(",");


intjoinID=Integer.valueOf(Orders[0]);


Customercustomerid=map.get(joinID);


StringBuffersbf=newStringBuffer();


sbf.append(Orders[0]).append(",")


.append(customerid.getCname()).append(",")


.append(customerid.getCphone()).append(",")


.append(Orders[1]).append(",")


.append(Orders[2]).append(",")


.append(Orders[3]).append(",");


outputValue.set(sbf.toString());


context.write(NullWritable.get(),outputValue);


}


}


//无reduce程序


//配置Driver模块


publicintrun(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{


//获取配置配置文件对象


Configurationconfiguration=newConfiguration();


//创建给mapreduce处理的任务


Jobjob=Job.getInstance(configuration,this.getClass().getSimpleName());


//获取将要读取到内存的文件的路径,并加载进内存


job.addCacheFile(URI.create(cacheFile));


//创建输入路径


Pathsource_path=newPath(args[0]);


//创建输出路径


Pathdes_path=newPath(args[1]);


//创建操作hdfs的FileSystem对象


FileSystemfs=FileSystem.get(configuration);


if(fs.exists(des_path)){


fs.delete(des_path,true);


}


FileInputFormat.addInputPath(job,source_path);


FileOutputFormat.setOutputPath(job,des_path);


//设置让任务打包jar运行


job.setJarByClass(MapJoin.class);


//设置map


job.setMapperClass(map.class);


job.setMapOutputKeyClass(NullWritable.class);


job.setMapOutputValueClass(Text.class);


//设置reduceTask的任务数为0,即没有reduce阶段和shuffle阶段


job.setNumReduceTasks(0);


//提交job到yarn组件上


booleanisSuccess=job.waitForCompletion(true);


returnisSuccess0:1;


}


//书写主函数


publicstaticvoidmain(String[]args){


Configurationconfiguration=newConfiguration();


//1.书写输入和输出路径


String[]args1=newString[]{


"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",


"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"


};


//2.设置系统以什么用户执行job任务


System.setProperty("HADOOP_USER_NAME","beifeng");


//3.运行job任务


intstatus=0;


try{


status=ToolRunner.run(configuration,newMapJoin(),args1);


}catch(Exceptione){


e.printStackTrace();


}


//intstatus=newMyWordCountMapReduce().run(args1);


//4.退出系统


System.exit(status);


}


}


构造类代码:


packagecom.ibeifeng.mapreduce.join;


importjava.io.DataInput;


importjava.io.DataOutput;


importjava.io.IOException;


importorg.apache.hadoop.io.Writable;


publicclassCustomerimplementsWritable{


privateintcid;


privateStringcname;


privateStringcphone;


publicintgetCid(){


returncid;


}


publicvoidsetCid(intcid){


this.cid=cid;


}


publicStringgetCname(){


returncname;


}


publicvoidsetCname(Stringcname){


this.cname=cname;


}


publicStringgetCphone(){


returncphone;


}


publicvoidsetCphone(Stringcphone){


this.cphone=cphone;


}


publicCustomer(intcid,Stringcname,Stringcphone){


super();


this.cid=cid;


this.cname=cname;


this.cphone=cphone;


}


publicvoidwrite(DataOutputout)throwsIOException{


out.writeInt(this.cid);


out.writeUTF(this.cname);


out.writeUTF(this.cphone);


}


publicvoidreadFields(DataInputin)throwsIOException{


this.cid=in.readInt();


this.cname=in.readUTF();


this.cphone=in.readUTF();


}


@Override


publicStringtoString(){


return"Customer[cid="+cid+",cname="+cname+",cphone="+cphone+"]";


}


}


本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程