Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
udp_port.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/udp_port.h
10//! @brief UDP port.
11
12#ifndef ROC_NETIO_UDP_PORT_H_
13#define ROC_NETIO_UDP_PORT_H_
14
15#include <uv.h>
16
18#include "roc_core/iarena.h"
19#include "roc_core/list.h"
20#include "roc_core/list_node.h"
21#include "roc_core/mpsc_queue.h"
25#include "roc_packet/iwriter.h"
27
28namespace roc {
29namespace netio {
30
31//! UDP port parameters.
32struct UdpConfig {
33 //! Port will bind to this address.
34 //! If IP is zero, INADDR_ANY is used, i.e. the socket is bound to all network
35 //! interfaces. If port is zero, a random free port is selected.
37
38 //! If not empty, port will join multicast group on the interface
39 //! with given address. May be "0.0.0.0" or "[::]" to join on all interfaces.
40 //! Used only if receiving is started.
42
43 //! If set, enable SO_REUSEADDR when binding socket to non-ephemeral port.
44 //! If not set, SO_REUSEADDR is enabled only for multicast sockets when
45 //! binding to non-ephemeral port.
47
48 //! If true, allow non-blocking writes directly in write() method.
49 //! If non-blocking write can't be performed, port falls back to
50 //! regular asynchronous write.
51 //! Used only if sending is started.
53
54 UdpConfig()
55 : enable_reuseaddr(false)
56 , enable_non_blocking(true) {
57 multicast_interface[0] = '\0';
58 }
59
60 //! Check two configs for equality.
61 bool operator==(const UdpConfig& other) const {
62 return bind_address == other.bind_address
63 && strcmp(multicast_interface, other.multicast_interface) == 0
66 }
67};
68
69//! UDP sender/receiver port.
70class UdpPort : public BasicPort, private packet::IWriter {
71public:
72 //! Initialize.
73 UdpPort(const UdpConfig& config,
74 uv_loop_t& event_loop,
75 packet::PacketFactory& packet_factory,
77
78 //! Destroy.
79 virtual ~UdpPort();
80
81 //! Get bind address.
83
84 //! Open receiver.
85 virtual bool open();
86
87 //! Asynchronously close receiver.
88 virtual AsyncOperationStatus async_close(ICloseHandler& handler, void* handler_arg);
89
90 //! Start receiving packets.
91 //! @remarks
92 //! Packets written to returned writer will be enqueued for sending.
93 //! Writer can be used from any thread.
95
96 //! Start receiving packets.
97 //! @remarks
98 //! Received packets will be written to inbound_writer.
99 //! Writer will be invoked from network thread.
100 bool start_recv(packet::IWriter& inbound_writer);
101
102protected:
103 //! Format descriptor.
105
106private:
107 static void close_cb_(uv_handle_t* handle);
108
109 static void alloc_cb_(uv_handle_t* handle, size_t size, uv_buf_t* buf);
110 static void recv_cb_(uv_udp_t* handle,
111 ssize_t nread,
112 const uv_buf_t* buf,
113 const sockaddr* addr,
114 unsigned flags);
115
116 static void write_sem_cb_(uv_async_t* handle);
117 static void send_cb_(uv_udp_send_t* req, int status);
118
119 // Implements packet::IWriter::write()
120 virtual status::StatusCode write(const packet::PacketPtr& packet);
121 void write_(const packet::PacketPtr& packet);
122 bool try_nonblocking_write_(const packet::PacketPtr& pp);
123
124 bool fully_closed_() const;
125 void start_closing_();
126
127 bool join_multicast_group_();
128 void leave_multicast_group_();
129
130 void report_stats_();
131
132 UdpConfig config_;
133
134 ICloseHandler* close_handler_;
135 void* close_handler_arg_;
136
137 uv_loop_t& loop_;
138
139 uv_udp_t handle_;
140 bool handle_initialized_;
141
142 uv_async_t write_sem_;
143 bool write_sem_initialized_;
144
145 bool multicast_group_joined_;
146 bool recv_started_;
147 bool want_close_;
148 bool closed_;
149
150 uv_os_fd_t fd_;
151
152 packet::PacketFactory& packet_factory_;
153
154 packet::IWriter* inbound_writer_;
155 core::MpscQueue<packet::Packet> outbound_queue_;
156
157 core::RateLimiter rate_limiter_;
158
159 core::Atomic<int> pending_packets_;
160 core::Atomic<int> sent_packets_;
161 core::Atomic<int> sent_packets_blk_;
162 core::Atomic<int> received_packets_;
163};
164
165} // namespace netio
166} // namespace roc
167
168#endif // ROC_NETIO_UDP_PORT_H_
Base class for ports.
IArena & arena() const
Get arena.
Memory arena interface.
Definition iarena.h:23
Base class for ports.
Definition basic_port.h:40
Close handler interface.
UDP sender/receiver port.
Definition udp_port.h:70
bool start_recv(packet::IWriter &inbound_writer)
Start receiving packets.
virtual AsyncOperationStatus async_close(ICloseHandler &handler, void *handler_arg)
Asynchronously close receiver.
virtual ~UdpPort()
Destroy.
virtual void format_descriptor(core::StringBuilder &b)
Format descriptor.
virtual bool open()
Open receiver.
packet::IWriter * start_send()
Start receiving packets.
const address::SocketAddr & bind_address() const
Get bind address.
UdpPort(const UdpConfig &config, uv_loop_t &event_loop, packet::PacketFactory &packet_factory, core::IArena &arena)
Initialize.
Packet writer interface.
Definition iwriter.h:23
Memory arena interface.
Close handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Linked list node.
Multi-producer single-consumer queue.
AsyncOperationStatus
Asynchronous operation status.
StatusCode
Status code.
Definition status_code.h:19
Root namespace.
Packet factory.
Rate limiter.
Socket address.
UDP port parameters.
Definition udp_port.h:32
char multicast_interface[64]
If not empty, port will join multicast group on the interface with given address. May be "0....
Definition udp_port.h:41
bool enable_reuseaddr
If set, enable SO_REUSEADDR when binding socket to non-ephemeral port. If not set,...
Definition udp_port.h:46
bool operator==(const UdpConfig &other) const
Check two configs for equality.
Definition udp_port.h:61
bool enable_non_blocking
If true, allow non-blocking writes directly in write() method. If non-blocking write can't be perform...
Definition udp_port.h:52
address::SocketAddr bind_address
Port will bind to this address. If IP is zero, INADDR_ANY is used, i.e. the socket is bound to all ne...
Definition udp_port.h:36