Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
udp_port.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_netio/target_libuv/roc_netio/udp_port.h
10 //! @brief UDP port.
11 
12 #ifndef ROC_NETIO_UDP_PORT_H_
13 #define ROC_NETIO_UDP_PORT_H_
14 
15 #include <uv.h>
16 
18 #include "roc_core/iarena.h"
19 #include "roc_core/list.h"
20 #include "roc_core/list_node.h"
21 #include "roc_core/mpsc_queue.h"
22 #include "roc_core/rate_limiter.h"
23 #include "roc_netio/basic_port.h"
25 #include "roc_packet/iwriter.h"
27 
28 namespace roc {
29 namespace netio {
30 
31 //! UDP port parameters.
32 struct UdpConfig {
33  //! Port will bind to this address.
34  //! If IP is zero, INADDR_ANY is used, i.e. the socket is bound to all network
35  //! interfaces. If port is zero, a random free port is selected.
37 
38  //! If not empty, port will join multicast group on the interface
39  //! with given address. May be "0.0.0.0" or "[::]" to join on all interfaces.
40  //! Used only if receiving is started.
42 
43  //! If set, enable SO_REUSEADDR when binding socket to non-ephemeral port.
44  //! If not set, SO_REUSEADDR is enabled only for multicast sockets when
45  //! binding to non-ephemeral port.
47 
48  //! If true, allow non-blocking writes directly in write() method.
49  //! If non-blocking write can't be performed, port falls back to
50  //! regular asynchronous write.
51  //! Used only if sending is started.
53 
54  UdpConfig()
55  : enable_reuseaddr(false)
56  , enable_non_blocking(true) {
57  multicast_interface[0] = '\0';
58  }
59 
60  //! Check two configs for equality.
61  bool operator==(const UdpConfig& other) const {
62  return bind_address == other.bind_address
63  && strcmp(multicast_interface, other.multicast_interface) == 0
66  }
67 };
68 
69 //! UDP sender/receiver port.
70 class UdpPort : public BasicPort, private packet::IWriter {
71 public:
72  //! Initialize.
73  UdpPort(const UdpConfig& config,
74  uv_loop_t& event_loop,
75  packet::PacketFactory& packet_factory,
77 
78  //! Destroy.
79  virtual ~UdpPort();
80 
81  //! Get bind address.
83 
84  //! Open receiver.
85  virtual bool open();
86 
87  //! Asynchronously close receiver.
88  virtual AsyncOperationStatus async_close(ICloseHandler& handler, void* handler_arg);
89 
90  //! Start receiving packets.
91  //! @remarks
92  //! Packets written to returned writer will be enqueued for sending.
93  //! Writer can be used from any thread.
95 
96  //! Start receiving packets.
97  //! @remarks
98  //! Received packets will be written to inbound_writer.
99  //! Writer will be invoked from network thread.
100  bool start_recv(packet::IWriter& inbound_writer);
101 
102 protected:
103  //! Format descriptor.
105 
106 private:
107  static void close_cb_(uv_handle_t* handle);
108 
109  static void alloc_cb_(uv_handle_t* handle, size_t size, uv_buf_t* buf);
110  static void recv_cb_(uv_udp_t* handle,
111  ssize_t nread,
112  const uv_buf_t* buf,
113  const sockaddr* addr,
114  unsigned flags);
115 
116  static void write_sem_cb_(uv_async_t* handle);
117  static void send_cb_(uv_udp_send_t* req, int status);
118 
119  // Implements packet::IWriter::write()
120  virtual status::StatusCode write(const packet::PacketPtr& packet);
121  void write_(const packet::PacketPtr& packet);
122  bool try_nonblocking_write_(const packet::PacketPtr& pp);
123 
124  bool fully_closed_() const;
125  void start_closing_();
126 
127  bool join_multicast_group_();
128  void leave_multicast_group_();
129 
130  void report_stats_();
131 
132  UdpConfig config_;
133 
134  ICloseHandler* close_handler_;
135  void* close_handler_arg_;
136 
137  uv_loop_t& loop_;
138 
139  uv_udp_t handle_;
140  bool handle_initialized_;
141 
142  uv_async_t write_sem_;
143  bool write_sem_initialized_;
144 
145  bool multicast_group_joined_;
146  bool recv_started_;
147  bool want_close_;
148  bool closed_;
149 
150  uv_os_fd_t fd_;
151 
152  packet::PacketFactory& packet_factory_;
153 
154  packet::IWriter* inbound_writer_;
155  core::MpscQueue<packet::Packet> outbound_queue_;
156 
157  core::RateLimiter rate_limiter_;
158 
159  core::Atomic<int> pending_packets_;
160  core::Atomic<int> sent_packets_;
161  core::Atomic<int> sent_packets_blk_;
162  core::Atomic<int> received_packets_;
163 };
164 
165 } // namespace netio
166 } // namespace roc
167 
168 #endif // ROC_NETIO_UDP_PORT_H_
Base class for ports.
Socket address.
Definition: socket_addr.h:26
IArena & arena() const
Get arena.
Memory arena interface.
Definition: iarena.h:23
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:45
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
UDP sender/receiver port.
Definition: udp_port.h:70
bool start_recv(packet::IWriter &inbound_writer)
Start receiving packets.
virtual AsyncOperationStatus async_close(ICloseHandler &handler, void *handler_arg)
Asynchronously close receiver.
virtual ~UdpPort()
Destroy.
virtual void format_descriptor(core::StringBuilder &b)
Format descriptor.
virtual bool open()
Open receiver.
const address::SocketAddr & bind_address() const
Get bind address.
packet::IWriter * start_send()
Start receiving packets.
UdpPort(const UdpConfig &config, uv_loop_t &event_loop, packet::PacketFactory &packet_factory, core::IArena &arena)
Initialize.
Packet writer interface.
Definition: iwriter.h:23
Memory arena interface.
Close handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Linked list node.
Multi-producer single-consumer queue.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Packet factory.
Rate limiter.
Socket address.
StatusCode
Status code.
Definition: status_code.h:19
UDP port parameters.
Definition: udp_port.h:32
char multicast_interface[64]
If not empty, port will join multicast group on the interface with given address. May be "0....
Definition: udp_port.h:41
bool enable_reuseaddr
If set, enable SO_REUSEADDR when binding socket to non-ephemeral port. If not set,...
Definition: udp_port.h:46
bool operator==(const UdpConfig &other) const
Check two configs for equality.
Definition: udp_port.h:61
bool enable_non_blocking
If true, allow non-blocking writes directly in write() method. If non-blocking write can't be perform...
Definition: udp_port.h:52
address::SocketAddr bind_address
Port will bind to this address. If IP is zero, INADDR_ANY is used, i.e. the socket is bound to all ne...
Definition: udp_port.h:36