from odps import ODPS from odps import options import pandas as pd import multiprocessing as mp CORE = 2 MIGRATE_TABLE = 'crm_adm_pdm_dev.migrate_odps_his_table' df= o.get_table(MIGRATE_TABLE) df= df.to_df().to_pandas() source_table = df['source_table'].values.tolist() partition_col = df['partition_col'].values.tolist() target_table = df['target_table'].values.tolist() condition = df['condition'].values.tolist() def ins(source_table,partition_col,target_table,condition): try: print(source_table,target_table) if partition_col == '' or partition_col is None: sql = "INSERT OVERWRITE TABLE %s SELECT * FROM %s %s" % (target_table,source_table,condition) else: sql = "INSERT OVERWRITE TABLE %s partition(%s) SELECT * FROM %s %s" % (target_table,partition_col,source_table,condition) print(sql) o.execute_sql(sql) except Exception as e: print(f"Error while processing {source_table}:\n {e}") with mp.Pool(processes=CORE) as pool: res = pool.starmap_async(ins,zip(source_table,partition_col,target_table,condition)) res.wait()
时间: 2024-04-10 10:32:15 浏览: 233
对于你的代码,我注意到你正在使用ODPS库来连接和操作MaxCompute。你导入了`ODPS`和`options`模块,然后使用`get_table`方法获取了一个表,并将其转换为Pandas DataFrame。
接下来,你将表中的一些列分别赋值给了`source_table`、`partition_col`、`target_table`和`condition`变量。
然后,你定义了一个名为`ins`的函数,将这些变量作为参数传递进去。在函数内部,你尝试执行一个SQL语句,将源表的数据插入到目标表中。如果存在分区列,则将其作为分区信息插入。
最后,你使用`multiprocessing.Pool`创建了一个进程池,并使用`starmap_async`方法并行地调用`ins`函数,将参数从四个列表中逐个取出。使用`CORE`变量来指定进程池的大小。
请问有什么我可以帮助你的吗?
相关问题
set odps.sql.hive.compatible=true; INSERT OVERWRITE TABLE ads_ec_mbr_balance_trans_info_di PARTITION (dt = '${biz_date}') SELECT * FROM (SELECT trans.user_id AS user_id ,user.nickname AS nickname ,user.mobilephonenumber AS mobile ,CAST(GET_JSON_OBJECT(user.balance,'$.cash.available') AS BIGINT) AS available ,CASE WHEN trans.type = 'recharge' THEN '充值' WHEN trans.type = 'consumption' THEN '消费' WHEN trans.type = 'withdraw' THEN '提现' WHEN trans.type = 'adjust' THEN '历史余额转换' WHEN trans.type = 'refunded' THEN '退款' END AS type ,amount AS amount ,DATE_FORMAT(CAST(UNIX_TIMESTAMP('${biz_date}', 'yyyyMMdd') AS TIMESTAMP), 'yyyy-MM-dd') AS date FROM dws_ec_mbr_balance_trans_amount_1d trans LEFT JOIN dim_ec_mbr_user_info_di user ON trans.user_id = user.id WHERE trans.dt = '${biz_date-1}') AS temp ;是什么意思
这段代码是一个SQL语句,用于向一个名为 `ads_ec_mbr_balance_trans_info_di` 的分区表中插入数据。代码中的 `INSERT OVERWRITE TABLE` 语句用于覆盖已存在的表数据,并将查询结果插入到指定的分区中。
查询部分是一个子查询,用于获取数据。子查询中使用了表 `dws_ec_mbr_balance_trans_amount_1d` 和表 `dim_ec_mbr_user_info_di` 进行关联查询,获取相应的字段信息。其中,还对日期进行了格式化处理。
最后,通过 `PARTITION (dt = '${biz_date}')` 指定了插入的分区,`${biz_date}` 是一个占位符,表示在执行时会被实际的日期值所替换。
此代码的整体逻辑是将前一天的交易数据和用户信息进行关联查询,然后将查询结果插入到指定的分区表中。
odps_config
### ODPS Configuration Settings and Parameters
ODPS (Open Data Processing Service) 是阿里云提供的一种大数据处理服务,支持多种配置设置和参数来优化性能以及满足不同的业务需求。对于 ODPS 的配置项主要分为两类:一类是在创建表或分区时指定的属性;另一类则是提交作业时可调整的任务级参数。
#### 表级别的配置选项
当定义表格结构时,可以通过 `CREATE TABLE` 语句中的 TBLPROPERTIES 子句设定一些特定于该表的行为特性:
- **lifecycle**: 设置数据生命周期天数,在此期间内未被访问的数据会被自动清理。
```sql
CREATE EXTERNAL TABLE IF NOT EXISTS my_table (
id BIGINT,
name STRING
)
PARTITIONED BY (ds STRING)
LIFECYCLE 7;
```
- **comment**: 提供描述性的注释帮助理解表的内容。
- **storage_handler**: 指定存储处理器用于自定义文件格式的支持。
这些配置有助于管理资源并提高查询效率[^1]。
#### 任务级别的配置选项
在运行 MapReduce 或 SQL 查询之前,还可以通过 SET 命令临时更改会话范围内的系统变量值,影响整个计算过程的表现形式:
- **odps.sql.mapper.split.size**: 控制 map 阶段输入切片大小,默认单位为 MB。
- **odps.sql.udf.cache.archive**: 启用 UDF 缓存归档功能减少重复加载时间开销。
- **odps.job.priority**: 调整当前 job 执行优先级以适应不同场景下的调度策略。
```bash
SET odps.sql.mapper.split.size=256;
SET odps.job.priority=NORMAL;
```
以上只是部分常用配置示例,实际应用中可根据具体情况进行适当调整以达到最佳效果。
阅读全文
相关推荐
















