【云计算】MapReduce编程解析
小标 2018-12-13 来源 : 阅读 1532 评论 0

摘要:本文主要向大家介绍了【云计算】MapReduce编程解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】MapReduce编程解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


MapReduce


1、MapReduce原理


(先分析,在编程)

1、WordCount

2、Yarn平台调度

3、分许WordCount单词计数的数据流程

4、开发自己的WordCount程序



2、MapReduce编程


1、三种方式运行mr


windows本地运行 打包成jar,上传linux 命令hadoop jar xxx windows打成jar,然后本地运行,通知linux的yarn

1.环境准备


环境:Java Eclipse
需要的jar包:
/root/training/hadoop-2.7.3/share/hadoop/common
/root/training/hadoop-2.7.3/share/hadoop/common/lib

/root/training/hadoop-2.7.3/share/hadoop/mapreduce
/root/training/hadoop-2.7.3/share/hadoop/mapreduce/lib


2、创建类


创建Mapper类WordCountMapper

创建Reducer类WordCountReducer

创建Main类WordCountMain


3、编写Mapper类WordCountMapper

1).继承Mapper类



public void class WordCountMapper extends Mapper{
    
}


2).指定输入输出类型(k1,v1)(k2,v2)



 

k1:LongWritable

 v1:Text

 k2:Text

 v2:LongWritable




public class WordCountMapper extends Mapper{
    
}


3).重写map方法



在函数中 右键-->Source--> Override/Implements Methods-->选择map


4).整理格式,命名参数



protected void map(LongWritable k1, Text v1, Context context){
    
}


5).通过分词 写入上下文context



String data = v1.toString();
//通过空格分割
String[] words = data.split(" ");
for(String w:words){
    context.write(new Text(w),new LongWritable(1));   
}


4、编写Reducer类WordCountReducer

1).继承Reducer类



public void class WordCountReducer extends Reducer{
    
}


2).指定输入输出类型(k3,v3)(k4,v4)



 

k3:Text

 v3:LongWritable

 k4:Text

 v2:LongWritable




public class WordCountReducer extends Reducer{
    
}


3).重写reduce方法



在函数中 右键-->Source--> Override/Implements Methods-->选择reduce


4).整理格式,命名参数



protected void reduce(Text k3, LongWritable v3, Context context){
    
}


5). 写上下文context



long total = 0;
for(LongWritable v:v3){
    total = total+v.get();
}

context.write(k3,new LongWritable(total));   


5、编写主程序WordCountMain

1).创建一个job



//job = Mapper+Reducer
Job job = Job.getInstance(new Configuration());
}


2.指定job的入口



job.setJarByClass(WordCountMain.class);


3.指定mapper和输出(k2,v2)的数据类型



job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);


4.指定reducer



job.setReducerClass(WordCountReducer.class);
job.setReduceOutputKeyClass(Text.class);
job.setReduceOutputValueClass(LongWritable.class);


5.指定输入输出路径



FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutPaths(job,new Path(args[1]));


6.执行任务



job.waitForCompletion(true);


6.生成jar包

右击package-->Emport-->JAR file-->保存jar路径(此处命名s1) -->选择Main class-->Finish

7.运行jar包



hadoop jar s1.jar /input/data.txt /output/w0919


MapReduce高级功能


1、序列化

2、排序

3、分区

4、合并


1. 序列化


Java序列化:



 

核心:实现Serializable接口



如果一个类实现了Java序列化接口(Serializable),这个类对象可以作为InputStream和OutputStream对象(没有方法的接口--标准接口)



 

IO:序列化:内存到磁盘       反序列化:磁盘到内存



java demo:对java进行序列化


创建一个学生类Student.java


private int StuID;
private String Stuname;


public int getStuID() {
 return stuID;
}

public void setStuID(int stuID) {
 this.stuID = stuID;
}

public String getStuName() {
 return stuName;
}

public void setStuName(String stuName) {
 this.stuName = stuName;
}

public Student(){
 
}


编写一个测试程序StudentMain.java


//创建一个学生 s
Student s = new Student();
s.setStuID(1);
s.setStuName("Tom");
//将对象保存至文件
OutputStream out = new FileOutputStream("e:\\Test\\Test.ooo");
ObjOutputStream objOut = new ObjectOutputStream(out);
//输出对象
objOut.writObject(s);
//关闭资源
objOut.close();
out.close();


3.发现错误



Exception in thread "main" java.io.NotSerializableException: demo.serializable.java.Student
 at java.io.ObjectOutputStream.writeObject0(Unknown Source)
 at java.io.ObjectOutputStream.writeObject(Unknown Source)
 at demo.serializable.java.StudentMain.main(StudentMain.java:20)


4.原因:没有实现Serializable

解决:将Student继承Serializable接口



public class Student implements Serializable


MR的序列化:



 

核心:实现Writable接口



序列化原理图1.创建Employee类,定义变量 和get和set方法



public class Employee{
 private int empno;//员工号
 private String ename;//员工姓名
 private String job;//岗位
 private int mgr;//领导
 private String hiredata;//入职日期
 private int sal;//月薪
 private int comm;//奖金
 private int deptno;//部门号
 
 public int getEmpno() {
  return empno;
 }
 public void setEmpno(int empno) {
  this.empno = empno;
 }
 public String getEname() {
  return ename;
 }
 public void setEname(String ename) {
  this.ename = ename;
 }
 public String getJob() {
  return job;
 }
 public void setJob(String job) {
  this.job = job;
 }
 public int getMgr() {
  return mgr;
 }
 public void setMgr(int mgr) {
  this.mgr = mgr;
 }
 public String getHiredata() {
  return hiredata;
 }
 public void setHiredata(String hiredata) {
  this.hiredata = hiredata;
 }
 public int getSal() {
  return sal;
 }
 public void setSal(int sal) {
  this.sal = sal;
 }
 public int getComm() {
  return comm;
 }
 public void setComm(int comm) {
  this.comm = comm;
 }
 public int getDeptno() {
  return deptno;
 }
 public void setDeptno(int deptno) {
  this.deptno = deptno;
 }


2.继承Writable接口



public class Employee implements Writable


3.实现Writable的方法



@Override
public void write(DataOutput output) throws IOException {
 // 序列化
 output.writeInt(this.empno);
 output.writeUTF(this.ename);
 output.writeUTF(this.job);
 output.writeInt(this.mgr);
 output.writeUTF(this.hiredata);
 output.writeInt(this.sal);
 output.writeInt(this.comm);
 output.writeInt(this.deptno);
}

@Override
public void readFields(DataInput input) throws IOException {
 // 反序列化
 this.empno = input.readInt();
 this.ename=input.readUTF();
 this.job = input.readUTF();
 this.mgr = input.readInt();
 this.sal = input.readInt();
 this.comm = input.readInt();
 this.deptno = input.readInt();
}


4.重写toString(非必要),格式化显示对象



@Override
public String toString() {
 return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}


5.创建EmployeeMapper类,继承Mapper类



package demo.serializable.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeMapper extends Mapper {
 //7839,KING,PRESIDENT,,1981/11/17,5000,,10
 @Override
 protected void map(LongWritable k1, Text v1, Context context)
   throws IOException, InterruptedException {
  // 分词
  String data=v1.toString();
  String[] words = data.split(",");
  Employee e = new Employee();
  e.setEmpno(Integer.parseInt(words[0]));
  e.setEname(words[1]);
  e.setJob(words[2]);
  try{
  e.setMgr(Integer.parseInt(words[3]));
  }catch(Exception ex){
   e.setMgr(0);
  }
  e.setHiredata(words[4]);
  e.setSal(Integer.parseInt(words[5]));
  try{
   e.setComm(Integer.parseInt(words[6]));
  }catch(Exception ex){
   e.setComm(0);
  }
  e.setDeptno(Integer.parseInt(words[7]));

  context.write(new LongWritable(e.getDeptno()), e);
 }

}


6.创建EmployeeReducer继承Reducer类



package demo.serializable.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer extends Reducer {

 @Override
 protected void reduce(LongWritable k3, Iterable v3,Context context)
     throws IOException, InterruptedException {
   for(Employee e:v3){
    context.write(k3, e);
    
   }
 }
 

}


7.创建主程序EmployeeMain



package demo.emp;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeMain {

 public static void main(String[] args) throws Exception {
  //配置一个Job
  Job job = Job.getInstance(new Configuration());
  //设置JAR包环境
  job.setJarByClass(EmployeeMain.class);
  
  //设置Mapper类,输出k2  v2
  job.setMapperClass(EmployeeMaper.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(LongWritable.class);
  
  //设置Reducer类 输出 k4  v4
  job.setReducerClass(EmployeeReducer.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(LongWritable.class);
  
  //设置文件输入输出
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  //运行环境
  job.waitForCompletion(true);
 }

}


8.生成可执行的jar包

9.在linux的环境下执行程序


2.排序



 

让程序根据自己的排序规则输出结果



java的排序

 对象继承一个Comparator的类,重写方法即可


创建Student类,创建属性和读写方法


package demo.sort.java;

public class Student implements Comparable{
 private int stuID;
 private String name;
 private int age;


 public int getStuID() {
  return stuID;
 }


 public void setStuID(int stuID) {
  this.stuID = stuID;
 }


 public String getName() {
  return name;
 }


 public void setName(String name) {
  this.name = name;
 }


 public int getAge() {
  return age;
 }


 public void setAge(int age) {
  this.age = age;
 }
 
 
}


重写compareTo和toString(非必要)方法


@Override
public int compareTo(Student o) {
 if(this.age>=o.age){
  return -1;
 }else{
  return 1;
 }
}
 
@Override
public String toString() {
 return "["+stuID+"\t"+name+"\t"+age+"]";
}


编写主程序StudentMain.java


package demo.sort.java;

import java.lang.reflect.Array;
import java.util.Arrays;

public class StudentMain {

 public static void main(String[] args) {
  Student s1 = new Student();
  s1.setStuID(1);
  s1.setName("Tom");
  s1.setAge(23);
  Student s2 = new Student();
  s2.setStuID(2);
  s2.setName("Lion");
  s2.setAge(21);
  Student s3 = new Student();
  s3.setStuID(3);
  s3.setName("Hell");
  s3.setAge(30);
  Student[] list = {s1,s2,s3};
  Arrays.sort(list);
  
  for(Student s:list){
   System.out.println(s);
  }
  
 }

}


输出结果



[2 Lion 21]
[1 Tom 23]
[3 Hell 30]


总结



 

排序的顺序关键在与compareTo方法的return 值



MapReduce的排序规则



 

基本数据类型:需要写Comparator类文件,在主程序指定排序规则

 对象类型:直接在对象中继承WritableComparable接口,实现compareTo方法,即可不用再主程序加排序规则




(1)基本数据类型:
如果排序的过程中,使用了reducer,会去掉重复记录

数字:  默认:升序;如果要改变顺序,创建一个自定义比较器
字符串:默认:按照字典顺序;如果要改变顺序,创建一个自定义比较器

小结:
(1)把数据作为key2  
(2)value2没有值:NullWritable  
(3)没有Reducer,如果有Reducer,去掉重复的数据  


(2)对象进行排序:数据:员工表的数据
(*)复习:SQL语句的排序
       order by 后面 + 列名、表达式、别名、序号
    多个列的排序:先按照第一个排序,如果相同,再按照第二列排序,以此类推。
    desc只作用于离他最近的列
    
    select * from emp order by deptno desc,sal desc;
    
    (难一点的问题)排序后的表,跟原来的表不是同一张表
    
(*)MR的排序:对员工的对象排序  -----> 实现WritableComparable
 要求:(1)实现序列化
       (2)可被排序


MapReduce程序



 

此程序排序不需要reduce



1.创建Mapper程序





package demo.sort.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper {

 @Override
 protected void map(LongWritable k1, Text v1,Context context)
     throws IOException, InterruptedException {
  String data = v1.toString();
  context.write(new LongWritable(Integer.parseInt(data)), NullWritable.get());
 }

}


2.创建比较器对象Comparator



package demo.sort.mr;

import org.apache.hadoop.io.LongWritable;

public class CompareMapper extends LongWritable.Comparator{

 @Override
 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  // TODO Auto-generated method stub
  return -super.compare(b1, s1, l1, b2, s2, l2);
 }

}


3.创建主程序,添加自己的比较器对象



package demo.sort.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortMain {

 public static void main(String[] args) throws Exception {
  //1创建Job
  Job job = Job.getInstance(new Configuration());
  //2指定job
  
  job.setJarByClass(SortMain.class);
  //3指定map的类 输出k2,v2
  job.setMapperClass(SortMapper.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(NullWritable.class);
  //指定自己的比较对象
  job.setSortComparatorClass(CompareMapper.class);
  
  //4指定reduce的类 和MR输出
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(NullWritable.class);
  //5指定文件输出
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  //6启动job
  job.waitForCompletion(true);

 }

}


4.生成jar包

5.在linux下执行程序


MapReduce程序 根据部门和薪水排序


1.创建Employee员工类


创建属性和读写方法 继承WritableComparable接口 重写compareTo方法 序列化Employee类 重写toString方法


package demo.sort.tolsal;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

//员工类
public class Employee implements WritableComparable{
 private int empno;
 private String ename;
 private String job;
 private int mgr;
 private String hiredata;
 private int sal;//月薪
 private int comm;//奖金
 private int deptno;//部门号
 
 @Override
 public int compareTo(Employee e) {
  if(this.deptno>e.deptno){
   return 1;
  }else if(this.deptno=e.sal){
   return 1;
  }else{
   return -1;
  }
 }
 
 @Override
 public String toString() {
  return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
 }

 @Override
 public void write(DataOutput output) throws IOException {
  // 序列化
  output.writeInt(this.empno);
  output.writeUTF(this.ename);
  output.writeUTF(this.job);
  output.writeInt(this.mgr);
  output.writeUTF(this.hiredata);
  output.writeInt(this.sal);
  output.writeInt(this.comm);
  output.writeInt(this.deptno);
 }
 
 @Override
 public void readFields(DataInput input) throws IOException {
  // 反序列化
  this.empno = input.readInt();
  this.ename=input.readUTF();
  this.job = input.readUTF();
  this.mgr = input.readInt();
  this.sal = input.readInt();
  this.comm = input.readInt();
  this.deptno = input.readInt();
 }

 
 
 public int getEmpno() {
  return empno;
 }
 public void setEmpno(int empno) {
  this.empno = empno;
 }
 public String getEname() {
  return ename;
 }
 public void setEname(String ename) {
  this.ename = ename;
 }
 public String getJob() {
  return job;
 }
 public void setJob(String job) {
  this.job = job;
 }
 public int getMgr() {
  return mgr;
 }
 public void setMgr(int mgr) {
  this.mgr = mgr;
 }
 public String getHiredata() {
  return hiredata;
 }
 public void setHiredata(String hiredata) {
  this.hiredata = hiredata;
 }
 public int getSal() {
  return sal;
 }
 public void setSal(int sal) {
  this.sal = sal;
 }
 public int getComm() {
  return comm;
 }
 public void setComm(int comm) {
  this.comm = comm;
 }
 public int getDeptno() {
  return deptno;
 }
 public void setDeptno(int deptno) {
  this.deptno = deptno;
 }



}


2.创建EmployeeMaper对象



package demo.sort.tolsal;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class TotalMapper extends Mapper {

 @Override
 protected void map(LongWritable k1, Text v1, Context context)
   throws IOException, InterruptedException {
  String data=v1.toString();
  String[] words = data.split(",");
  Employee e = new Employee();
  e.setEmpno(Integer.parseInt(words[0]));
  e.setEname(words[1]);
  e.setJob(words[2]);
  try{
  e.setMgr(Integer.parseInt(words[3]));
  }catch(Exception ex){
   e.setMgr(0);
  }
  e.setHiredata(words[4]);
  e.setSal(Integer.parseInt(words[5]));
  try{
   e.setComm(Integer.parseInt(words[6]));
  }catch(Exception ex){
   e.setComm(0);
  }
  e.setDeptno(Integer.parseInt(words[7]));

  context.write(e, NullWritable.get());
 }
 
}


3.创建Reducer类



package demo.sort.tolsal;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;


public class TotalReducer extends Reducer {

 @Override
 protected void reduce(Employee k3, Iterable v3,Context context)
     throws IOException, InterruptedException {
  context.write(new LongWritable(k3.getDeptno()),k3);
 }


}


4.创建主函数



package demo.sort.tolsal;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TotalMain {

 public static void main(String[] args) throws Exception {
  //1创建Job
  Job job =Job.getInstance(new Configuration());
  //2指定程序入口
  job.setJarByClass(TotalMain.class);
  //3指定Map的类 和输出类型k3 v3
  job.setMapperClass(TotalMapper.class);
  job.setMapOutputKeyClass(Employee.class);
  job.setMapOutputValueClass(NullWritable.class);
  //4指定Reduce的类 输出类型k4 v4
  job.setReducerClass(TotalReducer.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(Employee.class);
  //5指定文件的输入输出路径
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  //6启动job
  job.waitForCompletion(true);
  

 }

}


5.创建jar包compare.jar

6.在linux上执行jar包



hardoop jar compare.jar /input/emp.csv /output/w0921


7.执行结果



7934 [7934 MILLER 1300 10]
7782 [7782 CLARK 2450 10]
7839 [7839 KING 5000 10]
7369 [7369 SMITH 800 20]
7876 [7876 ADAMS 1100 20]
7566 [7566 JONES 2975 20]
7902 [7902 FORD 3000 20]
7788 [7788 SCOTT 3000 20]
7900 [7900 JAMES 950 30]
7521 [7521 WARD 1250 30]
7654 [7654 MARTIN 1250 30]
7844 [7844 TURNER 1500 30]
7499 [7499 ALLEN 1600 30]
7698 [7698 BLAKE 2850 30]


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved