mqtt应用于进程间通信

前言

这是以socket来作为进程间通信的方式,并且这个demo是基于Windows写的,需要包含Windows特定的头文件。

本篇笔记我们把上面这个综合demo改为:

我们用mqtt来作为进程间通信的方式,在Linux下进程测试。

先贴代码:

json_print进程源码

「json_print.c:」

左右滑动查看全部代码>>>

/*
- 程序功能: 组JSON格式数据包并发送(MQTT发布者客户端程序)
- 编译命令: gcc cJSON.c json_print.c -L ../mosquitto/build/lib -lmosquitto -o json_print
- 导出mosquitto动态库: export LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
- 作者:ZhengN
- 公众号:嵌入式大杂烩
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "cJSON.h"
#include "../mosquitto/lib/mosquitto.h"

#define  STU_NAME_LEN  32

/* 学生结构体 */
typedef struct _Student
{
    char name[STU_NAME_LEN];  // 名字      
    int num;                  // 学号      
    int c_score;              // C语言分数
}StudentDef, *pStudentDef;

/* 内部函数声明 */
static StudentDef StudentData_Prepare(void);
static char *StudentsData_Packet(pStudentDef _Stu);
static void StudentData_Send(const char *_data);
static void MqttClientInit(void);
static void MqttClientClean(void);

bool clean_session = true;
struct mosquitto *mosq = NULL;

/********************************************************************************************************
** 函数: main
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/
int main(void)
{
    StudentDef stu = {0};
    char *stu_data = NULL;
    int stu_count = 0;
    int i = 0;

/* MQTT客户端初始化 */
    MqttClientInit();

/* 需要登记的学生总人数 */
    printf("Please input number of student: ");
    scanf("%d", &stu_count);

while (i++ < stu_count)
    {
        /* 准备数据 */
        stu = StudentData_Prepare();

/* JSON格式数据组包 */
        stu_data = StudentsData_Packet(&stu);

/* 发送数据 */
        StudentData_Send(stu_data);
    }

/* 回收操作 */
    MqttClientClean();

return 0;
}

/********************************************************************************************************
** 函数: StudentData_Prepare, 准备组包需要的数据
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 获取得到的数据
********************************************************************************************************/
static StudentDef StudentData_Prepare(void)
{
    char name[STU_NAME_LEN] = {0};
    int num = 0;
    int c_score = 0;
    StudentDef stu;

/* 名字 */
    printf("Please input name: ");
    scanf("%s", name);
    if (strlen(name) < STU_NAME_LEN)
    {
        strncpy((char*)&stu.name, name, strlen(name)+1);
    }
    else
    {
        printf("The name is too long\n");
    }
    
    /* 学号 */
    printf("Please input num (0~100): ");
    scanf("%d", &num);
    stu.num = num;

/* C语言分数 */
    printf("Please input c_score (0~100): ");
    scanf("%d", &c_score);
    stu.c_score = c_score;

return stu;
}

/********************************************************************************************************
** 函数: StudentsData_Packet, JSON格式数据组包
**------------------------------------------------------------------------------------------------------
** 参数: _Stu:组student json数据包需要的数据
** 说明: 
** 返回: JSON格式的字符串 
********************************************************************************************************/
static char *StudentsData_Packet(pStudentDef _Stu)
{
    char *res_string = NULL;    // 返回值
    cJSON *name = NULL;         // 名字
    cJSON *num = NULL;          // 学号
    cJSON *c_score = NULL;      // C语言分数

/* 创建一个JSON对象,{}扩起来 */
    cJSON *obj = cJSON_CreateObject();
    if (obj == NULL)
    {
        goto end;
    }

/* 创建 "name": "xxx" 键值对 */
    name = cJSON_CreateString(_Stu->name);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "name", name);

/* 创建 "num": 207 键值对 */
    num = cJSON_CreateNumber(_Stu->num);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "num", num);
    
    /* 创建 "c_score": 95 键值对 */
    c_score = cJSON_CreateNumber(_Stu->c_score);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "c_score", c_score);

res_string = cJSON_Print(obj);          // 呈现为JSON格式 
    // res_string = cJSON_PrintUnformatted(obj);   // 呈现为无格式

if (res_string == NULL)
    {
        fprintf(stderr, "Failed to print monitor.\n");
    }

/* 异常情况统一Delete(free) */
end:
    cJSON_Delete(obj);
    return res_string;
}

/********************************************************************************************************
** 函数: StudentData_Send, JSON格式字符串数据组包发送
**------------------------------------------------------------------------------------------------------
** 参数: _data:要发送的数据
** 说明: 
** 返回: JSON格式的字符串 
********************************************************************************************************/
static void StudentData_Send(const char *_data)
{
    printf("%s: %s\n\n", __FUNCTION__, _data);

/* 发布消息 */
    mosquitto_publish(mosq, NULL, "test_topic", strlen(_data)+1, (const char*)_data, 0, 0);
}

/********************************************************************************************************
** 函数: MqttClientInit, MQTT客户端初始化
**------------------------------------------------------------------------------------------------------
** 参数: void
** 说明: 
** 返回: 
********************************************************************************************************/
static void MqttClientInit(void)
{
    /* libmosquitto 库初始化 */
    mosquitto_lib_init();

/* 创建mosquitto客户端 */
    mosq = mosquitto_new(NULL, clean_session, NULL);
    if(NULL == mosq)
    {
        printf("Create mqtt client failed...\n");
        mosquitto_lib_cleanup();
        return;
    }

/* 连接服务器 */
    if(mosquitto_connect(mosq, "localhost", 1883, 60))
    {
        printf("Unable to connect...\n");
        return;
    }

/* 网络消息处理线程 */
    int loop = mosquitto_loop_start(mosq);
    if(loop != MOSQ_ERR_SUCCESS)
    {
        printf("mosquitto loop error\n");
        return;
    }
}

/********************************************************************************************************
** 函数: MqttClientClean, MQTT客户端清理操作
**------------------------------------------------------------------------------------------------------
** 参数: void
** 说明: 
** 返回: 
********************************************************************************************************/
static void MqttClientClean(void)
{
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
}

json_parse进程源码

「json_parse.c:」

左右滑动查看全部代码>>>

/*
- 程序功能: 接收JSON数据并解析(MQTT订阅者客户端程序)
- 编译命令: gcc cJSON.c json_parse.c -L ../mosquitto/build/lib -lmosquitto -o json_parse
- 导出mosquitto动态库: export LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
- 作者:ZhengN
- 公众号:嵌入式大杂烩
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "cJSON.h"
#include "../mosquitto/lib/mosquitto.h"

#define  STU_NAME_LEN  32

/* 学生结构体 */
typedef struct _Student
{
    char name[STU_NAME_LEN];  // 名字      
    int num;                  // 学号      
    int c_score;              // C语言分数
}StudentDef, *pStudentDef;

/* 内部函数声明 */
static void StudentsData_Parse(pStudentDef _Stu, const char *_JsonStudnetData);
static void PrintParseResult(const pStudentDef _Stu);
static void SaveParseResult(const pStudentDef _Stu);
static void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message);
static void my_connect_callback(struct mosquitto *mosq, void *userdata, int result);

/* 内部全局变量 */
static FILE *stu_fp = NULL;

/********************************************************************************************************
** 函数: main
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/
bool clean_session = true;
int main(void)
{         
    struct mosquitto *mosq = NULL;

/* libmosquitto 库初始化 */
    mosquitto_lib_init();

/* 创建mosquitto客户端 */
    mosq = mosquitto_new(NULL, clean_session, NULL);
    if(NULL == mosq)
    {
        printf("Create mqtt client failed...\n");
        mosquitto_lib_cleanup();
        return 1;
    }

/* 绑定连接、消息接收回调函数 */
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);

/* 连接服务器 */
    if(mosquitto_connect(mosq, "localhost", 1883, 60))
    {
        printf("Unable to connect...\n");
        return 1;
    }

/* 循环处理网络消息 */
    mosquitto_loop_forever(mosq, -1, 1);

/* 回收操作 */
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

return 0;
}

/********************************************************************************************************
** 函数: StudentsData_Parse, JOSN格式学生期末数据解析
**------------------------------------------------------------------------------------------------------
** 参数: _JsonStudnetData:JSON数据   _Stu:保存解析出的有用数据
** 说明: 
** 返回: 
********************************************************************************************************/
static void StudentsData_Parse(pStudentDef _Stu, const char *_JsonStudnetData)
{
    cJSON *student_json = NULL;   // student_json操作对象,可代表 {} 扩起来的内容
    cJSON *name = NULL;             
    cJSON *num = NULL;
    cJSON *c_score = NULL;
    
    /* 开始解析 */
    student_json = cJSON_Parse(_JsonStudnetData);
    if (NULL == student_json)
    {
        const char *error_ptr = cJSON_GetErrorPtr();
        if (error_ptr != NULL)
        {
            fprintf(stderr, "Error before: %s\n", error_ptr);
        }
        goto end;
    }

/* 解析获取name得值 */
    name = cJSON_GetObjectItemCaseSensitive(student_json, "name");
    if (cJSON_IsString(name) && (name->valuestring != NULL))
    {
        memset(&_Stu->name, 0, STU_NAME_LEN*sizeof(char));
        memcpy(&_Stu->name, name->valuestring, strlen(name->valuestring));
    }

/* 解析获取num的值 */
    num = cJSON_GetObjectItemCaseSensitive(student_json, "num");
    if (cJSON_IsNumber(num))
    {
        _Stu->num = num->valueint;
    }

/* 解析获取c_score的值 */
    c_score = cJSON_GetObjectItemCaseSensitive(student_json, "c_score");
    if (cJSON_IsNumber(c_score))
    {
        _Stu->c_score = c_score->valueint;
    }

end:
    cJSON_Delete(student_json);
}

/********************************************************************************************************
** 函数: PrintParseResult, 打印输出解析结果
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/
static void PrintParseResult(const pStudentDef _Stu)
{
    printf("name: %s, num: %d, c_score: %d\n\n", _Stu->name, _Stu->num, _Stu->c_score);
}

/********************************************************************************************************
** 函数: SaveParseResult, 保存解析结果
**------------------------------------------------------------------------------------------------------
** 参数: _Stu:需要保存的数据
** 说明: 
** 返回: 
********************************************************************************************************/
static void SaveParseResult(const pStudentDef _Stu)
{
    char write_buf[512] = {0};
    static int stu_count = 0;

/* 以可在文件末尾追加内容的方式打开文件 */
 if((stu_fp = fopen("ParseResult.txt", "a+")) == NULL)
 {
  printf("Open file error!\n");
  return exit(EXIT_FAILURE);
 }

/* 按指定格式写入文件 */
    snprintf(write_buf, 512, "name: %s, num: %d, c_score: %d\n", _Stu->name, _Stu->num, _Stu->c_score);
    size_t len = fwrite((char*)write_buf, 1, strlen(write_buf), stu_fp);

/* 文件位置指针偏移 */
    fseek(stu_fp, len * stu_count, SEEK_SET);
    stu_count++;

/* 关闭文件 */
    fclose(stu_fp);
}

/********************************************************************************************************
** 函数: my_message_callback, 消息接收回调函数
**------------------------------------------------------------------------------------------------------
** 参数: 
** 返回: 
********************************************************************************************************/
static void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    StudentDef stu = {0};

if(message->payloadlen)
    {
        printf("%s %s\n", message->topic, (char*)message->payload);

/* 解析JSON数据 */
        StudentsData_Parse(&stu, (const char*)message->payload);

/* 打印输出解析结果 */
        PrintParseResult(&stu);

/* 保存数据到文件 */
        SaveParseResult(&stu); 
    }
    else
    {
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

/********************************************************************************************************
** 函数: my_connect_callback, 连接回调函数
**------------------------------------------------------------------------------------------------------
** 参数: 
** 返回: 
********************************************************************************************************/
static void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result)
    {
        /* 订阅test_topic主题的消息 */
        mosquitto_subscribe(mosq, NULL, "test_topic", 0);
    }
    else
    {
        fprintf(stderr, "Connect failed\n");
    }
}

编译运行

1、编译生成json_parse、json_print程序:

左右滑动查看全部代码>>>

gcc cJSON.c json_parse.c -L ../mosquitto/build/lib -lmosquitto -o json_parse
gcc cJSON.c json_print.c -L ../mosquitto/build/lib -lmosquitto -o json_print    

(0)

相关推荐

  • 树莓派上的MQTT环境搭建

    前言最近实验室准备招新生,但现在可以拿出来演示诱拐新生的DEMO一个都没,于是老板要求我做一个房间定位的系统.原理和设备也很简单.一个Beacon收集器,和若干个beacon.通过Beacon收集器收 ...

  • cJSON库的安装与使用

    介绍 cJSON 库是C语言中的最常用的 JSON 库.github 地址是 https://github.com/DaveGamble/cJSON . 安装 环境是 Ubuntu 16.04.需要先 ...

  • 基于RabbitMQ的MQTT使用

    因为公司业务需求用到MQTT,简单记录一下MQTT的部署,以及MQTTBOX的使用.首先需要安装好RabbitMQ,没安装可以参考我之前写的分享在centos7环境下安装RabbitMQ使用MQTT插 ...

  • 威纶通触摸屏通过MQTT和阿里云平台连接的方法

    [导读]MQTT协议在物联网中应用广泛,自动化产品也和物联网.互联网.大数据等结合越来越广广泛,本文详细介绍威纶通触摸屏利用EasyBuilder Pro的MQTT元件与阿里云平台连接,并通过Iot- ...

  • 一款Modbus 转为OPC UA、MQTT、华为云IoT、AWS IoT、阿里云IoT网关

    BL101是一款Modbus RTU.Modbus TCP转换为Modbus TCP.OPC UA.MQTT.华为云IoT.AWS IoT.阿里云IoT.金鸽云等协议的网关. BL101下行支持:Mo ...

  • 消息推送标准协议:MQTT

    随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,M2M)的大规模信息沟通成为重要的课堂,之前HTTP的请求/回答(Request/Re ...

  • Modbus转MQTT网关介绍

    Modbus转MQTT网关介绍: 众所周知,在Internet中,网关是一种连接内部网与Internet上其它网的中间设备,而在物联网的体系架构中,我们知道在感知层和网络层两个不同的网络之间需要一个中 ...

  • NB-IoT,LORA,wifi,蓝牙,zigbee,MQTT,COAP之间的关系

    从事物联网的人员一定有听过NB-IoT,wifi,蓝牙,zigbee,MQTT,COAP这几个词,相信大家还是云里雾里的.只有理清这几个词大家才能做出正确的方案,开发出一款好的产品.今天我在这里为你们 ...

  • STM32移植MQTT连接阿里云物联网平台

    前言前段时间尝试了一下阿里云物联网平台,功能还挺强大的,有些经验在此分享一下. 硬件 l单片机:STM32F103C8T6 l网络连接:ESP8266 WIFI模块 l传感器:PM2.5传感器.温湿度 ...

  • MQTT QOS &amp; Retained Message

    项目终于基本完成了,记录一下,由于客户端误设置了发布保留消息,导致服务器每次上线都会收到客户端最后一条消息,先去解读并改了QOS的设置,并没有用,然后才发现有Retained Message的设置,直 ...

  • 进程间通信之消息队列

    这两天在复习Linux应用编程,感谢杨宗德老师的书<Linux高级程序设计>,写得非常通俗易懂又不乏严谨,一路看下去非常顺利,即便第三版的书中有些小的编辑错误,但不影响阅读.我这两天的文章 ...