Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
tcp_connection_port.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2019 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_netio/target_libuv/roc_netio/tcp_connection_port.h
10 //! @brief TCP connection.
11 
12 #ifndef ROC_NETIO_TCP_CONNECTION_PORT_H_
13 #define ROC_NETIO_TCP_CONNECTION_PORT_H_
14 
15 #include <uv.h>
16 
18 #include "roc_core/atomic.h"
19 #include "roc_core/mutex.h"
20 #include "roc_core/rate_limiter.h"
21 #include "roc_core/seqlock.h"
22 #include "roc_core/shared_ptr.h"
23 #include "roc_netio/basic_port.h"
25 #include "roc_netio/iconn.h"
28 #include "roc_netio/socket_ops.h"
29 
30 namespace roc {
31 namespace netio {
32 
33 //! TCP connection parameters.
35  //! Socket options.
37 };
38 
39 //! TCP connection parameters.
41  //! Local peer address to which we're bound.
43 
44  //! Remote peer address to which we're connected.
46 };
47 
48 //! TCP connection type.
50  //! Local peer is client, remote peer is server.
52 
53  //! Local peer is server, remote peer is client.
55 };
56 
57 //! TCP connection port.
58 //!
59 //! Public interfaces
60 //! -----------------
61 //!
62 //! There are two important interfaces related to TCP connection:
63 //! - IConn
64 //! - IConnHandler
65 //!
66 //! IConn is implemented by TcpConnectionPort. The interface allows to retrieve
67 //! connection parameters and perform non-blocking I/O.
68 //!
69 //! IConnHandler is implemented by users of netio module. This interface is notified
70 //! about connection state changes (e.g. connection is established) and availability
71 //! of I/O (e.g. connection becomes readable).
72 //!
73 //! Thread access
74 //! -------------
75 //!
76 //! Methods that are not part of IConn interface are called from within other netio
77 //! classes, e.g. TcpServerPort, on the network loop thread.
78 //!
79 //! Methods from the IConn interface are called by users of netio module from any
80 //! thread. They are thread-safe and lock-free.
81 //!
82 //! Connection type and lifecycle
83 //! -----------------------------
84 //!
85 //! Connection can be client-side (connect call) or server-side (accept call).
86 //!
87 //! Client-side connection is created using AddTcpClientPort task of the network
88 //! loop, and is closed using RemovePort task. Before removing the port, the user
89 //! must call async_terminate() and wait until termination is completed.
90 //!
91 //! Server-side connection is created by TcpServerPort when it receives a new
92 //! incoming connection. To remove it, the user should call async_terminate().
93 //! When termination is completed, TcpServerPort automatically closes and
94 //! destroys connection.
95 //!
96 //! Connection workflow
97 //! -------------------
98 //!
99 //! The following rules must be followed:
100 //!
101 //! - if you called open(), even if it failed, you're responsible for calling
102 //! async_close() and waiting for its completion before destroying connection
103 //! - after calling open(), you should call either accept() or connect() before
104 //! using connection
105 //! - if you called connect() or accept(), even if it failed, you're responsible
106 //! for calling async_terminate() and waiting for its completion before calling
107 //! async_close()
108 //! - after connection is established and before it's terminated you can
109 //! perform I/O
110 //! - even if connection can't be established, async_terminate() still should be
111 //! called before closing and destryoing connection
112 //!
113 //! Connection FSM
114 //! --------------
115 //!
116 //! TcpConnectionPort maintains an FSM and sees each operation or event handler as a
117 //! transition between states. Each operation is allowed only in certain states and
118 //! will panic when not used properly.
119 //!
120 //! State switch mostly happens on the network thread, however some limited set of
121 //! transitions is allowed from other threads. For this reason, state switching is
122 //! done using atomic operations.
123 class TcpConnectionPort : public BasicPort, public IConn {
124 public:
125  //! Initialize.
127 
128  //! Destroy.
130 
131  //! Open TCP connection.
132  //! @remarks
133  //! Should be called from network loop thread.
134  virtual bool open();
135 
136  //! Asynchronously close TCP connection.
137  //! @remarks
138  //! Should be called from network loop thread.
139  virtual AsyncOperationStatus async_close(ICloseHandler& handler, void* handler_arg);
140 
141  //! Establish conection by accepting it from listening socket.
142  //! @remarks
143  //! Should be called from network loop thread.
144  bool accept(const TcpConnectionConfig& config,
145  const address::SocketAddr& server_address,
146  SocketHandle server_socket);
147 
148  //! Establish connection to remote peer (asynchronously).
149  //! @remarks
150  //! Should be called from network loop thread.
151  bool connect(const TcpClientConfig& config);
152 
153  //! Set termination handler and start using it.
154  //! @remarks
155  //! Should be called from network loop thread.
156  void attach_terminate_handler(ITerminateHandler& handler, void* handler_arg);
157 
158  //! Set connection handler and start reporting events to it.
159  //! @remarks
160  //! Should be called from network loop thread.
162 
163  //! Return address of the local peer.
164  //! @remarks
165  //! Can be called from any thread.
166  virtual const address::SocketAddr& local_address() const;
167 
168  //! Return address of the remote peer.
169  //! @remarks
170  //! Can be called from any thread.
171  virtual const address::SocketAddr& remote_address() const;
172 
173  //! Return true if there was a failure.
174  //! @remarks
175  //! Can be called from any thread.
176  virtual bool is_failed() const;
177 
178  //! Return true if the connection is writable.
179  //! @remarks
180  //! Can be called from any thread.
181  virtual bool is_writable() const;
182 
183  //! Return true if the connection is readable.
184  //! @remarks
185  //! Can be called from any thread.
186  virtual bool is_readable() const;
187 
188  //! Write @p buf of size @p len to the connection.
189  //! @remarks
190  //! Can be called from any thread.
191  virtual ssize_t try_write(const void* buf, size_t len);
192 
193  //! Read @p len bytes from the the connection to @p buf.
194  //! @remarks
195  //! Can be called from any thread.
196  virtual ssize_t try_read(void* buf, size_t len);
197 
198  //! Initiate asynchronous graceful shutdown.
199  //! @remarks
200  //! Can be called from any thread.
201  virtual void async_terminate(TerminationMode mode);
202 
203 protected:
204  //! Format descriptor.
206 
207 private:
208  // State of the connection FSM.
209  enum ConnectionState {
210  // not opened or already closed
211  State_Closed,
212 
213  // open() is in progress
214  State_Opening,
215 
216  // opened, waiting for connect() or accept()
217  State_Opened,
218 
219  // accept() or connect() is in progress
220  State_Connecting,
221 
222  // asynchronous connection failed, need terminate and close
223  State_Refused,
224 
225  // asynchronous connection succeeded, do I/O and then terminate and close
226  State_Established,
227 
228  // failure during I/O, need terminate and close
229  State_Broken,
230 
231  // async_terminate() was called, asynchronous termination is in progress
232  State_Terminating,
233 
234  // asynchronous termination completed, ready for closing
235  State_Terminated,
236 
237  // async_close() was called, asynchronous close is in progress
238  State_Closing
239  };
240 
241  // Reading or writing status of the socket.
242  enum IoStatus {
243  // socket is not ready for I/O
244  Io_NotAvailable,
245 
246  // socket is ready for reading or writing
247  Io_Available,
248 
249  // read or write operation is in progress
250  Io_InProgress
251  };
252 
253  // I/O statistics.
254  struct IoStats {
255  // number of IConnHandler events
256  core::Seqlock<uint64_t> rd_events;
257  core::Seqlock<uint64_t> wr_events;
258 
259  // number of try_read() and try_write() calls
260  uint64_t rd_calls;
261  uint64_t wr_calls;
262 
263  // how much times SockErr_WouldBlock was returned
264  uint64_t rd_wouldblock;
265  uint64_t wr_wouldblock;
266 
267  // number of bytes transferred
268  uint64_t rd_bytes;
269  uint64_t wr_bytes;
270 
271  IoStats()
272  : rd_events(0)
273  , wr_events(0)
274  , rd_calls(0)
275  , wr_calls(0)
276  , rd_wouldblock(0)
277  , wr_wouldblock(0)
278  , rd_bytes(0)
279  , wr_bytes(0) {
280  }
281  };
282 
283  static const char* conn_state_to_str_(ConnectionState);
284 
285  static void poll_cb_(uv_poll_t* handle, int status, int events);
286  static void start_terminate_cb_(uv_async_t* handle);
287  static void finish_terminate_cb_(uv_handle_t* handle);
288  static void close_cb_(uv_handle_t* handle);
289 
290  bool start_polling_();
291  AsyncOperationStatus async_stop_polling_(uv_close_cb completion_cb);
292 
293  void disconnect_socket_();
294 
295  AsyncOperationStatus async_close_();
296 
297  void set_and_report_writable_();
298  void set_and_report_readable_();
299 
300  ConnectionState get_state_() const;
301  void switch_and_report_state_(ConnectionState new_state);
302  bool maybe_switch_state_(ConnectionState expected_state,
303  ConnectionState desired_state);
304  void report_state_(ConnectionState state);
305 
306  void set_conn_handler_(IConnHandler& handler);
307  void unset_conn_handler_();
308 
309  void check_usable_(ConnectionState conn_state) const;
310  void check_usable_for_io_(ConnectionState conn_state) const;
311 
312  void report_io_stats_();
313 
314  uv_loop_t& loop_;
315 
316  uv_poll_t poll_handle_;
317  bool poll_handle_initialized_;
318  bool poll_handle_started_;
319 
320  uv_async_t terminate_sem_;
321  bool terminate_sem_initialized_;
322 
323  core::SharedPtr<IConnHandler> conn_handler_;
324 
325  ITerminateHandler* terminate_handler_;
326  void* terminate_handler_arg_;
327 
328  ICloseHandler* close_handler_;
329  void* close_handler_arg_;
330 
331  TcpConnectionType type_;
332 
333  address::SocketAddr local_address_;
334  address::SocketAddr remote_address_;
335 
336  SocketHandle socket_;
337 
338  core::Atomic<int32_t> conn_state_;
339 
340  core::Atomic<int32_t> conn_was_established_;
341  core::Atomic<int32_t> conn_was_failed_;
342 
343  core::Atomic<int32_t> writable_status_;
344  core::Atomic<int32_t> readable_status_;
345 
346  bool got_stream_end_;
347 
348  core::Mutex io_mutex_;
349 
350  IoStats io_stats_;
351  core::RateLimiter report_limiter_;
352 };
353 
354 } // namespace netio
355 } // namespace roc
356 
357 #endif // ROC_NETIO_TCP_CONNECTION_PORT_H_
Atomic.
Base class for ports.
Socket address.
Definition: socket_addr.h:26
IArena & arena() const
Get arena.
Memory arena interface.
Definition: iarena.h:23
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
Connection event handler interface.
Definition: iconn_handler.h:60
Connection interface.
Definition: iconn.h:30
Termination handler interface.
void attach_connection_handler(IConnHandler &handler)
Set connection handler and start reporting events to it.
virtual ssize_t try_read(void *buf, size_t len)
Read len bytes from the the connection to buf.
bool connect(const TcpClientConfig &config)
Establish connection to remote peer (asynchronously).
virtual bool is_failed() const
Return true if there was a failure.
virtual AsyncOperationStatus async_close(ICloseHandler &handler, void *handler_arg)
Asynchronously close TCP connection.
virtual const address::SocketAddr & local_address() const
Return address of the local peer.
virtual bool open()
Open TCP connection.
TcpConnectionPort(TcpConnectionType type, uv_loop_t &loop, core::IArena &arena)
Initialize.
virtual bool is_readable() const
Return true if the connection is readable.
bool accept(const TcpConnectionConfig &config, const address::SocketAddr &server_address, SocketHandle server_socket)
Establish conection by accepting it from listening socket.
virtual ~TcpConnectionPort()
Destroy.
virtual void async_terminate(TerminationMode mode)
Initiate asynchronous graceful shutdown.
virtual void format_descriptor(core::StringBuilder &b)
Format descriptor.
virtual const address::SocketAddr & remote_address() const
Return address of the remote peer.
virtual bool is_writable() const
Return true if the connection is writable.
void attach_terminate_handler(ITerminateHandler &handler, void *handler_arg)
Set termination handler and start using it.
virtual ssize_t try_write(const void *buf, size_t len)
Write buf of size len to the connection.
Close handler interface.
Connection interface.
Connection event handler interface.
Termination handler interface.
Mutex.
int SocketHandle
Platform-specific socket handle.
Definition: socket_ops.h:51
AsyncOperationStatus
Asynchronous operation status.
TcpConnectionType
TCP connection type.
@ TcpConn_Client
Local peer is client, remote peer is server.
@ TcpConn_Server
Local peer is server, remote peer is client.
TerminationMode
Connection termination mode.
Root namespace.
Rate limiter.
Seqlock.
Shared ownership intrusive pointer.
Socket address.
Socket operations.
Socket options.
Definition: socket_ops.h:29
TCP connection parameters.
address::SocketAddr local_address
Local peer address to which we're bound.
address::SocketAddr remote_address
Remote peer address to which we're connected.
TCP connection parameters.
SocketOpts socket_options
Socket options.