smart-green-house-server-an.../cgi-bin/mqtt_sub_ctos.c

120 lines
3.8 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
用于接收 MQTT 服务器的消息
并将消息转发给 CG 程序
同时存入 sqlite3 数据库
订阅 ctos 的消息,即从设备传入 MQTT 服务器的传感器数据
*/
#include "includes/mosquitto.h" // MQTT 的头文件
#include <pthread.h>
#include <signal.h>
#include <stdbool.h> // bool 类型
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <cjson/cJSON.h> // cJSON 的头文件
#include "includes/sqlite3.h" // sqlite3 的头文件
struct mosquitto *client; // mosquitto 客户端
int isRunning = 1; // 线程运行标志
pthread_t main_th_id; // 主线程 ID
// sqlite3 连接的回调函数
void connect_callback(struct mosquitto *mosq, void *args, int rc)
{
if (rc == 0)
{
printf("连接MQTT服务成功!\n");
}
else
{
printf("连接MQTT服务失败!\n");
}
}
// sqlite3 订阅的回调函数
void on_subscribe(struct mosquitto *mosq,
void *obj,
int mid,
int qos_count,
const int *granted_qos)
{
printf("-订阅的消息ID为--%d 成功, Qos的granted: %d--\n", mid,
*granted_qos);
}
// sqlite3 接收消息的回调函数(处理接收到的消息,并将消息存入 sqlite3 数据库)
void on_message(struct mosquitto *mosq,
void *obj,
const struct mosquitto_message *message)
{
printf("收到%s(%d)的消息: %s\n", message->topic, message->mid,
(char *)message->payload);
if (strncmp((char *)message->payload, "exit", 4) == 0)
{
isRunning = 0;
mosquitto_loop_stop(client, true);
}
}
int main(int argc, char const *argv[])
{
main_th_id = pthread_self();
// 1. 初始化mosquitto的库环境
// 使用mosquitto库函数前要先初始化使用之后要清除。
mosquitto_lib_init(); // 初始化(固定格式)
// 2. 创建mosquitto的客户端
unsigned char userdata[128] = "1";
client = mosquitto_new("166", true, userdata); // 创建客户端句柄
// struct mosquitto* mosquitto_new(
// const char *id,//用户自定义标识ID
// bool clean_session, //断开后是否保留订阅信息true/false
// void *userdata //回调参数
// );
// d可以为NULLclean_session的标识必须是true, userdata也可以是NULL
if (client == NULL) // 创建失败
{
printf("创建mqtt客户端失败!\n");
perror("mosquitto_new\n");
return -1;
}
// 设置回调函数
mosquitto_connect_callback_set(client, connect_callback); // 设置连接回调函数,用于连接后的处理
mosquitto_subscribe_callback_set(client, on_subscribe); // 设置订阅回调函数,用于订阅后的处理
mosquitto_message_callback_set(client, on_message); // 设置接收消息回调函数,用于接收到消息后的处理
// 3. 连接mqtt broker
int flag = mosquitto_connect(client, "flykhan.com", 1883, 60);
if (flag == MOSQ_ERR_SUCCESS)
{
printf("-----连接MQTT 服务器成功!-----\n");
}
else
{
printf("-----连接MQTT 服务器失败!-----\n");
return -1;
}
int msgId = 1; // 消息ID
// 开始订阅信息
flag = mosquitto_subscribe(client, &msgId, "ctos", 0);
if (flag == MOSQ_ERR_SUCCESS)
{
printf("订阅消息ID: %d 成功, 等待消息!\n", msgId);
}
while (isRunning)
{
// 处理网络事件
mosquitto_loop_start(client); // 接收网络数据
// usleep(500);
}
// 关闭mosquitto的客户端
mosquitto_destroy(client);
// 最后清理库环境
mosquitto_lib_cleanup();
return 0;
}