Queue 队列的三种数据传递

Queue 队列的三种数据传递

官方文档函数参考 Queue API [[FreeRTOS_Reference_Manual_V10.0.0.pdf#page=157&selection=4,0,4,9|FreeRTOS_Reference_Manual_V10.0.0, page 157]] 官方文档参考 4.2 Charact

官方文档函数参考

Queue API

[[FreeRTOS_Reference_Manual_V10.0.0.pdf#page=157&selection=4,0,4,9|FreeRTOS_Reference_Manual_V10.0.0, page 157]]

官方文档参考

4.2 Characteristics of a Queue

[[FreeRTOS_Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide.pdf#page=130&selection=5,0,7,26|FreeRTOS_Mastering_the_FreeRTOS_Real_Time_Kernel-A_Hands-On_Tutorial_Guide, page 130]]


队列

queue_animation.gif

队列是任务间通信的主要形式。它们可以用于在任务之间和中断之间发送消息。在大多数情况下,它们作为线程安全的 FIFO(先进先出)缓冲区使用。新数据被发送到队列的后面,尽管也可以发送到队列前面。

队列的三种数据传递

  1. 整型 [[10. Queue 队列的三种数据传递#^fe78d5]]
  2. 结构体 [[10. Queue 队列的三种数据传递#^3bba53]]
  3. 指针 [[10. Queue 队列的三种数据传递#^67b50b]]

xQueueCreate():

#include "FreeRTOS.h"
#include "queue.h"

QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength, UBaseType_t uxItemSize );

函数说明:
创建一个新队列,返回队列句柄。

参数:

  • uxQueueLength:队列的长度。
  • uxItemSize:队列中的宽度,即单个数据的大小。

返回值:

  • NULL:无法创建队列,因为没有足够的内存空间。
  • Any other value:队列创建成功,返回队列句柄。

xQueueSend(), xQueueSendToFront(), xQueueSendToBack():

#include "FreeRTOS.h"
#include "queue.h"

BaseType_t xQueueSend( QueneHandle_t xQueue, const void *pvItemToQueue, TickType_t xTicksToWait );
BaseType_t xQueueSendToFront( QueueHandle_t xQueue, const void *pvItemToQueue, TickType_t xTickToWait );
BaseType_t xQueueSendToBack( QueneHandle_t xQueue, const void *pvItemToQueue, TickType_t xTicksToWait );

函数说明:
从前面或者后面向 Queue 发送(写入)单个数据。

  • xQueneSend()xQueueSendToBack()是一样的,只不过xQueueSend()是原始版本,所以目前更建议用xQueueSendToBack()

参数:

  • xQueue:用于发送数据的句柄。
  • pvItemToQueue:指向数据的指针,用来吧数据复制到队列。
  • xTicksToWait:超时等待时间,队列会等待有空间可以放置数据。

返回值:

  • pdPASS:数据成功发送。
  • errQUEUE_FULL:队列空间不足。

xQueueReceive()

#include "FreeRTOS.h"
#include "queue.h"

BaseType_t xQueueReceive( QueueHandle_t xQueue, void *pvBuffer, TickType_t xTickToWait );

函数说明:
从队列中接受数据。

参数:

  • xQueue:用于接收数据的队列。
  • pvBuffer:存储接收数据的缓冲区。
  • xTickToWait:超时等待时间。

返回值:

  • pdPASS:成功接收数据的返回。
  • errQUEUE_EMPTY:因为队列为空,所以接收数据失败。

uxQueueMessagesWaiting()

^4948b6

#include "FreeRTOS.h"
#inlcude "queue.h"

UBaseType_t uxQueueMessagesWaiting( const QueueHandle_t xQueue );

函数说明:
返回队列中现存数据的数量。

参数:

  • xQueue:用于获取数据数量的队列句柄。

返回:

  • 队列中现存数据的数量。

uxQueueSpacesAvailable()

#include "FreeRTOS.h"
#include "queue.h"

UBaseType_t uxQueueSpacesAvailable( const QueueHandle_t xQueue );

函数说明:
获取消息队列当前剩余可用空间。

参数:

  • xQueue:要获取可用空间的消息队列句柄。

返回:

  • (UBaseType_t)返回消息队列剩余可用空间。

整形数据(int)

^fe78d5

示例代码

#include <stdio.h>
#include <inttypes.h>
#include "sdkconfig.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_chip_info.h"
#include "esp_flash.h"
#include "esp_system.h"

#include "esp_tasK_wdt.h"
#include "freertos/queue.h"

void send_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara; // 初始化队列句柄,赋值为传入的句柄
    BaseType_t queue_status; // 队列发送状态返回值

    char i = 0;

    while (true) {
        queue_status = xQueueSendToBack(QHandle, &i, 0); // 发送数据,并获取发送函数的返回值
        if (pdPASS == queue_status) {
            printf("Data sent successfully! Data: %d\n", i);
        } else {
            printf("Data send failed!\n");
        }
        i++;

        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }
}

void receive_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara; // 初始化队列句柄,赋值为传入的句柄
    BaseType_t queue_status; // 读取函数返回值

    char receiveBuffer = 0;
    
    while (true) {
        queue_status = xQueueReceive(QHandle, &receiveBuffer, 0); // 接收数据,并获取读取函数的返回值
        if (queue_status == pdPASS) {
            printf("Data received successfully! Data: %d\n", receiveBuffer);
        } else {
            printf("Data received failed!\n");
        }
        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }
}

void app_main(void)
{
    QueueHandle_t QHandle; // 队列句柄,作为参数传入任务函数
    QHandle = xQueueCreate(5, sizeof(int)); // 设置队列宽度为长度为 5, 宽度为 sizeof(int)

    TaskHandle_t send_task_handle = NULL;
    TaskHandle_t receive_task_handle =NULL;

    if (QHandle) {
        printf("Create queue successfully!\n");

        xTaskCreate(send_task, "Task1", 2048, (void *)QHandle, 1, &send_task_handle);
        xTaskCreate(receive_task, "Receive Task", 5120, (void *)QHandle, 1, &receive_task_handle);
    } else {
        printf("Can not create a Queue!\n");
    }
}

示例输出

10. Queue 队列的三种数据传递-20240613173709235.webp

可以看到数据收发正常。

整形数据发送进阶:

实际应用中,通常要考虑数据发送和接收是否同步,现在用uxQueueMessagesWaiting()函数来进一步理解Queue的运行机制。[[10. Queue 队列的三种数据传递#^4948b6]]

示例代码

#include <stdio.h>
#include <inttypes.h>
#include "sdkconfig.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_chip_info.h"
#include "esp_flash.h"
#include "esp_system.h"

#include "esp_tasK_wdt.h"
#include "freertos/queue.h"

void send_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t queue_status;
    UBaseType_t queue_amount; // 存储队列中数据数量

    int i = 0;

    while (true) {
        queue_status = xQueueSendToBack(QHandle, &i, 0);        
        if (pdPASS == queue_status) {
            printf("Data sent successfully! Data: %d\n", i);
	        i++; // 数据发送成功后再变更数据
        } else {
            queue_amount = uxQueueMessagesWaiting(QHandle); // 获取队列中数据数量
            printf("Queue is already full, waiting... Number of item in the queue now: %d\n", queue_amount);
        }

        vTaskDelay(100 / portTICK_PERIOD_MS);
    }
}

void receive_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t queue_status;
    UBaseType_t queue_amount;

    int receiveBuffer = 0;
    
    while (true) {
        queue_status = xQueueReceive(QHandle, &receiveBuffer, 0);
        if (queue_status == pdPASS) {
            printf("Data received successfully! Data: %d\n", receiveBuffer);

            queue_amount = uxQueueMessagesWaiting(QHandle);
            printf("Number of item in the queue: %d\n", queue_amount);
        } else {
            printf("Data received failed!\n");
        }
        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }
}

void app_main(void)
{
    QueueHandle_t QHandle;
    QHandle = xQueueCreate(5, sizeof(int));

    TaskHandle_t send_task_handle = NULL;
    TaskHandle_t receive_task_handle =NULL;

    if (QHandle) {
        printf("Create queue successfully!\n");

        xTaskCreate(send_task, "Task1", 2048, (void *)QHandle, 1, &send_task_handle);
        xTaskCreate(receive_task, "Receive Task", 5120, (void *)QHandle, 1, &receive_task_handle);
    } else {
        printf("Can not create a Queue!\n");
    }
}

代码中,send_task的运行频率要高于received_task,这就导致接收任务不能同步接收发送任务的数据。所以,在发送任务中,我们来检测是否发送成功,如果发送不成功,那么就等待下一次发送,同时,在下一次发送的时候,再将数据变更。

示例输出

10. Queue 队列的三种数据传递-20240613180057928.webp

我们可以观察到,send_task发送得非常快,但是receive_task接受得很慢,这就导致数据收发不同步。但是为了避免数据丢失,在队列充满之后,发送任务不再变更数据,而是等待队列有空余位置之后,再变更数据并发送。

结构体传输

^3bba53

示例代码

#include <stdio.h>
#include <inttypes.h>
#include "sdkconfig.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_chip_info.h"
#include "esp_flash.h"
#include "esp_system.h"

#include "esp_tasK_wdt.h"
#include "freertos/queue.h"

// 定义结构体数据类型
typedef struct Book {
    char title[50];
    char author[50];
    int book_id;
} book;

void send_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t send_state; // 获取发送结果
    UBaseType_t queue_space_cur; // 获取当前可用空间

    book book_send[3] = {
        { "平凡的世界", "路遥", 1 },
        { "苏菲的世界", "AUTHOR", 2 },
        { "三体", "刘慈欣", 3 }
    };

    while (true) {
        for (int i = 0; i < 3; i++) {
            send_state = xQueueSendToBack(QHandle, &book_send[i], 0); // 发送结构体数据并获取发送状态

            if (send_state == pdPASS) {
                printf("[SEND TASK]: Data send successfully.\n");
                printf("[SEND TASK]: Title: %s, Author: %s, Book_ID: %d\n", book_send[i].title, book_send[i].author, book_send[i].book_id);
            } else {
				// 获取消息队列剩余可用空间,如果没有空间,则下次不再发送,并且 i 自减 1 ,防止漏发数据。
				// 此处可以优化,为了防止忙等待,可以在判断消息队列没有可用空间后,适当延时一段时间,以避免无效占用 CPU 时间。
                queue_space_cur = uxQueueSpacesAvailable(QHandle);
                if (queue_space_cur == 0) {
                    printf("[SEND TASK]: Queue is already full, waiting...\n");
                    i--;
                } else {
                    printf("[SEND TASK]: Unknow error for failed.\n");
                }
            }

            vTaskDelay(1000 / portTICK_PERIOD_MS);
        }
    }
}

void receive_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t receive_state; // 接收状态
    UBaseType_t queue_space_hold; // 存储还有多少消息在消息队列中

    book book_receive[3];

    while (true) {
        for (int i = 0; i < 3; i++) {
            receive_state = xQueueReceive(QHandle, &book_receive[i], 0);

            if (receive_state == pdPASS) {
                printf("[RECEIVE TASK]: Receiving data successfully.\n");
                printf("[RECEIVE TASK]: Title: %s, Author: %s, Book_ID: %d\n", book_receive[i].title, book_receive[i].author, book_receive[i].book_id);
            } else {
				// 如果消息队列中没有消息,则暂停接收,同时 i 自减 1 ,为了防止漏接消息。
				// 同理,这里也可以跟发送任务一样进行延时优化,避免忙等待。
                queue_space_hold = uxQueueMessagesWaiting(QHandle);
                if (!queue_space_hold) {
                    printf("[RECEIVE TASK]: Queue is empty now, waiting for data into the queue...\n");
                }
            }

            vTaskDelay(300 / portTICK_PERIOD_MS);
        }
    }
}

void app_main(void)
{
    QueueHandle_t QHandle = xQueueCreate(3, sizeof(book));

    if (QHandle) {
        printf("[MAIN]: QueueHandle_t create successfully.\n");

        xTaskCreate(send_task, "Send Task", 2048, (void *)QHandle, 1, NULL);
        xTaskCreate(receive_task, "Receive Task", 2048, (void *)QHandle, 1, NULL);
    } else {
        printf("[MAIN]: Failed to create a Queue.\n");
    }
}

通过调整两个任务函数的延时,可以观察到不同的输出结果:“队列满/队列空“”。

传递指针

^67b50b

含义

  • 把指针所指向的大数据的内容从一个任务传递到另外一个任务

注意

  • 在传递指针时应非常小心地处理内存的分配和释放。

示例代码

#include <stdio.h>
#include <inttypes.h>
#include "sdkconfig.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_chip_info.h"
#include "esp_flash.h"
#include "esp_system.h"

#include "esp_tasK_wdt.h"
#include "freertos/queue.h"

void sender_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t queue_state = 0;

    char *bufferToSend = (char *)malloc(100); // 存储数据的指针
    int num = 123;

    // while (true) {
    for (int i = 0; i < 4; i++) {
        snprintf(bufferToSend, 100, "\nHello, Moker!\nIt's a 100 length of string.\nThe value: %d", num);

        queue_state = xQueueSend(QHandle, &bufferToSend, 0);
        if (queue_state == pdPASS) {
            printf("[SENDER TASK]: Data send successfully.\n");
            printf("[SENDER TASK]: Data: %s\n", bufferToSend);
        } else {
            printf("[SENDER TASK]: Data send failed.\n");
        }
        num++;

        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }

	vTaskDelete(NULL);
}

void receiver_task(void *pvPara)
{
    QueueHandle_t QHandle = (QueueHandle_t)pvPara;
    BaseType_t queue_state = 0;

    char *bufferToReceive;

    // while (true) {
    for (int i = 0; i < 3; i++) {
        queue_state = xQueueReceive(QHandle, &bufferToReceive, 0);
        if (queue_state == pdPASS) {
            printf("[RECEIVER TASK]: Data receive successfully.\n");
            printf("[RECEIVER TASK]: Data: %s\n", bufferToReceive);
        } else {
            printf("[RECEIVER TASK]: Data receive error.\n");
        }

        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }

	// 堆上分配的内存,在任何时候都应该注意释放
    if (bufferToReceive != NULL) {
        free(bufferToReceive);
	    bufferToReceive = NULL; // 指针释放后,指向 NULL ,因为释放 NULL 是安全的,所以在其他任务重复释放也不会引发重复释放的严重故障
    }
    vTaskDelete(NULL);
}

void app_main(void)
{
    QueueHandle_t QHandle = xQueueCreate(5, sizeof(char *)); // 消息队列的长宽分别定义为 5, sizeof(char *)

    if (QHandle) {
        xTaskCreate(sender_task, "Sender Task", 2048, (void *)QHandle, 1, NULL);
        xTaskCreate(receiver_task, "Receiver Task", 2048, (void *)QHandle, 1, NULL);
    } else {
        printf("Not have enough memory to create a queue.\n");
    }

    while (true) {
        printf("All tasks end.\n");
        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }
}

此处应补充说明数组指针使用的具体事项:[[数组指针详解]]。

示例输出

10. Queue 队列的三种数据传递-20240614004319683.webp
10. Queue 队列的三种数据传递-20240614004342789.webp

可以观察到任务收发正常。

Comment