Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
network_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/network_loop.h
10//! @brief Network event loop thread.
11
12#ifndef ROC_NETIO_NETWORK_LOOP_H_
13#define ROC_NETIO_NETWORK_LOOP_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
19#include "roc_core/attributes.h"
20#include "roc_core/iarena.h"
21#include "roc_core/ipool.h"
22#include "roc_core/list.h"
23#include "roc_core/mpsc_queue.h"
25#include "roc_core/optional.h"
26#include "roc_core/semaphore.h"
27#include "roc_core/thread.h"
30#include "roc_netio/iconn.h"
36#include "roc_netio/resolver.h"
39#include "roc_netio/udp_port.h"
40#include "roc_packet/iwriter.h"
42
43namespace roc {
44namespace netio {
45
46//! Network event loop thread.
47//! @remarks
48//! This class is a task-based facade for the whole roc_netio module.
50 private ICloseHandler,
52 private core::Thread {
53public:
54 //! Opaque port handle.
55 typedef struct PortHandle* PortHandle;
56
57 //! Subclasses for specific tasks.
58 class Tasks {
59 public:
60 //! Add UDP datagram sender/receiver port.
61 class AddUdpPort : public NetworkTask {
62 public:
63 //! Set task parameters.
64 //! @remarks
65 //! Updates @p config with the actual bind address.
67
68 //! Get created port handle.
69 //! @pre
70 //! Should be called only after success() is true.
72
73 private:
74 friend class NetworkLoop;
75
76 UdpConfig* config_;
77 };
78
79 //! Start sending on UDP port.
80 class StartUdpSend : public NetworkTask {
81 public:
82 //! Set task parameters.
83 //! @remarks
84 //! get_outbound_writer() returns a writer for packets to be send. It may be
85 //! used from another thread. It doesn't block the caller
87
88 //! Get created writer for outbound packets.
89 //! @pre
90 //! Should be called only after success() is true.
92
93 private:
94 friend class NetworkLoop;
95
96 packet::IWriter* outbound_writer_;
97 };
98
99 //! Start receiving on UDP port.
100 class StartUdpRecv : public NetworkTask {
101 public:
102 //! Set task parameters.
103 //! @remarks
104 //! Received packets will be passed to @p inbound_writer.
105 //! It is invoked from network thread. It should not block the caller.
106 StartUdpRecv(PortHandle handle, packet::IWriter& inbound_writer);
107
108 private:
109 friend class NetworkLoop;
110
111 packet::IWriter* inbound_writer_;
112 };
113
114 //! Add TCP server port.
116 public:
117 //! Set task parameters.
118 //! @remarks
119 //! - Updates @p config with the actual bind address.
120 //! - Listens for incoming connections and passes new connections
121 //! to @p conn_acceptor. It should return handler that will be
122 //! notified when connection state changes.
124
125 //! Get created port handle.
126 //! @pre
127 //! Should be called only after success() is true.
129
130 private:
131 friend class NetworkLoop;
132
133 TcpServerConfig* config_;
134 IConnAcceptor* conn_acceptor_;
135 };
136
137 //! Add TCP client port.
139 public:
140 //! Set task parameters.
141 //! @remarks
142 //! - Updates @p config with the actual bind address.
143 //! - Notofies @p conn_handler when connection state changes.
145
146 //! Get created port handle.
147 //! @pre
148 //! Should be called only after success() is true.
150
151 private:
152 friend class NetworkLoop;
153
154 TcpClientConfig* config_;
155 IConnHandler* conn_handler_;
156 };
157
158 //! Remove port.
159 class RemovePort : public NetworkTask {
160 public:
161 //! Set task parameters.
163
164 private:
165 friend class NetworkLoop;
166 };
167
168 //! Resolve endpoint address.
170 public:
171 //! Set task parameters.
172 //! @remarks
173 //! Gets endpoint hostname, resolves it, and writes the resolved IP address
174 //! and the port from the endpoint to the resulting SocketAddr.
176
177 //! Get resolved address.
178 //! @pre
179 //! Should be called only after success() is true.
181
182 private:
183 friend class NetworkLoop;
184
185 ResolverRequest resolve_req_;
186 };
187 };
188
189 //! Initialize.
190 //! @remarks
191 //! Start background thread if the object was successfully constructed.
192 NetworkLoop(core::IPool& packet_pool, core::IPool& buffer_pool, core::IArena& arena);
193
194 //! Destroy. Stop all receivers and senders.
195 //! @remarks
196 //! Wait until background thread finishes.
197 virtual ~NetworkLoop();
198
199 //! Check if the object was successfully constructed.
200 bool is_valid() const;
201
202 //! Get number of receiver and sender ports.
203 size_t num_ports() const;
204
205 //! Enqueue a task for asynchronous execution and return.
206 //! The task should not be destroyed until the callback is called.
207 //! The @p completer will be invoked on event loop thread after the
208 //! task completes.
210
211 //! Enqueue a task for asynchronous execution and wait for its completion.
212 //! The task should not be destroyed until this method returns.
213 //! Should not be called from schedule() callback.
214 //! @returns
215 //! true if the task succeeded or false if it failed.
217
218private:
219 static void task_sem_cb_(uv_async_t* handle);
220 static void stop_sem_cb_(uv_async_t* handle);
221
222 virtual void handle_terminate_completed(IConn&, void*);
223 virtual void handle_close_completed(BasicPort&, void*);
224 virtual void handle_resolved(ResolverRequest& req);
225
226 virtual void run();
227
228 void process_pending_tasks_();
229 void finish_task_(NetworkTask&);
230
231 void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
232 NetworkTask* task);
233 AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
234 NetworkTask* task);
235 void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
236
237 void update_num_ports_();
238
239 void close_all_sems_();
240 void close_all_ports_();
241
242 void task_add_udp_port_(NetworkTask&);
243 void task_start_udp_send_(NetworkTask&);
244 void task_start_udp_recv_(NetworkTask&);
245 void task_add_tcp_server_(NetworkTask&);
246 void task_add_tcp_client_(NetworkTask&);
247 void task_remove_port_(NetworkTask&);
248 void task_resolve_endpoint_address_(NetworkTask&);
249
250 packet::PacketFactory packet_factory_;
251 core::IArena& arena_;
252
253 bool started_;
254
255 uv_loop_t loop_;
256 bool loop_initialized_;
257
258 uv_async_t stop_sem_;
259 bool stop_sem_initialized_;
260
261 uv_async_t task_sem_;
262 bool task_sem_initialized_;
263
265
266 Resolver resolver_;
267
268 core::List<BasicPort> open_ports_;
269 core::List<BasicPort> closing_ports_;
270
271 core::Atomic<int> num_open_ports_;
272};
273
274} // namespace netio
275} // namespace roc
276
277#endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition attributes.h:31
Base class for ports.
Network endpoint URI.
Memory arena interface.
Definition iarena.h:23
Memory pool interface.
Definition ipool.h:23
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Base class for thread objects.
Definition thread.h:27
Base class for ports.
Definition basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Connection interface.
Definition iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
Add UDP datagram sender/receiver port.
AddUdpPort(UdpConfig &config)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
RemovePort(PortHandle handle)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
StartUdpRecv(PortHandle handle, packet::IWriter &inbound_writer)
Set task parameters.
StartUdpSend(PortHandle handle)
Set task parameters.
packet::IWriter & get_outbound_writer() const
Get created writer for outbound packets.
Subclasses for specific tasks.
Network event loop thread.
bool is_valid() const
Check if the object was successfully constructed.
NetworkLoop(core::IPool &packet_pool, core::IPool &buffer_pool, core::IArena &arena)
Initialize.
size_t num_ports() const
Get number of receiver and sender ports.
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Base class for network loop tasks.
Hostname resolver.
Definition resolver.h:25
Packet writer interface.
Definition iwriter.h:23
Memory arena interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Memory pool interface.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP port parameters.
Definition udp_port.h:32
TCP connection.
Thread.
UDP port.