2023-10-25 17:34:40 +08:00
|
|
|
|
/*
|
|
|
|
|
用于接收 MQTT 服务器的消息
|
|
|
|
|
并将消息转发给 CG 程序
|
|
|
|
|
同时存入 sqlite3 数据库
|
|
|
|
|
|
|
|
|
|
订阅 ctos 的消息,即从设备传入 MQTT 服务器的传感器数据
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include "includes/mosquitto.h" // MQTT 的头文件
|
|
|
|
|
#include <pthread.h>
|
|
|
|
|
#include <signal.h>
|
|
|
|
|
#include <stdbool.h> // bool 类型
|
|
|
|
|
#include <stdio.h>
|
|
|
|
|
#include <string.h>
|
|
|
|
|
#include <unistd.h>
|
|
|
|
|
#include <cjson/cJSON.h> // cJSON 的头文件
|
|
|
|
|
#include "includes/sqlite3.h" // sqlite3 的头文件
|
2023-10-26 16:56:52 +08:00
|
|
|
|
#include <time.h> // time_t 用于获取系统时间
|
2023-10-25 17:34:40 +08:00
|
|
|
|
|
|
|
|
|
struct mosquitto *client; // mosquitto 客户端
|
|
|
|
|
int isRunning = 1; // 线程运行标志
|
|
|
|
|
pthread_t main_th_id; // 主线程 ID
|
|
|
|
|
|
|
|
|
|
// sqlite3 连接的回调函数
|
|
|
|
|
void connect_callback(struct mosquitto *mosq, void *args, int rc)
|
|
|
|
|
{
|
|
|
|
|
if (rc == 0)
|
|
|
|
|
{
|
|
|
|
|
printf("连接MQTT服务成功!\n");
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
printf("连接MQTT服务失败!\n");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sqlite3 订阅的回调函数
|
|
|
|
|
void on_subscribe(struct mosquitto *mosq,
|
|
|
|
|
void *obj,
|
|
|
|
|
int mid,
|
|
|
|
|
int qos_count,
|
|
|
|
|
const int *granted_qos)
|
|
|
|
|
{
|
|
|
|
|
printf("-订阅的消息ID为--%d 成功, Qos的granted: %d--\n", mid,
|
|
|
|
|
*granted_qos);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sqlite3 接收消息的回调函数(处理接收到的消息,并将消息存入 sqlite3 数据库)
|
|
|
|
|
void on_message(struct mosquitto *mosq,
|
|
|
|
|
void *obj,
|
|
|
|
|
const struct mosquitto_message *message)
|
|
|
|
|
{
|
|
|
|
|
printf("收到%s(%d)的消息: %s\n", message->topic, message->mid,
|
|
|
|
|
(char *)message->payload);
|
2023-10-26 16:56:52 +08:00
|
|
|
|
// if (strncmp((char *)message->payload, "exit", 4) == 0)
|
|
|
|
|
// {
|
|
|
|
|
// isRunning = 0;
|
|
|
|
|
// mosquitto_loop_stop(client, true);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// 解析 JSON 数据
|
|
|
|
|
cJSON *root_json = cJSON_Parse((char *)message->payload);
|
|
|
|
|
if (root_json == NULL)
|
|
|
|
|
{
|
|
|
|
|
printf("JSON 数据解析失败!\n");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// 获取 JSON 数据中的各个字段
|
|
|
|
|
// cJSON *id = cJSON_GetObjectItem(root_json, "id");
|
|
|
|
|
cJSON *temperature = cJSON_GetObjectItem(root_json, "temperature");
|
|
|
|
|
cJSON *humidity = cJSON_GetObjectItem(root_json, "humidity");
|
|
|
|
|
|
|
|
|
|
// 拿出字段的值
|
|
|
|
|
double temperature_value = temperature->valuedouble;
|
|
|
|
|
double humidity_value = humidity->valuedouble;
|
|
|
|
|
|
|
|
|
|
// // 获取当前日期和时间
|
|
|
|
|
// time_t current_time;
|
|
|
|
|
// time(¤t_time);
|
|
|
|
|
|
|
|
|
|
// // 将时间转换为本地时间
|
|
|
|
|
// struct tm *localtime = localetime(¤t_time);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
printf("温度: %.2lf, 湿度: %.2lf\n", temperature_value, humidity_value);
|
|
|
|
|
|
|
|
|
|
// 打开数据库
|
|
|
|
|
sqlite3 *db = NULL; // 数据库连接句柄
|
|
|
|
|
int ret = sqlite3_open("../sql_base/green_house.db", &db); // 打开数据库
|
|
|
|
|
if (ret != SQLITE_OK)
|
2023-10-25 17:34:40 +08:00
|
|
|
|
{
|
2023-10-26 16:56:52 +08:00
|
|
|
|
printf("数据库打开失败!\n");
|
|
|
|
|
return;
|
2023-10-25 17:34:40 +08:00
|
|
|
|
}
|
2023-10-26 16:56:52 +08:00
|
|
|
|
|
|
|
|
|
// 构建插入语句
|
|
|
|
|
char insert_sql[128] = "";
|
|
|
|
|
sprintf(insert_sql,
|
|
|
|
|
"insert into temp_hum_info (temperature,humidity) values(%.2lf,%.2lf);",
|
|
|
|
|
temperature_value, humidity_value); // 构建插入语句
|
|
|
|
|
printf("insert_sql is %s\n", insert_sql);
|
|
|
|
|
|
|
|
|
|
// 执行插入语句
|
|
|
|
|
char *errmsg = NULL;
|
|
|
|
|
ret = sqlite3_exec(db, insert_sql, NULL, NULL, &errmsg);
|
|
|
|
|
|
|
|
|
|
if (ret != SQLITE_OK)
|
|
|
|
|
{
|
|
|
|
|
printf("插入数据失败!\n");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
printf("插入数据成功!\n");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 关闭数据库
|
|
|
|
|
sqlite3_close(db);
|
|
|
|
|
|
|
|
|
|
// 释放 cJSON_Parse() 分配的内存
|
|
|
|
|
cJSON_Delete(root_json);
|
2023-10-25 17:34:40 +08:00
|
|
|
|
}
|
|
|
|
|
int main(int argc, char const *argv[])
|
|
|
|
|
{
|
|
|
|
|
main_th_id = pthread_self();
|
|
|
|
|
// 1. 初始化mosquitto的库环境
|
|
|
|
|
// 使用mosquitto库函数前,要先初始化,使用之后要清除。
|
|
|
|
|
mosquitto_lib_init(); // 初始化(固定格式)
|
|
|
|
|
|
|
|
|
|
// 2. 创建mosquitto的客户端
|
|
|
|
|
unsigned char userdata[128] = "1";
|
|
|
|
|
client = mosquitto_new("166", true, userdata); // 创建客户端句柄
|
|
|
|
|
// struct mosquitto* mosquitto_new(
|
|
|
|
|
// const char *id,//用户自定义标识ID
|
|
|
|
|
// bool clean_session, //断开后是否保留订阅信息true/false
|
|
|
|
|
// void *userdata //回调参数
|
|
|
|
|
// );
|
|
|
|
|
// d可以为NULL,clean_session的标识必须是true, userdata也可以是NULL
|
|
|
|
|
if (client == NULL) // 创建失败
|
|
|
|
|
{
|
|
|
|
|
printf("创建mqtt客户端失败!\n");
|
|
|
|
|
perror("mosquitto_new\n");
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 设置回调函数
|
|
|
|
|
mosquitto_connect_callback_set(client, connect_callback); // 设置连接回调函数,用于连接后的处理
|
|
|
|
|
mosquitto_subscribe_callback_set(client, on_subscribe); // 设置订阅回调函数,用于订阅后的处理
|
|
|
|
|
mosquitto_message_callback_set(client, on_message); // 设置接收消息回调函数,用于接收到消息后的处理
|
|
|
|
|
|
|
|
|
|
// 3. 连接mqtt broker
|
2023-10-26 20:00:47 +08:00
|
|
|
|
int flag = mosquitto_connect(client, "localhost", 1883, 60);
|
|
|
|
|
//int flag = mosquitto_connect(client, "flykhan.com", 1883, 60*60*24*30); // 连接服务器(地址,端口,超时时间) : 超时时间为一个月
|
2023-10-25 17:34:40 +08:00
|
|
|
|
if (flag == MOSQ_ERR_SUCCESS)
|
|
|
|
|
{
|
|
|
|
|
printf("-----连接MQTT 服务器成功!-----\n");
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
printf("-----连接MQTT 服务器失败!-----\n");
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int msgId = 1; // 消息ID
|
|
|
|
|
// 开始订阅信息
|
|
|
|
|
flag = mosquitto_subscribe(client, &msgId, "ctos", 0);
|
|
|
|
|
if (flag == MOSQ_ERR_SUCCESS)
|
|
|
|
|
{
|
|
|
|
|
printf("订阅消息ID: %d 成功, 等待消息!\n", msgId);
|
|
|
|
|
}
|
|
|
|
|
while (isRunning)
|
|
|
|
|
{
|
|
|
|
|
// 处理网络事件
|
|
|
|
|
mosquitto_loop_start(client); // 接收网络数据
|
2023-10-26 16:56:52 +08:00
|
|
|
|
usleep(500);
|
2023-10-25 17:34:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 关闭mosquitto的客户端
|
|
|
|
|
mosquitto_destroy(client);
|
|
|
|
|
// 最后清理库环境
|
|
|
|
|
mosquitto_lib_cleanup();
|
|
|
|
|
return 0;
|
|
|
|
|
}
|