Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
pipeline_loop.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_pipeline/pipeline_loop.h
10 //! @brief Base class for pipelines.
11 
12 #ifndef ROC_PIPELINE_PIPELINE_LOOP_H_
13 #define ROC_PIPELINE_PIPELINE_LOOP_H_
14 
15 #include "roc_audio/frame.h"
16 #include "roc_audio/sample_spec.h"
17 #include "roc_core/atomic.h"
18 #include "roc_core/mpsc_queue.h"
19 #include "roc_core/mutex.h"
20 #include "roc_core/noncopyable.h"
21 #include "roc_core/optional.h"
22 #include "roc_core/rate_limiter.h"
23 #include "roc_core/seqlock.h"
24 #include "roc_core/time.h"
25 #include "roc_packet/units.h"
26 #include "roc_pipeline/config.h"
30 
31 namespace roc {
32 namespace pipeline {
33 
34 //! Base class for task-based pipelines.
35 //!
36 //! Frames, tasks, and threads
37 //! --------------------------
38 //!
39 //! The pipeline processes frames and tasks. This processing is serialized. At every
40 //! moment, the pipeline is either processing a frame, processing a task, or doing
41 //! nothing.
42 //!
43 //! The pipeline does not have its own thread. Both frame and task processing happens
44 //! when the user calls one of the pipeline methods, in the context of the caller thread.
45 //! Methods may be called from different threads, concurrently. This complicates the
46 //! implementation, but allows to have different thread layouts for different use cases.
47 //!
48 //! Precise task scheduling
49 //! -----------------------
50 //!
51 //! This class implements "precise task scheduling" feature, which tries to schedule task
52 //! processing intervals smartly, to prevent time collisions with frame processing and
53 //! keep frame processing timings unaffected.
54 //!
55 //! Precise task scheduling is enabled by default, but can be disabled via config. When
56 //! disabled, no special scheduling is performed and frame and task processing compete
57 //! each other for the exclusive access to the pipeline.
58 //!
59 //! The sections below describe various aspects of the implementation.
60 //!
61 //! Task processing time slices
62 //! ---------------------------
63 //!
64 //! Tasks are processed between frames on dedicated time slices, to ensure that the
65 //! task processing wont delay frame processing, which should be as close to real-time
66 //! as possible.
67 //!
68 //! If frame is too large, it's split into sub-frames, to allow task processing between
69 //! these sub-frames. This is needed to ensure that the task processing delay would not
70 //! be too large, at least while there are not too much tasks.
71 //!
72 //! If frames are too small, tasks are processing only after some of the frames instead
73 //! of after every frame. This is needed to reduce task processing overhead when using
74 //! tiny frames.
75 //!
76 //! There are two types of time slices dedicated for task processing:
77 //! - in-frame task processing: short intervals between sub-frames
78 //! (inside process_frame_and_tasks())
79 //! - inter-frame longer intervals between frames
80 //! (inside process_tasks())
81 //!
82 //! process_frame_and_tasks() calls are to be driven by the user-defined pipeline
83 //! clock. It should be called exactly when it's time to process more samples. Our
84 //! goal is to provide it exclusive access to the pipeline as fast as possible
85 //! immediately after it's called.
86 //!
87 //! process_tasks() should be called by user when there are pending tasks that should
88 //! be processed and when no concurrent process_frame_and_tasks() call is running.
89 //! Our goal is to notify the user if and when it should be called.
90 //!
91 //! Asynchronous task processing
92 //! ----------------------------
93 //!
94 //! Since pipeline does not have its own thread, it can't schedule process_tasks()
95 //! invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler
96 //! object.
97 //!
98 //! When the pipeline wants to schedule asychronous process_tasks() invocation, it
99 //! calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and
100 //! on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal
101 //! invocation time.
102 //!
103 //! The pipeline may also cancel the scheduled task processing by invoking
104 //! IPipelineTaskScheduler::cancel_task_processing().
105 //!
106 //! In-place task processing
107 //! ------------------------
108 //!
109 //! If schedule() or schedule_and_wait() is called when the task queue is empty and the
110 //! current time point belongs to the task processing time slice, the new task is
111 //! processed in-place without waiting for the next process_frame_and_tasks() or
112 //! process_tasks() invocation. This allows to avoid extra delays and thread switches
113 //! when possible.
114 //!
115 //! Processing priority
116 //! -------------------
117 //!
118 //! When process_frame_and_tasks() is called, it increments pending_frame_ atomic
119 //! and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs
120 //! to be processed as soon as possible and other methods should give it a way.
121 //!
122 //! When process_frame_and_tasks() is called, it also cancels any scheduled
123 //! asynchronous task processing before starting processing the frame and tasks.
124 //! Before exiting, process_frame_and_tasks() checks if there are still some pending
125 //! tasks and if necessary, schedules asynchronous execution again.
126 //!
127 //! When process_tasks() is processing asynchronous tasks, but detects that
128 //! process_frame_and_tasks() was invoked concurrently from another thread, it gives
129 //! it a way and exits. process_frame_and_tasks() will process the frame and some of
130 //! the remaning tasks, and if there are even more tasks remaining, it will invoke
131 //! schedule_task_processing() to allow process_tasks() to continue.
132 //!
133 //! When schedule() and process_tasks() want to invoke schedule_task_processing(), but
134 //! detect that process_frame_and_tasks() was invoked concurrently from another thread,
135 //! they give it a way and don't call schedule_task_processing(), assuming that
136 //! process_frame_and_tasks() will either process all tasks or call
137 //! schedule_task_processing() by itself.
138 //!
139 //! Locking rules
140 //! -------------
141 //!
142 //! pipeline_mutex_ protects the internal pipeline state. It should be acquired to
143 //! process a frame or a task.
144 //!
145 //! scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to
146 //! schedule or cancel asycnrhonous task processing.
147 //!
148 //! If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will
149 //! check pending tasks after unlocking the mutex and will either process them or
150 //! scheduler asynchronous processing.
151 //!
152 //! If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will
153 //! either schedule or cancel asynchronous task processing, depending on whether
154 //! there are pending tasks and frames.
155 //!
156 //! Lock-free operations
157 //! --------------------
158 //!
159 //! schedule() and process_tasks() methods are lock-free. Also, they're either completely
160 //! wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware
161 //! architecture (see comments for core::MpscQueue).
162 //!
163 //! In practice it means that when running concurrently with other PipelineLoop method
164 //! invocations, they never block waiting for other threads, and usually even don't spin.
165 //!
166 //! This is archived by using a lock-free queue for tasks, atomics for 32-bit counters,
167 //! seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always
168 //! using try_lock() for mutexes and delaying the work if the mutex can't be acquired,
169 //! and using semaphores instead of condition variables for signaling (which don't
170 //! require blocking on mutex, at least on modern plarforms; e.g. on glibc they're
171 //! implemented using an atomic and a futex).
172 //!
173 //! process_frame_and_tasks() is not lock-free because it has to acquire the pipeline
174 //! mutex and can't delay its work. However, the precise task scheduling feature does it
175 //! best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks()
176 //! is invoked, thus in most cases it wont block or wait too.
177 //!
178 //! This approach helps us with our global goal of making all inter-thread interactions
179 //! mostly wait-free, so that one thread is never or almost never blocked when another
180 //! thead is blocked, preempted, or busy.
181 //!
182 //! Benchmarks
183 //! ----------
184 //!
185 //! PipelineLoop is covered with two groups of benchmarks:
186 //! - bench_pipeline_loop_peak_load.cpp measures frame and task processing delays with
187 //! or without task load and with or without precise task scheduling feature;
188 //! - bench_pipeline_loop_contention.cpp measures scheduling times under different
189 //! contention levels.
190 //!
191 //! You can run them using "roc-bench-pipeline" command. For further details, see
192 //! comments in the source code of the benchmarks.
193 class PipelineLoop : public core::NonCopyable<> {
194 public:
195  //! Enqueue a task for asynchronous execution.
197 
198  //! Enqueue a task for asynchronous execution and wait until it finishes.
199  //! @returns false if the task fails.
201 
202  //! Process some of the enqueued tasks, if any.
204 
205 protected:
206  //! Task processing statistics.
207  struct Stats {
208  //! Total number of tasks processed.
210 
211  //! Number of tasks processed directly in schedule() or schedule_and_wait().
213 
214  //! Number of tasks processed in process_frame_and_tasks().
216 
217  //! Number of times when other method was preempted by process_frame_and_tasks().
218  uint64_t preemptions;
219 
220  //! Number of time when schedule_task_processing() was called.
221  uint64_t scheduler_calls;
222 
223  //! Number of time when cancel_task_processing() was called.
225 
226  Stats()
230  , preemptions(0)
231  , scheduler_calls(0)
233  }
234  };
235 
236  //! Initialization.
238  const TaskConfig& config,
239  const audio::SampleSpec& sample_spec);
240 
241  virtual ~PipelineLoop();
242 
243  //! How much pending tasks are there.
244  size_t num_pending_tasks() const;
245 
246  //! How much pending frames are there.
247  size_t num_pending_frames() const;
248 
249  //! Get task processing statistics.
250  //! Returned object can't be accessed concurrently with other methods.
251  const Stats& get_stats_ref() const;
252 
253  //! Split frame and process subframes and some of the enqueued tasks.
255 
256  //! Get current time.
257  virtual core::nanoseconds_t timestamp_imp() const = 0;
258 
259  //! Get current thread id.
260  virtual uint64_t tid_imp() const = 0;
261 
262  //! Process subframe.
263  virtual bool process_subframe_imp(audio::Frame& frame) = 0;
264 
265  //! Process task.
266  virtual bool process_task_imp(PipelineTask& task) = 0;
267 
268 private:
269  enum ProcState { ProcNotScheduled, ProcScheduled, ProcRunning };
270 
271  bool process_subframes_and_tasks_simple_(audio::Frame& frame);
272  bool process_subframes_and_tasks_precise_(audio::Frame& frame);
273 
274  bool schedule_and_maybe_process_task_(PipelineTask& task);
275  bool maybe_process_tasks_();
276 
277  void schedule_async_task_processing_();
278  void cancel_async_task_processing_();
279 
280  void process_task_(PipelineTask& task, bool notify);
281  bool process_next_subframe_(audio::Frame& frame, size_t* frame_pos);
282 
283  bool start_subframe_task_processing_();
284  bool subframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
285 
286  core::nanoseconds_t update_next_frame_deadline_(core::nanoseconds_t frame_start_time,
287  size_t frame_size);
288  bool
289  interframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
290 
291  void report_stats_();
292 
293  // configuration
294  const TaskConfig config_;
295 
296  const audio::SampleSpec sample_spec_;
297 
298  const size_t min_samples_between_tasks_;
299  const size_t max_samples_between_tasks_;
300 
301  const core::nanoseconds_t no_task_proc_half_interval_;
302 
303  // used to schedule asynchronous work
304  IPipelineTaskScheduler& scheduler_;
305 
306  // protects pipeline state
307  core::Mutex pipeline_mutex_;
308 
309  // protects IPipelineTaskScheduler
310  core::Mutex scheduler_mutex_;
311 
312  // lock-free queue of pending tasks
314 
315  // counter of pending tasks
316  core::Atomic<int> pending_tasks_;
317 
318  // counter of pending process_frame_and_tasks() calls blocked on pipeline_mutex_
319  core::Atomic<int> pending_frames_;
320 
321  // asynchronous processing state
322  core::Atomic<int> processing_state_;
323 
324  // tid of last thread that performed frame processing
325  core::Seqlock<uint64_t> frame_processing_tid_;
326 
327  // when next frame is expected to be started
328  core::Seqlock<core::nanoseconds_t> next_frame_deadline_;
329 
330  // when task processing before next sub-frame ends
331  core::nanoseconds_t subframe_tasks_deadline_;
332 
333  // number of samples processed since last in-frame task processing
334  size_t samples_processed_;
335 
336  // did we accumulate enough samples in samples_processed_
337  bool enough_samples_to_process_tasks_;
338 
339  // task processing statistics
340  core::RateLimiter rate_limiter_;
341  Stats stats_;
342 };
343 
344 } // namespace pipeline
345 } // namespace roc
346 
347 #endif // ROC_PIPELINE_PIPELINE_LOOP_H_
Atomic.
Audio frame.
Definition: frame.h:25
Sample specification. Describes sample rate and channels.
Definition: sample_spec.h:26
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Mutex.
Definition: mutex.h:31
Base class for non-copyable objects.
Definition: noncopyable.h:23
Pipeline task completion handler.
Pipeline task scheduler interface. PipelineLoop uses this interface to schedule asynchronous work....
Base class for task-based pipelines.
virtual uint64_t tid_imp() const =0
Get current thread id.
size_t num_pending_frames() const
How much pending frames are there.
virtual core::nanoseconds_t timestamp_imp() const =0
Get current time.
void schedule(PipelineTask &task, IPipelineTaskCompleter &completer)
Enqueue a task for asynchronous execution.
virtual bool process_task_imp(PipelineTask &task)=0
Process task.
virtual bool process_subframe_imp(audio::Frame &frame)=0
Process subframe.
PipelineLoop(IPipelineTaskScheduler &scheduler, const TaskConfig &config, const audio::SampleSpec &sample_spec)
Initialization.
size_t num_pending_tasks() const
How much pending tasks are there.
void process_tasks()
Process some of the enqueued tasks, if any.
const Stats & get_stats_ref() const
Get task processing statistics. Returned object can't be accessed concurrently with other methods.
bool process_subframes_and_tasks(audio::Frame &frame)
Split frame and process subframes and some of the enqueued tasks.
bool schedule_and_wait(PipelineTask &task)
Enqueue a task for asynchronous execution and wait until it finishes.
Base class for pipeline tasks.
Definition: pipeline_task.h:27
Audio frame.
Pipeline task completion handler.
Pipeline task scheduler interface.
Multi-producer single-consumer queue.
Mutex.
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
Root namespace.
Non-copyable object.
Optionally constructed object.
Base class for pipeline tasks.
Rate limiter.
Pipeline config.
Sample specifications.
Seqlock.
Task processing statistics.
uint64_t task_processed_in_frame
Number of tasks processed in process_frame_and_tasks().
uint64_t preemptions
Number of times when other method was preempted by process_frame_and_tasks().
uint64_t scheduler_cancellations
Number of time when cancel_task_processing() was called.
uint64_t task_processed_in_place
Number of tasks processed directly in schedule() or schedule_and_wait().
uint64_t scheduler_calls
Number of time when schedule_task_processing() was called.
uint64_t task_processed_total
Total number of tasks processed.
Task processing parameters.
Definition: config.h:58
Time definitions.
Various units used in packets.