char MQTT_Publish(int dup, int qos, int retain, char *post_topic, unsigned int id, char *message) { const int BUFF_SIZE = 512; char data[BUFF_SIZE] = {0}; int len = 0; int remaining_length; int encoded_remaining_len_bytes = 0; int temp_remaining_length; int topic_length = post_topic ? strlen(post_topic) : 0; int message_length = message ? strlen(message) : 0; if (topic_length == 0 || topic_length > 65535) // 主题长度小于65535 { return 1; } if (!message) // 消息内容不可为空 { return 2; } if (qos > 0 && id == 0) // QoS>0时必须提供有效id { return 3; } if (qos == 0 && dup != 0) // QoS为0,不支持重发 { return 4; } /* 剩余长度计算 */ remaining_length = (2 + topic_length) + (qos ? 2 : 0) + message_length; /* 缓冲区容量检查 */ temp_remaining_length = remaining_length; do { encoded_remaining_len_bytes++; temp_remaining_length /= 128; } while (temp_remaining_length > 0); /* 确保数据长度不超过缓存大小 */ if ((1 + encoded_remaining_len_bytes + remaining_length) > BUFF_SIZE) { return 5; } /* 固定报头构造 */ data[len++] = 0x30 | (dup << 3) | (qos << 1) | retain; /* 剩余长度编码 */ char remaining_encoded[4]; int encoded_len = encode_remaining_length(remaining_length, remaining_encoded); memcpy(&data[len], remaining_encoded, encoded_len); len += encoded_len; /* 可变报头与有效载荷 */ // 主题名 data[len++] = (topic_length >> 8) & 0xFF; data[len++] = topic_length & 0xFF; memcpy(&data[len], post_topic, topic_length); len += topic_length; /* 报文标识符(QoS > 0) */ if (qos > 0) { data[len++] = (id >> 8) & 0xFF; data[len++] = id & 0xFF; } /* 有效载荷 */ memcpy(&data[len], message, message_length); len += message_length; /* 数据发送 */ Write_MQTT_TxDataBuff(data, len); mqtt_debug(data, len, "\r\n* 3、PUBLISH – 发布消息 *\r\n"); WIFI_Clean(); return 0; } 检查,完善这个代码,特别是dup等于1的时候, 在定时器中断函数调用重发,使用数组不使用结构体,给出完整代码
时间: 2025-06-20 22:22:52 浏览: 14
<think>好的,我现在需要仔细检查用户提供的MQTT_Publish函数代码,并根据要求进行完善,特别是处理dup标志为1时的情况,以及在定时器中断中实现重发机制,同时使用数组而不使用结构体。首先,我需要理解现有代码的结构和潜在问题。
首先,观察函数参数和变量。函数接收dup、qos、retain等参数,主题、ID、消息内容。然后构造MQTT PUBLISH报文,进行各种校验,计算剩余长度,填充数据到缓冲区,最后发送数据。返回值是错误码,0表示成功。
用户提到需要完善代码,特别是当dup=1时的情况。根据MQTT协议规范,当DUP标志为1时,表示当前报文是重发的,需要确保报文标识符(Packet Identifier)存在,这在QoS>0的情况下才有效。现有代码中,在构造固定报头时,将dup左移3位,正确设置DUP位的位置(固定报头的第一个字节的第3位)。但需要确认当QoS为0时是否允许设置DUP为1,根据协议,当QoS=0时,DUP标志必须为0,否则可能被服务器视为错误。检查代码中的错误处理:在函数中有一个条件检查,如果qos为0且dup不等于0,返回错误码4。这已经处理了这种情况,所以这部分是正确的。
接下来,关于定时器中断中的重发机制。用户要求使用数组而不是结构体来管理重发的消息。因此,可能需要维护一个发送队列或重发缓冲区,保存需要重发的报文数据、长度、重试次数等信息。由于在中断中调用,需要考虑数据结构的线程安全性和效率。
现有的代码中,发送数据后调用了Write_MQTT_TxDataBuff、mqtt_debug、WIFI_Clean。为了支持重发,可能需要将已发送的报文存储起来,当收到对应的确认(如PUBACK)时移除,或者在超时后重新发送。
现在需要补充的部分包括:
1. 定义一个全局的数组(队列)来保存待重发的报文数据,每个元素需要包含报文内容、长度、重试次数、报文标识符(QoS>0时)等信息。
2. 在定时器中断函数中,遍历该数组,对需要重发的报文进行处理,比如减少重试次数,重新发送数据。
3. 当发送原始报文时(尤其是当DUP=0时,即首次发送),如果QoS>0,需要将该报文添加到重发队列中,并设置重试次数和DUP标志为1。
4. 当收到对应的确认(如PUBACK)时,从队列中移除对应的报文(根据报文ID)。
5. 当定时器触发时,检查队列中的报文是否超时,如果超时则重新发送,并更新DUP标志为1,同时减少剩余重试次数,如果次数用尽则移除。
现在需要修改MQTT_Publish函数,使其在成功发送后,如果QoS>0,将报文加入重发队列。同时,处理DUP标志的设置:首次发送时DUP=0,重发时设置为1。
接下来检查现有代码中的DUP处理。当前函数中,固定报头的构造是 data[len++] = 0x30 | (dup << 3) | (qos << 1) | retain; 这里是否正确?MQTT PUBLISH报文的固定报头第一个字节的高4位是报文类型(PUBLISH是0x3),所以正确的应该是 0x30是00110000,然后将DUP位(第3位,即左移3位的位置)设置为dup的值。例如,如果dup=1,则0x30 | (1<<3) = 0x38。这部分是正确的。
当需要重发时,应该将dup设置为1,并且报文的其他部分保持不变。因此,在重发队列中的每个条目需要保存原始报文的数据、长度、报文ID(用于QoS>0)、重试次数等。
现在考虑如何实现重发队列。由于不能使用结构体,可能需要使用多个数组来保存相关信息,比如一个二维数组存储报文数据,另一个数组存储长度,另一个存储报文ID,另一个存储重试次数等。但这样管理起来可能比较麻烦。不过用户要求不使用结构体,所以必须这样做。
例如:
#define MAX_PENDING_MESSAGES 5
#define MAX_MESSAGE_LEN 512
static uint8_t pending_data[MAX_PENDING_MESSAGES][MAX_MESSAGE_LEN];
static uint16_t pending_len[MAX_PENDING_MESSAGES];
static uint16_t pending_id[MAX_PENDING_MESSAGES]; // 报文ID,QoS>0时需要
static uint8_t pending_retries[MAX_PENDING_MESSAGES];
static uint32_t pending_timestamps[MAX_PENDING_MESSAGES]; // 记录时间以便超时判断
static int pending_count = 0;
当MQTT_Publish函数成功发送后(返回0),如果qos>0,则需要将报文数据复制到pending_data中,记录长度、id、初始重试次数(比如3次)、时间戳,并增加pending_count。此时,DUP标志在首次发送时为0,当重发时设置为1,因此在队列中保存的数据在重发时需要修改DUP位。
但是,当将报文保存到队列中时,是否需要保存原始数据(DUP=0)还是允许在重发时修改DUP位?例如,首次发送的报文数据中的DUP位是0,当重发时,需要将DUP位设置为1,并重新编码剩余长度吗?
根据MQTT协议,重发的报文必须与原始报文具有相同的Packet Identifier,并且DUP标志设置为1。但报文的其他部分(如主题、消息内容等)必须相同。因此,在重发时,需要将固定报头中的DUP位设为1,但其他部分不变。因此,在保存原始报文数据时,可能需要允许在重发时修改DUP位。
但是,在现有代码中,报文数据是构造在data数组中的,其中固定报头的DUP位是根据传入的dup参数设置的。当首次发送时,dup参数是0,而重发时应该传入1。因此,当将报文存入队列时,应该保存构造好的数据,但允许在重发时修改DUP位。
但这样的话,队列中的每个报文的数据在重发时需要修改其固定报头的DUP位。例如,在定时器中断中,当决定重发某个报文时,需要将data[0]的最高位中的DUP位(即第3位)设置为1。
这可能会带来问题,因为原始保存的数据中的DUP位是0,而重发时需要改为1。因此,在保存到队列时,可能需要保存构造好的数据,但允许在重发时修改DUP位。或者,在保存时,保存除DUP位之外的其余部分,并在重发时重新构造固定报头。
或者,更简单的方式,每次重发时,重新构造整个报文,但将dup参数设为1。但这样需要保存原始的参数,比如主题、消息、qos、retain、id等,以便在重发时调用MQTT_Publish函数,并设置dup=1。这可能更灵活,但需要保存这些参数,而不仅仅是数据缓冲区。
这可能更符合用户的需求,因为用户可能希望使用数组而不使用结构体,但保存参数可能比较复杂,需要更多的存储空间。例如,需要保存post_topic、message等字符串,但这在C语言中可能涉及动态内存分配,或者使用固定长度的数组,这可能不现实,特别是如果主题或消息较长的话。
另一种方式是,在首次调用MQTT_Publish时,构造整个报文数据,并将其保存到队列中,然后在重发时直接使用该数据,但修改其中的DUP位。例如,队列中的每个条目保存的是整个报文的数据,长度,以及报文ID(用于QoS>0的情况),还有重试次数和时间戳。当需要重发时,修改数据中的DUP位,然后重新发送。
这样,在保存到队列时,数据中的DUP位是0(首次发送时),当重发时,需要将DUP位设置为1。例如,队列中的data数组的第一个字节的高4位是报文类型(0x30是PUBLISH),然后DUP位在bit3的位置。因此,要设置DUP位为1,可以将data[0] |= 0x08(即第3位设为1)。
这样处理的话,队列中的每个条目保存的是完整的报文数据,包括固定报头,可变报头和有效载荷。当需要重发时,修改DUP位为1,然后发送该数据。
但需要注意的是,在MQTT协议中,DUP标志仅在控制报文的QoS等级大于0时有效。当QoS=0时,DUP标志必须设置为0。因此,在重发时,只有QoS>0的报文才允许设置DUP=1。因此,队列中的条目应该仅包含QoS>0的报文,因为QoS=0的报文不需要确认,因此不需要重发。
因此,在MQTT_Publish函数中,当qos>0且成功发送后,将报文数据保存到队列中,以便在未收到确认时重发。
现在,回到代码中,当函数执行到发送数据后,即调用Write_MQTT_TxDataBuff(data, len); 之后,如果qos>0,需要将数据保存到重发队列中。因此,需要将data数组的内容复制到队列中,并记录其长度、报文ID、重试次数和时间戳。
但目前的代码中,data是一个局部变量,函数结束后就会被销毁。因此,必须将数据复制到全局的队列中,而不能直接保存指针。因此,在代码中,当需要保存到队列时,必须将data数组的内容复制到队列的条目中。
现在,假设我们有一个全局的重发队列,结构如前面所述:
#define MAX_PENDING_MESSAGES 5
static uint8_t pending_data[MAX_PENDING_MESSAGES][BUFF_SIZE];
static uint16_t pending_len[MAX_PENDING_MESSAGES];
static uint16_t pending_id[MAX_PENDING_MESSAGES];
static uint8_t pending_retries[MAX_PENDING_MESSAGES];
static uint32_t pending_time[MAX_PENDING_MESSAGES]; // 时间戳,用于超时判断
static int pending_count = 0;
在函数中,当qos>0且发送成功时(返回0),将当前data数组的内容复制到队列中:
if (qos > 0) {
if (pending_count < MAX_PENDING_MESSAGES) {
memcpy(pending_data[pending_count], data, len);
pending_len[pending_count] = len;
pending_id[pending_count] = id;
pending_retries[pending_count] = 3; // 初始重试次数
pending_time[pending_count] = get_current_time(); // 假设有获取时间的函数
pending_count++;
} else {
// 队列已满,无法加入,可能需要处理错误
}
}
然后,定时器中断函数需要定期检查队列中的每个条目,如果超时(比如发送后超过一定时间未收到确认),则重新发送,并减少重试次数。当重试次数为0时,从队列中移除。
在定时器中断中,可能需要这样的逻辑:
void Timer_Interrupt_Handler() {
for (int i = 0; i < pending_count; ) {
if ((current_time - pending_time[i]) > RETRY_TIMEOUT) {
if (pending_retries[i] > 0) {
// 重发报文,设置DUP为1
uint8_t *data = pending_data[i];
// 修改DUP位为1
data[0] |= 0x08; // 设置DUP位为1
// 重新发送数据
Write_MQTT_TxDataBuff(data, pending_len[i]);
// 更新时间戳和重试次数
pending_time[i] = current_time;
pending_retries[i]--;
i++;
} else {
// 重试次数耗尽,移除该条目
for (int j = i; j < pending_count - 1; j++) {
memcpy(pending_data[j], pending_data[j+1], pending_len[j+1]);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
}
} else {
i++;
}
}
}
需要注意的是,修改DUP位时,原始保存的数据中的DUP位是0,现在需要设置为1。例如,data[0]的bit3位置1。例如,原来的固定报头是0x30(假设qos=1,dup=0,retain=0),则设置DUP后变为0x38。
此外,需要确保在修改DUP位后,发送的数据是正确的。例如,在定时器中断中,重新发送pending_data[i]中的数据,此时DUP位已经修改为1。
但还需要注意,在重发时,是否还需要重新编码剩余长度或者其他字段?根据MQTT协议,重发的报文必须与原始报文相同,除了DUP标志位可能被设置为1。因此,其他部分必须保持不变,所以不需要重新计算剩余长度,只需要修改DUP位即可。因此,上述修改是正确的。
现在,回到原函数中的错误处理。在现有代码中,当qos>0且id==0时返回错误码3,这已经处理了。当发送成功且qos>0时,将报文加入队列。
接下来,处理接收确认的问题。例如,当收到PUBACK(对于QoS1)或PUBREC(对于QoS2)等确认报文时,需要从队列中移除对应的报文。这需要另一个函数来处理,比如根据报文ID查找队列中的条目并删除。
例如:
void MQTT_Handle_Ack(uint16_t packet_id) {
for (int i = 0; i < pending_count; ) {
if (pending_id[i] == packet_id) {
// 找到对应的条目,移除
for (int j = i; j < pending_count - 1; j++) {
memcpy(pending_data[j], pending_data[j+1], pending_len[j+1]);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
} else {
i++;
}
}
}
这个函数需要在收到确认时被调用,传入对应的报文ID,从而清除队列中的条目。
现在,总结需要修改和添加的部分:
1. 添加全局的重发队列和相关变量。
2. 在MQTT_Publish函数成功发送且qos>0时,将报文加入队列。
3. 在定时器中断处理函数中处理重发。
4. 添加处理确认报文的函数,以清除队列中的条目。
现在,需要将这些补充到用户提供的代码中。
但用户要求给出完整代码,所以需要将上述思路整合到原函数中,并提供必要的全局变量、定时器中断处理函数等。
接下来,检查用户提供的代码中的潜在问题:
1. 在计算剩余长度时,remaining_length = (2 + topic_length) + (qos ? 2 : 0) + message_length; 这部分是否正确?
根据MQTT PUBLISH报文结构,可变报头包括主题名(2字节长度+主题字符串)+报文标识符(仅当QoS>0时,2字节)。有效载荷是消息内容。因此,剩余长度的计算应该是:2 + topic_length + (qos>0 ? 2 : 0) + message_length。这部分正确。
2. encode_remaining_length函数是否正确?用户提供的代码中没有给出该函数的实现,但假设该函数能正确编码剩余长度。
3. 在缓冲区容量检查时,计算encoded_remaining_len_bytes是否正确?是的,通过不断除以128来计算需要多少字节编码剩余长度。
4. 数据发送后调用WIFI_Clean(),可能需要确认该函数的作用,是否清空发送缓冲区?可能需要保留数据,或者该函数不影响队列中的数据。
现在,根据用户的要求,特别是处理dup=1时的重发机制,接下来需要完善代码,添加重发队列和定时器中断处理。
另外,用户提到使用数组而不是结构体,所以需要用多个并行数组来保存各个属性。
接下来,编写完整的代码,包括全局变量、定时器中断处理函数、以及修改后的MQTT_Publish函数。
但需要注意,在中断处理函数中,不能进行复杂操作,或者需要使用临界区保护(如禁用中断)来避免竞态条件。例如,在修改全局队列时,可能需要暂时禁用中断。
这里可能需要对代码进行一些调整,比如在操作pending_count等全局变量时,需要确保原子性。
综上,以下是完善后的代码:
首先,添加全局变量:
#define MAX_PENDING_MESSAGES 5
#define BUFF_SIZE 512
#define RETRY_TIMEOUT 5000 // 5秒超时
#define MAX_RETRIES 3
static uint8_t pending_data[MAX_PENDING_MESSAGES][BUFF_SIZE];
static uint16_t pending_len[MAX_PENDING_MESSAGES];
static uint16_t pending_id[MAX_PENDING_MESSAGES];
static uint8_t pending_retries[MAX_PENDING_MESSAGES];
static uint32_t pending_time[MAX_PENDING_MESSAGES];
static int pending_count = 0;
// 假设存在获取当前时间的函数,例如通过系统tick
extern uint32_t get_current_time(void);
// 假设存在处理确认报文的函数
void MQTT_Handle_PubAck(uint16_t packet_id) {
// 在收到PUBACK时调用此函数
for (int i = 0; i < pending_count; ) {
if (pending_id[i] == packet_id) {
// 移除第i个条目
for (int j = i; j < pending_count - 1; j++) {
memcpy(pending_data[j], pending_data[j+1], BUFF_SIZE);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
break; // 每个ID唯一,找到后退出
} else {
i++;
}
}
}
// 定时器中断处理函数,需在适当的中断服务例程中调用
void Timer_IRQHandler(void) {
uint32_t current_time = get_current_time();
for (int i = 0; i < pending_count; ) {
if ((current_time - pending_time[i]) >= RETRY_TIMEOUT) {
if (pending_retries[i] > 0) {
// 重发报文,设置DUP位为1
uint8_t *data = pending_data[i];
data[0] |= 0x08; // 设置DUP位为1
Write_MQTT_TxDataBuff(data, pending_len[i]);
mqtt_debug(data, pending_len[i], "\r\n* PUBLISH重发 *\r\n");
pending_time[i] = current_time;
pending_retries[i]--;
i++;
} else {
// 重试次数用尽,移除条目
for (int j = i; j < pending_count - 1; j++) {
memcpy(pending_data[j], pending_data[j+1], BUFF_SIZE);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
// i不递增,因为后面的元素前移了
}
} else {
i++;
}
}
}
然后,修改MQTT_Publish函数,在发送成功后,将报文加入队列:
char MQTT_Publish(int dup, int qos, int retain, char *post_topic, unsigned int id, char *message) {
const int BUFF_SIZE = 512;
char data[BUFF_SIZE] = {0};
int len = 0;
int remaining_length;
int encoded_remaining_len_bytes = 0;
int temp_remaining_length;
int topic_length = post_topic ? strlen(post_topic) : 0;
int message_length = message ? strlen(message) : 0;
if (topic_length == 0 || topic_length > 65535) {
return 1;
}
if (!message) {
return 2;
}
if (qos > 0 && id == 0) {
return 3;
}
if (qos == 0 && dup != 0) {
return 4;
}
remaining_length = (2 + topic_length) + (qos ? 2 : 0) + message_length;
temp_remaining_length = remaining_length;
do {
encoded_remaining_len_bytes++;
temp_remaining_length /= 128;
} while (temp_remaining_length > 0);
if ((1 + encoded_remaining_len_bytes + remaining_length) > BUFF_SIZE) {
return 5;
}
data[len++] = 0x30 | (dup << 3) | (qos << 1) | retain;
char remaining_encoded[4];
int encoded_len = encode_remaining_length(remaining_length, remaining_encoded);
memcpy(&data[len], remaining_encoded, encoded_len);
len += encoded_len;
data[len++] = (topic_length >> 8) & 0xFF;
data[len++] = topic_length & 0xFF;
memcpy(&data[len], post_topic, topic_length);
len += topic_length;
if (qos > 0) {
data[len++] = (id >> 8) & 0xFF;
data[len++] = id & 0xFF;
}
memcpy(&data[len], message, message_length);
len += message_length;
Write_MQTT_TxDataBuff(data, len);
mqtt_debug(data, len, "\r\n* 3、PUBLISH – 发布消息 *\r\n");
WIFI_Clean();
// 如果QoS>0,添加到重发队列
if (qos > 0) {
if (pending_count < MAX_PENDING_MESSAGES) {
memcpy(pending_data[pending_count], data, len);
pending_len[pending_count] = len;
pending_id[pending_count] = (uint16_t)id;
pending_retries[pending_count] = MAX_RETRIES;
pending_time[pending_count] = get_current_time();
pending_count++;
} else {
// 队列已满,无法重发,返回错误码?
return 6;
}
}
return 0;
}
需要注意的几点:
1. 添加了一个新的错误码6,表示重发队列已满,无法添加。这需要根据用户的需求决定是否添加,或者在原函数中不返回错误,只是忽略。但根据现有代码,返回类型是char,可能只能返回0-5的错误码。因此,可能需要调整错误码的处理。例如,原函数返回0表示成功,其他错误码1-5。这里如果队列已满,可能无法处理,但用户可能希望函数在这种情况下仍然发送成功,只是无法保证重发。或者,返回新的错误码。需要根据实际情况调整。但原函数在发送成功后返回0,因此队列满的问题可能不影响当前发送,只是无法重发。在这种情况下,可能不需要返回错误,而只是无法加入队列,导致后续无法重发。因此,在代码中可能不返回错误,但用户需要注意队列的大小。
2. 在添加队列时,将id转换为uint16_t,假设id是16位的,这需要与原函数中的参数类型一致。原函数中的id是unsigned int,可能需要根据平台调整。例如,在16位系统中,unsigned int是16位,但在32位系统中是32位。但MQTT的报文标识符是16位,因此id参数应该确保不超过65535,并且在加入队列时转换为uint16_t。
3. encode_remaining_length函数需要正确实现,用户提供的代码中未给出,但假设存在。
4. 在定时器中断处理函数中,调用Write_MQTT_TxDataBuff和mqtt_debug,这些函数可能不能在中断上下文中安全调用,特别是如果它们涉及阻塞操作或使用共享资源而没有保护。因此,可能需要调整中断处理函数,例如设置标志位,在主循环中处理实际的重发。但用户要求定时器中断函数调用重发,因此需要确保这些函数是中断安全的,或者在中断中仅准备数据,由主循环发送。
但根据用户的要求,可能需要在中断中直接调用发送函数。这取决于具体的硬件和驱动程序是否允许在中断中执行发送操作。假设Write_MQTT_TxDataBuff是中断安全的,或者驱动程序允许在中断中调用。
最后,用户需要确保get_current_time()函数能够提供当前的时间戳,例如系统tick计数器,可能在中断中递增。
综上,上述代码添加了重发队列,处理了dup标志,定时器中断函数处理重发,并在收到确认时移除队列中的条目。这样,当报文需要重发时,DUP标志被正确设置为1,并重新发送。</think>以下是完善后的代码,包含重发机制和必要的注释:
```c
#include <string.h>
#define MAX_PENDING_MESSAGES 5 // 最大挂起消息数
#define RETRY_TIMEOUT 5000 // 重发超时5秒
#define MAX_RETRIES 3 // 最大重试次数
#define BUFF_SIZE 512 // 缓冲区大小
/* 重发队列结构(使用并行数组模拟) */
static uint8_t pending_data[MAX_PENDING_MESSAGES][BUFF_SIZE];
static uint16_t pending_len[MAX_PENDING_MESSAGES];
static uint16_t pending_id[MAX_PENDING_MESSAGES];
static uint8_t pending_retries[MAX_PENDING_MESSAGES];
static uint32_t pending_time[MAX_PENDING_MESSAGES];
static int pending_count = 0;
/* 外部依赖函数声明 */
extern void Write_MQTT_TxDataBuff(char *data, int len);
extern void mqtt_debug(char *data, int len, char *info);
extern void WIFI_Clean(void);
extern uint32_t get_current_time(void);
extern int encode_remaining_length(int length, char *buf);
/* MQTT发布函数 */
char MQTT_Publish(int dup, int qos, int retain, char *post_topic, unsigned int id, char *message)
{
char data[BUFF_SIZE] = {0};
int len = 0, remaining_length;
int topic_length = post_topic ? strlen(post_topic) : 0;
int message_length = message ? strlen(message) : 0;
/* 参数校验 */
if (topic_length == 0 || topic_length > 65535) return 1; // 主题长度非法
if (!message) return 2; // 消息为空
if (qos > 0 && id == 0) return 3; // QoS>0需要有效ID
if (qos == 0 && dup != 0) return 4; // QoS0不允许重发
/* 计算剩余长度 */
remaining_length = 2 + topic_length + (qos ? 2 : 0) + message_length;
/* 缓冲区容量检查 */
int temp = remaining_length, encoded_len_bytes = 0;
do {
encoded_len_bytes++;
temp /= 128;
} while (temp > 0);
if (1 + encoded_len_bytes + remaining_length > BUFF_SIZE) return 5;
/* 构建固定报头 */
data[len++] = 0x30 | (dup << 3) | (qos << 1) | retain;
/* 编码剩余长度 */
char remaining_encoded[4];
int encoded_len = encode_remaining_length(remaining_length, remaining_encoded);
memcpy(&data[len], remaining_encoded, encoded_len);
len += encoded_len;
/* 构建可变报头 */
data[len++] = (topic_length >> 8) & 0xFF; // 主题长度高字节
data[len++] = topic_length & 0xFF; // 主题长度低字节
memcpy(&data[len], post_topic, topic_length);
len += topic_length;
if (qos > 0) { // QoS需要报文ID
data[len++] = (id >> 8) & 0xFF;
data[len++] = id & 0xFF;
}
/* 构建有效载荷 */
memcpy(&data[len], message, message_length);
len += message_length;
/* 发送并加入重发队列(QoS>0时) */
Write_MQTT_TxDataBuff(data, len);
mqtt_debug(data, len, "\r\n* PUBLISH消息发送 *\r\n");
WIFI_Clean();
if (qos > 0 && pending_count < MAX_PENDING_MESSAGES) {
memcpy(pending_data[pending_count], data, len);
pending_len[pending_count] = len;
pending_id[pending_count] = (uint16_t)id;
pending_retries[pending_count] = MAX_RETRIES;
pending_time[pending_count] = get_current_time();
pending_count++;
}
return 0;
}
/* 定时器中断处理函数(需在中断服务例程中调用) */
void Timer_IRQHandler(void)
{
uint32_t current_time = get_current_time();
for (int i = 0; i < pending_count; ) {
if (current_time - pending_time[i] >= RETRY_TIMEOUT) {
if (pending_retries[i] > 0) {
/* 修改DUP标志并重发 */
pending_data[i][0] |= 0x08; // 设置DUP位为1
Write_MQTT_TxDataBuff(pending_data[i], pending_len[i]);
mqtt_debug(pending_data[i], pending_len[i], "\r\n* PUBLISH消息重发 *\r\n");
pending_time[i] = current_time;
pending_retries[i]--;
i++;
} else {
/* 移除重试耗尽的消息 */
for (int j = i; j < pending_count-1; j++) {
memcpy(pending_data[j], pending_data[j+1], BUFF_SIZE);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
}
} else {
i++;
}
}
}
/* 确认报文处理函数(收到PUBACK时调用) */
void MQTT_Handle_PubAck(uint16_t packet_id)
{
for (int i = 0; i < pending_count; ) {
if (pending_id[i] == packet_id) {
for (int j = i; j < pending_count-1; j++) {
memcpy(pending_data[j], pending_data[j+1], BUFF_SIZE);
pending_len[j] = pending_len[j+1];
pending_id[j] = pending_id[j+1];
pending_retries[j] = pending_retries[j+1];
pending_time[j] = pending_time[j+1];
}
pending_count--;
break;
} else {
i++;
}
}
}
```
### 关键改进说明:
1. **重发队列管理**
- 使用并行数组`pending_data`、`pending_len`等模拟结构体队列
- QoS>0的消息首次发送后自动加入队列
- 队列满时新消息无法加入(可根据需求扩展为循环队列)
2. **DUP标志处理**
- 首次发送时DUP=0,重发时修改固定报头的DUP位为1
```c
pending_data[i][0] |= 0x08; // 设置bit3为1
```
3. **定时器中断优化**
- 每次中断检查超时消息
- 重试次数耗尽自动移除消息
- 时间戳比对避免频繁重发
4. **线程安全考虑**
- 中断和主循环共享的`pending_count`变量需确保原子访问
- 实际部署时建议增加临界区保护(如关中断)
> **注意事项**
> 1. `get_current_time()`需实现为获取系统tick的函数
> 2. 中断服务中调用的`Write_MQTT_TxDataBuff`需确保可重入
> 3. 报文ID建议使用单调递增算法生成
阅读全文
相关推荐





