Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
network_loop.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_netio/target_libuv/roc_netio/network_loop.h
10 //! @brief Network event loop thread.
11 
12 #ifndef ROC_NETIO_NETWORK_LOOP_H_
13 #define ROC_NETIO_NETWORK_LOOP_H_
14 
15 #include <uv.h>
16 
18 #include "roc_core/atomic.h"
19 #include "roc_core/attributes.h"
21 #include "roc_core/iarena.h"
22 #include "roc_core/list.h"
23 #include "roc_core/mpsc_queue.h"
25 #include "roc_core/optional.h"
26 #include "roc_core/semaphore.h"
27 #include "roc_core/thread.h"
28 #include "roc_netio/basic_port.h"
30 #include "roc_netio/iconn.h"
35 #include "roc_netio/network_task.h"
36 #include "roc_netio/resolver.h"
41 #include "roc_packet/iwriter.h"
43 
44 namespace roc {
45 namespace netio {
46 
47 //! Network event loop thread.
48 //! @remarks
49 //! This class is a task-based facade for the whole roc_netio module.
51  private ICloseHandler,
53  private core::Thread {
54 public:
55  //! Opaque port handle.
56  typedef struct PortHandle* PortHandle;
57 
58  //! Subclasses for specific tasks.
59  class Tasks {
60  public:
61  //! Add UDP datagram receiver port.
63  public:
64  //! Set task parameters.
65  //! @remarks
66  //! - Updates @p config with the actual bind address.
67  //! - Passes received packets to @p writer. It is called from network thread.
68  //! It should not block the caller.
70 
71  //! Get created port handle.
72  //! @pre
73  //! Should be called only if success() is true.
75 
76  private:
77  friend class NetworkLoop;
78 
79  UdpReceiverConfig* config_;
80  packet::IWriter* writer_;
81  };
82 
83  //! Add UDP datagram sender port.
84  class AddUdpSenderPort : public NetworkTask {
85  public:
86  //! Set task parameters.
87  //! @remarks
88  //! Updates @p config with the actual bind address.
90 
91  //! Get created port handle.
92  //! @pre
93  //! Should be called only if success() is true.
95 
96  //! Get created port writer;
97  //! @remarks
98  //! The writer can be used to send packets from the port. It may be called
99  //! from any thread. It will not block the caller.
100  //! @pre
101  //! Should be called only if success() is true.
103 
104  private:
105  friend class NetworkLoop;
106 
107  UdpSenderConfig* config_;
108  packet::IWriter* writer_;
109  };
110 
111  //! Add TCP server port.
112  class AddTcpServerPort : public NetworkTask {
113  public:
114  //! Set task parameters.
115  //! @remarks
116  //! - Updates @p config with the actual bind address.
117  //! - Listens for incoming connections and passes new connections
118  //! to @p conn_acceptor. It should return handler that will be
119  //! notified when connection state changes.
121 
122  //! Get created port handle.
123  //! @pre
124  //! Should be called only if success() is true.
126 
127  private:
128  friend class NetworkLoop;
129 
130  TcpServerConfig* config_;
131  IConnAcceptor* conn_acceptor_;
132  };
133 
134  //! Add TCP client port.
135  class AddTcpClientPort : public NetworkTask {
136  public:
137  //! Set task parameters.
138  //! @remarks
139  //! - Updates @p config with the actual bind address.
140  //! - Notofies @p conn_handler when connection state changes.
142 
143  //! Get created port handle.
144  //! @pre
145  //! Should be called only if success() is true.
147 
148  private:
149  friend class NetworkLoop;
150 
151  TcpClientConfig* config_;
152  IConnHandler* conn_handler_;
153  };
154 
155  //! Remove port.
156  class RemovePort : public NetworkTask {
157  public:
158  //! Set task parameters.
160 
161  private:
162  friend class NetworkLoop;
163  };
164 
165  //! Resolve endpoint address.
167  public:
168  //! Set task parameters.
169  //! @remarks
170  //! Gets endpoint hostname, resolves it, and writes the resolved IP address
171  //! and the port from the endpoint to the resulting SocketAddr.
173 
174  //! Get resolved address.
175  //! @pre
176  //! Should be called only if success() is true.
178 
179  private:
180  friend class NetworkLoop;
181 
182  ResolverRequest resolve_req_;
183  };
184  };
185 
186  //! Initialize.
187  //! @remarks
188  //! Start background thread if the object was successfully constructed.
190  core::BufferFactory<uint8_t>& buffer_factory,
191  core::IArena& arena);
192 
193  //! Destroy. Stop all receivers and senders.
194  //! @remarks
195  //! Wait until background thread finishes.
196  virtual ~NetworkLoop();
197 
198  //! Check if the object was successfully constructed.
199  bool is_valid() const;
200 
201  //! Get number of receiver and sender ports.
202  size_t num_ports() const;
203 
204  //! Enqueue a task for asynchronous execution and return.
205  //! The task should not be destroyed until the callback is called.
206  //! The @p completer will be invoked on event loop thread after the
207  //! task completes.
208  void schedule(NetworkTask& task, INetworkTaskCompleter& completer);
209 
210  //! Enqueue a task for asynchronous execution and wait for its completion.
211  //! The task should not be destroyed until this method returns.
212  //! Should not be called from schedule() callback.
213  //! @returns
214  //! true if the task succeeded or false if it failed.
216 
217 private:
218  static void task_sem_cb_(uv_async_t* handle);
219  static void stop_sem_cb_(uv_async_t* handle);
220 
221  virtual void handle_terminate_completed(IConn&, void*);
222  virtual void handle_close_completed(BasicPort&, void*);
223  virtual void handle_resolved(ResolverRequest& req);
224 
225  virtual void run();
226 
227  void process_pending_tasks_();
228  void finish_task_(NetworkTask&);
229 
230  void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
231  NetworkTask* task);
232  AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
233  NetworkTask* task);
234  void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
235 
236  void update_num_ports_();
237 
238  void close_all_sems_();
239  void close_all_ports_();
240 
241  void task_add_udp_receiver_(NetworkTask&);
242  void task_add_udp_sender_(NetworkTask&);
243  void task_remove_port_(NetworkTask&);
244  void task_add_tcp_server_(NetworkTask&);
245  void task_add_tcp_client_(NetworkTask&);
246  void task_resolve_endpoint_address_(NetworkTask&);
247 
248  packet::PacketFactory& packet_factory_;
249  core::BufferFactory<uint8_t>& buffer_factory_;
250  core::IArena& arena_;
251 
252  bool started_;
253 
254  uv_loop_t loop_;
255  bool loop_initialized_;
256 
257  uv_async_t stop_sem_;
258  bool stop_sem_initialized_;
259 
260  uv_async_t task_sem_;
261  bool task_sem_initialized_;
262 
264 
265  Resolver resolver_;
266 
267  core::List<BasicPort> open_ports_;
268  core::List<BasicPort> closing_ports_;
269 
270  core::Atomic<int> num_open_ports_;
271 };
272 
273 } // namespace netio
274 } // namespace roc
275 
276 #endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Base class for ports.
Buffer factory.
Network endpoint URI.
Definition: endpoint_uri.h:27
Socket address.
Definition: socket_addr.h:26
Memory arena interface.
Definition: iarena.h:23
Intrusive doubly-linked list.
Definition: list.h:35
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Shared ownership intrusive pointer.
Definition: shared_ptr.h:32
Base class for thread objects.
Definition: thread.h:27
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Definition: iconn_handler.h:60
Connection interface.
Definition: iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
AddUdpReceiverPort(UdpReceiverConfig &config, packet::IWriter &writer)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
PortHandle get_handle() const
Get created port handle.
AddUdpSenderPort(UdpSenderConfig &config)
Set task parameters.
packet::IWriter * get_writer() const
Get created port writer;.
RemovePort(PortHandle handle)
Set task parameters.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
Subclasses for specific tasks.
Definition: network_loop.h:59
Network event loop thread.
Definition: network_loop.h:53
bool is_valid() const
Check if the object was successfully constructed.
size_t num_ports() const
Get number of receiver and sender ports.
ROC_ATTR_NODISCARD bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
NetworkLoop(packet::PacketFactory &packet_factory, core::BufferFactory< uint8_t > &buffer_factory, core::IArena &arena)
Initialize.
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Definition: network_loop.h:56
Base class for network loop tasks.
Definition: network_task.h:29
Hostname resolver.
Definition: resolver.h:25
Packet writer interface.
Definition: iwriter.h:23
Memory arena interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP receiver parameters.
UDP sender parameters.
TCP connection.
Thread.
UDP receiver.