用Python代码准确检测rocketMQ服务是否可用
时间: 2025-06-29 17:00:05 浏览: 9
### 使用Python实现检测RocketMQ服务可用性的脚本
为了确保RocketMQ的服务正常运行,可以通过编写Python脚本来监控各个组件的状态。这包括`namesrv`、`broker`和服务控制台(console)。
#### 1. 安装依赖库
首先需要安装一些必要的Python包来帮助完成HTTP请求和命令执行的任务:
```bash
pip install requests psutil
```
#### 2. 编写状态检查函数
下面是一个简单的Python脚本示例,它能够连接到本地或远程主机上的RocketMQ实例并验证其健康状况:
```python
import subprocess
import requests
import logging
def check_namesrv_status(host='localhost', port=9876):
url = f"http://{host}:{port}/nsaddr"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200 and 'OK' in response.text:
return True
except Exception as e:
logging.error(f"Failed to connect Namesrv at {url}: {e}")
return False
def check_broker_status(broker_addr):
cmd = ['mqadmin', 'clusterList', '-n', broker_addr]
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, error = process.communicate()
if not error and b'Broker Name' in output:
return True
logging.error(f"Error checking Broker status: {error.decode()}")
return False
if __name__ == "__main__":
namesrv_host = "localhost"
namesrv_port = 9876
broker_address = f"{namesrv_host}:{namesrv_port}"
print("Checking RocketMQ services...")
# Check Name Server Status
ns_result = check_namesrv_status(namesrv_host, namesrv_port)
print(f"NameServer is {'UP' if ns_result else 'DOWN'}")
# Check Broker Status
brk_result = check_broker_status(broker_address)
print(f"Broker is {'UP' if brk_result else 'DOWN'}")
```
此脚本定义了两个主要功能:一个是用来测试Nameserver是否可访问[^1];另一个则是利用`mqadmin clusterList`命令获取集群列表信息以判断Broker是否在线。
对于更复杂的部署环境(比如多节点),可以根据实际情况调整上述代码中的参数设置,并考虑增加更多的错误处理逻辑。
阅读全文
相关推荐












