Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
mpsc_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_core/mpsc_queue.h
10 //! @brief Multi-producer single-consumer queue.
11 
12 #ifndef ROC_CORE_MPSC_QUEUE_H_
13 #define ROC_CORE_MPSC_QUEUE_H_
14 
15 #include "roc_core/atomic_ops.h"
18 #include "roc_core/noncopyable.h"
20 #include "roc_core/panic.h"
21 
22 namespace roc {
23 namespace core {
24 
25 //! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26 //!
27 //! Provides sequential consistency.
28 //!
29 //! Based on Dmitry Vyukov algorithm:
30 //! - http://tiny.cc/3d3moz
31 //! - https://int08h.com/post/ode-to-a-vyukov-queue/
32 //! - https://github.com/samanbarghi/MPSCQ
33 //!
34 //! @tparam T defines object type, it should inherit MpscQueueNode.
35 //!
36 //! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37 //! element ownership when it's added to the queue and release ownership when it's
38 //! removed from the queue.
39 template <class T, template <class TT> class OwnershipPolicy = RefCountedOwnership>
40 class MpscQueue : public NonCopyable<> {
41 public:
42  //! Pointer type.
43  //! @remarks
44  //! either raw or smart pointer depending on the ownership policy.
45  typedef typename OwnershipPolicy<T>::Pointer Pointer;
46 
47  ~MpscQueue() {
48  // release ownership of all objects
49  while (pop_front_exclusive()) {
50  }
51  }
52 
53  //! Add object to the end of the queue.
54  //! Can be called concurrently.
55  //! Acquires ownership of @p obj.
56  //! After this call returns, any thread calling pop_front_exclusive() or
57  //! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
58  //! that the latter can still fail if called concurrently with push_back().
59  //! @note
60  //! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
61  //! and wait-free, i.e. it never waits for sleeping threads and never spins.
62  //! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
63  //! but not wait-free, i.e. it never waits for sleeping threads, but with a low
64  //! probability can spin while there are concurrent non-sleeping push_back()
65  //! calls (because of the spin loop in the implementation of atomic exchange).
66  //! - Concurrent try_pop_front() and pop_front() does not affect this operation.
67  //! Only concurrent push_back() calls can make it spin.
68  void push_back(T& obj) {
69  OwnershipPolicy<T>::acquire(obj);
70 
71  MpscQueueNode::MpscQueueData* node = obj.mpsc_queue_data();
72 
73  impl_.push_back(node);
74  }
75 
76  //! Try to remove object from the beginning of the queue (non-blocking version).
77  //! Should NOT be called concurrently.
78  //! Releases ownership of the returned object.
79  //! @remarks
80  //! - Returns NULL if the queue is empty.
81  //! - May return NULL even if the queue is actially non-empty, in particular if
82  //! concurrent push_back() call is running, or if the push_back() results were
83  //! not fully published yet.
84  //! @note
85  //! - This operation is both lock-free and wait-free on all architectures, i.e. it
86  //! never waits for sleeping threads and never spins indefinitely.
88  MpscQueueNode::MpscQueueData* node = impl_.pop_front(false);
89  if (!node) {
90  return NULL;
91  }
92 
93  Pointer obj = static_cast<T*>(node->container_of());
94  OwnershipPolicy<T>::release(*obj);
95 
96  return obj;
97  }
98 
99  //! Remove object from the beginning of the queue (blocking version).
100  //! Should NOT be called concurrently.
101  //! Releases ownership of the returned object.
102  //! @remarks
103  //! - Returns NULL if the queue is empty.
104  //! - May spin while a concurrent push_back() call is running.
105  //! @remarks
106  //! - This operation is NOT lock-free (or wait-free). It may spin until all
107  //! concurrent push_back() calls are finished.
108  //! - On the "fast-path", however, this operation does not wait for any
109  //! threads and just performs a few atomic reads and writes.
111  MpscQueueNode::MpscQueueData* node = impl_.pop_front(true);
112  if (!node) {
113  return NULL;
114  }
115 
116  Pointer obj = static_cast<T*>(node->container_of());
117  OwnershipPolicy<T>::release(*obj);
118 
119  return obj;
120  }
121 
122 private:
123  MpscQueueImpl impl_;
124 };
125 
126 } // namespace core
127 } // namespace roc
128 
129 #endif // ROC_CORE_MPSC_QUEUE_H_
Multi-producer single-consumer queue internal implementation class.
void push_back(MpscQueueNode::MpscQueueData *node)
Add object to the end of the queue.
MpscQueueNode::MpscQueueData * pop_front(bool can_spin)
Remove object from the beginning of the queue.
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
void push_back(T &obj)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of obj....
Definition: mpsc_queue.h:68
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition: mpsc_queue.h:110
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition: mpsc_queue.h:87
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition: mpsc_queue.h:45
Base class for non-copyable objects.
Definition: noncopyable.h:23
Multi-producer single-consumer queue internal implementation.
MpscQueue node.
Root namespace.
Non-copyable object.
Ownership policies.
Panic.
MpscQueueNode * container_of()
Get MpscQueueNode object that contains this ListData object.