Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
pipeline_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_pipeline/pipeline_loop.h
10//! @brief Base class for pipelines.
11
12#ifndef ROC_PIPELINE_PIPELINE_LOOP_H_
13#define ROC_PIPELINE_PIPELINE_LOOP_H_
14
15#include "roc_audio/frame.h"
17#include "roc_core/atomic.h"
18#include "roc_core/mpsc_queue.h"
19#include "roc_core/mutex.h"
21#include "roc_core/optional.h"
23#include "roc_core/seqlock.h"
24#include "roc_core/time.h"
25#include "roc_packet/units.h"
29
30namespace roc {
31namespace pipeline {
32
33//! Pipeline loop task processing parameters.
35 //! Enable precise task scheduling mode (default).
36 //! The other settings have effect only when this is set to true.
37 //! When enabled, pipeline processes tasks in dedicated time intervals between
38 //! sub-frame and between frames, trying to prevent time collisions between
39 //! task and frame processing.
41
42 //! Minimum frame duration between processing tasks.
43 //! In-frame task processing does not happen until at least given number
44 //! of samples is processed.
45 //! Set to zero to allow task processing between frames of any size.
47
48 //! Maximum frame duration between processing tasks.
49 //! If the frame is larger than this size, it is split into multiple subframes
50 //! to allow task processing between the sub-frames.
51 //! Set to zero to disable frame splitting.
53
54 //! Maximum task processing duration happening immediately after processing a frame.
55 //! If this period expires and there are still pending tasks, asynchronous
56 //! task processing is scheduled.
57 //! At least one task is always processed after each frame, even if this
58 //! setting is too small.
60
61 //! Time interval during which no task processing is allowed.
62 //! This setting is used to prohibit task processing during the time when
63 //! next read() or write() call is expected.
64 //! Since it can not be calculated absolutely precisely, and there is always
65 //! thread switch overhead, scheduler jitter clock drift, we use a wide interval.
67
70 , min_frame_length_between_tasks(200 * core::Microsecond)
71 , max_frame_length_between_tasks(1 * core::Millisecond)
72 , max_inframe_task_processing(20 * core::Microsecond)
73 , task_processing_prohibited_interval(200 * core::Microsecond) {
74 }
75};
76
77//! Base class for task-based pipelines.
78//!
79//! Frames, tasks, and threads
80//! --------------------------
81//!
82//! The pipeline processes frames and tasks. This processing is serialized. At every
83//! moment, the pipeline is either processing a frame, processing a task, or doing
84//! nothing.
85//!
86//! The pipeline does not have its own thread. Both frame and task processing happens
87//! when the user calls one of the pipeline methods, in the context of the caller thread.
88//! Methods may be called from different threads, concurrently. This complicates the
89//! implementation, but allows to have different thread layouts for different use cases.
90//!
91//! Precise task scheduling
92//! -----------------------
93//!
94//! This class implements "precise task scheduling" feature, which tries to schedule task
95//! processing intervals smartly, to prevent time collisions with frame processing and
96//! keep frame processing timings unaffected.
97//!
98//! Precise task scheduling is enabled by default, but can be disabled via config. When
99//! disabled, no special scheduling is performed and frame and task processing compete
100//! each other for the exclusive access to the pipeline.
101//!
102//! The sections below describe various aspects of the implementation.
103//!
104//! Task processing time slices
105//! ---------------------------
106//!
107//! Tasks are processed between frames on dedicated time slices, to ensure that the
108//! task processing wont delay frame processing, which should be as close to real-time
109//! as possible.
110//!
111//! If frame is too large, it's split into sub-frames, to allow task processing between
112//! these sub-frames. This is needed to ensure that the task processing delay would not
113//! be too large, at least while there are not too much tasks.
114//!
115//! If frames are too small, tasks are processing only after some of the frames instead
116//! of after every frame. This is needed to reduce task processing overhead when using
117//! tiny frames.
118//!
119//! There are two types of time slices dedicated for task processing:
120//! - in-frame task processing: short intervals between sub-frames
121//! (inside process_frame_and_tasks())
122//! - inter-frame longer intervals between frames
123//! (inside process_tasks())
124//!
125//! process_frame_and_tasks() calls are to be driven by the user-defined pipeline
126//! clock. It should be called exactly when it's time to process more samples. Our
127//! goal is to provide it exclusive access to the pipeline as fast as possible
128//! immediately after it's called.
129//!
130//! process_tasks() should be called by user when there are pending tasks that should
131//! be processed and when no concurrent process_frame_and_tasks() call is running.
132//! Our goal is to notify the user if and when it should be called.
133//!
134//! Asynchronous task processing
135//! ----------------------------
136//!
137//! Since pipeline does not have its own thread, it can't schedule process_tasks()
138//! invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler
139//! object.
140//!
141//! When the pipeline wants to schedule asynchronous process_tasks() invocation, it
142//! calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and
143//! on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal
144//! invocation time.
145//!
146//! The pipeline may also cancel the scheduled task processing by invoking
147//! IPipelineTaskScheduler::cancel_task_processing().
148//!
149//! In-place task processing
150//! ------------------------
151//!
152//! If schedule() or schedule_and_wait() is called when the task queue is empty and the
153//! current time point belongs to the task processing time slice, the new task is
154//! processed in-place without waiting for the next process_frame_and_tasks() or
155//! process_tasks() invocation. This allows to avoid extra delays and thread switches
156//! when possible.
157//!
158//! Processing priority
159//! -------------------
160//!
161//! When process_frame_and_tasks() is called, it increments pending_frame_ atomic
162//! and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs
163//! to be processed as soon as possible and other methods should give it a way.
164//!
165//! When process_frame_and_tasks() is called, it also cancels any scheduled
166//! asynchronous task processing before starting processing the frame and tasks.
167//! Before exiting, process_frame_and_tasks() checks if there are still some pending
168//! tasks and if necessary, schedules asynchronous execution again.
169//!
170//! When process_tasks() is processing asynchronous tasks, but detects that
171//! process_frame_and_tasks() was invoked concurrently from another thread, it gives
172//! it a way and exits. process_frame_and_tasks() will process the frame and some of
173//! the remaining tasks, and if there are even more tasks remaining, it will invoke
174//! schedule_task_processing() to allow process_tasks() to continue.
175//!
176//! When schedule() and process_tasks() want to invoke schedule_task_processing(), but
177//! detect that process_frame_and_tasks() was invoked concurrently from another thread,
178//! they give it a way and don't call schedule_task_processing(), assuming that
179//! process_frame_and_tasks() will either process all tasks or call
180//! schedule_task_processing() by itself.
181//!
182//! Locking rules
183//! -------------
184//!
185//! pipeline_mutex_ protects the internal pipeline state. It should be acquired to
186//! process a frame or a task.
187//!
188//! scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to
189//! schedule or cancel asynchronous task processing.
190//!
191//! If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will
192//! check pending tasks after unlocking the mutex and will either process them or
193//! scheduler asynchronous processing.
194//!
195//! If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will
196//! either schedule or cancel asynchronous task processing, depending on whether
197//! there are pending tasks and frames.
198//!
199//! Lock-free operations
200//! --------------------
201//!
202//! schedule() and process_tasks() methods are lock-free. Also, they're either completely
203//! wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware
204//! architecture (see comments for core::MpscQueue).
205//!
206//! In practice it means that when running concurrently with other PipelineLoop method
207//! invocations, they never block waiting for other threads, and usually even don't spin.
208//!
209//! This is archived by using a lock-free queue for tasks, atomics for 32-bit counters,
210//! seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always
211//! using try_lock() for mutexes and delaying the work if the mutex can't be acquired,
212//! and using semaphores instead of condition variables for signaling (which don't
213//! require blocking on mutex, at least on modern platforms; e.g. on glibc they're
214//! implemented using an atomic and a futex).
215//!
216//! process_frame_and_tasks() is not lock-free because it has to acquire the pipeline
217//! mutex and can't delay its work. However, the precise task scheduling feature does it
218//! best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks()
219//! is invoked, thus in most cases it wont block or wait too.
220//!
221//! This approach helps us with our global goal of making all inter-thread interactions
222//! mostly wait-free, so that one thread is never or almost never blocked when another
223//! thread is blocked, preempted, or busy.
224//!
225//! Benchmarks
226//! ----------
227//!
228//! PipelineLoop is covered with two groups of benchmarks:
229//! - bench_pipeline_loop_peak_load.cpp measures frame and task processing delays with
230//! or without task load and with or without precise task scheduling feature;
231//! - bench_pipeline_loop_contention.cpp measures scheduling times under different
232//! contention levels.
233//!
234//! You can run them using "roc-bench-pipeline" command. For further details, see
235//! comments in the source code of the benchmarks.
237public:
238 //! Enqueue a task for asynchronous execution.
240
241 //! Enqueue a task for asynchronous execution and wait until it finishes.
242 //! @returns false if the task fails.
244
245 //! Process some of the enqueued tasks, if any.
247
248protected:
249 //! Task processing statistics.
250 struct Stats {
251 //! Total number of tasks processed.
253
254 //! Number of tasks processed directly in schedule() or schedule_and_wait().
256
257 //! Number of tasks processed in process_frame_and_tasks().
259
260 //! Number of times when other method was preempted by process_frame_and_tasks().
261 uint64_t preemptions;
262
263 //! Number of time when schedule_task_processing() was called.
265
266 //! Number of time when cancel_task_processing() was called.
268
269 Stats()
273 , preemptions(0)
274 , scheduler_calls(0)
276 }
277 };
278
279 //! Initialization.
281 const PipelineLoopConfig& config,
282 const audio::SampleSpec& sample_spec);
283
284 virtual ~PipelineLoop();
285
286 //! How much pending tasks are there.
287 size_t num_pending_tasks() const;
288
289 //! How much pending frames are there.
290 size_t num_pending_frames() const;
291
292 //! Get task processing statistics.
293 //! Returned object can't be accessed concurrently with other methods.
294 const Stats& get_stats_ref() const;
295
296 //! Split frame and process subframes and some of the enqueued tasks.
298
299 //! Get current time.
301
302 //! Get current thread id.
303 virtual uint64_t tid_imp() const = 0;
304
305 //! Process subframe.
306 virtual bool process_subframe_imp(audio::Frame& frame) = 0;
307
308 //! Process task.
309 virtual bool process_task_imp(PipelineTask& task) = 0;
310
311private:
312 enum ProcState { ProcNotScheduled, ProcScheduled, ProcRunning };
313
314 bool process_subframes_and_tasks_simple_(audio::Frame& frame);
315 bool process_subframes_and_tasks_precise_(audio::Frame& frame);
316
317 bool schedule_and_maybe_process_task_(PipelineTask& task);
318 bool maybe_process_tasks_();
319
320 void schedule_async_task_processing_();
321 void cancel_async_task_processing_();
322
323 void process_task_(PipelineTask& task, bool notify);
324 bool process_next_subframe_(audio::Frame& frame,
326 packet::stream_timestamp_t frame_duration);
327
328 bool start_subframe_task_processing_();
329 bool subframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
330
332 update_next_frame_deadline_(core::nanoseconds_t frame_start_time,
333 packet::stream_timestamp_t frame_duration);
334 bool
335 interframe_task_processing_allowed_(core::nanoseconds_t next_frame_deadline) const;
336
337 void report_stats_();
338
339 // configuration
340 const PipelineLoopConfig config_;
341
342 const audio::SampleSpec sample_spec_;
343
344 const packet::stream_timestamp_t min_samples_between_tasks_;
345 const packet::stream_timestamp_t max_samples_between_tasks_;
346
347 const core::nanoseconds_t no_task_proc_half_interval_;
348
349 // used to schedule asynchronous work
350 IPipelineTaskScheduler& scheduler_;
351
352 // protects pipeline state
353 core::Mutex pipeline_mutex_;
354
355 // protects IPipelineTaskScheduler
356 core::Mutex scheduler_mutex_;
357
358 // lock-free queue of pending tasks
360
361 // counter of pending tasks
362 core::Atomic<int> pending_tasks_;
363
364 // counter of pending process_frame_and_tasks() calls blocked on pipeline_mutex_
365 core::Atomic<int> pending_frames_;
366
367 // asynchronous processing state
368 core::Atomic<int> processing_state_;
369
370 // tid of last thread that performed frame processing
371 core::Seqlock<uint64_t> frame_processing_tid_;
372
373 // when next frame is expected to be started
374 core::Seqlock<core::nanoseconds_t> next_frame_deadline_;
375
376 // when task processing before next sub-frame ends
377 core::nanoseconds_t subframe_tasks_deadline_;
378
379 // number of samples processed since last in-frame task processing
380 packet::stream_timestamp_t samples_processed_;
381
382 // did we accumulate enough samples in samples_processed_
383 bool enough_samples_to_process_tasks_;
384
385 // task processing statistics
386 core::RateLimiter rate_limiter_;
387 Stats stats_;
388};
389
390} // namespace pipeline
391} // namespace roc
392
393#endif // ROC_PIPELINE_PIPELINE_LOOP_H_
Atomic.
Audio frame.
Definition frame.h:25
Sample specification. Describes sample rate and channels.
Definition sample_spec.h:30
Base class for non-copyable objects.
Definition noncopyable.h:23
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Pipeline task completion handler.
Pipeline task scheduler interface. PipelineLoop uses this interface to schedule asynchronous work....
Base class for task-based pipelines.
virtual uint64_t tid_imp() const =0
Get current thread id.
size_t num_pending_frames() const
How much pending frames are there.
virtual core::nanoseconds_t timestamp_imp() const =0
Get current time.
void schedule(PipelineTask &task, IPipelineTaskCompleter &completer)
Enqueue a task for asynchronous execution.
virtual bool process_task_imp(PipelineTask &task)=0
Process task.
virtual bool process_subframe_imp(audio::Frame &frame)=0
Process subframe.
size_t num_pending_tasks() const
How much pending tasks are there.
void process_tasks()
Process some of the enqueued tasks, if any.
const Stats & get_stats_ref() const
Get task processing statistics. Returned object can't be accessed concurrently with other methods.
PipelineLoop(IPipelineTaskScheduler &scheduler, const PipelineLoopConfig &config, const audio::SampleSpec &sample_spec)
Initialization.
bool process_subframes_and_tasks(audio::Frame &frame)
Split frame and process subframes and some of the enqueued tasks.
bool schedule_and_wait(PipelineTask &task)
Enqueue a task for asynchronous execution and wait until it finishes.
Base class for pipeline tasks.
Audio frame.
Pipeline task completion handler.
Pipeline task scheduler interface.
Multi-producer single-consumer queue.
Mutex.
int64_t nanoseconds_t
Nanoseconds.
Definition time.h:58
uint32_t stream_timestamp_t
Packet stream timestamp.
Definition units.h:36
Root namespace.
Non-copyable object.
Optionally constructed object.
Base class for pipeline tasks.
Rate limiter.
Sample specifications.
Seqlock.
Pipeline loop task processing parameters.
core::nanoseconds_t task_processing_prohibited_interval
Time interval during which no task processing is allowed. This setting is used to prohibit task proce...
bool enable_precise_task_scheduling
Enable precise task scheduling mode (default). The other settings have effect only when this is set t...
core::nanoseconds_t min_frame_length_between_tasks
Minimum frame duration between processing tasks. In-frame task processing does not happen until at le...
core::nanoseconds_t max_frame_length_between_tasks
Maximum frame duration between processing tasks. If the frame is larger than this size,...
core::nanoseconds_t max_inframe_task_processing
Maximum task processing duration happening immediately after processing a frame. If this period expir...
Task processing statistics.
uint64_t task_processed_in_frame
Number of tasks processed in process_frame_and_tasks().
uint64_t preemptions
Number of times when other method was preempted by process_frame_and_tasks().
uint64_t scheduler_cancellations
Number of time when cancel_task_processing() was called.
uint64_t task_processed_in_place
Number of tasks processed directly in schedule() or schedule_and_wait().
uint64_t scheduler_calls
Number of time when schedule_task_processing() was called.
uint64_t task_processed_total
Total number of tasks processed.
Time definitions.
Various units used in packets.