package com.cmb.lul.lulm.service.data.sink; import com.cmb.dtpframework.exception.AppException; import com.cmb.dtpframework.utils.SpringContextHolder; import com.cmb.dtpframework.utils.StringUtil; import com.cmb.lp.za20.util.BaseLayerFacade; import com.cmb.lul.lulm.service.data.cache.metadata.ColumnMetadata; import com.cmb.lul.lulm.service.data.cache.metadata.JdbcMetadataManager; import com.cmb.lul.lulm.service.data.cache.metadata.TableMetadata; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import java.math.BigDecimal; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.Types; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; public abstract class AbstractTableSink implements TableSink { protected final String sinkDbName; protected final String sinkSchemaName; protected final String sinkTableName; protected final TableMetadata sinkDsTableMetadata; protected final String indexName; protected final List<Map<String, String>> datas; protected final List<String> dataColumnNames; protected final List<String> primaryKeys; protected final List<String> updateColumnNames; protected long connectCostMs = 0L; protected long sinkCostMs = 0L; public AbstractTableSink(String sinkSchemaName, String sinkTableName, String indexName, List<String> primaryKeys, List<Map<String, String>> datas) { this.sinkDbName = sinkSchemaName; this.sinkSchemaName = sinkSchemaName; this.sinkTableName = sinkTableName; this.indexName = indexName; this.primaryKeys = primaryKeys; this.sinkDsTableMetadata = BaseLayerFacade.getSingleton(JdbcMetadataManager.class).getTableMetadata(sinkSchemaName, sinkTableName); this.datas = datas; this.dataColumnNames = getAndSortDataColumnNames(sinkDsTableMetadata, datas); this.updateColumnNames = dataColumnNames.stream().filter(column -> !primaryKeys.contains(column)).collect(Collectors.toList()); } public AbstractTableSink(String sinkSchemaName, String sinkTableName, List<String> primaryKeys, List<Map<String, String>> datas) { this(sinkSchemaName, sinkTableName, null, primaryKeys, datas); } protected List<String> getAndSortDataColumnNames(TableMetadata sinkDsTableMetadata, List<Map<String, String>> datas) { // 用源表data数据确定要写哪些字段 Set<String> tempDataColumnNamesSet = new HashSet<>(16); datas.forEach(data -> tempDataColumnNamesSet.addAll(data.keySet())); // 用目标表metadata重排列字段顺序 List<String> tempDataColumnNamesList = new ArrayList<>(tempDataColumnNamesSet.size()); sinkDsTableMetadata.getColumnMetadatas().keySet().forEach(metadataColumnName -> { if (tempDataColumnNamesSet.contains(metadataColumnName)) { tempDataColumnNamesList.add(metadataColumnName); tempDataColumnNamesSet.remove(metadataColumnName); } }); // 源表data中存在,但目标表metadata中不存在的字段放到最后(这部分写的时候会报错,表明目标表字段比源表少) tempDataColumnNamesList.addAll(tempDataColumnNamesSet); return tempDataColumnNamesList; } @Override public void doSink() { if (!datas.isEmpty()) { /* 构建sql语句 */ String sql = buildSql(); SqlSession session = null; try { long beforeConnectTime = System.currentTimeMillis(); session = SpringContextHolder.getBean(SqlSessionFactory.class).openSession(); Connection connection = session.getConnection(); long afterConnectTime = System.currentTimeMillis(); try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { /* 设置参数 */ for (Map<String, String> data : datas) { // 设置参数 setPreparedStatementParameter(preparedStatement, data); preparedStatement.addBatch(); } int[] results = preparedStatement.executeBatch(); } /* 事务提交 */ session.commit(true); long afterSinkTime = System.currentTimeMillis(); connectCostMs = afterConnectTime - beforeConnectTime; sinkCostMs = afterSinkTime - afterConnectTime; } catch (Exception e) { if (null != session) { /* 事务回滚 */ session.rollback(true); } throw new AppException(e, "LULM605", sinkDbName, sinkTableName, sql, e.getMessage()); } finally { if (null != session) { session.close(); } } } } protected String buildSql() { StringBuilder sql = new StringBuilder(); sql.append("MERGE "); if (indexName != null) { sql.append("/*+INDEX(").append(sinkTableName).append(" ").append(indexName).append(") */ "); } sql.append("INTO ").append(buildTableName()).append(" USING DUAL ON \n") .append("(").append(buildWhereCondition()).append(") \n") .append(" WHEN MATCHED THEN UPDATE SET \n") .append(buildOnDuplicateKeyUpdateStatement()).append(" \n") .append(" WHEN NOT MATCHED THEN INSERT \n") .append("(").append(buildColumnNamesSpiltByComma()).append(") \n") .append("values (").append(buildPlaceHoldersSpiltByComma()).append(")"); return sql.toString(); } protected String buildWhereCondition() { StringBuilder result = new StringBuilder(); for (int i = 0; i < primaryKeys.size(); i++) { String primaryKey = primaryKeys.get(i); result.append(primaryKeys.get(i)); result.append((i < primaryKeys.size() - 1) ? " = ? AND " : " = ?"); } return result.toString(); } protected String buildTableName() { StringBuilder tableName = new StringBuilder(); tableName.append(sinkSchemaName).append(".") .append(sinkTableName); return tableName.toString(); } protected String buildColumnNamesSpiltByComma() { StringBuilder columnNamesSpiltByComma = new StringBuilder(); for (String columnName : dataColumnNames) { columnNamesSpiltByComma.append(columnName).append(","); } if (columnNamesSpiltByComma.length() >= 1) { columnNamesSpiltByComma.deleteCharAt(columnNamesSpiltByComma.length() - 1); } return columnNamesSpiltByComma.toString(); } protected String buildPlaceHoldersSpiltByComma() { StringBuilder placeHolders = new StringBuilder(); for (int i = 0; i < dataColumnNames.size(); i++) { placeHolders.append("?,"); } if (placeHolders.length() >= 1) { placeHolders.deleteCharAt(placeHolders.length() - 1); } return placeHolders.toString(); } protected String buildOnDuplicateKeyUpdateStatement() { StringBuilder updateStatement = new StringBuilder(); for (String columnName : updateColumnNames) { updateStatement.append(columnName).append(" = ?,"); } if (updateStatement.length() >= 1) { updateStatement.deleteCharAt(updateStatement.length() - 1); } return updateStatement.toString(); } protected void setPreparedStatementParameter(PreparedStatement preparedStatement, Map<String, String> data) { // 设置参数 // 通过binlog字段找metadata,找不到时getDsTableColumnMetadata(columnName)会报错,以此确保所有binlog数据字段都被写入目标表 int i = 1; for (String primaryKey : primaryKeys) { setOneColumn(preparedStatement, i, primaryKey, data, true); i++; } for (String columnName : updateColumnNames) { setOneColumn(preparedStatement, i, columnName, data, false); i++; } for (String columnName : dataColumnNames) { setOneColumn(preparedStatement, i, columnName, data, false); i++; } } protected void setOneColumn(PreparedStatement preparedStatement, int index, String columnName, Map<String, String> data, boolean format) { String columnValue = data.get(columnName); int columnDataType = -1; int columnSize = 0; try { ColumnMetadata metadata = sinkDsTableMetadata.getColumnMetadata(columnName); if (metadata == null) { throw new IllegalArgumentException("目标表不存在该字段!"); } columnDataType = metadata.getDataType(); columnSize = metadata.getColumnSize(); preparedStatement.setObject(index, transferData(columnName, columnDataType, columnValue, columnSize, format), columnDataType); } catch (Exception e) { throw new AppException(e, "LULM606", sinkDbName, sinkTableName, columnName, columnDataType, columnValue, columnValue.getClass().getName(), e.getMessage()); } } private Object transferData(String columnName, int columnType, String columnValue, int columnSize, boolean format) { switch (columnType) { case Types.CHAR: if (format) { if (columnValue == null) { columnValue = ""; } return columnValue.length() < columnSize ? String.format("%-" + columnSize + "s", columnValue) : columnValue; } return checkEmpty(columnValue) ? " " : columnValue; case Types.VARCHAR: case Types.LONGVARCHAR: return checkEmpty(columnValue) ? " " : columnValue; case Types.SMALLINT: return checkEmpty(columnValue) ? 0 : Short.parseShort(columnValue); case Types.INTEGER: return checkEmpty(columnValue) ? 0 : Integer.parseInt(columnValue); case Types.BIGINT: return checkEmpty(columnValue) ? 0 : Long.parseLong(columnValue); case Types.FLOAT: case Types.DOUBLE: return checkEmpty(columnValue) ? 0 : Double.parseDouble(columnValue); case Types.NUMERIC: case Types.DECIMAL: return checkEmpty(columnValue) ? BigDecimal.ZERO : new BigDecimal(columnValue); case Types.DATE: case Types.TIMESTAMP: case Types.TIME: return checkEmpty(columnValue) ? 0 : columnValue; default: throw new AppException("LULM603", sinkDbName, sinkSchemaName, sinkTableName, columnName, columnType); } } private boolean checkEmpty(Object value) { return value == null || (value instanceof String && StringUtil.isBlank((String)value)); } } 请你给我详细分析解释一下这段代码
时间: 2025-07-21 22:55:02 浏览: 5
<think>好的,我现在需要详细分析用户提供的这段Java代码。首先,我应该通读整个代码,理解它的整体结构和功能。这段代码属于一个名为AbstractTableSink的抽象类,实现了TableSink接口,看起来是处理数据同步或写入数据库的组件。
首先,注意到类中有多个protected成员变量,比如sinkDbName、sinkSchemaName等,这些可能用于存储数据库连接信息和表结构元数据。构造函数接收参数,初始化这些变量,并通过JdbcMetadataManager获取目标表的元数据。接下来,dataColumnNames和updateColumnNames是通过处理传入的数据和主键列表来确定的,这可能用于生成SQL语句中的列。
在doSink方法中,主要逻辑是构建SQL语句,使用MyBatis的SqlSession获取数据库连接,然后通过PreparedStatement执行批量操作。这里用了MERGE语句,这在Oracle中通常用于upsert操作(更新或插入)。异常处理部分捕获了可能的错误,并进行事务回滚,同时抛出自定义的AppException。
buildSql方法构建了MERGE INTO语句,根据是否存在indexName添加提示。ON子句使用主键构建WHERE条件,WHEN MATCHED THEN UPDATE SET部分使用updateColumnNames,而WHEN NOT MATCHED THEN INSERT部分使用所有数据列。这里需要注意updateColumnNames是排除主键后的列,这样在更新时不会修改主键。
setPreparedStatementParameter方法负责设置SQL参数,遍历主键、更新列和所有列,依次设置参数。setOneColumn方法根据元数据设置每个参数的值,并调用transferData进行类型转换。transferData方法根据不同的SQL类型将字符串值转换为相应的Java类型,比如Integer、BigDecimal等,并处理空值情况。
接下来需要考虑代码中的潜在问题。例如,在transferData方法中处理日期类型时,直接返回columnValue作为字符串,但数据库可能需要特定的日期类型如java.sql.Date或Timestamp。这可能导致类型不匹配错误。此外,在设置参数时使用setObject,可能依赖于JDBC驱动的自动转换,这在某些情况下可能不可靠。
另一个问题是,当目标表的字段比源数据少时,getAndSortDataColumnNames方法会将这些字段放在最后,但实际插入时可能会报错。这部分需要确保数据列和目标表列匹配,否则会导致SQL异常。
另外,异常处理部分抛出的AppException是否正确传递了所有必要的信息,比如错误代码和参数,需要检查日志记录和异常处理流程是否完善。
关于性能方面,批量操作使用addBatch和executeBatch,但每个数据项都执行一次addBatch,这可能效率不高。可以考虑调整批处理的大小,或者使用更高效的批量插入方式。
还有,代码中使用了SpringContextHolder获取SqlSessionFactory,这是否线程安全?需要确认Spring的配置是否正确,避免多线程问题。
最后,MERGE语句中的USING DUAL ON部分,DUAL是Oracle中的伪表,但在其他数据库如MySQL中可能不适用。这段代码是否与数据库类型耦合,是否具有可移植性?需要确认使用的数据库是否为Oracle,否则可能需要调整SQL语法。
总结一下,这段代码的主要功能是通过MERGE语句将数据从源同步到目标表,处理主键冲突,并进行类型转换和参数设置。需要关注类型转换的正确性、SQL语句的数据库兼容性、异常处理的健壮性以及性能优化的可能性。</think>这段代码是一个基于Java的抽象类`AbstractTableSink`,用于将数据写入数据库的组件,主要功能是实现数据的**增量更新(Upsert)**操作。以下是详细分析:
---
### **1. 核心功能**
- **数据同步**:通过`MERGE INTO`语句实现数据库的**插入或更新**操作(Oracle语法)。
- **元数据管理**:利用`JdbcMetadataManager`获取目标表的结构信息(如列类型、主键等)。
- **类型转换**:将字符串格式的数据转换为数据库所需的类型(如`Integer`、`BigDecimal`等)。
- **异常处理**:捕获SQL异常并抛出自定义异常`AppException`,记录错误上下文信息。
---
### **2. 关键成员变量**
- **目标表信息**:
- `sinkDbName`/`sinkSchemaName`/`sinkTableName`:目标数据库、模式、表名。
- `sinkDsTableMetadata`:目标表的元数据(列类型、大小等)。
- **操作参数**:
- `datas`:待写入的数据列表(`Map<String, String>`)。
- `dataColumnNames`:数据中实际存在的列名。
- `primaryKeys`:主键列表。
- `updateColumnNames`:非主键列,用于更新操作。
- **性能监控**:
- `connectCostMs`:连接耗时。
- `sinkCostMs`:写入耗时。
---
### **3. 核心方法解析**
#### **(1) 构造函数**
- 初始化目标表信息,并通过`JdbcMetadataManager`获取元数据。
- 从数据中提取列名,按目标表结构排序,确保字段顺序一致。
#### **(2) doSink()**
- **流程**:
1. 检查数据非空。
2. 构建SQL语句(`buildSql()`)。
3. 获取数据库连接,执行批量操作。
4. 提交事务,记录耗时。
5. 异常时回滚并抛出`AppException`。
- **关键点**:
- 使用`SqlSession`管理连接(MyBatis集成)。
- 批量操作通过`PreparedStatement.addBatch()`实现。
#### **(3) buildSql()**
- **生成MERGE语句**:
```sql
MERGE /*+INDEX(...) */ INTO schema.table
USING DUAL ON (主键条件)
WHEN MATCHED THEN UPDATE SET 非主键列
WHEN NOT MATCHED THEN INSERT (列) VALUES (?)
```
- **特点**:
- 支持指定索引提示(`indexName`)。
- 主键用于匹配记录,非主键列用于更新。
#### **(4) setPreparedStatementParameter()**
- **参数绑定逻辑**:
1. 按顺序设置主键参数(用于`ON`条件)。
2. 设置更新列参数(`WHEN MATCHED`部分)。
3. 设置插入列参数(`WHEN NOT MATCHED`部分)。
- **类型转换**:调用`transferData()`将字符串转换为目标类型。
#### **(5) transferData()**
- **数据类型转换**:
- `CHAR/VARCHAR`:处理空值,填充空格。
- `SMALLINT/INTEGER/BIGINT`:转换为数值类型,空值转为0。
- `DECIMAL`:转换为`BigDecimal`。
- `DATE/TIME`:直接返回字符串(可能存在兼容性问题)。
- **异常处理**:不支持的类型抛出`AppException`。
---
### **4. 潜在问题**
1. **日期类型处理**:
- `transferData()`中`DATE/TIMESTAMP`直接返回字符串,可能引发类型不匹配(应使用`java.sql.Date`或`Timestamp`)。
2. **数据库兼容性**:
- `MERGE INTO ... USING DUAL`是Oracle语法,若用于MySQL需改用`ON DUPLICATE KEY UPDATE`。
3. **性能问题**:
- 批量操作未分页,大数据量可能导致内存溢出。
4. **空值处理**:
- `CHAR`类型填充空格可能不符合业务需求。
---
### **5. 优化建议**
1. **类型转换增强**:
- 添加对`DATE/TIME`的解析(如`SimpleDateFormat`)。
2. **动态SQL生成**:
- 根据数据库类型生成适配的SQL语句(如MySQL的`INSERT ... ON DUPLICATE KEY UPDATE`)。
3. **批处理优化**:
- 分批提交(如每1000条提交一次)减少内存压力。
4. **日志记录**:
- 添加SQL日志输出,便于调试和性能分析。
---
### **6. 示例调用**
假设目标表为`users(id, name, age)`,主键为`id`,数据为:
```java
List<Map<String, String>> datas = List.of(
Map.of("id", "1", "name", "Alice", "age", "30"),
Map.of("id", "2", "name", "Bob", "age", "25")
);
AbstractTableSink sink = new ConcreteTableSink("schema", "users", List.of("id"), datas);
sink.doSink();
```
生成的SQL类似:
```sql
MERGE INTO schema.users USING DUAL ON (id = ?)
WHEN MATCHED THEN UPDATE SET name = ?, age = ?
WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (?, ?, ?)
```
---
### **相关问题**
1. 如何修改代码以支持MySQL的`ON DUPLICATE KEY UPDATE`语法?
2. `transferData()`方法中日期类型转换可能引发哪些错误?如何修复?
3. 为什么`dataColumnNames`需要按目标表元数据排序?这对SQL执行有何影响?
4. 在批量写入时,如何优化内存使用以避免OOM?
5. 如果目标表新增列而源数据未包含,代码如何处理?会报错吗?
阅读全文
相关推荐



















