Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
roc::pipeline::PipelineLoop Class Referenceabstract

Base class for task-based pipelines. More...

#include <pipeline_loop.h>

Inheritance diagram for roc::pipeline::PipelineLoop:
Collaboration diagram for roc::pipeline::PipelineLoop:


struct  Stats
 Task processing statistics. More...

Public Member Functions

void schedule (PipelineTask &task, IPipelineTaskCompleter &completer)
 Enqueue a task for asynchronous execution. More...
bool schedule_and_wait (PipelineTask &task)
 Enqueue a task for asynchronous execution and wait until it finishes. More...
void process_tasks ()
 Process some of the enqueued tasks, if any. More...

Protected Member Functions

 PipelineLoop (IPipelineTaskScheduler &scheduler, const TaskConfig &config, const audio::SampleSpec &sample_spec)
 Initialization. More...
size_t num_pending_tasks () const
 How much pending tasks are there. More...
size_t num_pending_frames () const
 How much pending frames are there. More...
const Statsget_stats_ref () const
 Get task processing statistics. Returned object can't be accessed concurrently with other methods. More...
bool process_subframes_and_tasks (audio::Frame &frame)
 Split frame and process subframes and some of the enqueued tasks. More...
virtual core::nanoseconds_t timestamp_imp () const =0
 Get current time. More...
virtual uint64_t tid_imp () const =0
 Get current thread id. More...
virtual bool process_subframe_imp (audio::Frame &frame)=0
 Process subframe. More...
virtual bool process_task_imp (PipelineTask &task)=0
 Process task. More...

Detailed Description

Base class for task-based pipelines.

Frames, tasks, and threads

The pipeline processes frames and tasks. This processing is serialized. At every moment, the pipeline is either processing a frame, processing a task, or doing nothing.

The pipeline does not have its own thread. Both frame and task processing happens when the user calls one of the pipeline methods, in the context of the caller thread. Methods may be called from different threads, concurrently. This complicates the implementation, but allows to have different thread layouts for different use cases.

Precise task scheduling

This class implements "precise task scheduling" feature, which tries to schedule task processing intervals smartly, to prevent time collisions with frame processing and keep frame processing timings unaffected.

Precise task scheduling is enabled by default, but can be disabled via config. When disabled, no special scheduling is performed and frame and task processing compete each other for the exclusive access to the pipeline.

The sections below describe various aspects of the implementation.

Task processing time slices

Tasks are processed between frames on dedicated time slices, to ensure that the task processing wont delay frame processing, which should be as close to real-time as possible.

If frame is too large, it's split into sub-frames, to allow task processing between these sub-frames. This is needed to ensure that the task processing delay would not be too large, at least while there are not too much tasks.

If frames are too small, tasks are processing only after some of the frames instead of after every frame. This is needed to reduce task processing overhead when using tiny frames.

There are two types of time slices dedicated for task processing:

  • in-frame task processing: short intervals between sub-frames (inside process_frame_and_tasks())
  • inter-frame longer intervals between frames (inside process_tasks())

process_frame_and_tasks() calls are to be driven by the user-defined pipeline clock. It should be called exactly when it's time to process more samples. Our goal is to provide it exclusive access to the pipeline as fast as possible immediately after it's called.

process_tasks() should be called by user when there are pending tasks that should be processed and when no concurrent process_frame_and_tasks() call is running. Our goal is to notify the user if and when it should be called.

Asynchronous task processing

Since pipeline does not have its own thread, it can't schedule process_tasks() invocation by its own. Instead, it relies on the user-provided IPipelineTaskScheduler object.

When the pipeline wants to schedule asychronous process_tasks() invocation, it calls IPipelineTaskScheduler::schedule_task_processing(). It's up to the user when and on which thread to invoke process_tasks(), but pipeline gives a hint with the ideal invocation time.

The pipeline may also cancel the scheduled task processing by invoking IPipelineTaskScheduler::cancel_task_processing().

In-place task processing

If schedule() or schedule_and_wait() is called when the task queue is empty and the current time point belongs to the task processing time slice, the new task is processed in-place without waiting for the next process_frame_and_tasks() or process_tasks() invocation. This allows to avoid extra delays and thread switches when possible.

Processing priority

When process_frame_and_tasks() is called, it increments pending_frame_ atomic and blocks on pipeline_mutex_. The non-zero atomic indicates that a frame needs to be processed as soon as possible and other methods should give it a way.

When process_frame_and_tasks() is called, it also cancels any scheduled asynchronous task processing before starting processing the frame and tasks. Before exiting, process_frame_and_tasks() checks if there are still some pending tasks and if necessary, schedules asynchronous execution again.

When process_tasks() is processing asynchronous tasks, but detects that process_frame_and_tasks() was invoked concurrently from another thread, it gives it a way and exits. process_frame_and_tasks() will process the frame and some of the remaning tasks, and if there are even more tasks remaining, it will invoke schedule_task_processing() to allow process_tasks() to continue.

When schedule() and process_tasks() want to invoke schedule_task_processing(), but detect that process_frame_and_tasks() was invoked concurrently from another thread, they give it a way and don't call schedule_task_processing(), assuming that process_frame_and_tasks() will either process all tasks or call schedule_task_processing() by itself.

Locking rules

pipeline_mutex_ protects the internal pipeline state. It should be acquired to process a frame or a task.

scheduler_mutex_ protects IPipelineTaskScheduler invocations. It should be acquired to schedule or cancel asycnrhonous task processing.

If pipeline_mutex_ is locked, it's guaranteed that the thread locking it will check pending tasks after unlocking the mutex and will either process them or scheduler asynchronous processing.

If scheduler_mutex_ is locked, it's guaranteed that the thread locking it will either schedule or cancel asynchronous task processing, depending on whether there are pending tasks and frames.

Lock-free operations

schedule() and process_tasks() methods are lock-free. Also, they're either completely wait-free or "mostly" wait-free (i.e. on the fast path), depending on the hardware architecture (see comments for core::MpscQueue).

In practice it means that when running concurrently with other PipelineLoop method invocations, they never block waiting for other threads, and usually even don't spin.

This is archived by using a lock-free queue for tasks, atomics for 32-bit counters, seqlocks for 64-bit counters (which are reduced to atomics on 64-bit CPUs), always using try_lock() for mutexes and delaying the work if the mutex can't be acquired, and using semaphores instead of condition variables for signaling (which don't require blocking on mutex, at least on modern plarforms; e.g. on glibc they're implemented using an atomic and a futex).

process_frame_and_tasks() is not lock-free because it has to acquire the pipeline mutex and can't delay its work. However, the precise task scheduling feature does it best to ensure that the pipeline mutex will be unlocked when process_frame_and_tasks() is invoked, thus in most cases it wont block or wait too.

This approach helps us with our global goal of making all inter-thread interactions mostly wait-free, so that one thread is never or almost never blocked when another thead is blocked, preempted, or busy.


PipelineLoop is covered with two groups of benchmarks:

  • bench_pipeline_loop_peak_load.cpp measures frame and task processing delays with or without task load and with or without precise task scheduling feature;
  • bench_pipeline_loop_contention.cpp measures scheduling times under different contention levels.

You can run them using "roc-bench-pipeline" command. For further details, see comments in the source code of the benchmarks.

Definition at line 193 of file pipeline_loop.h.

Constructor & Destructor Documentation

◆ PipelineLoop()

roc::pipeline::PipelineLoop::PipelineLoop ( IPipelineTaskScheduler scheduler,
const TaskConfig config,
const audio::SampleSpec sample_spec 


Member Function Documentation

◆ get_stats_ref()

const Stats& roc::pipeline::PipelineLoop::get_stats_ref ( ) const

Get task processing statistics. Returned object can't be accessed concurrently with other methods.

◆ num_pending_frames()

size_t roc::pipeline::PipelineLoop::num_pending_frames ( ) const

How much pending frames are there.

◆ num_pending_tasks()

size_t roc::pipeline::PipelineLoop::num_pending_tasks ( ) const

How much pending tasks are there.

◆ process_subframe_imp()

virtual bool roc::pipeline::PipelineLoop::process_subframe_imp ( audio::Frame frame)
protectedpure virtual

Process subframe.

◆ process_subframes_and_tasks()

bool roc::pipeline::PipelineLoop::process_subframes_and_tasks ( audio::Frame frame)

Split frame and process subframes and some of the enqueued tasks.

◆ process_task_imp()

virtual bool roc::pipeline::PipelineLoop::process_task_imp ( PipelineTask task)
protectedpure virtual

Process task.

◆ process_tasks()

void roc::pipeline::PipelineLoop::process_tasks ( )

Process some of the enqueued tasks, if any.

◆ schedule()

void roc::pipeline::PipelineLoop::schedule ( PipelineTask task,
IPipelineTaskCompleter completer 

Enqueue a task for asynchronous execution.

◆ schedule_and_wait()

bool roc::pipeline::PipelineLoop::schedule_and_wait ( PipelineTask task)

Enqueue a task for asynchronous execution and wait until it finishes.

false if the task fails.

◆ tid_imp()

virtual uint64_t roc::pipeline::PipelineLoop::tid_imp ( ) const
protectedpure virtual

Get current thread id.

◆ timestamp_imp()

virtual core::nanoseconds_t roc::pipeline::PipelineLoop::timestamp_imp ( ) const
protectedpure virtual

Get current time.

The documentation for this class was generated from the following file: