Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
network_loop.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_netio/target_libuv/roc_netio/network_loop.h
10 //! @brief Network event loop thread.
11 
12 #ifndef ROC_NETIO_NETWORK_LOOP_H_
13 #define ROC_NETIO_NETWORK_LOOP_H_
14 
15 #include <uv.h>
16 
18 #include "roc_core/atomic.h"
19 #include "roc_core/attributes.h"
20 #include "roc_core/iarena.h"
21 #include "roc_core/ipool.h"
22 #include "roc_core/list.h"
23 #include "roc_core/mpsc_queue.h"
25 #include "roc_core/optional.h"
26 #include "roc_core/semaphore.h"
27 #include "roc_core/thread.h"
28 #include "roc_netio/basic_port.h"
30 #include "roc_netio/iconn.h"
35 #include "roc_netio/network_task.h"
36 #include "roc_netio/resolver.h"
39 #include "roc_netio/udp_port.h"
40 #include "roc_packet/iwriter.h"
42 
43 namespace roc {
44 namespace netio {
45 
46 //! Network event loop thread.
47 //! @remarks
48 //! This class is a task-based facade for the whole roc_netio module.
50  private ICloseHandler,
52  private core::Thread {
53 public:
54  //! Opaque port handle.
55  typedef struct PortHandle* PortHandle;
56 
57  //! Subclasses for specific tasks.
58  class Tasks {
59  public:
60  //! Add UDP datagram sender/receiver port.
61  class AddUdpPort : public NetworkTask {
62  public:
63  //! Set task parameters.
64  //! @remarks
65  //! Updates @p config with the actual bind address.
67 
68  //! Get created port handle.
69  //! @pre
70  //! Should be called only after success() is true.
72 
73  private:
74  friend class NetworkLoop;
75 
76  UdpConfig* config_;
77  };
78 
79  //! Start sending on UDP port.
80  class StartUdpSend : public NetworkTask {
81  public:
82  //! Set task parameters.
83  //! @remarks
84  //! get_outbound_writer() returns a writer for packets to be send. It may be
85  //! used from another thread. It doesn't block the caller
87 
88  //! Get created writer for outbound packets.
89  //! @pre
90  //! Should be called only after success() is true.
92 
93  private:
94  friend class NetworkLoop;
95 
96  packet::IWriter* outbound_writer_;
97  };
98 
99  //! Start receiving on UDP port.
100  class StartUdpRecv : public NetworkTask {
101  public:
102  //! Set task parameters.
103  //! @remarks
104  //! Received packets will be passed to @p inbound_writer.
105  //! It is invoked from network thread. It should not block the caller.
106  StartUdpRecv(PortHandle handle, packet::IWriter& inbound_writer);
107 
108  private:
109  friend class NetworkLoop;
110 
111  packet::IWriter* inbound_writer_;
112  };
113 
114  //! Add TCP server port.
115  class AddTcpServerPort : public NetworkTask {
116  public:
117  //! Set task parameters.
118  //! @remarks
119  //! - Updates @p config with the actual bind address.
120  //! - Listens for incoming connections and passes new connections
121  //! to @p conn_acceptor. It should return handler that will be
122  //! notified when connection state changes.
124 
125  //! Get created port handle.
126  //! @pre
127  //! Should be called only after success() is true.
129 
130  private:
131  friend class NetworkLoop;
132 
133  TcpServerConfig* config_;
134  IConnAcceptor* conn_acceptor_;
135  };
136 
137  //! Add TCP client port.
138  class AddTcpClientPort : public NetworkTask {
139  public:
140  //! Set task parameters.
141  //! @remarks
142  //! - Updates @p config with the actual bind address.
143  //! - Notofies @p conn_handler when connection state changes.
145 
146  //! Get created port handle.
147  //! @pre
148  //! Should be called only after success() is true.
150 
151  private:
152  friend class NetworkLoop;
153 
154  TcpClientConfig* config_;
155  IConnHandler* conn_handler_;
156  };
157 
158  //! Remove port.
159  class RemovePort : public NetworkTask {
160  public:
161  //! Set task parameters.
163 
164  private:
165  friend class NetworkLoop;
166  };
167 
168  //! Resolve endpoint address.
170  public:
171  //! Set task parameters.
172  //! @remarks
173  //! Gets endpoint hostname, resolves it, and writes the resolved IP address
174  //! and the port from the endpoint to the resulting SocketAddr.
176 
177  //! Get resolved address.
178  //! @pre
179  //! Should be called only after success() is true.
181 
182  private:
183  friend class NetworkLoop;
184 
185  ResolverRequest resolve_req_;
186  };
187  };
188 
189  //! Initialize.
190  //! @remarks
191  //! Start background thread if the object was successfully constructed.
192  NetworkLoop(core::IPool& packet_pool, core::IPool& buffer_pool, core::IArena& arena);
193 
194  //! Destroy. Stop all receivers and senders.
195  //! @remarks
196  //! Wait until background thread finishes.
197  virtual ~NetworkLoop();
198 
199  //! Check if the object was successfully constructed.
200  bool is_valid() const;
201 
202  //! Get number of receiver and sender ports.
203  size_t num_ports() const;
204 
205  //! Enqueue a task for asynchronous execution and return.
206  //! The task should not be destroyed until the callback is called.
207  //! The @p completer will be invoked on event loop thread after the
208  //! task completes.
209  void schedule(NetworkTask& task, INetworkTaskCompleter& completer);
210 
211  //! Enqueue a task for asynchronous execution and wait for its completion.
212  //! The task should not be destroyed until this method returns.
213  //! Should not be called from schedule() callback.
214  //! @returns
215  //! true if the task succeeded or false if it failed.
217 
218 private:
219  static void task_sem_cb_(uv_async_t* handle);
220  static void stop_sem_cb_(uv_async_t* handle);
221 
222  virtual void handle_terminate_completed(IConn&, void*);
223  virtual void handle_close_completed(BasicPort&, void*);
224  virtual void handle_resolved(ResolverRequest& req);
225 
226  virtual void run();
227 
228  void process_pending_tasks_();
229  void finish_task_(NetworkTask&);
230 
231  void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
232  NetworkTask* task);
233  AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
234  NetworkTask* task);
235  void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
236 
237  void update_num_ports_();
238 
239  void close_all_sems_();
240  void close_all_ports_();
241 
242  void task_add_udp_port_(NetworkTask&);
243  void task_start_udp_send_(NetworkTask&);
244  void task_start_udp_recv_(NetworkTask&);
245  void task_add_tcp_server_(NetworkTask&);
246  void task_add_tcp_client_(NetworkTask&);
247  void task_remove_port_(NetworkTask&);
248  void task_resolve_endpoint_address_(NetworkTask&);
249 
250  packet::PacketFactory packet_factory_;
251  core::IArena& arena_;
252 
253  bool started_;
254 
255  uv_loop_t loop_;
256  bool loop_initialized_;
257 
258  uv_async_t stop_sem_;
259  bool stop_sem_initialized_;
260 
261  uv_async_t task_sem_;
262  bool task_sem_initialized_;
263 
265 
266  Resolver resolver_;
267 
268  core::List<BasicPort> open_ports_;
269  core::List<BasicPort> closing_ports_;
270 
271  core::Atomic<int> num_open_ports_;
272 };
273 
274 } // namespace netio
275 } // namespace roc
276 
277 #endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Base class for ports.
Network endpoint URI.
Definition: endpoint_uri.h:27
Socket address.
Definition: socket_addr.h:26
Memory arena interface.
Definition: iarena.h:23
Memory pool interface.
Definition: ipool.h:23
Intrusive doubly-linked list.
Definition: list.h:40
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:45
Shared ownership intrusive pointer.
Definition: shared_ptr.h:32
Base class for thread objects.
Definition: thread.h:27
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Definition: iconn_handler.h:60
Connection interface.
Definition: iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
Add UDP datagram sender/receiver port.
Definition: network_loop.h:61
AddUdpPort(UdpConfig &config)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
RemovePort(PortHandle handle)
Set task parameters.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
StartUdpRecv(PortHandle handle, packet::IWriter &inbound_writer)
Set task parameters.
packet::IWriter & get_outbound_writer() const
Get created writer for outbound packets.
StartUdpSend(PortHandle handle)
Set task parameters.
Subclasses for specific tasks.
Definition: network_loop.h:58
Network event loop thread.
Definition: network_loop.h:52
bool is_valid() const
Check if the object was successfully constructed.
NetworkLoop(core::IPool &packet_pool, core::IPool &buffer_pool, core::IArena &arena)
Initialize.
size_t num_ports() const
Get number of receiver and sender ports.
ROC_ATTR_NODISCARD bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Definition: network_loop.h:55
Base class for network loop tasks.
Definition: network_task.h:29
Hostname resolver.
Definition: resolver.h:25
Packet writer interface.
Definition: iwriter.h:23
Memory arena interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Memory pool interface.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP port parameters.
Definition: udp_port.h:32
TCP connection.
Thread.
UDP port.