1 单生产者单消费者无锁队列

Github项目地址:https://github.com/cameron314/readerwriterqueue
C++11 – 基于无锁队列的单生产者单消费者模型-StubbornHuang Blog

1.1 成员函数说明

  • try_enqueue:如果队列中有空间,则使元素的副本入队。如果元素已入队,则返回true,否则返回false。不分配内存。
  • enqueue : 使元素的副本进入队列。如果需要,分配一个额外的内存块。仅在内存分配失败时失败(返回false)。
  • try_dequeue : 尝试使元素出队; 如果队列为空,返回false。 如果队列中至少有一个元素,使用operator =将结果移到结果的前面,然后返回true。
  • peek : 返回一个指向队列中前元素的指针(接下来将通过调用try_dequeuepop删除)。 如果
    调用该方法时,队列显示为空,返回nullptr。必须仅从消费者线程调用。
  • pop : 从队列中删除前元素(如果有),而不返回它。成功返回true,如果队列当时为空则返回false。
  • size_approx :返回队列中当前的大概项目数。从生产者线程和使用者线程都可以安全调用。
  • max_capacity : 返回此队列为空时可排队但不分配的项目总数。 生产者线程和消费者线程均可安全调用。
  • emplace : 类似于enqueue(),但具有Emplace语义(就地构造)
  • try_emplace : 类似于try_enqueue(),但具有Emplace语义(就地构造)

1.2 使用

在项目中包含readerwriterqueue.h和atomicops.h

2 多生产者多消费者无锁队列

另外这个作者还搞了一个并发的无锁队列,
Github地址:https://github.com/cameron314/concurrentqueue
C++11 – 基于无锁队列的单生产者单消费者模型-StubbornHuang Blog

3 单生产者单消费者无锁模型

基于第一个项目的readerwriterqueue,做了以下代码测试:

#include <iostream>
#include <thread>

#include "readerwriterqueue.h"

using namespace moodycamel;


ReaderWriterQueue<int> q(100);

int globalCount = 0;
// 生产者线程
void Producer()
{
    while (true)
    {
        globalCount++;
        q.enqueue(globalCount);
    }
}

void Consumer()
{
    while (true)
    {
        int temp;
        q.try_dequeue(temp);
        std::cout <<temp << std::endl;
    }
}



int main()
{
    std::thread consumer_Thread(Consumer);
    std::thread producer_Thread(Producer);

    consumer_Thread.join();
    producer_Thread.join();

    getchar();
    return 0;
}

速度是真的快,果然如这个作者说的:最快的同步就是永不不会发生的同步。而且其压出队列的顺序做出了保证。
C++11 – 基于无锁队列的单生产者单消费者模型-StubbornHuang Blog

4 如果队列里面是指针,该如何正确释放内存

可参考以下代码:

#include <iostream>
#include <thread>
#include <vld.h>

#include "readerwriterqueue.h"
#include "concurrentqueue.h"

using namespace moodycamel;

class A {
public:
    int a;
};

ReaderWriterQueue<A*> q;

// 生产者线程
void Producer()
{
    for (int i = 0; i < 100000; ++i)
    {
        A* temp = new A();
        temp->a = i;
        q.enqueue(temp);
    }
}

void Consumer()
{
    while (true)
    {
        A* temp = NULL;
        if (q.try_dequeue(temp))
        {
            //std::cout << temp->a <<"队列数:"<<q.size_approx()<< std::endl;
            delete temp;
            temp = NULL;
        }
        else
        {
            std::cout << "执行完成" << std::endl;
            break;
        }   

    }
}



int main()
{
    Producer();
    std::cout << "开始执行" << std::endl;
    std::thread consumer_Thread(Consumer);

    consumer_Thread.join();

    getchar();
    return 0;
}