[0]: index [audit_log_share_6_2023_7], type [_doc], id [64a9b7f18dced0000169db86], message [ElasticsearchException[Elasticsearch exception [type=circuit_breaking_exception, reason=[parent] Data too large, data for [indices:data/write/bulk[s]] would be [10214441696/9.5gb], which is larger than the limit of [10200547328/9.5gb], real usage: [10214440528/9.5gb], new bytes reserved: [1168/1.1kb], usages [request=0/0b, fielddata=206560/201.7kb, in_flight_requests=34048/33.2kb, model_inference=0/0b, accounting=99048310/94.4mb]]]]
时间: 2023-12-25 13:03:43 浏览: 183
这个错误信息显示在执行Elasticsearch操作时发生了一个断路异常(circuit_breaking_exception)。根据错误信息,数据大小为10.2GB,超过了9.5GB的限制。实际使用的空间为10.2GB,新增的空间为1.1KB。错误信息还提供了一些使用情况的细节,如请求使用量、字段数据使用量、模型推理使用量等。
为了解决这个问题,你可以考虑以下几个方案:
1. 增加可用空间:如果你的设备或系统支持,可以尝试增加可用的存储空间,以满足数据大小的需求。
2. 优化数据结构:检查你的数据结构是否存在冗余或重复的部分,可以尝试优化数据结构以减少空间占用。
3. 调整Elasticsearch的配置参数:根据具体情况,可能需要调整相关的配置参数,比如限制的大小、缓冲区大小等。可以参考Elasticsearch文档或向官方技术支持咨询以获取更准确的解决方案。
请注意,具体解决方法可能因你使用的软件版本和配置而有所不同。建议参考相关文档或向技术支持寻求帮助,以获得更准确的解决方案。
相关问题
if self.config.load_type == "INC": # adhoc hist job do not need to join landing merge table try: landing_merge_df = self.spark.read.format(self.config.destination_file_type). \ load(self.config.destination_data_path) # dataframe for updated records df = df.drop("audit_batch_id", "audit_job_id", "audit_src_sys_name", "audit_created_usr", "audit_updated_usr", "audit_created_tmstmp", "audit_updated_tmstmp") # dataframe for newly inserted records new_insert_df = df.join(landing_merge_df, primary_keys_list, "left_anti") self.logger.info(f"new_insert_df count: {new_insert_df.count()}") new_insert_df = DataSink_with_audit(self.spark).add_audit_columns(new_insert_df, param_dict) update_df = df.alias('l').join(landing_merge_df.alias('lm'), on=primary_keys_list, how="inner") update_df = update_df.select("l.*", "lm.audit_batch_id", "lm.audit_job_id", "lm.audit_src_sys_name", "lm.audit_created_usr", "lm.audit_updated_usr", "lm.audit_created_tmstmp", "lm.audit_updated_tmstmp") self.logger.info(f"update_df count : {update_df.count()}") update_df = DataSink_with_audit(self.spark).update_audit_columns(update_df, param_dict) # dataframe for unchanged records unchanged_df = landing_merge_df.join(df, on=primary_keys_list, how="left_anti") self.logger.info(f"unchanged_records_df count : {unchanged_df.count()}") final_df = new_insert_df.union(update_df).union(unchanged_df) print("final_df count : ", final_df.count()) except AnalysisException as e: if e.desc.startswith('Path does not exist'): self.logger.info('landing merge table not exists. will skip join landing merge') final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) else: self.logger.error(f'unknown error: {e.desc}') raise e else: final_df = DataSink_with_audit(self.spark).add_audit_columns(df, param_dict) return final_df
这是一段Python代码,其中包含一个类方法的实现。该方法根据配置参数的不同,从一个特定的数据路径中将数据加载到一个Spark DataFrame中,并对该数据进行一些操作,最终返回一个具有审计列的DataFrame。如果配置参数是"INC",则会执行一些数据合并的操作,包括添加、更新和未更改的记录,并对这些记录添加审计列。如果配置参数是其他值,则只会添加审计列。
create table #temp( audit_batch_id int null, audit_job_id int null, audit_src_sys_name nvarchar(50) null, audit_created_usr nvarchar(50) null, audit_updated_usr nvarchar(50) null, audit_created_tmstmp datetime null, audit_updated_tmstmp datetime null ),怎么给这个表初始化
这是一个 T-SQL 的语句,它创建了一个名为 `#temp` 的临时表,该表包含了七个列,分别是 `audit_batch_id`、`audit_job_id`、`audit_src_sys_name`、`audit_created_usr`、`audit_updated_usr`、`audit_created_tmstmp` 和 `audit_updated_tmstmp`,均为可空的数据类型。
要给这个表初始化,可以使用以下语句:
```
INSERT INTO #temp (audit_batch_id, audit_job_id, audit_src_sys_name, audit_created_usr, audit_updated_usr, audit_created_tmstmp, audit_updated_tmstmp) VALUES
(1, 1001, 'System A', 'User A', 'User B', '2021-01-01 00:00:00', '2021-01-01 01:00:00'),
(2, 1002, 'System B', 'User B', 'User C', '2021-01-02 00:00:00', '2021-01-02 01:00:00'),
(3, 1003, 'System C', 'User C', 'User A', '2021-01-03 00:00:00', '2021-01-03 01:00:00');
```
这个语句会向 `#temp` 表中插入三条记录,每条记录对应七个列的值,可以根据需要修改这些值来进行表的初始化。注意,临时表只在当前会话中存在,当会话结束时,临时表将被自动删除。
阅读全文
相关推荐














