Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
mpsc_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_core/mpsc_queue.h
10 //! @brief Multi-producer single-consumer queue.
11 
12 #ifndef ROC_CORE_MPSC_QUEUE_H_
13 #define ROC_CORE_MPSC_QUEUE_H_
14 
15 #include "roc_core/atomic_ops.h"
18 #include "roc_core/noncopyable.h"
20 #include "roc_core/panic.h"
21 
22 namespace roc {
23 namespace core {
24 
25 //! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26 //!
27 //! Provides sequential consistency.
28 //!
29 //! Based on Dmitry Vyukov algorithm:
30 //! - http://tiny.cc/3d3moz
31 //! - https://int08h.com/post/ode-to-a-vyukov-queue/
32 //! - https://github.com/samanbarghi/MPSCQ
33 //!
34 //! @tparam T defines object type, it must inherit MpscQueueNode.
35 //!
36 //! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37 //! element ownership when it's added to the queue and release ownership when it's
38 //! removed from the queue.
39 //!
40 //! @tparam Node defines base class of queue nodes. It is needed if MpscQueueNode
41 //! is used with non-default tag.
42 template <class T,
43  template <class TT> class OwnershipPolicy = RefCountedOwnership,
44  class Node = MpscQueueNode<> >
45 class MpscQueue : public NonCopyable<> {
46 public:
47  //! Pointer type.
48  //! @remarks
49  //! either raw or smart pointer depending on the ownership policy.
50  typedef typename OwnershipPolicy<T>::Pointer Pointer;
51 
52  ~MpscQueue() {
53  // release ownership of all objects
54  while (pop_front_exclusive()) {
55  }
56  }
57 
58  //! Add object to the end of the queue.
59  //! Can be called concurrently.
60  //! Acquires ownership of @p elem.
61  //! After this call returns, any thread calling pop_front_exclusive() or
62  //! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
63  //! that the latter can still fail if called concurrently with push_back().
64  //! @note
65  //! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
66  //! and wait-free, i.e. it never waits for sleeping threads and never spins.
67  //! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
68  //! but not wait-free, i.e. it never waits for sleeping threads, but with a low
69  //! probability can spin while there are concurrent non-sleeping push_back()
70  //! calls (because of the spin loop in the implementation of atomic exchange).
71  //! - Concurrent try_pop_front() and pop_front() does not affect this operation.
72  //! Only concurrent push_back() calls can make it spin.
73  void push_back(T& elem) {
74  OwnershipPolicy<T>::acquire(elem);
75 
76  MpscQueueData* data = to_node_data_(elem);
77  impl_.push_back(data);
78  }
79 
80  //! Try to remove object from the beginning of the queue (non-blocking version).
81  //! Should NOT be called concurrently.
82  //! Releases ownership of the returned object.
83  //! @remarks
84  //! - Returns NULL if the queue is empty.
85  //! - May return NULL even if the queue is actually non-empty, in particular if
86  //! concurrent push_back() call is running, or if the push_back() results were
87  //! not fully published yet.
88  //! @note
89  //! - This operation is both lock-free and wait-free on all architectures, i.e. it
90  //! never waits for sleeping threads and never spins indefinitely.
92  MpscQueueData* data = impl_.pop_front(false);
93  if (!data) {
94  return NULL;
95  }
96 
97  Pointer elem = from_node_data_(data);
98  OwnershipPolicy<T>::release(*elem);
99 
100  return elem;
101  }
102 
103  //! Remove object from the beginning of the queue (blocking version).
104  //! Should NOT be called concurrently.
105  //! Releases ownership of the returned object.
106  //! @remarks
107  //! - Returns NULL if the queue is empty.
108  //! - May spin while a concurrent push_back() call is running.
109  //! @remarks
110  //! - This operation is NOT lock-free (or wait-free). It may spin until all
111  //! concurrent push_back() calls are finished.
112  //! - On the "fast-path", however, this operation does not wait for any
113  //! threads and just performs a few atomic reads and writes.
115  MpscQueueData* data = impl_.pop_front(true);
116  if (!data) {
117  return NULL;
118  }
119 
120  Pointer elem = from_node_data_(data);
121  OwnershipPolicy<T>::release(*elem);
122 
123  return elem;
124  }
125 
126 private:
127  static MpscQueueData* to_node_data_(T& elem) {
128  return static_cast<Node&>(elem).mpsc_queue_data();
129  }
130 
131  static T* from_node_data_(MpscQueueData* data) {
132  return static_cast<T*>(static_cast<Node*>(Node::mpsc_queue_node(data)));
133  }
134 
135  MpscQueueImpl impl_;
136 };
137 
138 } // namespace core
139 } // namespace roc
140 
141 #endif // ROC_CORE_MPSC_QUEUE_H_
MpscQueueData * pop_front(bool can_spin)
Remove object from the beginning of the queue.
void push_back(MpscQueueData *node)
Add object to the end of the queue.
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:45
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition: mpsc_queue.h:50
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition: mpsc_queue.h:91
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition: mpsc_queue.h:114
void push_back(T &elem)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of elem....
Definition: mpsc_queue.h:73
Base class for non-copyable objects.
Definition: noncopyable.h:23
Multi-producer single-consumer queue internal implementation.
MpscQueue node.
Root namespace.
Non-copyable object.
Ownership policies.
Panic.
MpscQueue node internal data.