/* 用于接收 MQTT 服务器的消息 并将消息转发给 CG 程序 同时存入 sqlite3 数据库 订阅 ctos 的消息,即从设备传入 MQTT 服务器的传感器数据 */ #include "includes/mosquitto.h" // MQTT 的头文件 #include #include #include // bool 类型 #include #include #include #include // cJSON 的头文件 #include "includes/sqlite3.h" // sqlite3 的头文件 #include // time_t 用于获取系统时间 struct mosquitto *client; // mosquitto 客户端 int isRunning = 1; // 线程运行标志 pthread_t main_th_id; // 主线程 ID // sqlite3 连接的回调函数 void connect_callback(struct mosquitto *mosq, void *args, int rc) { if (rc == 0) { printf("连接MQTT服务成功!\n"); } else { printf("连接MQTT服务失败!\n"); } } // sqlite3 订阅的回调函数 void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { printf("-订阅的消息ID为--%d 成功, Qos的granted: %d--\n", mid, *granted_qos); } // sqlite3 接收消息的回调函数(处理接收到的消息,并将消息存入 sqlite3 数据库) void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { printf("收到%s(%d)的消息: %s\n", message->topic, message->mid, (char *)message->payload); // if (strncmp((char *)message->payload, "exit", 4) == 0) // { // isRunning = 0; // mosquitto_loop_stop(client, true); // } // 解析 JSON 数据 cJSON *root_json = cJSON_Parse((char *)message->payload); if (root_json == NULL) { printf("JSON 数据解析失败!\n"); return; } // 获取 JSON 数据中的各个字段 // cJSON *id = cJSON_GetObjectItem(root_json, "id"); cJSON *temperature = cJSON_GetObjectItem(root_json, "temperature"); cJSON *humidity = cJSON_GetObjectItem(root_json, "humidity"); // 拿出字段的值 double temperature_value = temperature->valuedouble; double humidity_value = humidity->valuedouble; // 获取存入数据库的时间 time_t now = time(NULL); // 获取当前时间 struct tm *local = localtime(&now); // 转换为本地时间 char time_str[80]; // 存储时间的字符串 strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", local); // printf("温度: %.2lf, 湿度: %.2lf, 时间: %s\n", temperature_value, humidity_value, local); printf("温度: %.2lf, 湿度: %.2lf, 时间: %s\n", temperature_value, humidity_value, time_str); // 打开数据库 sqlite3 *db = NULL; // 数据库连接句柄 int ret = sqlite3_open("../sql_base/green_house.db", &db); // 打开数据库 if (ret != SQLITE_OK) { printf("数据库打开失败!\n"); return; } // 构建插入语句 char insert_sql[128] = ""; sprintf(insert_sql, "insert into temp_hum_info (temperature,humidity,th_date_time) values(%.2lf,%.2lf,\"%s\");", temperature_value, humidity_value, time_str); // 构建插入语句 printf("insert_sql is %s\n", insert_sql); // 执行插入语句 char *errmsg = NULL; ret = sqlite3_exec(db, insert_sql, NULL, NULL, &errmsg); if (ret != SQLITE_OK) { printf("插入数据失败!\n"); return; } else { printf("插入数据成功!\n"); } // 关闭数据库 sqlite3_close(db); // 释放 cJSON_Parse() 分配的内存 cJSON_Delete(root_json); } int main(int argc, char const *argv[]) { main_th_id = pthread_self(); // 1. 初始化mosquitto的库环境 // 使用mosquitto库函数前,要先初始化,使用之后要清除。 mosquitto_lib_init(); // 初始化(固定格式) // 2. 创建mosquitto的客户端 unsigned char userdata[128] = "1"; client = mosquitto_new("167", true, userdata); // 创建客户端句柄 // struct mosquitto* mosquitto_new( // const char *id,//用户自定义标识ID // bool clean_session, //断开后是否保留订阅信息true/false // void *userdata //回调参数 // ); // d可以为NULL,clean_session的标识必须是true, userdata也可以是NULL if (client == NULL) // 创建失败 { printf("创建mqtt客户端失败!\n"); perror("mosquitto_new\n"); return -1; } // 设置回调函数 mosquitto_connect_callback_set(client, connect_callback); // 设置连接回调函数,用于连接后的处理 mosquitto_subscribe_callback_set(client, on_subscribe); // 设置订阅回调函数,用于订阅后的处理 mosquitto_message_callback_set(client, on_message); // 设置接收消息回调函数,用于接收到消息后的处理 // 3. 连接mqtt broker int flag = mosquitto_connect(client, "localhost", 1883, 60); //int flag = mosquitto_connect(client, "flykhan.com", 1883, 60*60*24*30); // 连接服务器(地址,端口,超时时间) : 超时时间为一个月 if (flag == MOSQ_ERR_SUCCESS) { printf("-----连接MQTT 服务器成功!-----\n"); } else { printf("-----连接MQTT 服务器失败!-----\n"); return -1; } int msgId = 1; // 消息ID // 开始订阅信息 flag = mosquitto_subscribe(client, &msgId, "ctos", 0); if (flag == MOSQ_ERR_SUCCESS) { printf("订阅消息ID: %d 成功, 等待消息!\n", msgId); } while (isRunning) { // 处理网络事件 mosquitto_loop_start(client); // 接收网络数据 usleep(500); } // 关闭mosquitto的客户端 mosquitto_destroy(client); // 最后清理库环境 mosquitto_lib_cleanup(); return 0; }