mapreduce的编程开发求平均值-头歌
时间: 2025-05-14 08:05:15 浏览: 19
### 如何在 MapReduce 中实现计算平均值
#### 定义 Mapper 和 Reducer 类
为了计算一组数字的平均值,需要设计 `AverageMapper` 和 `AverageReducer` 来分别处理映射(Map)和化简(Reduce)过程。在Map阶段,输入的数据被解析成一个个单独的小块,并针对每一块计算其局部平均数以及记录样本数量;而在Reduce阶段,则负责收集来自各个Mapper节点的信息,通过加权求和的方式得出全局平均值。
```java
public static class AverageMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
float number = Float.parseFloat(line.trim());
// 输出 (key=0,value=(number))
context.write(new IntWritable(0), new FloatWritable(number));
}
}
```
对于 Reduce 阶段而言,接收到来自多个 Mapper 的输出之后,应该累加所有的数值并统计总数目以便于最后一步除法操作完成真正的算术平均运算[^2]:
```java
public static class AverageReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable> {
@Override
protected void reduce(IntWritable key, Iterable<FloatWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
double sum = 0;
for (FloatWritable val : values) {
sum += val.get(); // 累计所有数值
++count; // 统计数目
}
if (count != 0){
float average = (float)(sum / count); // 计算平均值
// 将结果写入文件中
context.write(new Text("average"), new FloatWritable(average));
}else{
System.out.println("No data to process.");
}
}
}
```
考虑到直接对各分区内的中间结果取平均后再做一次平均会导致误差增大,因此建议采用传递总和与计数器的方法来确保准确性[^4]。具体来说就是在 Mapper 结束工作前不仅返回当前批次数据的均值还要附带上传该批数据量大小给 Reducer 使用。
#### 设置 Combiner 函数
由于单纯地将每个 Mapper 得到的部分平均值相加以获得整体平均值会产生偏差,所以当涉及到大量分布式集群上的大规模数据分析时,应当谨慎考虑是否启用组合函数(combiner)[^4]。然而,在某些情况下可以通过调整算法逻辑使 combiner 成为可能,比如这里提到的方案:让 combiner 接收来自 mapper 的一对儿(部分总量,对应条目个数),而不是简单的一个浮点型平均值得以避免潜在问题的同时也提高了效率。
```java
// 如果决定使用Combiner的话,可以这样设置
job.setCombinerClass(AverageCombiner.class);
public static class AverageCombiner extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable> {
@Override
public void reduce(IntWritable key, Iterable<FloatWritable> values, Context context)throws IOException, InterruptedException {
int count = 0;
double sum = 0.0d;
for(FloatWritable v : values){
sum += v.get() * count++;
}
context.write(key, new FloatWritable((float)sum/count));
}
}
```
注意这里的 combiner 只是一个简化版本,实际应用中还需要更复杂的逻辑来保证精度无损传输至 reducer 层面继续加工处理。
阅读全文
相关推荐


















