Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
receiver_session_router.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2023 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_pipeline/receiver_session_router.h
10 //! @brief Receiver session router.
11 
12 #ifndef ROC_PIPELINE_RECEIVER_SESSION_ROUTER_H_
13 #define ROC_PIPELINE_RECEIVER_SESSION_ROUTER_H_
14 
16 #include "roc_core/attributes.h"
17 #include "roc_core/hashsum.h"
18 #include "roc_core/iarena.h"
19 #include "roc_core/list.h"
20 #include "roc_core/list_node.h"
21 #include "roc_core/macro_helpers.h"
22 #include "roc_core/noncopyable.h"
24 #include "roc_core/slab_pool.h"
25 #include "roc_packet/units.h"
27 #include "roc_rtcp/cname.h"
28 #include "roc_rtcp/headers.h"
29 #include "roc_status/status_code.h"
30 
31 namespace roc {
32 namespace pipeline {
33 
34 //! Receiver session router.
35 //!
36 //! Helps routing packets to sessions within session group.
37 //!
38 //! Session group corresponds to all sessions handled by one receiver slot - a set of
39 //! related complementary endpoints, e.g. one endpoint for audio, one for repair, and one
40 //! for control packets.
41 //!
42 //! For each remote sender, receiver creates a session inside session group. All audio,
43 //! repair, and control packets from same sender are then routed to same session.
44 //!
45 //! Session management is implemented in ReceiverSessionGroup, and algorithm for selecting
46 //! session for a packet is implemented in ReceiverSessionRouter (this class).
47 //!
48 //! Session router provides two methods to select session:
49 //!
50 //! - By source id.
51 //!
52 //! Sender can assign unique source id (SSRC) to each stream (audio, repair), and
53 //! then transmit RTCP SDES packet that associate all sender's SSRCs with the same
54 //! unique (randomly generated) CNAME string.
55 //!
56 //! Session router will remember that these SSRCs are related and will route packets
57 //! from those streams to same session.
58 //!
59 //! - By source address.
60 //!
61 //! As a fallback for the case when RTCP is not used, session router will assume that
62 //! packets with same source address belong to the same session.
63 //!
64 //! To make it work, sender should ensure that it sends all streams (audio, repair)
65 //! from the same socket, and that there are no proxies or retranslators that combine
66 //! multiple senders on the same socket.
68 public:
69  //! Initialize.
71 
72  //! Deinitialize.
74 
75  //! Get number of know routes.
76  size_t num_routes();
77 
78  //! Find registered session by source id of sender's stream.
79  //! @remarks
80  //! Sender can have multiple streams, each with its own SSRC.
81  //! Session router will remember all those SSRCs and map them to sender's session.
82  //! @note
83  //! To make it work, one of the SSRCs should be explicitly mapped to a session
84  //! using add_session(), and the rest of SSRCs should be linked together using
85  //! link_source() with same CNAME. The order of these calls does not matter.
87 
88  //! Find registered session by source address of sender's stream.
89  //! @remarks
90  //! Sender can use one source address for all its streams.
91  //! Session router will remember this address and map it to sender's session.
92  //! @note
93  //! To make it work, sender's source address should be provided to add_session(),
94  //! and all sender's streams should have the same source address.
96  find_by_address(const address::SocketAddr& source_addr);
97 
98  //! Check if there is a route for given session.
99  //! @remarks
100  //! Will return false after session was removed via remove_session()
101  //! or unlink_source().
103 
104  //! Register session in router.
105  //! @remarks
106  //! - @p session defines session where to route packets.
107  //! - @p source_id defines SSRC of stream which will be routed to the session.
108  //! Additional streams may be associated with same session via link_source() call.
109  //! - @p source_addr defines source address which will be routed to the session.
110  //! If other streams share the same source address, they will be routed to it.
113  packet::stream_source_t source_id,
114  const address::SocketAddr& source_addr);
115 
116  //! Unregister session from router.
117  //! @remarks
118  //! All associated SSRCs, CNAMEs, and addresses are removed.
120 
121  //! Link source id with unique CNAME.
122  //! @remarks
123  //! Remembers what SSRCs are linked together by sharing the same CNAME.
124  //! If/when one of the linked SSRCs is associated with a session using add_session(),
125  //! all linked SSRCs become being routed to that session.
127  const char* cname);
128 
129  //! Unlink source id from session.
130  //! @remarks
131  //! Removes association of SSRC with session and CNAME.
132  //! If this was the last SSRC, the whole route is removed.
134 
135 private:
136  enum { PreallocatedRoutes = 4, PreallocatedSources = 8 };
137 
138  struct Route;
139 
140  // Map route by source id (ssrc).
141  // Allocated from pool.
142  struct SourceNode : core::RefCounted<SourceNode, core::PoolAllocation>,
144  core::ListNode<> {
145  Route& parent_route;
146  const packet::stream_source_t source_id;
147 
148  SourceNode(core::IPool& pool, Route& route, packet::stream_source_t source_id)
149  : core::RefCounted<SourceNode, core::PoolAllocation>(pool)
150  , parent_route(route)
151  , source_id(source_id) {
152  }
153 
154  Route& route() {
155  return parent_route;
156  }
157 
159  // Use source id as a key.
160  return source_id;
161  }
162 
163  static core::hashsum_t key_hash(packet::stream_source_t source_id) {
164  return core::hashsum_int(source_id);
165  }
166 
167  static bool key_equal(packet::stream_source_t source_id1,
168  packet::stream_source_t source_id2) {
169  return source_id1 == source_id2;
170  }
171  };
172 
173  // Map route by source address.
174  // Embedded into Route struct.
175  struct AddressNode : core::HashmapNode<> {
176  Route& route() {
177  return *ROC_CONTAINER_OF(this, Route, address_node);
178  }
179 
180  const address::SocketAddr& key() {
181  // Use route's source address as a key.
182  return route().source_addr;
183  }
184 
185  static core::hashsum_t key_hash(const address::SocketAddr& source_addr) {
186  return core::hashsum_mem(source_addr.saddr(), (size_t)source_addr.slen());
187  }
188 
189  static bool key_equal(const address::SocketAddr& source_addr1,
190  const address::SocketAddr& source_addr2) {
191  return source_addr1 == source_addr2;
192  }
193  };
194 
195  // Map route by cname.
196  // Embedded into Route struct.
197  struct CnameNode : core::HashmapNode<> {
198  Route& route() {
199  return *ROC_CONTAINER_OF(this, Route, cname_node);
200  }
201 
202  const char* key() {
203  // Use route's cname as a key
204  return route().cname;
205  }
206 
207  static core::hashsum_t key_hash(const char* cname) {
208  return core::hashsum_str(cname);
209  }
210 
211  static bool key_equal(const char* cname1, const char* cname2) {
212  return strcmp(cname1, cname2) == 0;
213  }
214  };
215 
216  // Map route by session pointer.
217  // Embedded into Route struct.
218  struct SessionNode : core::HashmapNode<> {
219  Route& route() {
220  return *ROC_CONTAINER_OF(this, Route, session_node);
221  }
222 
223  const core::SharedPtr<ReceiverSession>& key() {
224  // Use route's session pointer as a key.
225  return route().session;
226  }
227 
228  static core::hashsum_t key_hash(const core::SharedPtr<ReceiverSession>& session) {
229  return core::hashsum_int((uintptr_t)session.get());
230  }
231 
232  static bool key_equal(const core::SharedPtr<ReceiverSession>& session1,
233  const core::SharedPtr<ReceiverSession>& session2) {
234  return session1 == session2;
235  }
236  };
237 
238  // Route represents one actual or potential receiver session.
239  // Usually each route has a session, but it can be created before session if RTCP
240  // packets come before RTP packets, and thus won't have a session for a while.
241  //
242  // Route can be mapped by different keys:
243  // - one or several source ids (SSRCs)
244  // - one source address
245  // - one cname
246  // - one session pointer
247  //
248  // All of the mappings are optional. Route struct owns hashmap nodes for all these
249  // mappings, to allow including it into corresponding hash tables.
250  struct Route : core::RefCounted<Route, core::PoolAllocation>, core::ListNode<> {
251  // Session to which packets are routed.
252  // May be empty.
253  core::SharedPtr<ReceiverSession> session;
254 
255  // Sender source address.
256  // May be empty.
257  address::SocketAddr source_addr;
258 
259  // Sender cname.
260  // May be empty.
261  char cname[rtcp::MaxCnameLen + 1];
262 
263  // Sender main source ID.
264  // Set to one of the identifiers from source_nodes list and
265  // identifies source provided to add_session().
266  bool has_main_source_id;
267  packet::stream_source_t main_source_id;
268 
269  // Hashmap nodes to map this route by different keys.
270  core::List<SourceNode> source_nodes;
271  AddressNode address_node;
272  CnameNode cname_node;
273  SessionNode session_node;
274 
275  Route(core::IPool& pool)
276  : core::RefCounted<Route, core::PoolAllocation>(pool)
277  , has_main_source_id(false)
278  , main_source_id(0) {
279  cname[0] = '\0';
280  }
281  };
282 
283  status::StatusCode relink_source_(packet::stream_source_t source_id,
284  const char* cname);
285 
286  status::StatusCode create_route_(const packet::stream_source_t source_id,
287  const address::SocketAddr& source_addr,
288  const char* cname,
289  const core::SharedPtr<ReceiverSession>& session);
290  void remove_route_(core::SharedPtr<Route> route);
291  void remove_all_routes_();
292  status::StatusCode move_route_session_(Route& from, Route& to);
293  void collect_route_(Route& route);
294 
295  // Pools
296  core::SlabPool<Route, PreallocatedRoutes> route_pool_;
297  core::SlabPool<SourceNode, PreallocatedSources> source_node_pool_;
298 
299  // List of all routes
300  // Holds ownership to routes to keep them alive
301  core::List<Route> route_list_;
302 
303  // Mappings to find routes by different keys
304  // Don't hold ownership to routes
305  core::Hashmap<SourceNode, PreallocatedRoutes, core::NoOwnership> source_route_map_;
306  core::Hashmap<AddressNode, PreallocatedRoutes, core::NoOwnership> address_route_map_;
307  core::Hashmap<CnameNode, PreallocatedRoutes, core::NoOwnership> cname_route_map_;
308  core::Hashmap<SessionNode, PreallocatedRoutes, core::NoOwnership> session_route_map_;
309 };
310 
311 } // namespace pipeline
312 } // namespace roc
313 
314 #endif // ROC_PIPELINE_RECEIVER_SESSION_ROUTER_H_
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Socket address.
Definition: socket_addr.h:26
Base class for Hashmap element.
Definition: hashmap_node.h:61
Memory arena interface.
Definition: iarena.h:23
Memory pool interface.
Definition: ipool.h:23
Base class for List element.
Definition: list_node.h:48
Base class for non-copyable objects.
Definition: noncopyable.h:23
PoolAllocation(IPool &pool)
Initialize.
Base class for object with reference counter.
Definition: ref_counted.h:40
Shared ownership intrusive pointer.
Definition: shared_ptr.h:32
void unlink_source(packet::stream_source_t source_id)
Unlink source id from session.
ROC_ATTR_NODISCARD status::StatusCode add_session(const core::SharedPtr< ReceiverSession > &session, packet::stream_source_t source_id, const address::SocketAddr &source_addr)
Register session in router.
ROC_ATTR_NODISCARD status::StatusCode link_source(packet::stream_source_t source_id, const char *cname)
Link source id with unique CNAME.
size_t num_routes()
Get number of know routes.
core::SharedPtr< ReceiverSession > find_by_address(const address::SocketAddr &source_addr)
Find registered session by source address of sender's stream.
void remove_session(const core::SharedPtr< ReceiverSession > &session)
Unregister session from router.
bool has_session(const core::SharedPtr< ReceiverSession > &session)
Check if there is a route for given session.
core::SharedPtr< ReceiverSession > find_by_source(packet::stream_source_t source_id)
Find registered session by source id of sender's stream.
ReceiverSessionRouter(core::IArena &arena)
Initialize.
CNAME utilities.
Hash sum.
Memory arena interface.
Intrusive doubly-linked list.
Linked list node.
Helper macros.
#define ROC_CONTAINER_OF(ptr, type, member)
Cast a member of a structure out to the containing structure.
Definition: macro_helpers.h:37
hashsum_t hashsum_int(int16_t)
Compute hash of 16-bit integer.
hashsum_t hashsum_str(const char *str)
Compute hash of zero-terminated string.
hashsum_t hashsum_mem(const void *data, size_t size)
Compute hash of byte range.
size_t hashsum_t
Hash type.
Definition: hashsum.h:21
uint32_t stream_source_t
Packet stream identifier.
Definition: units.h:27
Root namespace.
Non-copyable object.
Ownership policies.
Receiver session pipeline.
RTCP headers.
Memory pool.
Socket address.
Status codes.
StatusCode
Status code.
Definition: status_code.h:19
Various units used in packets.