mqtt订阅和发布到服务器成功,订阅端可以将数据解析并存入数据库

This commit is contained in:
flykhan 2023-10-26 16:56:52 +08:00
parent 519318dd1a
commit daea54f40f
3 changed files with 133 additions and 9 deletions

View File

@ -4,20 +4,31 @@ MQTT_CFLAGS = -lmosquitto -lpthread -ldl
MQTT_CFLAGS += -I./includes
MQTT_CFLAGS += -L./mqtt_libs
SQLITE3_CFLAGS = -lpthread -ldl -lcjson
SQLITE3_CFLAGS = -lpthread -ldl
SQLITE3_CFLAGS += -I./includes
CJSON_CFLAGS = -lcjson
all:
sqlite_cgi_base:
$(cc) sqlite3.c sqlite_cgi_base.c -o sqlite_cgi_base.cgi $(SQLITE3_CFLAGS)
$(cc) sqlite3.c sqlite_cgi_base.c -o sqlite_cgi_base.cgi $(SQLITE3_CFLAGS) $(CJSON_CFLAGS)
sqlite_cgi_insert_base:
$(cc) sqlite3.c sqlite_cgi_insert_base.c -o sqlite_cgi_insert_base.cgi $(SQLITE3_CFLAGS)
$(cc) sqlite3.c sqlite_cgi_insert_base.c -o sqlite_cgi_insert_base.cgi $(SQLITE3_CFLAGS) $(CJSON_CFLAGS)
mqtt_sub_ctos:
$(cc) mqtt_sub_ctos.c -o mqtt_sub_ctos.cgi $(MQTT_CFLAGS)
$(cc) sqlite3.c mqtt_sub_ctos.c -o mqtt_sub_ctos.cgi $(MQTT_CFLAGS) $(CJSON_CFLAGS) $(SQLITE3_CFLAGS)
mqtt_pub_stoc:
$(cc) mqtt_pub_stoc.c -o mqtt_pub_stoc.cgi $(MQTT_CFLAGS)
mqtt:
$(cc) sqlite3.c mqtt_sub_ctos.c -o mqtt_sub_ctos.cgi $(MQTT_CFLAGS) $(CJSON_CFLAGS) $(SQLITE3_CFLAGS)
$(cc) mqtt_pub_stoc.c -o mqtt_pub_stoc.cgi $(MQTT_CFLAGS)
clean:
rm -rf *.cgi

48
cgi-bin/mqtt_pub_stoc.c Normal file
View File

@ -0,0 +1,48 @@
#include "includes/mosquitto.h" // mqtt
#include "includes/sqlite3.h" // sqlite3
#include <stdbool.h> // bool
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int isConnected = 0;
void connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
if (rc == 0)
{
printf("---连接服务器成功---\n");
isConnected = 1;
}
}
int main(int argc, char const *argv[])
{
if (argc < 3)
{
printf("format: %s msgid topic msg", argv[0]);
return -1;
}
mosquitto_lib_init(); // 初始化(固定格式)
struct mosquitto *client = mosquitto_new(NULL, true, NULL); // 创建客户端
mosquitto_connect_callback_set(client, connect_callback); // 设置连接回调函数
// mosquitto_connect(client, "localhost", 1883, 60); // 连接服务器(地址,端口,超时时间)
mosquitto_connect(client, "flykhan.com", 1883, 60); // 连接服务器(地址,端口,超时时间)
mosquitto_loop_start(client); // 开启客户端线程
while (isConnected == 0)
; // 等待连接成功(当连接成功后isConnected会被置为1然后跳出循环)
int msg_id = atoi(argv[1]); // 消息ID
char *topic = (char *)argv[2]; // 主题
char *msg = (char *)argv[3]; // 消息
int flag = mosquitto_publish(client, &msg_id, topic, strlen(msg), msg, 1, true); // 发布消息
// 发布成功
if (flag == MOSQ_ERR_SUCCESS)
{
printf("---消息发布成功: %s(%d):%s---\n", topic, msg_id, msg);
mosquitto_destroy(client);
mosquitto_lib_cleanup();
}
return 0;
}

View File

@ -15,6 +15,7 @@
#include <unistd.h>
#include <cjson/cJSON.h> // cJSON 的头文件
#include "includes/sqlite3.h" // sqlite3 的头文件
#include <time.h> // time_t 用于获取系统时间
struct mosquitto *client; // mosquitto 客户端
int isRunning = 1; // 线程运行标志
@ -51,11 +52,74 @@ void on_message(struct mosquitto *mosq,
{
printf("收到%s(%d)的消息: %s\n", message->topic, message->mid,
(char *)message->payload);
if (strncmp((char *)message->payload, "exit", 4) == 0)
// if (strncmp((char *)message->payload, "exit", 4) == 0)
// {
// isRunning = 0;
// mosquitto_loop_stop(client, true);
// }
// 解析 JSON 数据
cJSON *root_json = cJSON_Parse((char *)message->payload);
if (root_json == NULL)
{
isRunning = 0;
mosquitto_loop_stop(client, true);
printf("JSON 数据解析失败!\n");
return;
}
// 获取 JSON 数据中的各个字段
// cJSON *id = cJSON_GetObjectItem(root_json, "id");
cJSON *temperature = cJSON_GetObjectItem(root_json, "temperature");
cJSON *humidity = cJSON_GetObjectItem(root_json, "humidity");
// 拿出字段的值
double temperature_value = temperature->valuedouble;
double humidity_value = humidity->valuedouble;
// // 获取当前日期和时间
// time_t current_time;
// time(&current_time);
// // 将时间转换为本地时间
// struct tm *localtime = localetime(&current_time);
printf("温度: %.2lf, 湿度: %.2lf\n", temperature_value, humidity_value);
// 打开数据库
sqlite3 *db = NULL; // 数据库连接句柄
int ret = sqlite3_open("../sql_base/green_house.db", &db); // 打开数据库
if (ret != SQLITE_OK)
{
printf("数据库打开失败!\n");
return;
}
// 构建插入语句
char insert_sql[128] = "";
sprintf(insert_sql,
"insert into temp_hum_info (temperature,humidity) values(%.2lf,%.2lf);",
temperature_value, humidity_value); // 构建插入语句
printf("insert_sql is %s\n", insert_sql);
// 执行插入语句
char *errmsg = NULL;
ret = sqlite3_exec(db, insert_sql, NULL, NULL, &errmsg);
if (ret != SQLITE_OK)
{
printf("插入数据失败!\n");
return;
}
else
{
printf("插入数据成功!\n");
}
// 关闭数据库
sqlite3_close(db);
// 释放 cJSON_Parse() 分配的内存
cJSON_Delete(root_json);
}
int main(int argc, char const *argv[])
{
@ -86,7 +150,8 @@ int main(int argc, char const *argv[])
mosquitto_message_callback_set(client, on_message); // 设置接收消息回调函数,用于接收到消息后的处理
// 3. 连接mqtt broker
int flag = mosquitto_connect(client, "flykhan.com", 1883, 60);
// int flag = mosquitto_connect(client, "localhost", 1883, 60);
int flag = mosquitto_connect(client, "flykhan.com", 1883, 60*60*24*30); // 连接服务器(地址,端口,超时时间) : 超时时间为一个月
if (flag == MOSQ_ERR_SUCCESS)
{
printf("-----连接MQTT 服务器成功!-----\n");
@ -108,7 +173,7 @@ int main(int argc, char const *argv[])
{
// 处理网络事件
mosquitto_loop_start(client); // 接收网络数据
// usleep(500);
usleep(500);
}
// 关闭mosquitto的客户端