Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
mpsc_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_core/mpsc_queue.h
10//! @brief Multi-producer single-consumer queue.
11
12#ifndef ROC_CORE_MPSC_QUEUE_H_
13#define ROC_CORE_MPSC_QUEUE_H_
14
15#include "roc_core/atomic_ops.h"
20#include "roc_core/panic.h"
21
22namespace roc {
23namespace core {
24
25//! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26//!
27//! Provides sequential consistency.
28//!
29//! Based on Dmitry Vyukov algorithm:
30//! - http://tiny.cc/3d3moz
31//! - https://int08h.com/post/ode-to-a-vyukov-queue/
32//! - https://github.com/samanbarghi/MPSCQ
33//!
34//! @tparam T defines object type, it must inherit MpscQueueNode.
35//!
36//! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37//! element ownership when it's added to the queue and release ownership when it's
38//! removed from the queue.
39//!
40//! @tparam Node defines base class of queue nodes. It is needed if MpscQueueNode
41//! is used with non-default tag.
42template <class T,
43 template <class TT> class OwnershipPolicy = RefCountedOwnership,
44 class Node = MpscQueueNode<> >
45class MpscQueue : public NonCopyable<> {
46public:
47 //! Pointer type.
48 //! @remarks
49 //! either raw or smart pointer depending on the ownership policy.
51
52 ~MpscQueue() {
53 // release ownership of all objects
54 while (pop_front_exclusive()) {
55 }
56 }
57
58 //! Add object to the end of the queue.
59 //! Can be called concurrently.
60 //! Acquires ownership of @p elem.
61 //! After this call returns, any thread calling pop_front_exclusive() or
62 //! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
63 //! that the latter can still fail if called concurrently with push_back().
64 //! @note
65 //! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
66 //! and wait-free, i.e. it never waits for sleeping threads and never spins.
67 //! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
68 //! but not wait-free, i.e. it never waits for sleeping threads, but with a low
69 //! probability can spin while there are concurrent non-sleeping push_back()
70 //! calls (because of the spin loop in the implementation of atomic exchange).
71 //! - Concurrent try_pop_front() and pop_front() does not affect this operation.
72 //! Only concurrent push_back() calls can make it spin.
73 void push_back(T& elem) {
75
76 MpscQueueData* data = to_node_data_(elem);
77 impl_.push_back(data);
78 }
79
80 //! Try to remove object from the beginning of the queue (non-blocking version).
81 //! Should NOT be called concurrently.
82 //! Releases ownership of the returned object.
83 //! @remarks
84 //! - Returns NULL if the queue is empty.
85 //! - May return NULL even if the queue is actually non-empty, in particular if
86 //! concurrent push_back() call is running, or if the push_back() results were
87 //! not fully published yet.
88 //! @note
89 //! - This operation is both lock-free and wait-free on all architectures, i.e. it
90 //! never waits for sleeping threads and never spins indefinitely.
92 MpscQueueData* data = impl_.pop_front(false);
93 if (!data) {
94 return NULL;
95 }
96
97 Pointer elem = from_node_data_(data);
99
100 return elem;
101 }
102
103 //! Remove object from the beginning of the queue (blocking version).
104 //! Should NOT be called concurrently.
105 //! Releases ownership of the returned object.
106 //! @remarks
107 //! - Returns NULL if the queue is empty.
108 //! - May spin while a concurrent push_back() call is running.
109 //! @remarks
110 //! - This operation is NOT lock-free (or wait-free). It may spin until all
111 //! concurrent push_back() calls are finished.
112 //! - On the "fast-path", however, this operation does not wait for any
113 //! threads and just performs a few atomic reads and writes.
115 MpscQueueData* data = impl_.pop_front(true);
116 if (!data) {
117 return NULL;
118 }
119
120 Pointer elem = from_node_data_(data);
122
123 return elem;
124 }
125
126private:
127 static MpscQueueData* to_node_data_(T& elem) {
128 return static_cast<Node&>(elem).mpsc_queue_data();
129 }
130
131 static T* from_node_data_(MpscQueueData* data) {
132 return static_cast<T*>(static_cast<Node*>(Node::mpsc_queue_node(data)));
133 }
134
135 MpscQueueImpl impl_;
136};
137
138} // namespace core
139} // namespace roc
140
141#endif // ROC_CORE_MPSC_QUEUE_H_
void push_back(MpscQueueData *node)
Add object to the end of the queue.
MpscQueueData * pop_front(bool can_spin)
Remove object from the beginning of the queue.
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition mpsc_queue.h:45
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition mpsc_queue.h:50
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition mpsc_queue.h:91
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition mpsc_queue.h:114
void push_back(T &elem)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of elem....
Definition mpsc_queue.h:73
Base class for non-copyable objects.
Definition noncopyable.h:23
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Multi-producer single-consumer queue internal implementation.
MpscQueue node.
Root namespace.
Non-copyable object.
Ownership policies.
Panic.
MpscQueue node internal data.