Hadoop从入门到精通31:MapReduce高级功能——分区

1.什么是分区?

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中等等。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。

分区的英文单词叫做Partition,简写为part。可以从MR任务的输出文件part-r-00000的前缀part看到这一点。MR默认情况下只有一个分区,即只有一个输出文件part-r-00000。如果设置了多个分区,那么就会在一个目录下输出多个文件:part-r-00000,part-r-00001,part-r-00002,等等。

对比日志信息:

(1)没有分区的情况(一个分区)

18/11/05 22:12:38 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 22:12:41 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 22:12:45 INFO mapreduce.Job: map 100% reduce 100%

(2)有分区的情况(3个分区)

18/11/05 22:12:38 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 22:12:41 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 22:12:45 INFO mapreduce.Job: map 100% reduce 33%
18/11/05 22:12:49 INFO mapreduce.Job: map 100% reduce 67%
18/11/05 22:12:54 INFO mapreduce.Job: map 100% reduce 100%

2.开发带有分区的MR程序

(1)开发带有分区的MR程序需要注意以下几点:

  1. 在Mapper和Reducer之间加上一个Partitioner阶段;
  2. Partitioner的输入就是Mapper的输出;
  3. 分区类需要继承自Partitioner父类;
  4. 分区类需要重载getPartition()方法;

示例:按照员工的部门号对员工数据进行分类存放(不同部门的员工输出到不同的文件中)。

//员工类:Employee.java
package demo.part;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Employee implements Writable {
  private int empno;
  private String ename;
  private String job;
  private int mgr;
  private String hiredate;
  private int sal;
  private int comm;
  private int deptno;
  @Override
  public String toString() {
    return "["+this.ename+"\t"+this.deptno+"\t"+this.sal+"]";
  }
  @Override
  public void readFields(DataInput input) throws IOException {
    // 反序列化
    this.empno = input.readInt();
    this.ename = input.readUTF();
    this.job = input.readUTF();
    this.mgr = input.readInt();
    this.hiredate = input.readUTF();
    this.sal = input.readInt();
    this.comm = input.readInt();
    this.deptno = input.readInt();
  }
  @Override
  public void write(DataOutput output) throws IOException {
    // 序列化
    output.writeInt(this.empno);
    output.writeUTF(this.ename);
    output.writeUTF(this.job);
    output.writeInt(this.mgr);
    output.writeUTF(this.hiredate);
    output.writeInt(this.sal);
    output.writeInt(this.comm);
    output.writeInt(this.deptno);
  }
  public int getEmpno() {
    return empno;
  }
  public void setEmpno(int empno) {
    this.empno = empno;
  }
  public String getEname() {
    return ename;
  }
  public void setEname(String ename) {
    this.ename = ename;
  }
  public String getJob() {
    return job;
  }
  public void setJob(String job) {
    this.job = job;
  }
  public int getMgr() {
    return mgr;
  }
  public void setMgr(int mgr) {
    this.mgr = mgr;
  }
  public String getHiredate() {
    return hiredate;
  }
  public void setHiredate(String hiredate) {
    this.hiredate = hiredate;
  }
  public int getSal() {
    return sal;
  }
  public void setSal(int sal) {
    this.sal = sal;
  }
  public int getComm() {
    return comm;
  }
  public void setComm(int comm) {
    this.comm = comm;
  }
  public int getDeptno() {
    return deptno;
  }
  public void setDeptno(int deptno) {
    this.deptno = deptno;
  }
}
//Mapper类:EmployeePartitionMapper.java
package demo.part;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EmployeePartitionMapper extends Mapper<LongWritable, Text, LongWritable, Employee>{
  /*
  * mapper:将读入的员工数据存到一个员工对象中
  */
  @Override
  protected void map(LongWritable key1, Text value1, Context context)
    throws IOException, InterruptedException {
    //读入一行数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    String line = value1.toString();
    //分词操作
    String[] words = line.split(",");
    //创建一个员工对象
    Employee e = new Employee();
    //设置员工属性
    e.setEmpno(Integer.parseInt(words[0]));
    e.setEname(words[1]);
    e.setJob(words[2]);
    try {
      e.setMgr(Integer.parseInt(words[3]));
    }catch(Exception ex) {
      e.setMgr(0);
    }
    e.setHiredate(words[4]);
    e.setSal(Integer.parseInt(words[5]));
    try {
      e.setComm(Integer.parseInt(words[6]));
    }catch(Exception ex) {
      e.setComm(0);
    }
    e.setDeptno(Integer.parseInt(words[7]));
    //map输出
    context.write(new LongWritable(e.getDeptno()), e);
  }
}
//Partitioner类:EmployeePartitioner.java
package demo.<
相关推荐
软件下载
QR Code
微信扫一扫,欢迎咨询~

联系我们
武汉格发信息技术有限公司
湖北省武汉市经开区科技园西路6号103孵化器
电话:155-2731-8020 座机:027-59821821
邮件:tanzw@gofarlic.com
Copyright © 2023 Gofarsoft Co.,Ltd. 保留所有权利
遇到许可问题?该如何解决!?
评估许可证实际采购量? 
不清楚软件许可证使用数据? 
收到软件厂商律师函!?  
想要少购买点许可证,节省费用? 
收到软件厂商侵权通告!?  
有正版license,但许可证不够用,需要新购? 
联系方式 155-2731-8020
预留信息,一起解决您的问题
* 姓名:
* 手机:

* 公司名称:

姓名不为空

手机不正确

公司不为空