Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
reporter.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2023 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_rtcp/reporter.h
10 //! @brief RTCP reporter.
11 
12 #ifndef ROC_RTCP_REPORTER_H_
13 #define ROC_RTCP_REPORTER_H_
14 
16 #include "roc_core/array.h"
17 #include "roc_core/hashmap.h"
18 #include "roc_core/list.h"
19 #include "roc_core/noncopyable.h"
21 #include "roc_core/ref_counted.h"
22 #include "roc_core/shared_ptr.h"
23 #include "roc_core/slab_pool.h"
24 #include "roc_core/time.h"
25 #include "roc_packet/ntp.h"
26 #include "roc_packet/units.h"
27 #include "roc_rtcp/cname.h"
28 #include "roc_rtcp/config.h"
29 #include "roc_rtcp/headers.h"
30 #include "roc_rtcp/iparticipant.h"
33 #include "roc_rtcp/reports.h"
34 #include "roc_rtcp/rtt_estimator.h"
35 #include "roc_rtcp/sdes.h"
36 #include "roc_status/status_code.h"
37 
38 namespace roc {
39 namespace rtcp {
40 
41 //! RTCP report processor and generator.
42 //!
43 //! Used by rtcp::Communicator to incrementally process and generate individual
44 //! blocks of compound RTCP packets. Collects data from RTCP traffic and local
45 //! pipeline (IParticipant).
46 //!
47 //! Features:
48 //!
49 //! - Maintains hash table of all known sending and receiving streams.
50 //! The table is populated from two sources: reports gathered via RTCP from
51 //! remote peers and local reports gathered from IParticipant.
52 //!
53 //! - Maintains hash table of all destination addresses where to send reports,
54 //! and an index to quickly find which streams are associated with each address.
55 //!
56 //! - Provides methods to process report blocks from incoming RTCP packets.
57 //! Incrementally fills internal tables from provided report blocks.
58 //! When RTCP packet is fully processed, notifies IParticipant with
59 //! the updated remote reports accumulated in internal tables.
60 //!
61 //! - Provides methods to generate report blocks for outgoing RTCP packets.
62 //! Queries up-to-date local reports from IParticipant into internal tables.
63 //! Incrementally fills report blocks from the internal tables.
64 //!
65 //! - Notifies IParticipant when a stream is removed after receiving BYE
66 //! message or due to inactivity timeout.
67 //!
68 //! - Detects SSRC collisions and asks IParticipant to switch SSRC.
69 //! Sends BYE message for old SSRC.
70 //!
71 //! Workflow of incoming packets processing:
72 //!
73 //! @code
74 //! reporter.begin_processing()
75 //! reporter.process_sr(...)
76 //! reporter.process_reception_block(...)
77 //! ...
78 //! reporter.end_processing()
79 //! @endcode
80 //!
81 //! Workflow of outgoing packet generation:
82 //!
83 //! @code
84 //! reporter.begin_generation();
85 //! reporter.generate_sr(...)
86 //! reporter.generate_reception_block(...)
87 //! ...
88 //! reporter.end_generation()
89 //! @endcode
90 class Reporter : public core::NonCopyable<> {
91 public:
92  //! Initialize.
93  Reporter(const Config& config, IParticipant& participant, core::IArena& arena);
94  ~Reporter();
95 
96  //! Check if initialization succeeded.
97  bool is_valid() const;
98 
99  //! Check if there is local sending stream.
100  bool is_sending() const;
101 
102  //! Check if there are local receiving streams.
103  bool is_receiving() const;
104 
105  //! Get number of tracked destination addresses, for testing.
106  size_t total_destinations() const;
107 
108  //! Get number of tracked streams, for testing.
109  size_t total_streams() const;
110 
111  //! @name Report processing
112  //! @{
113 
114  //! Begin report processing.
115  //! Invoked before process_xxx() functions.
118  core::nanoseconds_t report_time);
119 
120  //! Process SDES CNAME.
121  void process_cname(const SdesChunk& chunk, const SdesItem& item);
122 
123  //! Process SR header.
125 
126  //! Process SR/RR reception block.
128  const header::ReceptionReportBlock& blk);
129 
130  //! Process XR DLRR sub-block (extended sender report).
132  const header::XrDlrrSubblock& blk);
133 
134  //! Process XR RRTR block (extended receiver report).
136 
137  //! Process XR Measurement Info block (extended receiver report).
139  const header::XrMeasurementInfoBlock& blk);
140 
141  //! Process XR Delay Metrics block (extended receiver report).
143  const header::XrDelayMetricsBlock& blk);
144 
145  //! Process XR Queue Metrics block (extended receiver report).
147  const header::XrQueueMetricsBlock& blk);
148 
149  //! Process BYE message.
151 
152  //! End report processing.
153  //! Invoked after process_xxx() functions.
155 
156  //! @}
157 
158  //! @name Report generation
159  //! @{
160 
161  //! Begin report generation.
162  //! Invoked before genrate_xxx() functions.
165 
166  //! Get number of destination addresses to which to send reports.
167  size_t num_dest_addresses() const;
168 
169  //! Get number of sending streams to be reported.
170  //! @p addr_index should be in range [0; num_dest_addresses()-1].
171  size_t num_sending_streams(size_t addr_index) const;
172 
173  //! Get number of receiving streams to be reported.
174  //! @p addr_index should be in range [0; num_dest_addresses()-1].
175  size_t num_receiving_streams(size_t addr_index) const;
176 
177  //! Generate destination address.
178  //! @p addr_index should be in range [0; num_dest_addresses()-1].
179  void generate_dest_address(size_t addr_index, address::SocketAddr& addr);
180 
181  //! Generate SDES chunk with CNAME item.
182  void generate_cname(SdesChunk& chunk, SdesItem& item);
183 
184  //! Generate SR header.
186 
187  //! Generate RR header.
189 
190  //! Generate SR/RR reception block.
191  //! @p addr_index should be in range [0; num_dest_addresses()-1].
192  //! @p stream_index should be in range [0; num_receiving_streams()-1].
193  void generate_reception_block(size_t addr_index,
194  size_t stream_index,
196 
197  //! Generate XR header.
199 
200  //! Generate XR DLRR sub-block (extended sender report).
201  //! @p addr_index should be in range [0; num_dest_addresses()-1].
202  //! @p stream_index should be in range [0; num_sending_streams()-1].
203  void generate_dlrr_subblock(size_t addr_index,
204  size_t stream_index,
206 
207  //! Generate XR RRTR header (extended receiver report).
209 
210  //! Generate XR Measurement Info block (extended receiver report).
211  //! @p addr_index should be in range [0; num_dest_addresses()-1].
212  //! @p stream_index should be in range [0; num_receiving_streams()-1].
213  void generate_measurement_info_block(size_t addr_index,
214  size_t stream_index,
216 
217  //! Generate XR Delay Metrics block (extended receiver report).
218  //! @p addr_index should be in range [0; num_dest_addresses()-1].
219  //! @p stream_index should be in range [0; num_receiving_streams()-1].
220  void generate_delay_metrics_block(size_t addr_index,
221  size_t stream_index,
223 
224  //! Generate XR Queue Metrics block (extended receiver report).
225  //! @p addr_index should be in range [0; num_dest_addresses()-1].
226  //! @p stream_index should be in range [0; num_receiving_streams()-1].
227  void generate_queue_metrics_block(size_t addr_index,
228  size_t stream_index,
230 
231  //! Check if BYE message should be included.
232  bool need_goodbye() const;
233 
234  //! Generate BYE message.
236 
237  //! End report generation.
238  //! Invoked after generate_xxx() functions.
240 
241  //! @}
242 
243 private:
244  enum { PreallocatedStreams = 8, PreallocatedAddresses = 4 };
245 
246  enum ReportState {
247  State_Idle, // Default state
248  State_Processing, // Between begin_processing() and end_processing()
249  State_Generating, // Between begin_generation() and end_generation()
250  };
251 
252  enum CreateMode {
253  AutoCreate, // Automatically create stream if not found
254  NoAutoCreate, // Return NULL if not found
255  };
256 
257  // Represents state of one local sending and/or receiving stream.
258  // One stream object is created for every discovered remote participant
259  // that receives from us and/or sends to us.
260  // Stream is uniquely identified by SSRC of remote participant.
261  struct Stream : core::RefCounted<Stream, core::PoolAllocation>,
262  core::HashmapNode<>,
263  core::ListNode<> {
264  Stream(core::IPool& pool,
265  packet::stream_source_t source_id,
266  core::nanoseconds_t report_time,
267  const RttConfig& rtt_config)
268  : core::RefCounted<Stream, core::PoolAllocation>(pool)
269  , source_id(source_id)
270  , has_remote_recv_report(false)
271  , remote_recv_rtt(rtt_config)
272  , has_remote_send_report(false)
273  , remote_send_rtt(rtt_config)
274  , local_recv_report(NULL)
275  , last_update(report_time)
276  , last_local_sr(0)
277  , last_remote_rr(0)
278  , last_remote_rr_ntp(0)
279  , last_remote_dlsr(0)
280  , last_local_rr(0)
281  , last_remote_sr(0)
282  , last_remote_sr_ntp(0)
283  , last_remote_dlrr(0)
284  , is_looped(false) {
285  cname[0] = '\0';
286  }
287 
288  // SSRC and CNAME of remote participant.
289  packet::stream_source_t source_id;
290  char cname[MaxCnameLen + 1];
291 
292  // Stream is sending to remote participant and we obtained
293  // receiver report from it.
294  bool has_remote_recv_report;
295  RecvReport remote_recv_report;
296  RttEstimator remote_recv_rtt;
297  PacketCounter remote_recv_packet_count;
298 
299  // Stream is receiving from remote participant and we obtained
300  // sender report from it.
301  bool has_remote_send_report;
302  SendReport remote_send_report;
303  RttEstimator remote_send_rtt;
304  PacketCounter remote_send_packet_count;
305  PacketCounter remote_send_byte_count;
306 
307  // Stream is receiving from remote participant and this is our
308  // receiver report to be delivered to remote side.
309  // Points to an element of local_recv_reports_ array. Whenever
310  // array is resized, rebuild_index_() updates the pointers.
311  RecvReport* local_recv_report;
312  LossEstimator local_recv_loss;
313 
314  // Remote address from where reports are coming.
315  address::SocketAddr remote_address;
316 
317  // Whenever stream is updated, this timestamp changes and stream
318  // is moved to the front of stream_lru_ list.
319  core::nanoseconds_t last_update;
320 
321  // When we sent last SR for which we received DLSR (local clock).
322  core::nanoseconds_t last_local_sr;
323  // When we received last RR (local clock).
324  core::nanoseconds_t last_remote_rr;
325  // NTP timestamp from last RR (as it was in packet, remote clock).
326  packet::ntp_timestamp_t last_remote_rr_ntp;
327  // DLSR received with last RR (delta, remote clock).
328  core::nanoseconds_t last_remote_dlsr;
329 
330  // When we sent last RR for which we received DLRR (local clock).
331  core::nanoseconds_t last_local_rr;
332  // When we received last SR (local clock).
333  core::nanoseconds_t last_remote_sr;
334  // NTP timestamp from last SR (as it was in packet, remote clock).
335  packet::ntp_timestamp_t last_remote_sr_ntp;
336  // DLRR received with last SR (delta, remote clock).
337  core::nanoseconds_t last_remote_dlrr;
338 
339  // Set when we detect network loop.
340  bool is_looped;
341 
342  packet::stream_source_t key() const {
343  return source_id;
344  }
345 
346  static core::hashsum_t key_hash(packet::stream_source_t id) {
347  return core::hashsum_int(id);
348  }
349 
350  static bool key_equal(packet::stream_source_t id1, packet::stream_source_t id2) {
351  return id1 == id2;
352  }
353  };
354 
355  // Represents one destination address.
356  // If we're sending all reports to a single preconfigured address, there will be
357  // only one instance. Otherwise there will be an instance for every unique address.
358  struct Address : core::RefCounted<Address, core::PoolAllocation>,
359  core::HashmapNode<>,
360  core::ListNode<> {
361  Address(core::IPool& pool,
362  core::IArena& arena,
363  const address::SocketAddr& remote_address,
364  core::nanoseconds_t report_time)
365  : core::RefCounted<Address, core::PoolAllocation>(pool)
366  , remote_address(remote_address)
367  , send_stream_index(arena)
368  , recv_stream_index(arena)
369  , last_rebuild(report_time) {
370  }
371 
372  // Destination address where to send reports.
373  address::SocketAddr remote_address;
374 
375  // Pointers to local sending and receiving streams from stream map
376  // associated with given address.
377  core::Array<Stream*, PreallocatedStreams> send_stream_index;
378  core::Array<Stream*, PreallocatedStreams> recv_stream_index;
379 
380  // Whenever address is rebuilt, this timestamp changes and address
381  // is moved to the front of address_lru_ list.
382  core::nanoseconds_t last_rebuild;
383 
384  const address::SocketAddr& key() const {
385  return remote_address;
386  }
387 
388  static core::hashsum_t key_hash(const address::SocketAddr& addr) {
389  return core::hashsum_mem(addr.saddr(), (size_t)addr.slen());
390  }
391 
392  static bool key_equal(const address::SocketAddr& addr1,
393  const address::SocketAddr& addr2) {
394  return addr1 == addr2;
395  }
396  };
397 
398  status::StatusCode notify_streams_();
399  status::StatusCode refresh_streams_();
400  status::StatusCode query_streams_();
401  status::StatusCode rebuild_index_();
402 
403  void detect_timeouts_();
404  void detect_collision_(packet::stream_source_t source_id);
405  void resolve_collision_();
406 
407  void validate_send_report_(const SendReport& send_report);
408  void validate_recv_report_(const RecvReport& recv_report);
409 
410  core::SharedPtr<Stream> find_stream_(packet::stream_source_t source_id,
411  CreateMode mode);
412  void remove_stream_(Stream& stream);
413  void update_stream_(Stream& stream);
414 
415  core::SharedPtr<Address> find_address_(const address::SocketAddr& remote_address,
416  CreateMode mode);
417  void remove_address_(Address& address);
418  void rebuild_address_(Address& address);
419 
420  core::IArena& arena_;
421 
422  // Interface implemented by local sender/receiver pipeline.
423  IParticipant& participant_;
424 
425  // Defines whether participant uses a single static destination address
426  // for all all reports, or otherwise sends individual reports to dynamically
427  // discovered remote addresses.
428  ParticipantReportMode participant_report_mode_;
429  address::SocketAddr participant_report_addr_;
430 
431  // Information obtained from IParticipant.
432  char local_cname_[MaxCnameLen + 1];
433  packet::stream_source_t local_source_id_;
434  bool has_local_send_report_;
435  SendReport local_send_report_;
436  core::Array<RecvReport, PreallocatedStreams> local_recv_reports_;
437 
438  // Map of all streams, identified by SSRC.
439  core::SlabPool<Stream, PreallocatedStreams> stream_pool_;
440  core::Hashmap<Stream, PreallocatedStreams> stream_map_;
441 
442  // List of all streams (from stream map) ordered by update time.
443  // Recently updated streams are moved to the front of the list.
444  // This list always contains all existing streams.
445  core::List<Stream, core::NoOwnership> stream_lru_;
446 
447  // Map of all destination addresses.
448  // In Report_ToAddress mode, there will be only one address.
449  // In Report_Back mode, addresses will be allocated as we discover
450  // new remote participants.
451  core::SlabPool<Address, PreallocatedAddresses> address_pool_;
452  core::Hashmap<Address, PreallocatedAddresses> address_map_;
453 
454  // List of all addresses (from address map) ordered by rebuild time.
455  // Recently rebuilt addresses are moved to the front of the list.
456  // This list always contains all existing addresses.
457  core::List<Address, core::NoOwnership> address_lru_;
458 
459  // Pointers to addresses (from address map), which in turn hold
460  // pointers to streams (from stream map), for fast access by index
461  // during report generation.
462  core::Array<Address*, PreallocatedAddresses> address_index_;
463  // If true, the index should be rebuilt before next generation.
464  bool need_rebuild_index_;
465 
466  // When we sent most recent SR (local clock).
467  core::nanoseconds_t current_sr_;
468  // When we sent most recent RR (local clock).
469  core::nanoseconds_t current_rr_;
470 
471  // SSRC collision detection state.
472  bool collision_detected_;
473  bool collision_reported_;
474 
475  // Report processing & generation state.
476  ReportState report_state_;
477  status::StatusCode report_error_;
478  address::SocketAddr report_addr_;
479  core::nanoseconds_t report_time_;
480 
481  // Configuration.
482  const Config config_;
483  const core::nanoseconds_t max_delay_;
484 
485  bool valid_;
486 };
487 
488 } // namespace rtcp
489 } // namespace roc
490 
491 #endif // ROC_RTCP_REPORTER_H_
Dynamic array.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Socket address.
Definition: socket_addr.h:26
Memory arena interface.
Definition: iarena.h:23
Base class for non-copyable objects.
Definition: noncopyable.h:23
PoolAllocation(IPool &pool)
Initialize.
RTCP participant.
Definition: iparticipant.h:49
RTCP report processor and generator.
Definition: reporter.h:90
size_t total_destinations() const
Get number of tracked destination addresses, for testing.
void process_dlrr_subblock(const header::XrPacket &xr, const header::XrDlrrSubblock &blk)
Process XR DLRR sub-block (extended sender report).
void process_sr(const header::SenderReportPacket &sr)
Process SR header.
bool is_valid() const
Check if initialization succeeded.
void generate_dlrr_subblock(size_t addr_index, size_t stream_index, header::XrDlrrSubblock &blk)
Generate XR DLRR sub-block (extended sender report). addr_index should be in range [0; num_dest_addre...
bool is_receiving() const
Check if there are local receiving streams.
void generate_rr(header::ReceiverReportPacket &rr)
Generate RR header.
void process_delay_metrics_block(const header::XrPacket &xr, const header::XrDelayMetricsBlock &blk)
Process XR Delay Metrics block (extended receiver report).
void process_reception_block(packet::stream_source_t ssrc, const header::ReceptionReportBlock &blk)
Process SR/RR reception block.
size_t num_dest_addresses() const
Get number of destination addresses to which to send reports.
void generate_rrtr_block(header::XrRrtrBlock &blk)
Generate XR RRTR header (extended receiver report).
void generate_goodbye(packet::stream_source_t &ssrc)
Generate BYE message.
void process_cname(const SdesChunk &chunk, const SdesItem &item)
Process SDES CNAME.
void process_rrtr_block(const header::XrPacket &xr, const header::XrRrtrBlock &blk)
Process XR RRTR block (extended receiver report).
ROC_ATTR_NODISCARD status::StatusCode begin_generation(core::nanoseconds_t report_time)
Begin report generation. Invoked before genrate_xxx() functions.
size_t total_streams() const
Get number of tracked streams, for testing.
void generate_delay_metrics_block(size_t addr_index, size_t stream_index, header::XrDelayMetricsBlock &blk)
Generate XR Delay Metrics block (extended receiver report). addr_index should be in range [0; num_des...
void generate_xr(header::XrPacket &xr)
Generate XR header.
void generate_sr(header::SenderReportPacket &sr)
Generate SR header.
ROC_ATTR_NODISCARD status::StatusCode end_generation()
End report generation. Invoked after generate_xxx() functions.
size_t num_receiving_streams(size_t addr_index) const
Get number of receiving streams to be reported. addr_index should be in range [0; num_dest_addresses(...
ROC_ATTR_NODISCARD status::StatusCode end_processing()
End report processing. Invoked after process_xxx() functions.
void process_goodbye(packet::stream_source_t ssrc)
Process BYE message.
bool need_goodbye() const
Check if BYE message should be included.
void generate_measurement_info_block(size_t addr_index, size_t stream_index, header::XrMeasurementInfoBlock &blk)
Generate XR Measurement Info block (extended receiver report). addr_index should be in range [0; num_...
void generate_cname(SdesChunk &chunk, SdesItem &item)
Generate SDES chunk with CNAME item.
ROC_ATTR_NODISCARD status::StatusCode begin_processing(const address::SocketAddr &report_addr, core::nanoseconds_t report_time)
Begin report processing. Invoked before process_xxx() functions.
size_t num_sending_streams(size_t addr_index) const
Get number of sending streams to be reported. addr_index should be in range [0; num_dest_addresses()-...
void process_measurement_info_block(const header::XrPacket &xr, const header::XrMeasurementInfoBlock &blk)
Process XR Measurement Info block (extended receiver report).
bool is_sending() const
Check if there is local sending stream.
void generate_dest_address(size_t addr_index, address::SocketAddr &addr)
Generate destination address. addr_index should be in range [0; num_dest_addresses()-1].
void generate_reception_block(size_t addr_index, size_t stream_index, header::ReceptionReportBlock &blk)
Generate SR/RR reception block. addr_index should be in range [0; num_dest_addresses()-1]....
Reporter(const Config &config, IParticipant &participant, core::IArena &arena)
Initialize.
void process_queue_metrics_block(const header::XrPacket &xr, const header::XrQueueMetricsBlock &blk)
Process XR Queue Metrics block (extended receiver report).
void generate_queue_metrics_block(size_t addr_index, size_t stream_index, header::XrQueueMetricsBlock &blk)
Generate XR Queue Metrics block (extended receiver report). addr_index should be in range [0; num_des...
Receiver Report RTCP packet (RR).
Definition: headers.h:524
Reception report block.
Definition: headers.h:333
Sender Report RTCP packet (SR).
Definition: headers.h:621
XR Delay Metrics Block.
Definition: headers.h:1515
XR DLRR Report sub-block.
Definition: headers.h:1228
XR Measurement Info Report Block.
Definition: headers.h:1373
RTCP Extended Report Packet.
Definition: headers.h:1035
XR Queue Metrics Block.
Definition: headers.h:1668
XR Receiver Reference Time Report block.
Definition: headers.h:1174
CNAME utilities.
Intrusive hash table.
RTCP participant.
Intrusive doubly-linked list.
Loss estimator.
hashsum_t hashsum_int(int16_t)
Compute hash of 16-bit integer.
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
hashsum_t hashsum_mem(const void *data, size_t size)
Compute hash of byte range.
size_t hashsum_t
Hash type.
Definition: hashsum.h:21
uint32_t stream_source_t
Packet stream identifier.
Definition: units.h:27
uint64_t ntp_timestamp_t
NTP timestamp.
Definition: ntp.h:35
ParticipantReportMode
Participant report generation mode.
Root namespace.
Non-copyable object.
Ownership policies.
Packet counter.
Base class for object with reference counter.
RTCP reports.
Utitilies for NTP timestamp.
RTCP config.
RTCP headers.
Round-trip time estimator.
SDES elements.
Shared ownership intrusive pointer.
Memory pool.
Socket address.
Status codes.
StatusCode
Status code.
Definition: status_code.h:19
RTCP config.
Definition: config.h:24
Parsed SDES chunk.
Definition: sdes.h:23
Parsed SDES item.
Definition: sdes.h:33
Time definitions.
Various units used in packets.