Skip to content

Commit 43aca84

Browse files
committed
[Feature] didi#1121 support clearInactiveClusterPhyBrokers
1 parent 4c10b4c commit 43aca84

File tree

7 files changed

+84
-0
lines changed

7 files changed

+84
-0
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ public interface ClusterBrokersManager {
1717
*/
1818
PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto);
1919

20+
/**
21+
* 删除status == 0 的所有broker -> 获取缓存查询结果 & broker 表查询结果并集
22+
* 获取缓存查询结果 & broker 表查询结果并集
23+
* @param clusterPhyId kafka 物理集群 id
24+
* @param dto 封装分页查询参数对象
25+
* @return 返回获取到的缓存查询结果 & broker 表查询结果并集
26+
*/
27+
PaginationResult<ClusterBrokersOverviewVO> deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto);
28+
29+
2030
/**
2131
* 根据物理集群id获取集群对应broker状态信息
2232
* @param clusterPhyId 物理集群 id

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
107107
);
108108
}
109109

110+
@Override
111+
public PaginationResult<ClusterBrokersOverviewVO> deleteInactiveClusterPhyBrokers(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
112+
brokerService.deleteInactiveClusterPhyBrokers(clusterPhyId);
113+
return this.getClusterPhyBrokersOverview(clusterPhyId, dto);
114+
}
115+
110116
@Override
111117
public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
112118
ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO();

km-console/packages/layout-clusters-fe/src/api/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ const api = {
120120
getTopicMetricPoints: (clusterPhyId: number, topicName: string) => getApi(`/clusters/${clusterPhyId}/topics/${topicName}/metric-points`),
121121
// Broker列表接口
122122
getBrokersList: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-overview`),
123+
// 删除失效Broker
124+
clearInactiveBrokers: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/brokers-clear`),
123125
// Broker列表页健康检查指标
124126
getBrokerMetricPoints: (clusterPhyId: number) => getApi(`/physical-clusters/${clusterPhyId}/latest-metrics`),
125127
// Controller列表接口 /api/v3/clusters/{clusterPhyId}/controller-history「controller-change-log」

km-console/packages/layout-clusters-fe/src/pages/BrokerList/index.tsx

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,42 @@ const BrokerList: React.FC = (props: any) => {
6666
});
6767
};
6868

69+
// 请求接口获取数据
70+
const clearInactiveBrokers = async ({ pageNo, pageSize, filters, sorter }: any) => {
71+
if (urlParams?.clusterId === undefined) return;
72+
// filters = filters || filteredInfo;
73+
setLoading(true);
74+
const params = {
75+
searchKeywords: searchKeywords.slice(0, 128),
76+
pageNo,
77+
pageSize,
78+
latestMetricNames: ['PartitionsSkew', 'Leaders', 'LeadersSkew', 'LogSize'],
79+
sortField: sorter?.field || 'brokerId',
80+
sortType: sorter?.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : 'asc',
81+
};
82+
83+
request(API.clearInactiveBrokers(urlParams?.clusterId), { method: 'POST', data: params })
84+
.then((res: any) => {
85+
setPagination({
86+
current: res.pagination?.pageNo,
87+
pageSize: res.pagination?.pageSize,
88+
total: res.pagination?.total,
89+
});
90+
const newData =
91+
res?.bizData.map((item: any) => {
92+
return {
93+
...item,
94+
...item?.latestMetrics?.metrics,
95+
};
96+
}) || [];
97+
setData(newData);
98+
setLoading(false);
99+
})
100+
.catch((err) => {
101+
setLoading(false);
102+
});
103+
};
104+
69105
const onTableChange = (pagination: any, filters: any, sorter: any) => {
70106
// setFilteredInfo(filters);
71107
genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter });
@@ -107,6 +143,12 @@ const BrokerList: React.FC = (props: any) => {
107143
>
108144
<IconFont className={`${tableHeaderPrefix}-left-refresh-icon`} type="icon-shuaxin1" />
109145
</div>
146+
<div
147+
className={`${tableHeaderPrefix}-left-clear`}
148+
onClick={() => clearInactiveBrokers({ pageNo: pagination.current, pageSize: pagination.pageSize })}
149+
>
150+
<IconFont className={`${tableHeaderPrefix}-left-clear-icon`} type="icon-Operation" />
151+
</div>
110152
</div>
111153
<div className={`${tableHeaderPrefix}-right`}>
112154
<SearchInput

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,6 @@ public interface BrokerService {
7171
boolean allServerDown(Long clusterPhyId);
7272

7373
boolean existServerDown(Long clusterPhyId);
74+
75+
void clearInactiveClusterPhyBrokers(Long clusterPhyId);
7476
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,20 @@ public List<Broker> listNotAliveBrokersFromDB(Long clusterPhyId) {
176176
return this.listAllBrokersAndUpdateCache(clusterPhyId).stream().filter( elem -> !elem.alive()).collect(Collectors.toList());
177177
}
178178

179+
/**
180+
* 清理对应集群中下线的broker记录
181+
* @param clusterPhyId
182+
*/
183+
@Override
184+
public void clearInactiveClusterPhyBrokers(Long clusterPhyId) {
185+
try {
186+
this.getAllBrokerPOsFromDB(clusterPhyId).stream()
187+
.filter(elem -> elem.getStatus().equals(Constant.DOWN))
188+
.forEach(elem -> brokerDAO.deleteById(elem.getId()));
189+
} catch (Exception e) {
190+
log.error("method=deleteInactiveClusterPhyBrokers||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
191+
}
192+
}
179193
@Override
180194
public List<Broker> listAllBrokersFromDB(Long clusterPhyId) {
181195
return this.listAllBrokersAndUpdateCache(clusterPhyId);

km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(@
6262
@RequestBody ClusterBrokersOverviewDTO dto) {
6363
return clusterBrokersManager.getClusterPhyBrokersOverview(clusterPhyId, dto);
6464
}
65+
66+
@ApiOperation(value = "集群无效brokers清理")
67+
@PostMapping(value = "clusters/{clusterPhyId}/brokers-clear")
68+
@ResponseBody
69+
public PaginationResult<ClusterBrokersOverviewVO> clearInactiveClusterPhyBrokers(@PathVariable Long clusterPhyId,
70+
@RequestBody ClusterBrokersOverviewDTO dto) {
71+
return clusterBrokersManager.clearInactiveClusterPhyBrokers(clusterPhyId, dto);
72+
}
6573
}

0 commit comments

Comments
 (0)