Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
pipeline_loop.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_pipeline/pipeline_loop.h
10 //! @brief Base class for pipelines.
11 
12 #ifndef ROC_PIPELINE_PIPELINE_LOOP_H_
13 #define ROC_PIPELINE_PIPELINE_LOOP_H_
14 
15 #include "roc_audio/frame.h"
16 #include "roc_audio/sample_spec.h"
17 #include "roc_core/atomic.h"
18 #include "roc_core/mpsc_queue.h"
19 #include "roc_core/mutex.h"
20 #include "roc_core/noncopyable.h"
21 #include "roc_core/optional.h"
22 #include "roc_core/rate_limiter.h"
23 #include "roc_core/seqlock.h"
24 #include "roc_core/time.h"
25 #include "roc_packet/units.h"
29 
30 namespace roc {
31 namespace pipeline {
32 
33 //! Pipeline loop task processing parameters.
35  //! Enable precise task scheduling mode (default).
36  //! The other settings have effect only when this is set to true.
37  //! When enabled, pipeline processes tasks in dedicated time intervals between
38  //! sub-frame and between frames, trying to prevent time collisions between
39  //! task and frame processing.
41 
42  //! Minimum frame duration between processing tasks.
43  //! In-frame task processing does not happen until at least given number
44  //! of samples is processed.
45  //! Set to zero to allow task processing between frames of any size.
47 
48  //! Maximum frame duration between processing tasks.
49  //! If the frame is larger than this size, it is split into multiple subframes
50  //! to allow task processing between the sub-frames.
51  //! Set to zero to disable frame splitting.
53 
54  //! Maximum task processing duration happening immediately after processing a frame.
55  //! If this period expires and there are still pending tasks, asynchronous
56  //! task processing is scheduled.
57  //! At least one task is always processed after each frame, even if this
58  //! setting is too small.
60 
61  //! Time interval during which no task processing is allowed.
62  //! This setting is used to prohibit task processing during the time when
63  //! next read() or write() call is expected.
64  //! Since it can not be calculated absolutely precisely, and there is always
65  //! thread switch overhead, scheduler jitter clock drift, we use a wide interval.
67 
74  }
75 };
76 
77 //! Base class for task-based pipelines.
78 //!
79 //! Frames, tasks, and threads
80 //! --------------------------
81 //!
82 //! The pipeline processes frames and tasks. This processing is serialized. At every
83 //! moment, the pipeline is either processing a frame, processing a task, or doing
84 //! nothing.
85 //!
86 //! The pipeline does not have its own thread. Both frame and task processing happens
87 //! when the user calls one of the pipeline methods, in the context of the caller thread.
88 //! Methods may be called from different threads, concurrently. This complicates the
89 //! implementation, but allows to have different thread layouts for different use cases.
90 //!
91 //! Precise task scheduling
92 //! -----------------------
93 //!
94 //! This class implements "precise task scheduling" feature, which tries to schedule task
95 //! processing intervals smartly, to prevent time collisions with frame processing and
96 //! keep frame processing timings unaffected.
97 //!
98 //! Precise task scheduling is enabled by default, but can be disabled via config. When
99 //! disabled, no special scheduling is performed and frame and task processing compete
100 //! each other for the exclusive access to the pipeline.
101 //!
102 //! The sections below describe various aspects of the implementation.
103 //!
104 //! Task processing time slices
105 //! ---------------------------
106 //!
107 //! Tasks are processed between frames on dedicated time slices, to ensure that the
108 //! task processing wont delay frame processing, which should be as close to real-time
109 //! as possible.
110 //!
111 //! If frame is too large, it's split into sub-frames, to allow task processing between
112 //! these sub-frames. This is needed to ensure that the task processing delay would not
113 //! be too large, at least while there are not too much tasks.
114 //!
115 //! If frames are too small, tasks are processing only after some of the frames instead
116 //! of after every frame. This is needed to reduce task processing overhead when using
117 //! tiny frames.
118 //!
119 //! There are two types of time slices dedicated for task processing:
120 //! - in-frame task processing: short intervals between sub-frames
121 //! (inside process_frame_and_tasks())
122 //! - inter-frame longer intervals between frames
123 //! (inside process_tasks())
124 //!
125 //! process_frame_and_tasks() calls are to be driven by the user-defined pipeline
126 //! clock. It should be called exactly when it's time to process more samples. Our
127 //! goal is to provide it exclusive access to the pipeline as fast as possible
128 //! immediately after it's called.
129 //!
130 //! process_tasks() should be called by user when there are pending tasks that should
131 //! be processed and when no concurrent process_frame_and_tasks() call is running.
132 //! Our goal is to notify the user if and when it should be called.
133 //!
134 //! Asynchronous task processing
135 //! ----------------------------
136 //!
137 //! Since pipeline does not have its own thread, it can't schedule process_tasks()
138 //! invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler
139 //! object.
140 //!
141 //! When the pipeline wants to schedule asynchronous process_tasks() invocation, it
142 //! calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and
143 //! on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal
144 //! invocation time.
145 //!
146 //! The pipeline may also cancel the scheduled task processing by invoking
147 //! IPipelineTaskScheduler::cancel_task_processing().
148 //!
149 //! In-place task processing
150 //! ------------------------
151 //!
152 //! If schedule() or schedule_and_wait() is called when the task queue is empty and the
153 //! current time point belongs to the task processing time slice, the new task is
154 //! processed in-place without waiting for the next process_frame_and_tasks() or
155 //! process_tasks() invocation. This allows to avoid extra delays and thread switches
156 //! when possible.
157 //!
158 //! Processing priority
159 //! -------------------
160 //!
161 //! When process_frame_and_tasks() is called, it increments pending_frame_ atomic
162 //! and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs
163 //! to be processed as soon as possible and other methods should give it a way.
164 //!
165 //! When process_frame_and_tasks() is called, it also cancels any scheduled
166 //! asynchronous task processing before starting processing the frame and tasks.
167 //! Before exiting, process_frame_and_tasks() checks if there are still some pending
168 //! tasks and if necessary, schedules asynchronous execution again.
169 //!
170 //! When process_tasks() is processing asynchronous tasks, but detects that
171 //! process_frame_and_tasks() was invoked concurrently from another thread, it gives
172 //! it a way and exits. process_frame_and_tasks() will process the frame and some of
173 //! the remaining tasks, and if there are even more tasks remaining, it will invoke
174 //! schedule_task_processing() to allow process_tasks() to continue.
175 //!
176 //! When schedule() and process_tasks() want to invoke schedule_task_processing(), but
177 //! detect that process_frame_and_tasks() was invoked concurrently from another thread,
178 //! they give it a way and don't call schedule_task_processing(), assuming that
179 //! process_frame_and_tasks() will either process all tasks or call
180 //! schedule_task_processing() by itself.
181 //!
182 //! Locking rules
183 //! -------------
184 //!
185 //! pipeline_mutex_ protects the internal pipeline state. It should be acquired to
186 //! process a frame or a task.
187 //!
188 //! scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to
189 //! schedule or cancel asynchronous task processing.
190 //!
191 //! If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will
192 //! check pending tasks after unlocking the mutex and will either process them or
193 //! scheduler asynchronous processing.
194 //!
195 //! If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will
196 //! either schedule or cancel asynchronous task processing, depending on whether
197 //! there are pending tasks and frames.
198 //!
199 //! Lock-free operations
200 //! --------------------
201 //!
202 //! schedule() and process_tasks() methods are lock-free. Also, they're either completely
203 //! wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware
204 //! architecture (see comments for core::MpscQueue).
205 //!
206 //! In practice it means that when running concurrently with other PipelineLoop method
207 //! invocations, they never block waiting for other threads, and usually even don't spin.
208 //!
209 //! This is archived by using a lock-free queue for tasks, atomics for 32-bit counters,
210 //! seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always
211 //! using try_lock() for mutexes and delaying the work if the mutex can't be acquired,
212 //! and using semaphores instead of condition variables for signaling (which don't
213 //! require blocking on mutex, at least on modern platforms; e.g. on glibc they're
214 //! implemented using an atomic and a futex).
215 //!
216 //! process_frame_and_tasks() is not lock-free because it has to acquire the pipeline
217 //! mutex and can't delay its work. However, the precise task scheduling feature does it
218 //! best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks()
219 //! is invoked, thus in most cases it wont block or wait too.
220 //!
221 //! This approach helps us with our global goal of making all inter-thread interactions
222 //! mostly wait-free, so that one thread is never or almost never blocked when another
223 //! thread is blocked, preempted, or busy.
224 //!
225 //! Benchmarks
226 //! ----------
227 //!
228 //! PipelineLoop is covered with two groups of benchmarks:
229 //! - bench_pipeline_loop_peak_load.cpp measures frame and task processing delays with
230 //! or without task load and with or without precise task scheduling feature;
231 //! - bench_pipeline_loop_contention.cpp measures scheduling times under different
232 //! contention levels.
233 //!
234 //! You can run them using "roc-bench-pipeline" command. For further details, see
235 //! comments in the source code of the benchmarks.
236 class PipelineLoop : public core::NonCopyable<> {
237 public:
238  //! Enqueue a task for asynchronous execution.
240 
241  //! Enqueue a task for asynchronous execution and wait until it finishes.
242  //! @returns false if the task fails.
244 
245  //! Process some of the enqueued tasks, if any.
247 
248 protected:
249  //! Task processing statistics.
250  struct Stats {
251  //! Total number of tasks processed.
253 
254  //! Number of tasks processed directly in schedule() or schedule_and_wait().
256 
257  //! Number of tasks processed in process_frame_and_tasks().
259 
260  //! Number of times when other method was preempted by process_frame_and_tasks().
261  uint64_t preemptions;
262 
263  //! Number of time when schedule_task_processing() was called.
264  uint64_t scheduler_calls;
265 
266  //! Number of time when cancel_task_processing() was called.
268 
269  Stats()
273  , preemptions(0)
274  , scheduler_calls(0)
276  }
277  };
278 
279  //! Initialization.
281  const PipelineLoopConfig& config,
282  const audio::SampleSpec& sample_spec);
283 
284  virtual ~PipelineLoop();
285 
286  //! How much pending tasks are there.
287  size_t num_pending_tasks() const;
288 
289  //! How much pending frames are there.
290  size_t num_pending_frames() const;
291 
292  //! Get task processing statistics.
293  //! Returned object can't be accessed concurrently with other methods.
294  const Stats& get_stats_ref() const;
295 
296  //! Split frame and process subframes and some of the enqueued tasks.
298 
299  //! Get current time.
300  virtual core::nanoseconds_t timestamp_imp() const = 0;
301 
302  //! Get current thread id.
303  virtual uint64_t tid_imp() const = 0;
304 
305  //! Process subframe.
306  virtual bool process_subframe_imp(audio::Frame& frame) = 0;
307 
308  //! Process task.
309  virtual bool process_task_imp(PipelineTask& task) = 0;
310 
311 private:
312  enum ProcState { ProcNotScheduled, ProcScheduled, ProcRunning };
313 
314  bool process_subframes_and_tasks_simple_(audio::Frame& frame);
315  bool process_subframes_and_tasks_precise_(audio::Frame& frame);
316 
317  bool schedule_and_maybe_process_task_(PipelineTask& task);
318  bool maybe_process_tasks_();
319 
320  void schedule_async_task_processing_();
321  void cancel_async_task_processing_();
322 
323  void process_task_(PipelineTask& task, bool notify);
324  bool process_next_subframe_(audio::Frame& frame,
325  packet::stream_timestamp_t* frame_pos,
326  packet::stream_timestamp_t frame_duration);
327 
328  bool start_subframe_task_processing_();
329  bool subframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
330 
332  update_next_frame_deadline_(core::nanoseconds_t frame_start_time,
333  packet::stream_timestamp_t frame_duration);
334  bool
335  interframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
336 
337  void report_stats_();
338 
339  // configuration
340  const PipelineLoopConfig config_;
341 
342  const audio::SampleSpec sample_spec_;
343 
344  const packet::stream_timestamp_t min_samples_between_tasks_;
345  const packet::stream_timestamp_t max_samples_between_tasks_;
346 
347  const core::nanoseconds_t no_task_proc_half_interval_;
348 
349  // used to schedule asynchronous work
350  IPipelineTaskScheduler& scheduler_;
351 
352  // protects pipeline state
353  core::Mutex pipeline_mutex_;
354 
355  // protects IPipelineTaskScheduler
356  core::Mutex scheduler_mutex_;
357 
358  // lock-free queue of pending tasks
360 
361  // counter of pending tasks
362  core::Atomic<int> pending_tasks_;
363 
364  // counter of pending process_frame_and_tasks() calls blocked on pipeline_mutex_
365  core::Atomic<int> pending_frames_;
366 
367  // asynchronous processing state
368  core::Atomic<int> processing_state_;
369 
370  // tid of last thread that performed frame processing
371  core::Seqlock<uint64_t> frame_processing_tid_;
372 
373  // when next frame is expected to be started
374  core::Seqlock<core::nanoseconds_t> next_frame_deadline_;
375 
376  // when task processing before next sub-frame ends
377  core::nanoseconds_t subframe_tasks_deadline_;
378 
379  // number of samples processed since last in-frame task processing
380  packet::stream_timestamp_t samples_processed_;
381 
382  // did we accumulate enough samples in samples_processed_
383  bool enough_samples_to_process_tasks_;
384 
385  // task processing statistics
386  core::RateLimiter rate_limiter_;
387  Stats stats_;
388 };
389 
390 } // namespace pipeline
391 } // namespace roc
392 
393 #endif // ROC_PIPELINE_PIPELINE_LOOP_H_
Atomic.
Audio frame.
Definition: frame.h:25
Sample specification. Describes sample rate and channels.
Definition: sample_spec.h:30
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:45
Mutex.
Definition: mutex.h:31
Base class for non-copyable objects.
Definition: noncopyable.h:23
Pipeline task completion handler.
Pipeline task scheduler interface. PipelineLoop uses this interface to schedule asynchronous work....
Base class for task-based pipelines.
virtual uint64_t tid_imp() const =0
Get current thread id.
size_t num_pending_frames() const
How much pending frames are there.
virtual core::nanoseconds_t timestamp_imp() const =0
Get current time.
void schedule(PipelineTask &task, IPipelineTaskCompleter &completer)
Enqueue a task for asynchronous execution.
virtual bool process_task_imp(PipelineTask &task)=0
Process task.
virtual bool process_subframe_imp(audio::Frame &frame)=0
Process subframe.
size_t num_pending_tasks() const
How much pending tasks are there.
void process_tasks()
Process some of the enqueued tasks, if any.
const Stats & get_stats_ref() const
Get task processing statistics. Returned object can't be accessed concurrently with other methods.
PipelineLoop(IPipelineTaskScheduler &scheduler, const PipelineLoopConfig &config, const audio::SampleSpec &sample_spec)
Initialization.
bool process_subframes_and_tasks(audio::Frame &frame)
Split frame and process subframes and some of the enqueued tasks.
bool schedule_and_wait(PipelineTask &task)
Enqueue a task for asynchronous execution and wait until it finishes.
Base class for pipeline tasks.
Definition: pipeline_task.h:27
Audio frame.
Pipeline task completion handler.
Pipeline task scheduler interface.
Multi-producer single-consumer queue.
Mutex.
const nanoseconds_t Millisecond
One millisecond represented in nanoseconds.
Definition: time.h:67
const nanoseconds_t Microsecond
One microsecond represented in nanoseconds.
Definition: time.h:64
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
uint32_t stream_timestamp_t
Packet stream timestamp.
Definition: units.h:36
Root namespace.
Non-copyable object.
Optionally constructed object.
Base class for pipeline tasks.
Rate limiter.
Sample specifications.
Seqlock.
Pipeline loop task processing parameters.
Definition: pipeline_loop.h:34
core::nanoseconds_t task_processing_prohibited_interval
Time interval during which no task processing is allowed. This setting is used to prohibit task proce...
Definition: pipeline_loop.h:66
bool enable_precise_task_scheduling
Enable precise task scheduling mode (default). The other settings have effect only when this is set t...
Definition: pipeline_loop.h:40
core::nanoseconds_t min_frame_length_between_tasks
Minimum frame duration between processing tasks. In-frame task processing does not happen until at le...
Definition: pipeline_loop.h:46
core::nanoseconds_t max_frame_length_between_tasks
Maximum frame duration between processing tasks. If the frame is larger than this size,...
Definition: pipeline_loop.h:52
core::nanoseconds_t max_inframe_task_processing
Maximum task processing duration happening immediately after processing a frame. If this period expir...
Definition: pipeline_loop.h:59
Task processing statistics.
uint64_t task_processed_in_frame
Number of tasks processed in process_frame_and_tasks().
uint64_t preemptions
Number of times when other method was preempted by process_frame_and_tasks().
uint64_t scheduler_cancellations
Number of time when cancel_task_processing() was called.
uint64_t task_processed_in_place
Number of tasks processed directly in schedule() or schedule_and_wait().
uint64_t scheduler_calls
Number of time when schedule_task_processing() was called.
uint64_t task_processed_total
Total number of tasks processed.
Time definitions.
Various units used in packets.