WritableComparable排序案例实操(区内排序) 得分_____ 1)需求 要求每个省份手机号输出的文件中按照总流量内部排序。 2)需求分析 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。 将完成的代码贴在此处
时间: 2025-07-28 11:03:39 浏览: 5
<think>我们正在处理一个HadoopMapReduce任务,要求对每个省份的手机号数据按照总流量进行区内排序(即每个分区内部排序)。这需要以下步骤:1.自定义一个FlowBean类,实现WritableComparable接口,用于封装上行流量、下行流量和总流量,并支持比较(特别是按总流量排序)。2.自定义Mapper类,读取输入数据并转换为FlowBean和手机号Text。3.自定义Partitioner类,根据手机号前三位(省份)进行分区,确保同一省份的手机号进入同一个Reduce任务。4.自定义Reducer类,对每个分区内的数据按总流量排序并输出。5.驱动类(Driver)配置Job并设置分区类、Reduce任务数量等。根据引用[4]中的案例,我们注意到在区内排序中,我们使用了自定义分区类(ProvincePartitioner2),同时要求每个分区内按总流量排序。而排序的关键在于FlowBean实现了WritableComparable接口,因此可以在Map阶段和Reduce阶段自动排序。下面我们将提供完整代码:步骤1:自定义FlowBean类(实现WritableComparable接口,按总流量降序排序)步骤2:自定义Mapper类步骤3:自定义Partitioner类(按手机号前三位分区)步骤4:自定义Reducer类步骤5:驱动类注意:我们假设输入数据的格式为:手机号\t上行流量\t下行流量(用制表符分隔)代码实现如下:</think>### Hadoop MapReduce WritableComparable 区内排序完整实现
以下是实现每个省份手机号按总流量排序的区内排序完整代码,基于Hadoop MapReduce框架:
#### 1. 流量数据封装类 (FlowBean.java)
```java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
// 空构造方法(反射需要)
public FlowBean() {}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
// 按总流量降序排序(区内排序核心)
@Override
public int compareTo(FlowBean o) {
return Long.compare(o.sumFlow, this.sumFlow); // 降序排列
}
// Getters and Setters
public long getUpFlow() { return upFlow; }
public long getDownFlow() { return downFlow; }
public long getSumFlow() { return sumFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
```
#### 2. Mapper类 (FlowMapper.java)
```java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1. 获取一行数据 (格式: 手机号\t上行流量\t下行流量)
String line = value.toString();
String[] fields = line.split("\t");
// 2. 提取数据
String phone = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
// 3. 封装对象
outV.set(phone);
outK.setUpFlow(upFlow);
outK.setDownFlow(downFlow);
outK.setSumFlow(upFlow + downFlow);
// 4. 输出 (K: FlowBean, V: 手机号)
context.write(outK, outV);
}
}
```
#### 3. 自定义分区类 (ProvincePartitioner.java)
```java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
// 获取手机号前三位
String phone = value.toString();
String prefix = phone.substring(0, 3);
// 按省份分区(区内排序关键)
switch (prefix) {
case "136": return 0;
case "137": return 1;
case "138": return 2;
case "139": return 3;
default: return 4; // 其他省份
}
}
}
```
#### 4. Reducer类 (FlowReducer.java)
```java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 遍历同组数据(已按总流量降序排列)
for (Text phone : values) {
context.write(phone, key); // 输出格式: 手机号\t流量详情
}
}
}
```
#### 5. 驱动类 (FlowDriver.java)
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Province Flow Sort");
// 设置Jar
job.setJarByClass(FlowDriver.class);
// 关联Mapper/Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Mapper输出类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置自定义分区器(区内排序关键)
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5); // 分区数=ReduceTask数
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
### 技术要点说明
1. **区内排序原理**:
- Map阶段输出`<FlowBean, Phone>`时,`FlowBean`已实现`WritableComparable`接口
- Shuffle阶段在**每个分区内**对`FlowBean`进行二次排序(归并排序)[^3]
- Reduce阶段直接获取已排序的分区数据
2. **自定义分区逻辑**:
- 通过`ProvincePartitioner`按手机前3位分区
- `job.setNumReduceTasks(5)`必须与分区数匹配[^4]
3. **排序方向控制**:
- `FlowBean.compareTo()`返回`Long.compare(o.sumFlow, this.sumFlow)`实现**降序排列**
- 如需升序排列可改为`Long.compare(this.sumFlow, o.sumFlow)`
4. **数据流优化**:
- Mapper输出`<FlowBean, Phone>`避免全数据传输
- Reduce阶段直接输出排序结果,减少处理开销
### 执行流程
```mermaid
graph LR
A[输入数据] --> B(Map阶段)
B --> C[Partition按省份分区]
C --> D[分区1 排序]
C --> E[分区2 排序]
C --> F[... 排序]
D --> G(Reduce输出)
E --> G
F --> G
G --> H[分省排序结果]
```
### 测试数据示例
输入文件内容 (`input.txt`):
```
13600000001\t1000\t2000
13700000002\t1500\t2500
13600000003\t800\t1800
13900000004\t3000\t4000
13800000005\t1200\t2200
```
输出结果 (按省份分区并按总流量降序):
```
# 分区0 (136开头)
13600000001 1000 2000 3000
13600000003 800 1800 2600
# 分区1 (137开头)
13700000002 1500 2500 4000
# 分区3 (139开头)
13900000004 3000 4000 7000
```
### 注意事项
1. 分区数(`numReduceTasks`)必须 ≥ 分区逻辑中的最大分区号+1
2. 输入数据需预处理为规范格式(手机号\t上行\t下行)
3. 如需实现**全排序+区内排序**,需结合`TotalOrderPartitioner`[^1]
阅读全文
相关推荐




















