Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
control_task_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_ctl/control_task_queue.h
10 //! @brief Control task queue.
11 
12 #ifndef ROC_CTL_CONTROL_TASK_QUEUE_H_
13 #define ROC_CTL_CONTROL_TASK_QUEUE_H_
14 
15 #include "roc_core/atomic.h"
16 #include "roc_core/list.h"
17 #include "roc_core/mpsc_queue.h"
18 #include "roc_core/mutex.h"
19 #include "roc_core/thread.h"
20 #include "roc_core/time.h"
21 #include "roc_core/timer.h"
22 #include "roc_ctl/control_task.h"
25 
26 namespace roc {
27 namespace ctl {
28 
29 //! Control task queue.
30 //!
31 //! This class implements a thread-safe task queue, allowing lock-free scheduling
32 //! of tasks for immediate or delayed execution on the background thread, as well
33 //! as lock-free task cancellation and re-scheduling (changing deadline).
34 //!
35 //! It also supports tasks to be paused and resumed. Task resuming is lock-free too.
36 //!
37 //! Note that those operations are lock-free only if core::Timer::try_set_deadline()
38 //! is so, which however is true on modern platforms.
39 //!
40 //! In the current implementation, priority is given to fast scheduling and cancellation
41 //! over the strict observance of the scheduling deadlines. In other words, during
42 //! contention or peak load, scheduling and cancellation will be always fast, but task
43 //! execution may be delayed.
44 //!
45 //! This design was considered acceptable because the actual users of control task queue
46 //! are more sensitive to delays than the tasks they schedule. The task queue is used by
47 //! network and pipeline threads, which should never block and use the task queue to
48 //! schedule low-priority delayed work.
49 //!
50 //! The implementation uses three queues internally:
51 //!
52 //! - ready_queue_ - a lock-free queue of tasks of three kinds:
53 //! - tasks to be resumed after pause (flags_ & FlagResumed != 0)
54 //! - tasks to be executed as soon as possible (renewed_deadline_ == 0)
55 //! - tasks to be re-scheduled with another deadline (renewed_deadline_ > 0)
56 //! - tasks to be canceled (renewed_deadline_ < 0)
57 //!
58 //! - sleeping_queue_ - a sorted queue of tasks with non-zero deadline, scheduled for
59 //! execution in future; the task at the head has the smallest (nearest) deadline;
60 //!
61 //! - pause_queue_ - an unsorted queue to keep track of all currently paused tasks.
62 //!
63 //! task_mutex_ should be acquired to process tasks and/or to access sleeping_queue_
64 //! and pause_queue_, as well as non-atomic task fields.
65 //!
66 //! wakeup_timer_ (core::Timer) is used to set or wait for the next wakeup time of the
67 //! background thread. This time is set to zero when ready_queue_ is non-empty, otherwise
68 //! it is set to the deadline of the first task in sleeping_queue_ if it's non-empty, and
69 //! otherwise is set to infinity (-1). The timer allows to update the deadline
70 //! concurrently from any thread.
71 //!
72 //! When the task is scheduled, re-scheduled, or canceled, there are two ways to
73 //! complete the operation:
74 //!
75 //! - If the event loop thread is sleeping and the task_mutex_ is free, we can acquire
76 //! the mutex and complete the operation in-place by manipulating sleeping_queue_
77 //! under the mutex, without bothering event loop thread. This can be done only if
78 //! we're changing task scheduling and not going to execute it right now.
79 //!
80 //! - Otherwise, we push the task to ready_queue_ (which has lock-free push), set
81 //! the timer wakeup time to zero (to ensure that the event loop thread wont go to
82 //! sleep), and return, leaving the completion of the operarion to the event loop
83 //! thread. The event loop thread will fetch the task from ready_queue_ soon and
84 //! complete the operation by manipulating the sleeping_queue_.
85 //!
86 //! The current task state is defined by its atomic field "state_". Various task queue
87 //! operations move task from one state to another. The move is always performed using
88 //! atomic CAS or exchange to handle concurrent lock-free updates correctly.
89 //!
90 //! There is also "flags_" field that provides additional information about task that is
91 //! preserved across transitions between states; for example that task is being resumed.
92 //!
93 //! Here are some example flows of the task states:
94 //! @code
95 //! schedule():
96 //! StateCompleted -> StateReady
97 //! -> StateProcessing -> StateCompleting -> StateCompleted
98 //!
99 //! schedule_at():
100 //! StateCompleted -> StateReady
101 //! -> StateSleeping
102 //! -> StateProcessing -> StateCompleting -> StateCompleted
103 //!
104 //! resume():
105 //! StateSleeping -> StateReady
106 //! -> StateProcessing -> StateCompleting -> StateCompleted
107 //!
108 //! async_cancel():
109 //! StateSleeping -> StateReady
110 //! -> StateCancelling -> StateCompleting -> StateCompleted
111 //! @endcode
112 //!
113 //! The meaning of the states is the following:
114 //! - StateReady: task is added to the ready queue for execution or renewal,
115 //! or probably is currently being renewed in-place
116 //! - StateSleeping: task renewal is complete and the task was put into the sleeping
117 //! queue to wait its deadline, or to paused queue to wait resume
118 //! - StateCancelling: task renewal is complete and the task is being canceled
119 //! because it was put to ready queue for cancellation
120 //! - StateProcessing: task is being processed after fetching it either from ready
121 //! queue (if it was put there for execution) or sleeping queue
122 //! - StateCompleting: task processing is complete and the task is being completed
123 //! - StateCompleted: task is completed and is not used anywhere; it may be safely
124 //! destroyed or reused; this is also the initial task state
126 public:
127  //! Initialize.
128  //! @remarks
129  //! Starts background thread.
131 
132  //! Destroy.
133  //! @remarks
134  //! stop_and_wait() should be called before destructor.
135  virtual ~ControlTaskQueue();
136 
137  //! Check if the object was successfully constructed.
138  bool is_valid() const;
139 
140  //! Enqueue a task for asynchronous execution as soon as possible.
141  //!
142  //! This is like schedule_at(), but the deadline is "as soon as possible".
143  void schedule(ControlTask& task,
144  IControlTaskExecutor& executor,
145  IControlTaskCompleter* completer);
146 
147  //! Enqueue a task for asynchronous execution at given point of time.
148  //!
149  //! - If the task is already completed, it's scheduled with given deadline.
150  //! - If the task is sleeping and waiting for deadline, it's deadline is updated.
151  //! - If the task is in processing, completion or cancellation phase, it's scheduled
152  //! to be executed again after completion or cancellation finishes.
153  //! - If the task is paused, re-scheduling is postponed until task resumes.
154  //!
155  //! @p deadline should be in the same domain as core::timestamp().
156  //! It can't be negative. Zero deadline means "execute as soon as possible".
157  //!
158  //! The @p executor is used to invoke the task function. It allows to implement
159  //! tasks in different classes. If a class T wants to implement tasks, it should
160  //! inherit ControlTaskExecutor<T>.
161  //!
162  //! If @p completer is present, the task should not be destroyed until completer is
163  //! invoked. The completer is invoked on event loop thread after once and only once,
164  //! after the task completes or is canceled. Completer should never block.
165  //!
166  //! The event loop thread assumes that the task may be destroyed right after it is
167  //! completed and it's completer is called (if it's present), and don't touch task
168  //! after this, unless the user explicitly reschedules the task.
170  core::nanoseconds_t deadline,
171  IControlTaskExecutor& executor,
172  IControlTaskCompleter* completer);
173 
174  //! Resume task if it's paused.
175  //!
176  //! - If the task is paused, schedule it for execution.
177  //! - If the task is being processed right now (i.e. it's executing or will be
178  //! executing very soon), then postpone decision until task execution ends. After
179  //! the task execution, if the task asked to pause, then immediately resume it.
180  //! - Otherwise, do nothing.
181  //!
182  //! If resume is called one or multiple times before task execution, those calls
183  //! are ignored. Only calls made during or after task execution are honored, and
184  //! only if the task execution leaved task in paused state.
185  //!
186  //! Subsequent resume calls between task executions are collapsed into one; even if
187  //! resume was called multiple after task paused and before it's executed again,
188  //! next pause will need a new resume call.
189  void resume(ControlTask& task);
190 
191  //! Try to cancel scheduled task execution, if it's not executed yet.
192  //!
193  //! - If the task is already completed or is being completed or canceled, do nothing.
194  //! - If the task is sleeping or paused, cancel task execution.
195  //! - If the task is being processed right now (i.e. it's executing or will be
196  //! executing very soon), then postpone decision until task execution ends. After
197  //! the task execution, if the task asked to pause or continue, then cancellation
198  //! request is fulfilled and the task is canceled; otherwise cancellation request
199  //! is ignored and the task is completed normally.
200  //!
201  //! When the task is being canceled instead of completed, if it has completer, the
202  //! completer is invoked.
204 
205  //! Wait until the task is completed.
206  //!
207  //! Blocks until the task is completed or canceled.
208  //! Does NOT wait until the task completer is called.
209  //!
210  //! Can not be called concurrently for the same task (will cause crash).
211  //! Can not be called from the task completion handler (will cause deadlock).
212  //!
213  //! If this method is called, the task should not be destroyed until this method
214  //! returns (as well as until the completer is invoked, if it's present).
215  void wait(ControlTask& task);
216 
217  //! Stop thread and wait until it terminates.
218  //!
219  //! All tasks should be completed before calling stop_and_wait().
220  //! stop_and_wait() should be called before calling destructor.
222 
223 private:
224  virtual void run();
225 
226  void start_thread_();
227  void stop_thread_();
228 
229  void setup_task_(ControlTask& task,
230  IControlTaskExecutor& executor,
231  IControlTaskCompleter* completer);
232 
233  void request_resume_(ControlTask& task);
234  void request_renew_(ControlTask& task, core::nanoseconds_t deadline);
235  void request_renew_guarded_(ControlTask& task, core::nanoseconds_t deadline);
236 
237  bool try_renew_inplace_(ControlTask& task,
238  core::nanoseconds_t deadline,
239  core::seqlock_version_t version);
240 
241  ControlTask::State
242  renew_state_(ControlTask& task, unsigned task_flags, core::nanoseconds_t deadline);
243  bool renew_scheduling_(ControlTask& task,
244  unsigned task_flags,
245  core::nanoseconds_t deadline,
246  core::seqlock_version_t version);
247 
248  bool reschedule_task_(ControlTask& task,
249  core::nanoseconds_t deadline,
250  core::seqlock_version_t version);
251  void cancel_task_(ControlTask& task, core::seqlock_version_t version);
252 
253  void reborn_task_(ControlTask& task, ControlTask::State from_state);
254  void pause_task_(ControlTask& task, ControlTask::State from_state);
255  void
256  complete_task_(ControlTask& task, unsigned task_flags, ControlTask::State from_state);
257  void wait_task_(ControlTask& task);
258 
259  void execute_task_(ControlTask& task);
260 
261  bool process_tasks_();
262 
263  ControlTask* fetch_task_();
264  ControlTask* fetch_ready_task_();
265  ControlTask* fetch_sleeping_task_();
266 
267  void insert_sleeping_task_(ControlTask& task);
268  void remove_sleeping_task_(ControlTask& task);
269 
270  core::nanoseconds_t update_wakeup_timer_();
271 
272  bool started_;
273  core::Atomic<int> stop_;
274  bool fetch_ready_;
275 
276  core::Atomic<int> ready_queue_size_;
280 
281  core::Timer wakeup_timer_;
282  core::Mutex task_mutex_;
283 };
284 
285 } // namespace ctl
286 } // namespace roc
287 
288 #endif // ROC_CTL_CONTROL_TASK_QUEUE_H_
Atomic.
Intrusive doubly-linked list.
Definition: list.h:40
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:45
Mutex.
Definition: mutex.h:31
Base class for thread objects.
Definition: thread.h:27
Thread-safe timer.
Definition: timer.h:25
void wait(ControlTask &task)
Wait until the task is completed.
void schedule(ControlTask &task, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution as soon as possible.
virtual ~ControlTaskQueue()
Destroy.
void resume(ControlTask &task)
Resume task if it's paused.
void async_cancel(ControlTask &task)
Try to cancel scheduled task execution, if it's not executed yet.
bool is_valid() const
Check if the object was successfully constructed.
void schedule_at(ControlTask &task, core::nanoseconds_t deadline, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution at given point of time.
void stop_and_wait()
Stop thread and wait until it terminates.
Base class for control tasks.
Definition: control_task.h:53
Control task completion handler.
Control task executor interface.
Control task.
Control task executor.
Control task completion handler.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
Mutex.
uint32_t seqlock_version_t
Type for holding seqlock value version. Version is changed each value update. May wrap.
Definition: seqlock_impl.h:23
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
Root namespace.
Thread.
Time definitions.
Thread-safe timer.