Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
tcp_connection_port.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2019 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/tcp_connection_port.h
10//! @brief TCP connection.
11
12#ifndef ROC_NETIO_TCP_CONNECTION_PORT_H_
13#define ROC_NETIO_TCP_CONNECTION_PORT_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
19#include "roc_core/mutex.h"
21#include "roc_core/seqlock.h"
22#include "roc_core/shared_ptr.h"
25#include "roc_netio/iconn.h"
29
30namespace roc {
31namespace netio {
32
33//! TCP connection parameters.
35 //! Socket options.
37};
38
39//! TCP connection parameters.
41 //! Local peer address to which we're bound.
43
44 //! Remote peer address to which we're connected.
46};
47
48//! TCP connection type.
50 //! Local peer is client, remote peer is server.
52
53 //! Local peer is server, remote peer is client.
55};
56
57//! TCP connection port.
58//!
59//! Public interfaces
60//! -----------------
61//!
62//! There are two important interfaces related to TCP connection:
63//! - IConn
64//! - IConnHandler
65//!
66//! IConn is implemented by TcpConnectionPort. The interface allows to retrieve
67//! connection parameters and perform non-blocking I/O.
68//!
69//! IConnHandler is implemented by users of netio module. This interface is notified
70//! about connection state changes (e.g. connection is established) and availability
71//! of I/O (e.g. connection becomes readable).
72//!
73//! Thread access
74//! -------------
75//!
76//! Methods that are not part of IConn interface are called from within other netio
77//! classes, e.g. TcpServerPort, on the network loop thread.
78//!
79//! Methods from the IConn interface are called by users of netio module from any
80//! thread. They are thread-safe and lock-free.
81//!
82//! Connection type and lifecycle
83//! -----------------------------
84//!
85//! Connection can be client-side (connect call) or server-side (accept call).
86//!
87//! Client-side connection is created using AddTcpClientPort task of the network
88//! loop, and is closed using RemovePort task. Before removing the port, the user
89//! must call async_terminate() and wait until termination is completed.
90//!
91//! Server-side connection is created by TcpServerPort when it receives a new
92//! incoming connection. To remove it, the user should call async_terminate().
93//! When termination is completed, TcpServerPort automatically closes and
94//! destroys connection.
95//!
96//! Connection workflow
97//! -------------------
98//!
99//! The following rules must be followed:
100//!
101//! - if you called open(), even if it failed, you're responsible for calling
102//! async_close() and waiting for its completion before destroying connection
103//! - after calling open(), you should call either accept() or connect() before
104//! using connection
105//! - if you called connect() or accept(), even if it failed, you're responsible
106//! for calling async_terminate() and waiting for its completion before calling
107//! async_close()
108//! - after connection is established and before it's terminated you can
109//! perform I/O
110//! - even if connection can't be established, async_terminate() still should be
111//! called before closing and destryoing connection
112//!
113//! Connection FSM
114//! --------------
115//!
116//! TcpConnectionPort maintains an FSM and sees each operation or event handler as a
117//! transition between states. Each operation is allowed only in certain states and
118//! will panic when not used properly.
119//!
120//! State switch mostly happens on the network thread, however some limited set of
121//! transitions is allowed from other threads. For this reason, state switching is
122//! done using atomic operations.
123class TcpConnectionPort : public BasicPort, public IConn {
124public:
125 //! Initialize.
127
128 //! Destroy.
130
131 //! Open TCP connection.
132 //! @remarks
133 //! Should be called from network loop thread.
134 virtual bool open();
135
136 //! Asynchronously close TCP connection.
137 //! @remarks
138 //! Should be called from network loop thread.
139 virtual AsyncOperationStatus async_close(ICloseHandler& handler, void* handler_arg);
140
141 //! Establish conection by accepting it from listening socket.
142 //! @remarks
143 //! Should be called from network loop thread.
144 bool accept(const TcpConnectionConfig& config,
145 const address::SocketAddr& server_address,
146 SocketHandle server_socket);
147
148 //! Establish connection to remote peer (asynchronously).
149 //! @remarks
150 //! Should be called from network loop thread.
151 bool connect(const TcpClientConfig& config);
152
153 //! Set termination handler and start using it.
154 //! @remarks
155 //! Should be called from network loop thread.
156 void attach_terminate_handler(ITerminateHandler& handler, void* handler_arg);
157
158 //! Set connection handler and start reporting events to it.
159 //! @remarks
160 //! Should be called from network loop thread.
162
163 //! Return address of the local peer.
164 //! @remarks
165 //! Can be called from any thread.
166 virtual const address::SocketAddr& local_address() const;
167
168 //! Return address of the remote peer.
169 //! @remarks
170 //! Can be called from any thread.
171 virtual const address::SocketAddr& remote_address() const;
172
173 //! Return true if there was a failure.
174 //! @remarks
175 //! Can be called from any thread.
176 virtual bool is_failed() const;
177
178 //! Return true if the connection is writable.
179 //! @remarks
180 //! Can be called from any thread.
181 virtual bool is_writable() const;
182
183 //! Return true if the connection is readable.
184 //! @remarks
185 //! Can be called from any thread.
186 virtual bool is_readable() const;
187
188 //! Write @p buf of size @p len to the connection.
189 //! @remarks
190 //! Can be called from any thread.
191 virtual ssize_t try_write(const void* buf, size_t len);
192
193 //! Read @p len bytes from the the connection to @p buf.
194 //! @remarks
195 //! Can be called from any thread.
196 virtual ssize_t try_read(void* buf, size_t len);
197
198 //! Initiate asynchronous graceful shutdown.
199 //! @remarks
200 //! Can be called from any thread.
202
203protected:
204 //! Format descriptor.
206
207private:
208 // State of the connection FSM.
209 enum ConnectionState {
210 // not opened or already closed
211 State_Closed,
212
213 // open() is in progress
214 State_Opening,
215
216 // opened, waiting for connect() or accept()
217 State_Opened,
218
219 // accept() or connect() is in progress
220 State_Connecting,
221
222 // asynchronous connection failed, need terminate and close
223 State_Refused,
224
225 // asynchronous connection succeeded, do I/O and then terminate and close
226 State_Established,
227
228 // failure during I/O, need terminate and close
229 State_Broken,
230
231 // async_terminate() was called, asynchronous termination is in progress
232 State_Terminating,
233
234 // asynchronous termination completed, ready for closing
235 State_Terminated,
236
237 // async_close() was called, asynchronous close is in progress
238 State_Closing
239 };
240
241 // Reading or writing status of the socket.
242 enum IoStatus {
243 // socket is not ready for I/O
244 Io_NotAvailable,
245
246 // socket is ready for reading or writing
247 Io_Available,
248
249 // read or write operation is in progress
250 Io_InProgress
251 };
252
253 // I/O statistics.
254 struct IoStats {
255 // number of IConnHandler events
256 core::Seqlock<uint64_t> rd_events;
257 core::Seqlock<uint64_t> wr_events;
258
259 // number of try_read() and try_write() calls
260 uint64_t rd_calls;
261 uint64_t wr_calls;
262
263 // how much times SockErr_WouldBlock was returned
264 uint64_t rd_wouldblock;
265 uint64_t wr_wouldblock;
266
267 // number of bytes transferred
268 uint64_t rd_bytes;
269 uint64_t wr_bytes;
270
271 IoStats()
272 : rd_events(0)
273 , wr_events(0)
274 , rd_calls(0)
275 , wr_calls(0)
276 , rd_wouldblock(0)
277 , wr_wouldblock(0)
278 , rd_bytes(0)
279 , wr_bytes(0) {
280 }
281 };
282
283 static const char* conn_state_to_str_(ConnectionState);
284
285 static void poll_cb_(uv_poll_t* handle, int status, int events);
286 static void start_terminate_cb_(uv_async_t* handle);
287 static void finish_terminate_cb_(uv_handle_t* handle);
288 static void close_cb_(uv_handle_t* handle);
289
290 bool start_polling_();
291 AsyncOperationStatus async_stop_polling_(uv_close_cb completion_cb);
292
293 void disconnect_socket_();
294
295 AsyncOperationStatus async_close_();
296
297 void set_and_report_writable_();
298 void set_and_report_readable_();
299
300 ConnectionState get_state_() const;
301 void switch_and_report_state_(ConnectionState new_state);
302 bool maybe_switch_state_(ConnectionState expected_state,
303 ConnectionState desired_state);
304 void report_state_(ConnectionState state);
305
306 void set_conn_handler_(IConnHandler& handler);
307 void unset_conn_handler_();
308
309 void check_usable_(ConnectionState conn_state) const;
310 void check_usable_for_io_(ConnectionState conn_state) const;
311
312 void report_io_stats_();
313
314 uv_loop_t& loop_;
315
316 uv_poll_t poll_handle_;
317 bool poll_handle_initialized_;
318 bool poll_handle_started_;
319
320 uv_async_t terminate_sem_;
321 bool terminate_sem_initialized_;
322
323 core::SharedPtr<IConnHandler> conn_handler_;
324
325 ITerminateHandler* terminate_handler_;
326 void* terminate_handler_arg_;
327
328 ICloseHandler* close_handler_;
329 void* close_handler_arg_;
330
331 TcpConnectionType type_;
332
333 address::SocketAddr local_address_;
334 address::SocketAddr remote_address_;
335
336 SocketHandle socket_;
337
338 core::Atomic<int32_t> conn_state_;
339
340 core::Atomic<int32_t> conn_was_established_;
341 core::Atomic<int32_t> conn_was_failed_;
342
343 core::Atomic<int32_t> writable_status_;
344 core::Atomic<int32_t> readable_status_;
345
346 bool got_stream_end_;
347
348 core::Mutex io_mutex_;
349
350 IoStats io_stats_;
351 core::RateLimiter report_limiter_;
352};
353
354} // namespace netio
355} // namespace roc
356
357#endif // ROC_NETIO_TCP_CONNECTION_PORT_H_
Atomic.
Base class for ports.
IArena & arena() const
Get arena.
Memory arena interface.
Definition iarena.h:23
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Base class for ports.
Definition basic_port.h:40
Close handler interface.
Connection event handler interface.
Connection interface.
Definition iconn.h:30
Termination handler interface.
void attach_connection_handler(IConnHandler &handler)
Set connection handler and start reporting events to it.
virtual const address::SocketAddr & local_address() const
Return address of the local peer.
virtual ssize_t try_read(void *buf, size_t len)
Read len bytes from the the connection to buf.
bool connect(const TcpClientConfig &config)
Establish connection to remote peer (asynchronously).
virtual bool is_failed() const
Return true if there was a failure.
virtual AsyncOperationStatus async_close(ICloseHandler &handler, void *handler_arg)
Asynchronously close TCP connection.
virtual bool open()
Open TCP connection.
TcpConnectionPort(TcpConnectionType type, uv_loop_t &loop, core::IArena &arena)
Initialize.
virtual bool is_readable() const
Return true if the connection is readable.
bool accept(const TcpConnectionConfig &config, const address::SocketAddr &server_address, SocketHandle server_socket)
Establish conection by accepting it from listening socket.
virtual ~TcpConnectionPort()
Destroy.
virtual void async_terminate(TerminationMode mode)
Initiate asynchronous graceful shutdown.
virtual const address::SocketAddr & remote_address() const
Return address of the remote peer.
virtual void format_descriptor(core::StringBuilder &b)
Format descriptor.
virtual bool is_writable() const
Return true if the connection is writable.
void attach_terminate_handler(ITerminateHandler &handler, void *handler_arg)
Set termination handler and start using it.
virtual ssize_t try_write(const void *buf, size_t len)
Write buf of size len to the connection.
Close handler interface.
Connection interface.
Connection event handler interface.
Termination handler interface.
Mutex.
int SocketHandle
Platform-specific socket handle.
Definition socket_ops.h:51
AsyncOperationStatus
Asynchronous operation status.
TcpConnectionType
TCP connection type.
@ TcpConn_Client
Local peer is client, remote peer is server.
@ TcpConn_Server
Local peer is server, remote peer is client.
TerminationMode
Connection termination mode.
Root namespace.
Rate limiter.
Seqlock.
Shared ownership intrusive pointer.
Socket address.
Socket operations.
Socket options.
Definition socket_ops.h:29
TCP connection parameters.
address::SocketAddr local_address
Local peer address to which we're bound.
address::SocketAddr remote_address
Remote peer address to which we're connected.
TCP connection parameters.
SocketOpts socket_options
Socket options.