package traffic.step1.mapreduce; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; /**连接MySQL工具类*/ public class DBHelper { /********** Begin **********/ /********** End **********/ } 在右侧代码 DBHelper.java 文件中编写代码,实现通过 jdbc 连接 MySQL; 在右侧代码 TrafficMap.java 文件中编写代码,继承 Mapper 类,实现剔除不合规数据的 map 方法; 在右侧代码 TrafficReduce.java 文件中编写代码,继承 Reducer 类,实现剔除不合规数据的 reduce 方法; 在右侧代码 TrafficJob.java 文件中编写代码,实现 Hadoop 程序的主入口以及程序的相关配置; 测试说明
时间: 2025-05-21 16:42:08 浏览: 23
### 使用 JDBC 连接 MySQL 并编写 Hadoop MapReduce 程序
#### 1. 实现 JDBC 工具类
为了简化与 MySQL 数据库的交互操作,可以创建一个通用的 JDBC 工具类来管理数据库连接、查询和关闭资源等功能。
```java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DBUtil {
private static final String URL = "jdbc:mysql://localhost:3306/your_database";
private static final String USER = "root";
private static final String PASSWORD = "password";
public static Connection getConnection() throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
return DriverManager.getConnection(URL, USER, PASSWORD);
}
public static void close(Connection conn, PreparedStatement ps, ResultSet rs) {
try {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (conn != null) conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
此工具类提供了获取数据库连接的方法 `getConnection` 和用于释放资源的静态方法 `close`[^1]。
---
#### 2. 编写 Mapper 类(剔除不合规数据)
Mapper 阶段的主要任务是从输入源中提取有效记录,并过滤掉不符合条件的数据。假设我们希望筛选出年龄大于等于 18 岁的有效用户数据:
```java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class UserFilterMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 3 && isAdult(fields)) {
context.write(new LongWritable(Integer.parseInt(fields[1])), new Text(fields[0])); // 输出<年龄, 用户ID>
}
}
private boolean isAdult(String[] fields) {
int age = Integer.parseInt(fields[1]);
return age >= 18; // 只保留成年人数据
}
}
```
在此代码片段中,`isAdult` 方法被用来判断用户的年龄是否满足条件[^4]。
---
#### 3. 编写 Reducer 类(进一步剔除异常数据)
Reducer 的职责是对来自 Mapper 的中间键值对进行聚合或二次过滤。这里我们将再次验证数据合法性并输出最终结果:
```java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class UserFilterReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text userId : values) {
if (!userId.toString().isEmpty()) { // 排除非法空字符串
context.write(key, userId); // 输出合法的<年龄, 用户ID>
}
}
}
}
```
该部分逻辑确保了即使在 Reduce 步骤中也能够排除潜在的错误条目[^4]。
---
#### 4. 主入口配置类
最后一步是定义驱动程序以设置作业参数并将整个流程串联起来:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class UserFilterDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "User Filter");
job.setJarByClass(UserFilterDriver.class);
job.setMapperClass(UserFilterMapper.class);
job.setCombinerClass(UserFilterReducer.class);
job.setReducerClass(UserFilterReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
这段代码设置了必要的 I/O 路径和其他属性以便于运行完整的 MapReduce 流程[^3]。
---
### 总结
以上展示了如何构建一个基于 JDBC 的 MySQL 访问工具类以及配套的 Hadoop MapReduce 组件设计思路。这些模块共同作用可完成从原始数据采集到清洗再到存储的一系列复杂计算需求。
阅读全文
相关推荐
















