1.什么是分区?
在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中等等。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。
分区的英文单词叫做Partition,简写为part。可以从MR任务的输出文件part-r-00000的前缀part看到这一点。MR默认情况下只有一个分区,即只有一个输出文件part-r-00000。如果设置了多个分区,那么就会在一个目录下输出多个文件:part-r-00000,part-r-00001,part-r-00002,等等。
对比日志信息:
(1)没有分区的情况(一个分区)
18/11/05 22:12:38 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 22:12:41 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 22:12:45 INFO mapreduce.Job: map 100% reduce 100%
(2)有分区的情况(3个分区)
18/11/05 22:12:38 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 22:12:41 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 22:12:45 INFO mapreduce.Job: map 100% reduce 33%
18/11/05 22:12:49 INFO mapreduce.Job: map 100% reduce 67%
18/11/05 22:12:54 INFO mapreduce.Job: map 100% reduce 100%
2.开发带有分区的MR程序
(1)开发带有分区的MR程序需要注意以下几点:
- 在Mapper和Reducer之间加上一个Partitioner阶段;
- Partitioner的输入就是Mapper的输出;
- 分区类需要继承自Partitioner父类;
- 分区类需要重载getPartition()方法;
示例:按照员工的部门号对员工数据进行分类存放(不同部门的员工输出到不同的文件中)。
//员工类:Employee.java
package demo.part;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Employee implements Writable {
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public String toString() {
return "["+this.ename+"\t"+this.deptno+"\t"+this.sal+"]";
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
//Mapper类:EmployeePartitionMapper.java
package demo.part;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EmployeePartitionMapper extends Mapper<LongWritable, Text, LongWritable, Employee>{
/*
* mapper:将读入的员工数据存到一个员工对象中
*/
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//读入一行数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String line = value1.toString();
//分词操作
String[] words = line.split(",");
//创建一个员工对象
Employee e = new Employee();
//设置员工属性
e.setEmpno(Integer.parseInt(words[0]));
e.setEname(words[1]);
e.setJob(words[2]);
try {
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex) {
e.setMgr(0);
}
e.setHiredate(words[4]);
e.setSal(Integer.parseInt(words[5]));
try {
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex) {
e.setComm(0);
}
e.setDeptno(Integer.parseInt(words[7]));
//map输出
context.write(new LongWritable(e.getDeptno()), e);
}
}
//Partitioner类:EmployeePartitioner.java
package demo.<