Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
control_task_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_ctl/control_task_queue.h
10//! @brief Control task queue.
11
12#ifndef ROC_CTL_CONTROL_TASK_QUEUE_H_
13#define ROC_CTL_CONTROL_TASK_QUEUE_H_
14
15#include "roc_core/atomic.h"
16#include "roc_core/list.h"
17#include "roc_core/mpsc_queue.h"
18#include "roc_core/mutex.h"
19#include "roc_core/thread.h"
20#include "roc_core/time.h"
21#include "roc_core/timer.h"
25
26namespace roc {
27namespace ctl {
28
29//! Control task queue.
30//!
31//! This class implements a thread-safe task queue, allowing lock-free scheduling
32//! of tasks for immediate or delayed execution on the background thread, as well
33//! as lock-free task cancellation and re-scheduling (changing deadline).
34//!
35//! It also supports tasks to be paused and resumed. Task resuming is lock-free too.
36//!
37//! Note that those operations are lock-free only if core::Timer::try_set_deadline()
38//! is so, which however is true on modern platforms.
39//!
40//! In the current implementation, priority is given to fast scheduling and cancellation
41//! over the strict observance of the scheduling deadlines. In other words, during
42//! contention or peak load, scheduling and cancellation will be always fast, but task
43//! execution may be delayed.
44//!
45//! This design was considered acceptable because the actual users of control task queue
46//! are more sensitive to delays than the tasks they schedule. The task queue is used by
47//! network and pipeline threads, which should never block and use the task queue to
48//! schedule low-priority delayed work.
49//!
50//! The implementation uses three queues internally:
51//!
52//! - ready_queue_ - a lock-free queue of tasks of three kinds:
53//! - tasks to be resumed after pause (flags_ & FlagResumed != 0)
54//! - tasks to be executed as soon as possible (renewed_deadline_ == 0)
55//! - tasks to be re-scheduled with another deadline (renewed_deadline_ > 0)
56//! - tasks to be canceled (renewed_deadline_ < 0)
57//!
58//! - sleeping_queue_ - a sorted queue of tasks with non-zero deadline, scheduled for
59//! execution in future; the task at the head has the smallest (nearest) deadline;
60//!
61//! - pause_queue_ - an unsorted queue to keep track of all currently paused tasks.
62//!
63//! task_mutex_ should be acquired to process tasks and/or to access sleeping_queue_
64//! and pause_queue_, as well as non-atomic task fields.
65//!
66//! wakeup_timer_ (core::Timer) is used to set or wait for the next wakeup time of the
67//! background thread. This time is set to zero when ready_queue_ is non-empty, otherwise
68//! it is set to the deadline of the first task in sleeping_queue_ if it's non-empty, and
69//! otherwise is set to infinity (-1). The timer allows to update the deadline
70//! concurrently from any thread.
71//!
72//! When the task is scheduled, re-scheduled, or canceled, there are two ways to
73//! complete the operation:
74//!
75//! - If the event loop thread is sleeping and the task_mutex_ is free, we can acquire
76//! the mutex and complete the operation in-place by manipulating sleeping_queue_
77//! under the mutex, without bothering event loop thread. This can be done only if
78//! we're changing task scheduling and not going to execute it right now.
79//!
80//! - Otherwise, we push the task to ready_queue_ (which has lock-free push), set
81//! the timer wakeup time to zero (to ensure that the event loop thread wont go to
82//! sleep), and return, leaving the completion of the operarion to the event loop
83//! thread. The event loop thread will fetch the task from ready_queue_ soon and
84//! complete the operation by manipulating the sleeping_queue_.
85//!
86//! The current task state is defined by its atomic field "state_". Various task queue
87//! operations move task from one state to another. The move is always performed using
88//! atomic CAS or exchange to handle concurrent lock-free updates correctly.
89//!
90//! There is also "flags_" field that provides additional information about task that is
91//! preserved across transitions between states; for example that task is being resumed.
92//!
93//! Here are some example flows of the task states:
94//! @code
95//! schedule():
96//! StateCompleted -> StateReady
97//! -> StateProcessing -> StateCompleting -> StateCompleted
98//!
99//! schedule_at():
100//! StateCompleted -> StateReady
101//! -> StateSleeping
102//! -> StateProcessing -> StateCompleting -> StateCompleted
103//!
104//! resume():
105//! StateSleeping -> StateReady
106//! -> StateProcessing -> StateCompleting -> StateCompleted
107//!
108//! async_cancel():
109//! StateSleeping -> StateReady
110//! -> StateCancelling -> StateCompleting -> StateCompleted
111//! @endcode
112//!
113//! The meaning of the states is the following:
114//! - StateReady: task is added to the ready queue for execution or renewal,
115//! or probably is currently being renewed in-place
116//! - StateSleeping: task renewal is complete and the task was put into the sleeping
117//! queue to wait its deadline, or to paused queue to wait resume
118//! - StateCancelling: task renewal is complete and the task is being canceled
119//! because it was put to ready queue for cancellation
120//! - StateProcessing: task is being processed after fetching it either from ready
121//! queue (if it was put there for execution) or sleeping queue
122//! - StateCompleting: task processing is complete and the task is being completed
123//! - StateCompleted: task is completed and is not used anywhere; it may be safely
124//! destroyed or reused; this is also the initial task state
126public:
127 //! Initialize.
128 //! @remarks
129 //! Starts background thread.
131
132 //! Destroy.
133 //! @remarks
134 //! stop_and_wait() should be called before destructor.
136
137 //! Check if the object was successfully constructed.
138 bool is_valid() const;
139
140 //! Enqueue a task for asynchronous execution as soon as possible.
141 //!
142 //! This is like schedule_at(), but the deadline is "as soon as possible".
144 IControlTaskExecutor& executor,
145 IControlTaskCompleter* completer);
146
147 //! Enqueue a task for asynchronous execution at given point of time.
148 //!
149 //! - If the task is already completed, it's scheduled with given deadline.
150 //! - If the task is sleeping and waiting for deadline, it's deadline is updated.
151 //! - If the task is in processing, completion or cancellation phase, it's scheduled
152 //! to be executed again after completion or cancellation finishes.
153 //! - If the task is paused, re-scheduling is postponed until task resumes.
154 //!
155 //! @p deadline should be in the same domain as core::timestamp().
156 //! It can't be negative. Zero deadline means "execute as soon as possible".
157 //!
158 //! The @p executor is used to invoke the task function. It allows to implement
159 //! tasks in different classes. If a class T wants to implement tasks, it should
160 //! inherit ControlTaskExecutor<T>.
161 //!
162 //! If @p completer is present, the task should not be destroyed until completer is
163 //! invoked. The completer is invoked on event loop thread after once and only once,
164 //! after the task completes or is canceled. Completer should never block.
165 //!
166 //! The event loop thread assumes that the task may be destroyed right after it is
167 //! completed and it's completer is called (if it's present), and don't touch task
168 //! after this, unless the user explicitly reschedules the task.
170 core::nanoseconds_t deadline,
171 IControlTaskExecutor& executor,
172 IControlTaskCompleter* completer);
173
174 //! Resume task if it's paused.
175 //!
176 //! - If the task is paused, schedule it for execution.
177 //! - If the task is being processed right now (i.e. it's executing or will be
178 //! executing very soon), then postpone decision until task execution ends. After
179 //! the task execution, if the task asked to pause, then immediately resume it.
180 //! - Otherwise, do nothing.
181 //!
182 //! If resume is called one or multiple times before task execution, those calls
183 //! are ignored. Only calls made during or after task execution are honored, and
184 //! only if the task execution leaved task in paused state.
185 //!
186 //! Subsequent resume calls between task executions are collapsed into one; even if
187 //! resume was called multiple after task paused and before it's executed again,
188 //! next pause will need a new resume call.
189 void resume(ControlTask& task);
190
191 //! Try to cancel scheduled task execution, if it's not executed yet.
192 //!
193 //! - If the task is already completed or is being completed or canceled, do nothing.
194 //! - If the task is sleeping or paused, cancel task execution.
195 //! - If the task is being processed right now (i.e. it's executing or will be
196 //! executing very soon), then postpone decision until task execution ends. After
197 //! the task execution, if the task asked to pause or continue, then cancellation
198 //! request is fulfilled and the task is canceled; otherwise cancellation request
199 //! is ignored and the task is completed normally.
200 //!
201 //! When the task is being canceled instead of completed, if it has completer, the
202 //! completer is invoked.
204
205 //! Wait until the task is completed.
206 //!
207 //! Blocks until the task is completed or canceled.
208 //! Does NOT wait until the task completer is called.
209 //!
210 //! Can not be called concurrently for the same task (will cause crash).
211 //! Can not be called from the task completion handler (will cause deadlock).
212 //!
213 //! If this method is called, the task should not be destroyed until this method
214 //! returns (as well as until the completer is invoked, if it's present).
215 void wait(ControlTask& task);
216
217 //! Stop thread and wait until it terminates.
218 //!
219 //! All tasks should be completed before calling stop_and_wait().
220 //! stop_and_wait() should be called before calling destructor.
222
223private:
224 virtual void run();
225
226 void start_thread_();
227 void stop_thread_();
228
229 void setup_task_(ControlTask& task,
230 IControlTaskExecutor& executor,
231 IControlTaskCompleter* completer);
232
233 void request_resume_(ControlTask& task);
234 void request_renew_(ControlTask& task, core::nanoseconds_t deadline);
235 void request_renew_guarded_(ControlTask& task, core::nanoseconds_t deadline);
236
237 bool try_renew_inplace_(ControlTask& task,
238 core::nanoseconds_t deadline,
240
241 ControlTask::State
242 renew_state_(ControlTask& task, unsigned task_flags, core::nanoseconds_t deadline);
243 bool renew_scheduling_(ControlTask& task,
244 unsigned task_flags,
245 core::nanoseconds_t deadline,
247
248 bool reschedule_task_(ControlTask& task,
249 core::nanoseconds_t deadline,
251 void cancel_task_(ControlTask& task, core::seqlock_version_t version);
252
253 void reborn_task_(ControlTask& task, ControlTask::State from_state);
254 void pause_task_(ControlTask& task, ControlTask::State from_state);
255 void
256 complete_task_(ControlTask& task, unsigned task_flags, ControlTask::State from_state);
257 void wait_task_(ControlTask& task);
258
259 void execute_task_(ControlTask& task);
260
261 bool process_tasks_();
262
263 ControlTask* fetch_task_();
264 ControlTask* fetch_ready_task_();
265 ControlTask* fetch_sleeping_task_();
266
267 void insert_sleeping_task_(ControlTask& task);
268 void remove_sleeping_task_(ControlTask& task);
269
270 core::nanoseconds_t update_wakeup_timer_();
271
272 bool started_;
273 core::Atomic<int> stop_;
274 bool fetch_ready_;
275
276 core::Atomic<int> ready_queue_size_;
280
281 core::Timer wakeup_timer_;
282 core::Mutex task_mutex_;
283};
284
285} // namespace ctl
286} // namespace roc
287
288#endif // ROC_CTL_CONTROL_TASK_QUEUE_H_
Atomic.
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Base class for thread objects.
Definition thread.h:27
Thread-safe timer.
Definition timer.h:25
void wait(ControlTask &task)
Wait until the task is completed.
void schedule(ControlTask &task, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution as soon as possible.
virtual ~ControlTaskQueue()
Destroy.
void resume(ControlTask &task)
Resume task if it's paused.
void async_cancel(ControlTask &task)
Try to cancel scheduled task execution, if it's not executed yet.
bool is_valid() const
Check if the object was successfully constructed.
void schedule_at(ControlTask &task, core::nanoseconds_t deadline, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution at given point of time.
void stop_and_wait()
Stop thread and wait until it terminates.
Base class for control tasks.
Control task completion handler.
Control task executor interface.
Control task.
Control task executor.
Control task completion handler.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
Mutex.
uint32_t seqlock_version_t
Type for holding seqlock value version. Version is changed each value update. May wrap.
int64_t nanoseconds_t
Nanoseconds.
Definition time.h:58
Root namespace.
Thread.
Time definitions.
Thread-safe timer.