STM32H7S78-DK基于TCP-CLIENT的冷链管理-电子产品世界论坛 (eepw.com.cn)
在这篇中,我们实现了tcpclient,Mqtt是一种轻量的网络协议,可以实现一对多的数据发布与多对一个数据交互。因此在物联网领域,这是一款比较常用的工具。
【实现步骤】
1、向工程中添加bsp_mqtt.c/h。
2、编写代码如下:
/* * bsp_mqtt.c * * Created on: Mar 19, 2024 * Author: liujianhua */ /*----------------------------------------------------------- * Includes files *----------------------------------------------------------*/ /* lib includes. */ #include <string.h> #include <stdio.h> /* segger rtt includes. */ #include "main.h" #include "bsp_mqtt.h" /* FreeRTOS includes. */ #include "FreeRTOS.h" #include "semphr.h" #include "cJSON.h" /* lwip includes. */ #include "lwip/apps/mqtt.h" #include "lwip/ip4_addr.h" #include "cmsis_os2.h" #include "vacc.h" float heartValue; uint8_t heartSetValue; uint8_t myheartstate = 0 ; //extern uint8_t mqtt_recv_state; static err_t bsp_mqtt_connect(void); extern VaccDevice *pmyVacc; #define USE_MQTT_MUTEX //使用发送数据的互斥锁,多个任务有发送才必须 #ifdef USE_MQTT_MUTEX static SemaphoreHandle_t s__mqtt_publish_mutex = NULL; #endif /* USE_MQTT_MUTEX */ static mqtt_client_t *s__mqtt_client_instance = NULL; //mqtt连接句柄,这里一定要设置全局变量,防止 lwip 底层重复申请空间 //MQTT 数据结构体 struct mqtt_recv_buffer { char recv_buffer[1024]; //储存接收的buffer uint16_t recv_len; //记录已接收多少个字节的数据,MQTT的数据分包来的 uint16_t recv_total; //MQTT接收数据的回调函数会有个总的大小 }; //结构体初始化 struct mqtt_recv_buffer s__mqtt_recv_buffer_g = { .recv_len = 0, .recv_total = 0, }; static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos); /* =========================================== 接收回调函数 ============================================== */ /*! * @brief mqtt 接收数据处理函数接口,需要在应用层进行处理 * 执行条件:mqtt连接成功 * * @param [in1] : 用户提供的回调参数指针 * @param [in2] : 接收的数据指针 * @param [in3] : 接收数据长度 * @retval: 处理的结果 */ __weak int mqtt_rec_data_process(void* arg, char *rec_buf, uint64_t buf_len) { cJSON *json; printf("recv_buffer = %s\n", rec_buf); json = cJSON_Parse(rec_buf); char *ptr; double ret; if(json == NULL) { printf("json fmt error:%s\r\n.", cJSON_GetErrorPtr()); } else { cJSON *Temper = cJSON_GetObjectItem(json, "Tmp_up"); if (!Temper) { printf("no Temperature!\n"); } else{ ret = strtod(Temper->valuestring,NULL); printf("value:%.1f\r\n",ret); if(NULL != pmyVacc) { pmyVacc->up_tmp = ret; } } Temper = cJSON_GetObjectItem(json, "Tmp_down"); if (!Temper) { printf("no Temperature!\n"); } else{ ret = strtod(Temper->valuestring,NULL); printf("value:%.1f\r\n",ret); if(NULL != pmyVacc) { pmyVacc->down_tmp = ret; } } Temper = cJSON_GetObjectItem(json, "Hum_down"); if (!Temper) { printf("no Temperature!\n"); } else{ ret = strtod(Temper->valuestring,NULL); printf("value:%.1f\r\n",ret); if(NULL != pmyVacc) { pmyVacc->down_hum = ret; } } Temper = cJSON_GetObjectItem(json, "Hum_up"); if (!Temper) { printf("no Temperature!\n"); } else{ ret = strtod(Temper->valuestring,NULL); printf("value:%.1f\r\n",ret); if(NULL != pmyVacc) { pmyVacc->up_hum = ret; } } cJSON_Delete(json); } return 0; } /*! * @brief MQTT 接收到数据的回调函数 * 执行条件:MQTT 连接成功 * * @param [in1] : 用户提供的回调参数指针 * @param [in2] : MQTT 收到的分包数据指针 * @param [in3] : MQTT 分包数据长度 * @param [in4] : MQTT 数据包的标志位 * @retval: None */ static void bsp_mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) { if( (data == NULL) || (len == 0) ) { printf("mqtt_client_incoming_data_cb: condition error @entry\n"); return; } if(s__mqtt_recv_buffer_g.recv_len + len < sizeof(s__mqtt_recv_buffer_g.recv_buffer)) { // snprintf(&s__mqtt_recv_buffer_g.recv_buffer[s__mqtt_recv_buffer_g.recv_len], len+1, "%s", data); s__mqtt_recv_buffer_g.recv_len += len; } if ( (flags & MQTT_DATA_FLAG_LAST) == MQTT_DATA_FLAG_LAST ) { //处理数据 mqtt_rec_data_process(arg , s__mqtt_recv_buffer_g.recv_buffer, s__mqtt_recv_buffer_g.recv_len); //已接收字节计数归0 s__mqtt_recv_buffer_g.recv_len = 0; //清空接收buffer memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer)); } printf("mqtt_client_incoming_data_cb:reveiving incomming data.\n"); } /*! * @brief MQTT 接收到数据的回调函数 * 执行条件:MQTT 连接成功 * * @param [in] : 用户提供的回调参数指针 * @param [in] : MQTT 收到数据的topic * @param [in] : MQTT 收到数据的总长度 * @retval: None */ static void bsp_mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) { if( (topic == NULL) || (tot_len == 0) ) { printf("bsp_mqtt_incoming_publish_cb: condition error @entry\n"); return; } printf("bsp_mqtt_incoming_publish_cb: topic = %s.\n",topic); printf("bsp_mqtt_incoming_publish_cb: tot_len = %d.\n",tot_len); s__mqtt_recv_buffer_g.recv_total = tot_len; //需要接收的总字节 s__mqtt_recv_buffer_g.recv_len = 0; //已接收字节计数归0 //清空接收buffer memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer)); } /* =========================================== 连接状态回调函数 ============================================== */ /*! * @brief MQTT 连接成功的处理函数,需要的话在应用层定义 * * @param [in1] : MQTT 连接句柄 * @param [in2] : MQTT 连接参数指针 * * @retval: None */ __weak void mqtt_conn_suc_proc(mqtt_client_t *client, void *arg) { char test_sub_topic[] = "attributes/response"; bsp_mqtt_subscribe(client,test_sub_topic,0); } /*! * @brief MQTT 处理失败调用的函数 * * @param [in1] : MQTT 连接句柄 * @param [in2] : MQTT 连接参数指针 * * @retval: None */ __weak void mqtt_error_process_callback(mqtt_client_t * client, void *arg) { } /*! * @brief MQTT 连接状态的回调函数 * * @param [in] : MQTT 连接句柄 * @param [in] : 用户提供的回调参数指针 * @param [in] : MQTT 连接状态 * @retval: None */ static void bsp_mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) { if( client == NULL ) { printf("bsp_mqtt_connection_cb: condition error @entry\n"); return; } if ( status == MQTT_CONNECT_ACCEPTED ) //Successfully connected { printf("bsp_mqtt_connection_cb: Successfully connected\n"); // 注册接收数据的回调函数 mqtt_set_inpub_callback(client, bsp_mqtt_incoming_publish_cb, bsp_mqtt_incoming_data_cb, arg); //成功处理函数 mqtt_conn_suc_proc(client, arg); } else { printf("bsp_mqtt_connection_cb: Fail connected, status = %s\n", lwip_strerr(status) ); //错误处理 mqtt_error_process_callback(client, arg); } } /*! * @brief 连接到 mqtt 服务器 * 执行条件:无 * * @param [in] : None * * @retval: 连接状态,如果返回不是 ERR_OK 则需要重新连接 */ static err_t bsp_mqtt_connect(void) { printf("bsp_mqtt_connect: Enter!\n"); err_t ret; struct mqtt_connect_client_info_t mqtt_connect_info = { "Stm32H7S78_MQTT_Test", /* 这里需要修改,以免在同一个服务器两个相同ID会发生冲突 */ "NULL", /* MQTT 服务器用户名 */ "NULL", /* MQTT 服务器密码 */ 60, /* 与 MQTT 服务器保持连接时间,时间超过未发送数据会断开 */ "attributes/response",/* MQTT遗嘱的消息发送topic */ "Offline_pls_check", /* MQTT遗嘱的消息,断开服务器的时候会发送 */ 0, /* MQTT遗嘱的消息 Qos */ 0 /* MQTT遗嘱的消息 Retain */ }; ip_addr_t server_ip; ip4_addr_set_u32(&server_ip, ipaddr_addr("192.168.3.180")); //MQTT服务器IP uint16_t server_port = 1883; //注意这里是 MQTT 的 TCP 连接方式的端口号!!!! if (s__mqtt_client_instance == NULL) { // 句柄==NULL 才申请空间,否则无需重复申请 s__mqtt_client_instance = mqtt_client_new(); } if (s__mqtt_client_instance == NULL) { //防止申请失败 printf("bsp_mqtt_connect: s__mqtt_client_instance malloc fail @@!!!\n"); return ERR_MEM; } //进行连接,注意:如果需要带入 arg ,arg必须是全局变量,局部变量指针会被回收,大坑!!!!! ret = mqtt_client_connect(s__mqtt_client_instance, &server_ip, server_port, bsp_mqtt_connection_cb, NULL, &mqtt_connect_info); /****************** 小提示:连接错误不需要做任何操作,mqtt_client_connect 中注册的回调函数里面做判断并进行对应的操作 *****************/ printf("bsp_mqtt_connect: connect to mqtt %s\n", lwip_strerr(ret)); return ret; } /* =========================================== 发送接口、回调函数 ============================================== */ /*! * @brief MQTT 发送数据的回调函数 * 执行条件:MQTT 连接成功 * * @param [in] : 用户提供的回调参数指针 * @param [in] : MQTT 发送的结果:成功或者可能的错误 * @retval: None */ static void mqtt_client_pub_request_cb(void *arg, err_t result) { mqtt_client_t *client = (mqtt_client_t *)arg; if (result != ERR_OK) { printf("mqtt_client_pub_request_cb: c002: Publish FAIL, result = %s\n", lwip_strerr(result)); //错误处理 mqtt_error_process_callback(client, arg); } else { printf("mqtt_client_pub_request_cb: c005: Publish complete!\n"); } } /*! * @brief 发送消息到服务器 * 执行条件:无 * * @param [in1] : mqtt 连接句柄 * @param [in2] : mqtt 发送 topic 指针 * @param [in3] : 发送数据包指针 * @param [in4] : 数据包长度 * @param [in5] : qos * @param [in6] : retain * @retval: 发送状态 * @note: 有可能发送不成功但是现实返回值是 0 ,需要判断回调函数 mqtt_client_pub_request_cb 是否 result == ERR_OK */ err_t bsp_mqtt_publish(mqtt_client_t *client, char *pub_topic, char *pub_buf, uint16_t data_len, uint8_t qos, uint8_t retain) { if ( (client == NULL) || (pub_topic == NULL) || (pub_buf == NULL) || (data_len == 0) || (qos > 2) || (retain > 1) ) { printf("bsp_mqtt_publish: input error@@" ); return ERR_VAL; } //判断是否连接状态 if(mqtt_client_is_connected(client) != pdTRUE) { printf("bsp_mqtt_publish: client is not connected\n"); return ERR_CONN; } err_t err; #ifdef USE_MQTT_MUTEX // 创建 mqtt 发送互斥锁 if (s__mqtt_publish_mutex == NULL) { printf("bsp_mqtt_publish: create mqtt mutex ! \n" ); s__mqtt_publish_mutex = xSemaphoreCreateMutex(); } if (xSemaphoreTake(s__mqtt_publish_mutex, portMAX_DELAY) == pdPASS) #endif /* USE_MQTT_MUTEX */ { err = mqtt_publish(client, pub_topic, pub_buf, data_len, qos, retain, mqtt_client_pub_request_cb, (void*)client); printf("bsp_mqtt_publish: mqtt_publish err = %s\n", lwip_strerr(err) ); #ifdef USE_MQTT_MUTEX printf("bsp_mqtt_publish: mqtt_publish xSemaphoreTake\n"); xSemaphoreGive(s__mqtt_publish_mutex); #endif /* USE_MQTT_MUTEX */ } return err; } /* =========================================== MQTT 订阅接口函数 ============================================== */ /*! * @brief MQTT 订阅的回调函数 * 执行条件:MQTT 连接成功 * * @param [in] : 用户提供的回调参数指针 * @param [in] : MQTT 订阅结果 * @retval: None */ static void bsp_mqtt_request_cb(void *arg, err_t err) { if ( arg == NULL ) { printf("bsp_mqtt_request_cb: input error@@\n"); return; } mqtt_client_t *client = (mqtt_client_t *)arg; if ( err != ERR_OK ) { printf("bsp_mqtt_request_cb: FAIL sub, sub again, err = %s\n", lwip_strerr(err)); //错误处理 mqtt_error_process_callback(client, arg); } else { printf("bsp_mqtt_request_cb: sub SUCCESS!\n"); } } /*! * @brief mqtt 订阅 * 执行条件:连接成功 * * @param [in1] : mqtt 连接句柄 * @param [in2] : mqtt 发送 topic 指针 * @param [in5] : qos * @retval: 订阅状态 */ static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos) { printf("bsp_mqtt_subscribe: Enter\n"); if( ( mqtt_client == NULL) || ( sub_topic == NULL) || ( qos > 2 ) ) { printf("bsp_mqtt_subscribe: input error@@\n"); return ERR_VAL; } if ( mqtt_client_is_connected(mqtt_client) != pdTRUE ) { printf("bsp_mqtt_subscribe: mqtt is not connected, return ERR_CLSD.\n"); return ERR_CLSD; } err_t err; err = mqtt_subscribe(mqtt_client, sub_topic, qos, bsp_mqtt_request_cb, (void *)mqtt_client); // subscribe and call back. if (err != ERR_OK) { printf("bsp_mqtt_subscribe: mqtt_subscribe Fail, return:%s \n", lwip_strerr(err)); } else { printf("bsp_mqtt_subscribe: mqtt_subscribe SUCCESS, reason: %s\n", lwip_strerr(err)); } return err; } /* =========================================== 初始化接口函数 ============================================== */ /*! * @brief 封装 MQTT 初始化接口 * 执行条件:无 * * @retval: 无 */ void bsp_mqtt_init(void) { printf("Mqtt init..."); // 连接服务器 bsp_mqtt_connect(); // 发送消息到服务器 char message_test[] = "Hello mqtt server"; // for(int i = 0; i < 10; i++) // { // bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heart",message_test,sizeof(message_test),1,0); // vTaskDelay(1000); // } } void user_pub_set(int setValue ) { char sen_buff[4] = {0}; sprintf(sen_buff,"%d",setValue); bsp_mqtt_publish(s__mqtt_client_instance,"attributes",sen_buff,2,1,0); osDelay(100); } void user_pub_onoff(int setValue ) { if(setValue == 1) { bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heartonoff","on",2,1,0); } else { bsp_mqtt_publish(s__mqtt_client_instance,"/lugl/heartonoff","off",3,1,0); } } err_t user_pub_tmp_hum(float Tmp,float Hum) { char buff[48]; err_t err; sprintf(buff,"{\"temperature\":%.1f, \"humidity\":%.1f}",Tmp,Hum); err = bsp_mqtt_publish(s__mqtt_client_instance,"attributes",buff,strlen(buff),1,0); return err; }
代码已经添加了详细的注释,不再过多解析。这里只做数据接收回调的分析。
在订阅指定的主题后,进行Json解析,如果符合,则更新到冷链设备数据中。以便进行数据分析。
使用,在工程中创建一个任务,添加mqtt的初始化后,就可以实现mqtt的发送与接收了。
void vaccTask(void *argument) { pmyVacc = vaccGetDevice(); err_t err; if(NULL == pmyVacc) { printf ("error Three is vacc no vaccdevice"); return; } pmyVacc->sensor_Init(pmyVacc); osDelay(5000); bsp_mqtt_init(); for(;;) { if(pmyVacc->Get_TemperatureHumidity(pmyVacc)) { printf("get sht30 error\r\n"); } osDelay(2000); /* 延时500个tick */ err = user_pub_tmp_hum(pmyVacc->Temperature,pmyVacc->Humidity); if(err == ERR_CONN) { osDelay(2000); bsp_mqtt_init(); } pmyVacc->vacc_check_state(pmyVacc); printf("vacc state:%d\r\n",pmyVacc->warn_sta); } }
如果在任务中出现发布数据,回复连接出错时,再次调用mqtt_ini就可以实现重连。
【实验环境】
1、使用emqt创建一个mqtt服务器,打开一个MQTT调试工具:
发送指定的主题,sth32H7s接收到相关数据后,可以实时更新参数的配置:
【总结】
在lwip移植成功后,可以快速的实现mqtt_client的创建,快速实现物联网的数据交互。