Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
reporter.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2023 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_rtcp/reporter.h
10//! @brief RTCP reporter.
11
12#ifndef ROC_RTCP_REPORTER_H_
13#define ROC_RTCP_REPORTER_H_
14
16#include "roc_core/array.h"
17#include "roc_core/hashmap.h"
18#include "roc_core/list.h"
22#include "roc_core/shared_ptr.h"
23#include "roc_core/slab_pool.h"
24#include "roc_core/time.h"
25#include "roc_packet/ntp.h"
26#include "roc_packet/units.h"
27#include "roc_rtcp/cname.h"
28#include "roc_rtcp/config.h"
29#include "roc_rtcp/headers.h"
33#include "roc_rtcp/reports.h"
35#include "roc_rtcp/sdes.h"
37
38namespace roc {
39namespace rtcp {
40
41//! RTCP report processor and generator.
42//!
43//! Used by rtcp::Communicator to incrementally process and generate individual
44//! blocks of compound RTCP packets. Collects data from RTCP traffic and local
45//! pipeline (IParticipant).
46//!
47//! Features:
48//!
49//! - Maintains hash table of all known sending and receiving streams.
50//! The table is populated from two sources: reports gathered via RTCP from
51//! remote peers and local reports gathered from IParticipant.
52//!
53//! - Maintains hash table of all destination addresses where to send reports,
54//! and an index to quickly find which streams are associated with each address.
55//!
56//! - Provides methods to process report blocks from incoming RTCP packets.
57//! Incrementally fills internal tables from provided report blocks.
58//! When RTCP packet is fully processed, notifies IParticipant with
59//! the updated remote reports accumulated in internal tables.
60//!
61//! - Provides methods to generate report blocks for outgoing RTCP packets.
62//! Queries up-to-date local reports from IParticipant into internal tables.
63//! Incrementally fills report blocks from the internal tables.
64//!
65//! - Notifies IParticipant when a stream is removed after receiving BYE
66//! message or due to inactivity timeout.
67//!
68//! - Detects SSRC collisions and asks IParticipant to switch SSRC.
69//! Sends BYE message for old SSRC.
70//!
71//! Workflow of incoming packets processing:
72//!
73//! @code
74//! reporter.begin_processing()
75//! reporter.process_sr(...)
76//! reporter.process_reception_block(...)
77//! ...
78//! reporter.end_processing()
79//! @endcode
80//!
81//! Workflow of outgoing packet generation:
82//!
83//! @code
84//! reporter.begin_generation();
85//! reporter.generate_sr(...)
86//! reporter.generate_reception_block(...)
87//! ...
88//! reporter.end_generation()
89//! @endcode
90class Reporter : public core::NonCopyable<> {
91public:
92 //! Initialize.
93 Reporter(const Config& config, IParticipant& participant, core::IArena& arena);
94 ~Reporter();
95
96 //! Check if initialization succeeded.
97 bool is_valid() const;
98
99 //! Check if there is local sending stream.
100 bool is_sending() const;
101
102 //! Check if there are local receiving streams.
103 bool is_receiving() const;
104
105 //! Get number of tracked destination addresses, for testing.
106 size_t total_destinations() const;
107
108 //! Get number of tracked streams, for testing.
109 size_t total_streams() const;
110
111 //! @name Report processing
112 //! @{
113
114 //! Begin report processing.
115 //! Invoked before process_xxx() functions.
118 core::nanoseconds_t report_time);
119
120 //! Process SDES CNAME.
121 void process_cname(const SdesChunk& chunk, const SdesItem& item);
122
123 //! Process SR header.
125
126 //! Process SR/RR reception block.
129
130 //! Process XR DLRR sub-block (extended sender report).
132 const header::XrDlrrSubblock& blk);
133
134 //! Process XR RRTR block (extended receiver report).
136
137 //! Process XR Measurement Info block (extended receiver report).
140
141 //! Process XR Delay Metrics block (extended receiver report).
143 const header::XrDelayMetricsBlock& blk);
144
145 //! Process XR Queue Metrics block (extended receiver report).
147 const header::XrQueueMetricsBlock& blk);
148
149 //! Process BYE message.
151
152 //! End report processing.
153 //! Invoked after process_xxx() functions.
155
156 //! @}
157
158 //! @name Report generation
159 //! @{
160
161 //! Begin report generation.
162 //! Invoked before genrate_xxx() functions.
165
166 //! Get number of destination addresses to which to send reports.
167 size_t num_dest_addresses() const;
168
169 //! Get number of sending streams to be reported.
170 //! @p addr_index should be in range [0; num_dest_addresses()-1].
171 size_t num_sending_streams(size_t addr_index) const;
172
173 //! Get number of receiving streams to be reported.
174 //! @p addr_index should be in range [0; num_dest_addresses()-1].
175 size_t num_receiving_streams(size_t addr_index) const;
176
177 //! Generate destination address.
178 //! @p addr_index should be in range [0; num_dest_addresses()-1].
179 void generate_dest_address(size_t addr_index, address::SocketAddr& addr);
180
181 //! Generate SDES chunk with CNAME item.
182 void generate_cname(SdesChunk& chunk, SdesItem& item);
183
184 //! Generate SR header.
186
187 //! Generate RR header.
189
190 //! Generate SR/RR reception block.
191 //! @p addr_index should be in range [0; num_dest_addresses()-1].
192 //! @p stream_index should be in range [0; num_receiving_streams()-1].
193 void generate_reception_block(size_t addr_index,
194 size_t stream_index,
196
197 //! Generate XR header.
199
200 //! Generate XR DLRR sub-block (extended sender report).
201 //! @p addr_index should be in range [0; num_dest_addresses()-1].
202 //! @p stream_index should be in range [0; num_sending_streams()-1].
203 void generate_dlrr_subblock(size_t addr_index,
204 size_t stream_index,
206
207 //! Generate XR RRTR header (extended receiver report).
209
210 //! Generate XR Measurement Info block (extended receiver report).
211 //! @p addr_index should be in range [0; num_dest_addresses()-1].
212 //! @p stream_index should be in range [0; num_receiving_streams()-1].
213 void generate_measurement_info_block(size_t addr_index,
214 size_t stream_index,
216
217 //! Generate XR Delay Metrics block (extended receiver report).
218 //! @p addr_index should be in range [0; num_dest_addresses()-1].
219 //! @p stream_index should be in range [0; num_receiving_streams()-1].
220 void generate_delay_metrics_block(size_t addr_index,
221 size_t stream_index,
223
224 //! Generate XR Queue Metrics block (extended receiver report).
225 //! @p addr_index should be in range [0; num_dest_addresses()-1].
226 //! @p stream_index should be in range [0; num_receiving_streams()-1].
227 void generate_queue_metrics_block(size_t addr_index,
228 size_t stream_index,
230
231 //! Check if BYE message should be included.
232 bool need_goodbye() const;
233
234 //! Generate BYE message.
236
237 //! End report generation.
238 //! Invoked after generate_xxx() functions.
240
241 //! @}
242
243private:
244 enum { PreallocatedStreams = 8, PreallocatedAddresses = 4 };
245
246 enum ReportState {
247 State_Idle, // Default state
248 State_Processing, // Between begin_processing() and end_processing()
249 State_Generating, // Between begin_generation() and end_generation()
250 };
251
252 enum CreateMode {
253 AutoCreate, // Automatically create stream if not found
254 NoAutoCreate, // Return NULL if not found
255 };
256
257 // Represents state of one local sending and/or receiving stream.
258 // One stream object is created for every discovered remote participant
259 // that receives from us and/or sends to us.
260 // Stream is uniquely identified by SSRC of remote participant.
261 struct Stream : core::RefCounted<Stream, core::PoolAllocation>,
262 core::HashmapNode<>,
263 core::ListNode<> {
264 Stream(core::IPool& pool,
265 packet::stream_source_t source_id,
266 core::nanoseconds_t report_time,
267 const RttConfig& rtt_config)
268 : core::RefCounted<Stream, core::PoolAllocation>(pool)
269 , source_id(source_id)
270 , has_remote_recv_report(false)
271 , remote_recv_rtt(rtt_config)
272 , has_remote_send_report(false)
273 , remote_send_rtt(rtt_config)
274 , local_recv_report(NULL)
275 , last_update(report_time)
276 , last_local_sr(0)
277 , last_remote_rr(0)
278 , last_remote_rr_ntp(0)
279 , last_remote_dlsr(0)
280 , last_local_rr(0)
281 , last_remote_sr(0)
282 , last_remote_sr_ntp(0)
283 , last_remote_dlrr(0)
284 , is_looped(false) {
285 cname[0] = '\0';
286 }
287
288 // SSRC and CNAME of remote participant.
289 packet::stream_source_t source_id;
290 char cname[MaxCnameLen + 1];
291
292 // Stream is sending to remote participant and we obtained
293 // receiver report from it.
294 bool has_remote_recv_report;
295 RecvReport remote_recv_report;
296 RttEstimator remote_recv_rtt;
297 PacketCounter remote_recv_packet_count;
298
299 // Stream is receiving from remote participant and we obtained
300 // sender report from it.
301 bool has_remote_send_report;
302 SendReport remote_send_report;
303 RttEstimator remote_send_rtt;
304 PacketCounter remote_send_packet_count;
305 PacketCounter remote_send_byte_count;
306
307 // Stream is receiving from remote participant and this is our
308 // receiver report to be delivered to remote side.
309 // Points to an element of local_recv_reports_ array. Whenever
310 // array is resized, rebuild_index_() updates the pointers.
311 RecvReport* local_recv_report;
312 LossEstimator local_recv_loss;
313
314 // Remote address from where reports are coming.
315 address::SocketAddr remote_address;
316
317 // Whenever stream is updated, this timestamp changes and stream
318 // is moved to the front of stream_lru_ list.
319 core::nanoseconds_t last_update;
320
321 // When we sent last SR for which we received DLSR (local clock).
322 core::nanoseconds_t last_local_sr;
323 // When we received last RR (local clock).
324 core::nanoseconds_t last_remote_rr;
325 // NTP timestamp from last RR (as it was in packet, remote clock).
326 packet::ntp_timestamp_t last_remote_rr_ntp;
327 // DLSR received with last RR (delta, remote clock).
328 core::nanoseconds_t last_remote_dlsr;
329
330 // When we sent last RR for which we received DLRR (local clock).
331 core::nanoseconds_t last_local_rr;
332 // When we received last SR (local clock).
333 core::nanoseconds_t last_remote_sr;
334 // NTP timestamp from last SR (as it was in packet, remote clock).
335 packet::ntp_timestamp_t last_remote_sr_ntp;
336 // DLRR received with last SR (delta, remote clock).
337 core::nanoseconds_t last_remote_dlrr;
338
339 // Set when we detect network loop.
340 bool is_looped;
341
342 packet::stream_source_t key() const {
343 return source_id;
344 }
345
346 static core::hashsum_t key_hash(packet::stream_source_t id) {
347 return core::hashsum_int(id);
348 }
349
350 static bool key_equal(packet::stream_source_t id1, packet::stream_source_t id2) {
351 return id1 == id2;
352 }
353 };
354
355 // Represents one destination address.
356 // If we're sending all reports to a single preconfigured address, there will be
357 // only one instance. Otherwise there will be an instance for every unique address.
358 struct Address : core::RefCounted<Address, core::PoolAllocation>,
359 core::HashmapNode<>,
360 core::ListNode<> {
361 Address(core::IPool& pool,
362 core::IArena& arena,
363 const address::SocketAddr& remote_address,
364 core::nanoseconds_t report_time)
365 : core::RefCounted<Address, core::PoolAllocation>(pool)
366 , remote_address(remote_address)
367 , send_stream_index(arena)
368 , recv_stream_index(arena)
369 , last_rebuild(report_time) {
370 }
371
372 // Destination address where to send reports.
373 address::SocketAddr remote_address;
374
375 // Pointers to local sending and receiving streams from stream map
376 // associated with given address.
377 core::Array<Stream*, PreallocatedStreams> send_stream_index;
378 core::Array<Stream*, PreallocatedStreams> recv_stream_index;
379
380 // Whenever address is rebuilt, this timestamp changes and address
381 // is moved to the front of address_lru_ list.
382 core::nanoseconds_t last_rebuild;
383
384 const address::SocketAddr& key() const {
385 return remote_address;
386 }
387
388 static core::hashsum_t key_hash(const address::SocketAddr& addr) {
389 return core::hashsum_mem(addr.saddr(), (size_t)addr.slen());
390 }
391
392 static bool key_equal(const address::SocketAddr& addr1,
393 const address::SocketAddr& addr2) {
394 return addr1 == addr2;
395 }
396 };
397
398 status::StatusCode notify_streams_();
399 status::StatusCode refresh_streams_();
400 status::StatusCode query_streams_();
401 status::StatusCode rebuild_index_();
402
403 void detect_timeouts_();
404 void detect_collision_(packet::stream_source_t source_id);
405 void resolve_collision_();
406
407 void validate_send_report_(const SendReport& send_report);
408 void validate_recv_report_(const RecvReport& recv_report);
409
410 core::SharedPtr<Stream> find_stream_(packet::stream_source_t source_id,
411 CreateMode mode);
412 void remove_stream_(Stream& stream);
413 void update_stream_(Stream& stream);
414
415 core::SharedPtr<Address> find_address_(const address::SocketAddr& remote_address,
416 CreateMode mode);
417 void remove_address_(Address& address);
418 void rebuild_address_(Address& address);
419
420 core::IArena& arena_;
421
422 // Interface implemented by local sender/receiver pipeline.
423 IParticipant& participant_;
424
425 // Defines whether participant uses a single static destination address
426 // for all all reports, or otherwise sends individual reports to dynamically
427 // discovered remote addresses.
428 ParticipantReportMode participant_report_mode_;
429 address::SocketAddr participant_report_addr_;
430
431 // Information obtained from IParticipant.
432 char local_cname_[MaxCnameLen + 1];
433 packet::stream_source_t local_source_id_;
434 bool has_local_send_report_;
435 SendReport local_send_report_;
436 core::Array<RecvReport, PreallocatedStreams> local_recv_reports_;
437
438 // Map of all streams, identified by SSRC.
439 core::SlabPool<Stream, PreallocatedStreams> stream_pool_;
440 core::Hashmap<Stream, PreallocatedStreams> stream_map_;
441
442 // List of all streams (from stream map) ordered by update time.
443 // Recently updated streams are moved to the front of the list.
444 // This list always contains all existing streams.
445 core::List<Stream, core::NoOwnership> stream_lru_;
446
447 // Map of all destination addresses.
448 // In Report_ToAddress mode, there will be only one address.
449 // In Report_Back mode, addresses will be allocated as we discover
450 // new remote participants.
451 core::SlabPool<Address, PreallocatedAddresses> address_pool_;
452 core::Hashmap<Address, PreallocatedAddresses> address_map_;
453
454 // List of all addresses (from address map) ordered by rebuild time.
455 // Recently rebuilt addresses are moved to the front of the list.
456 // This list always contains all existing addresses.
457 core::List<Address, core::NoOwnership> address_lru_;
458
459 // Pointers to addresses (from address map), which in turn hold
460 // pointers to streams (from stream map), for fast access by index
461 // during report generation.
462 core::Array<Address*, PreallocatedAddresses> address_index_;
463 // If true, the index should be rebuilt before next generation.
464 bool need_rebuild_index_;
465
466 // When we sent most recent SR (local clock).
467 core::nanoseconds_t current_sr_;
468 // When we sent most recent RR (local clock).
469 core::nanoseconds_t current_rr_;
470
471 // SSRC collision detection state.
472 bool collision_detected_;
473 bool collision_reported_;
474
475 // Report processing & generation state.
476 ReportState report_state_;
477 status::StatusCode report_error_;
478 address::SocketAddr report_addr_;
479 core::nanoseconds_t report_time_;
480
481 // Configuration.
482 const Config config_;
483 const core::nanoseconds_t max_delay_;
484
485 bool valid_;
486};
487
488} // namespace rtcp
489} // namespace roc
490
491#endif // ROC_RTCP_REPORTER_H_
Dynamic array.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition attributes.h:31
Memory arena interface.
Definition iarena.h:23
Base class for non-copyable objects.
Definition noncopyable.h:23
PoolAllocation(IPool &pool)
Initialize.
IPool & pool() const
Get pool.
RTCP participant.
RTCP report processor and generator.
Definition reporter.h:90
size_t total_destinations() const
Get number of tracked destination addresses, for testing.
void process_dlrr_subblock(const header::XrPacket &xr, const header::XrDlrrSubblock &blk)
Process XR DLRR sub-block (extended sender report).
void process_sr(const header::SenderReportPacket &sr)
Process SR header.
bool is_valid() const
Check if initialization succeeded.
void generate_dlrr_subblock(size_t addr_index, size_t stream_index, header::XrDlrrSubblock &blk)
Generate XR DLRR sub-block (extended sender report). addr_index should be in range [0; num_dest_addre...
bool is_receiving() const
Check if there are local receiving streams.
void generate_rr(header::ReceiverReportPacket &rr)
Generate RR header.
void process_delay_metrics_block(const header::XrPacket &xr, const header::XrDelayMetricsBlock &blk)
Process XR Delay Metrics block (extended receiver report).
void process_reception_block(packet::stream_source_t ssrc, const header::ReceptionReportBlock &blk)
Process SR/RR reception block.
size_t num_dest_addresses() const
Get number of destination addresses to which to send reports.
void generate_rrtr_block(header::XrRrtrBlock &blk)
Generate XR RRTR header (extended receiver report).
void generate_goodbye(packet::stream_source_t &ssrc)
Generate BYE message.
void process_cname(const SdesChunk &chunk, const SdesItem &item)
Process SDES CNAME.
void process_rrtr_block(const header::XrPacket &xr, const header::XrRrtrBlock &blk)
Process XR RRTR block (extended receiver report).
status::StatusCode begin_processing(const address::SocketAddr &report_addr, core::nanoseconds_t report_time)
Begin report processing. Invoked before process_xxx() functions.
size_t total_streams() const
Get number of tracked streams, for testing.
void generate_delay_metrics_block(size_t addr_index, size_t stream_index, header::XrDelayMetricsBlock &blk)
Generate XR Delay Metrics block (extended receiver report). addr_index should be in range [0; num_des...
void generate_xr(header::XrPacket &xr)
Generate XR header.
void generate_sr(header::SenderReportPacket &sr)
Generate SR header.
size_t num_receiving_streams(size_t addr_index) const
Get number of receiving streams to be reported. addr_index should be in range [0; num_dest_addresses(...
void process_goodbye(packet::stream_source_t ssrc)
Process BYE message.
bool need_goodbye() const
Check if BYE message should be included.
void generate_measurement_info_block(size_t addr_index, size_t stream_index, header::XrMeasurementInfoBlock &blk)
Generate XR Measurement Info block (extended receiver report). addr_index should be in range [0; num_...
void generate_cname(SdesChunk &chunk, SdesItem &item)
Generate SDES chunk with CNAME item.
status::StatusCode end_processing()
End report processing. Invoked after process_xxx() functions.
size_t num_sending_streams(size_t addr_index) const
Get number of sending streams to be reported. addr_index should be in range [0; num_dest_addresses()-...
void process_measurement_info_block(const header::XrPacket &xr, const header::XrMeasurementInfoBlock &blk)
Process XR Measurement Info block (extended receiver report).
bool is_sending() const
Check if there is local sending stream.
void generate_dest_address(size_t addr_index, address::SocketAddr &addr)
Generate destination address. addr_index should be in range [0; num_dest_addresses()-1].
status::StatusCode end_generation()
End report generation. Invoked after generate_xxx() functions.
void generate_reception_block(size_t addr_index, size_t stream_index, header::ReceptionReportBlock &blk)
Generate SR/RR reception block. addr_index should be in range [0; num_dest_addresses()-1]....
Reporter(const Config &config, IParticipant &participant, core::IArena &arena)
Initialize.
void process_queue_metrics_block(const header::XrPacket &xr, const header::XrQueueMetricsBlock &blk)
Process XR Queue Metrics block (extended receiver report).
status::StatusCode begin_generation(core::nanoseconds_t report_time)
Begin report generation. Invoked before genrate_xxx() functions.
void generate_queue_metrics_block(size_t addr_index, size_t stream_index, header::XrQueueMetricsBlock &blk)
Generate XR Queue Metrics block (extended receiver report). addr_index should be in range [0; num_des...
Receiver Report RTCP packet (RR).
Definition headers.h:524
Sender Report RTCP packet (SR).
Definition headers.h:621
XR DLRR Report sub-block.
Definition headers.h:1228
XR Measurement Info Report Block.
Definition headers.h:1373
RTCP Extended Report Packet.
Definition headers.h:1035
XR Receiver Reference Time Report block.
Definition headers.h:1174
CNAME utilities.
Intrusive hash table.
RTCP participant.
Intrusive doubly-linked list.
Loss estimator.
hashsum_t hashsum_int(int16_t)
Compute hash of 16-bit integer.
int64_t nanoseconds_t
Nanoseconds.
Definition time.h:58
hashsum_t hashsum_mem(const void *data, size_t size)
Compute hash of byte range.
size_t hashsum_t
Hash type.
Definition hashsum.h:21
uint32_t stream_source_t
Packet stream identifier.
Definition units.h:27
uint64_t ntp_timestamp_t
NTP timestamp.
Definition ntp.h:35
ParticipantReportMode
Participant report generation mode.
StatusCode
Status code.
Definition status_code.h:19
Root namespace.
Non-copyable object.
Ownership policies.
Packet counter.
Base class for object with reference counter.
RTCP reports.
Utitilies for NTP timestamp.
RTCP config.
RTCP headers.
Round-trip time estimator.
SDES elements.
Shared ownership intrusive pointer.
Memory pool.
Socket address.
Status codes.
RTCP config.
Definition config.h:24
Parsed SDES chunk.
Definition sdes.h:23
Parsed SDES item.
Definition sdes.h:33
Time definitions.
Various units used in packets.