#include "qt_softbus.h"
#include <QDebug>
// UdpUnicastConnector实现
UdpUnicastConnector::UdpUnicastConnector(QObject* parent)
: IConnector(parent), socket(nullptr), currentState(ConnectionState::Disconnected) {}
UdpUnicastConnector::~UdpUnicastConnector() {
stop();
}
bool UdpUnicastConnector::initialize(const QString& host, quint16 port) {
targetHost = host;
targetPort = port;
return true;
}
bool UdpUnicastConnector::start() {
if (socket) {
delete socket;
}
socket = new QUdpSocket(this);
connect(socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
if (!socket->bind(QHostAddress::Any, 0)) {
emit errorOccurred("Failed to bind UDP socket");
currentState = ConnectionState::Error;
emit stateChanged(currentState);
return false;
}
currentState = ConnectionState::Connected;
emit stateChanged(currentState);
return true;
}
void UdpUnicastConnector::stop() {
if (socket) {
socket->close();
delete socket;
socket = nullptr;
}
currentState = ConnectionState::Disconnected;
emit stateChanged(currentState);
}
bool UdpUnicastConnector::send(const Message& msg) {
if (!socket || currentState != ConnectionState::Connected) {
return false;
}
QByteArray datagram;
datagram.append(msg.topic).append('\n').append(msg.data);
qint64 bytesWritten = socket->writeDatagram(datagram, QHostAddress(targetHost), targetPort);
return bytesWritten == datagram.size();
}
void UdpUnicastConnector::readPendingDatagrams() {
while (socket->hasPendingDatagrams()) {
QByteArray datagram;
datagram.resize(socket->pendingDatagramSize());
QHostAddress sender;
quint16 senderPort;
socket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort);
// 解析消息
int separatorIndex = datagram.indexOf('\n');
if (separatorIndex > 0) {
Message msg;
msg.topic = QString::fromUtf8(datagram.left(separatorIndex));
msg.data = datagram.mid(separatorIndex + 1);
msg.sender = sender;
msg.senderPort = senderPort;
if (messageHandler) {
messageHandler(msg);
}
}
}
}
void UdpUnicastConnector::socketError(QAbstractSocket::SocketError error) {
Q_UNUSED(error);
emit errorOccurred(socket->errorString());
currentState = ConnectionState::Error;
emit stateChanged(currentState);
}
// UdpMulticastConnector实现
UdpMulticastConnector::UdpMulticastConnector(QObject* parent)
: IConnector(parent), socket(nullptr), currentState(ConnectionState::Disconnected) {}
UdpMulticastConnector::~UdpMulticastConnector() {
stop();
}
bool UdpMulticastConnector::initialize(const QString& groupAddress, quint16 port) {
multicastGroup = QHostAddress(groupAddress);
this->port = port;
return true;
}
bool UdpMulticastConnector::start() {
if (socket) {
delete socket;
}
socket = new QUdpSocket(this);
connect(socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
if (!socket->bind(QHostAddress::AnyIPv4, port, QUdpSocket::ShareAddress)) {
emit errorOccurred("Failed to bind UDP multicast socket");
currentState = ConnectionState::Error;
emit stateChanged(currentState);
return false;
}
if (!socket->joinMulticastGroup(multicastGroup)) {
emit errorOccurred("Failed to join multicast group");
currentState = ConnectionState::Error;
emit stateChanged(currentState);
return false;
}
currentState = ConnectionState::Connected;
emit stateChanged(currentState);
return true;
}
void UdpMulticastConnector::stop() {
if (socket) {
socket->leaveMulticastGroup(multicastGroup);
socket->close();
delete socket;
socket = nullptr;
}
currentState = ConnectionState::Disconnected;
emit stateChanged(currentState);
}
bool UdpMulticastConnector::send(const Message& msg) {
if (!socket || currentState != ConnectionState::Connected) {
return false;
}
QByteArray datagram;
datagram.append(msg.topic).append('\n').append(msg.data);
// 设置TTL(Time-to-Live)
socket->setSocketOption(QAbstractSocket::MulticastTtlOption, 1);
qint64 bytesWritten = socket->writeDatagram(datagram, multicastGroup, port);
return bytesWritten == datagram.size();
}
void UdpMulticastConnector::readPendingDatagrams() {
while (socket->hasPendingDatagrams()) {
QByteArray datagram;
datagram.resize(socket->pendingDatagramSize());
QHostAddress sender;
quint16 senderPort;
socket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort);
// 解析消息
int separatorIndex = datagram.indexOf('\n');
if (separatorIndex > 0) {
Message msg;
msg.topic = QString::fromUtf8(datagram.left(separatorIndex));
msg.data = datagram.mid(separatorIndex + 1);
msg.sender = sender;
msg.senderPort = senderPort;
if (messageHandler) {
messageHandler(msg);
}
}
}
}
void UdpMulticastConnector::socketError(QAbstractSocket::SocketError error) {
Q_UNUSED(error);
emit errorOccurred(socket->errorString());
currentState = ConnectionState::Error;
emit stateChanged(currentState);
}
// TcpClientConnector实现
TcpClientConnector::TcpClientConnector(QObject* parent)
: IConnector(parent), socket(nullptr), currentState(ConnectionState::Disconnected) {}
TcpClientConnector::~TcpClientConnector() {
stop();
}
bool TcpClientConnector::initialize(const QString& host, quint16 port) {
targetHost = host;
targetPort = port;
return true;
}
bool TcpClientConnector::start() {
if (socket) {
delete socket;
}
socket = new QTcpSocket(this);
connect(socket, SIGNAL(connected()), this, SLOT(connected()));
connect(socket, SIGNAL(disconnected()), this, SLOT(disconnected()));
connect(socket, SIGNAL(readyRead()), this, SLOT(readyRead()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
currentState = ConnectionState::Connecting;
emit stateChanged(currentState);
socket->connectToHost(targetHost, targetPort);
return true;
}
void TcpClientConnector::stop() {
if (socket) {
socket->disconnectFromHost();
if (socket->state() != QAbstractSocket::UnconnectedState) {
socket->waitForDisconnected();
}
delete socket;
socket = nullptr;
}
currentState = ConnectionState::Disconnected;
emit stateChanged(currentState);
}
bool TcpClientConnector::send(const Message& msg) {
if (!socket || currentState != ConnectionState::Connected) {
return false;
}
QByteArray datagram;
datagram.append(msg.topic).append('\n').append(msg.data);
// 添加消息长度前缀
QByteArray lengthPrefix;
lengthPrefix.setNum(datagram.size());
lengthPrefix.append('\n');
qint64 bytesWritten = socket->write(lengthPrefix + datagram);
return bytesWritten == (lengthPrefix.size() + datagram.size());
}
void TcpClientConnector::connected() {
currentState = ConnectionState::Connected;
emit stateChanged(currentState);
}
void TcpClientConnector::disconnected() {
currentState = ConnectionState::Disconnected;
emit stateChanged(currentState);