Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
|
Base class for task-based pipelines. More...
#include <pipeline_loop.h>
Classes | |
struct | Stats |
Task processing statistics. More... | |
Public Member Functions | |
void | schedule (PipelineTask &task, IPipelineTaskCompleter &completer) |
Enqueue a task for asynchronous execution. More... | |
bool | schedule_and_wait (PipelineTask &task) |
Enqueue a task for asynchronous execution and wait until it finishes. More... | |
void | process_tasks () |
Process some of the enqueued tasks, if any. More... | |
Protected Member Functions | |
PipelineLoop (IPipelineTaskScheduler &scheduler, const PipelineLoopConfig &config, const audio::SampleSpec &sample_spec) | |
Initialization. More... | |
size_t | num_pending_tasks () const |
How much pending tasks are there. More... | |
size_t | num_pending_frames () const |
How much pending frames are there. More... | |
const Stats & | get_stats_ref () const |
Get task processing statistics. Returned object can't be accessed concurrently with other methods. More... | |
bool | process_subframes_and_tasks (audio::Frame &frame) |
Split frame and process subframes and some of the enqueued tasks. More... | |
virtual core::nanoseconds_t | timestamp_imp () const =0 |
Get current time. More... | |
virtual uint64_t | tid_imp () const =0 |
Get current thread id. More... | |
virtual bool | process_subframe_imp (audio::Frame &frame)=0 |
Process subframe. More... | |
virtual bool | process_task_imp (PipelineTask &task)=0 |
Process task. More... | |
Base class for task-based pipelines.
The pipeline processes frames and tasks. This processing is serialized. At every moment, the pipeline is either processing a frame, processing a task, or doing nothing.
The pipeline does not have its own thread. Both frame and task processing happens when the user calls one of the pipeline methods, in the context of the caller thread. Methods may be called from different threads, concurrently. This complicates the implementation, but allows to have different thread layouts for different use cases.
This class implements "precise task scheduling" feature, which tries to schedule task processing intervals smartly, to prevent time collisions with frame processing and keep frame processing timings unaffected.
Precise task scheduling is enabled by default, but can be disabled via config. When disabled, no special scheduling is performed and frame and task processing compete each other for the exclusive access to the pipeline.
The sections below describe various aspects of the implementation.
Tasks are processed between frames on dedicated time slices, to ensure that the task processing wont delay frame processing, which should be as close to real-time as possible.
If frame is too large, it's split into sub-frames, to allow task processing between these sub-frames. This is needed to ensure that the task processing delay would not be too large, at least while there are not too much tasks.
If frames are too small, tasks are processing only after some of the frames instead of after every frame. This is needed to reduce task processing overhead when using tiny frames.
There are two types of time slices dedicated for task processing:
process_frame_and_tasks() calls are to be driven by the user-defined pipeline clock. It should be called exactly when it's time to process more samples. Our goal is to provide it exclusive access to the pipeline as fast as possible immediately after it's called.
process_tasks() should be called by user when there are pending tasks that should be processed and when no concurrent process_frame_and_tasks() call is running. Our goal is to notify the user if and when it should be called.
Since pipeline does not have its own thread, it can't schedule process_tasks() invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler object.
When the pipeline wants to schedule asynchronous process_tasks() invocation, it calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal invocation time.
The pipeline may also cancel the scheduled task processing by invoking IPipelineTaskScheduler::cancel_task_processing().
If schedule() or schedule_and_wait() is called when the task queue is empty and the current time point belongs to the task processing time slice, the new task is processed in-place without waiting for the next process_frame_and_tasks() or process_tasks() invocation. This allows to avoid extra delays and thread switches when possible.
When process_frame_and_tasks() is called, it increments pending_frame_ atomic and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs to be processed as soon as possible and other methods should give it a way.
When process_frame_and_tasks() is called, it also cancels any scheduled asynchronous task processing before starting processing the frame and tasks. Before exiting, process_frame_and_tasks() checks if there are still some pending tasks and if necessary, schedules asynchronous execution again.
When process_tasks() is processing asynchronous tasks, but detects that process_frame_and_tasks() was invoked concurrently from another thread, it gives it a way and exits. process_frame_and_tasks() will process the frame and some of the remaining tasks, and if there are even more tasks remaining, it will invoke schedule_task_processing() to allow process_tasks() to continue.
When schedule() and process_tasks() want to invoke schedule_task_processing(), but detect that process_frame_and_tasks() was invoked concurrently from another thread, they give it a way and don't call schedule_task_processing(), assuming that process_frame_and_tasks() will either process all tasks or call schedule_task_processing() by itself.
pipeline_mutex_ protects the internal pipeline state. It should be acquired to process a frame or a task.
scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to schedule or cancel asynchronous task processing.
If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will check pending tasks after unlocking the mutex and will either process them or scheduler asynchronous processing.
If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will either schedule or cancel asynchronous task processing, depending on whether there are pending tasks and frames.
schedule() and process_tasks() methods are lock-free. Also, they're either completely wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware architecture (see comments for core::MpscQueue).
In practice it means that when running concurrently with other PipelineLoop method invocations, they never block waiting for other threads, and usually even don't spin.
This is archived by using a lock-free queue for tasks, atomics for 32-bit counters, seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always using try_lock() for mutexes and delaying the work if the mutex can't be acquired, and using semaphores instead of condition variables for signaling (which don't require blocking on mutex, at least on modern platforms; e.g. on glibc they're implemented using an atomic and a futex).
process_frame_and_tasks() is not lock-free because it has to acquire the pipeline mutex and can't delay its work. However, the precise task scheduling feature does it best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks() is invoked, thus in most cases it wont block or wait too.
This approach helps us with our global goal of making all inter-thread interactions mostly wait-free, so that one thread is never or almost never blocked when another thread is blocked, preempted, or busy.
PipelineLoop is covered with two groups of benchmarks:
You can run them using "roc-bench-pipeline" command. For further details, see comments in the source code of the benchmarks.
Definition at line 236 of file pipeline_loop.h.
|
protected |
Initialization.
|
protected |
Get task processing statistics. Returned object can't be accessed concurrently with other methods.
|
protected |
How much pending frames are there.
|
protected |
How much pending tasks are there.
|
protectedpure virtual |
Process subframe.
|
protected |
Split frame and process subframes and some of the enqueued tasks.
|
protectedpure virtual |
Process task.
void roc::pipeline::PipelineLoop::process_tasks | ( | ) |
Process some of the enqueued tasks, if any.
void roc::pipeline::PipelineLoop::schedule | ( | PipelineTask & | task, |
IPipelineTaskCompleter & | completer | ||
) |
Enqueue a task for asynchronous execution.
bool roc::pipeline::PipelineLoop::schedule_and_wait | ( | PipelineTask & | task | ) |
Enqueue a task for asynchronous execution and wait until it finishes.
|
protectedpure virtual |
Get current thread id.
|
protectedpure virtual |
Get current time.