Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
sender.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_node/sender.h
10 //! @brief Sender node.
11 
12 #ifndef ROC_NODE_SENDER_H_
13 #define ROC_NODE_SENDER_H_
14 
16 #include "roc_address/interface.h"
17 #include "roc_address/protocol.h"
19 #include "roc_core/hashmap.h"
20 #include "roc_core/mutex.h"
21 #include "roc_core/ref_counted.h"
22 #include "roc_core/scoped_ptr.h"
23 #include "roc_core/slab_pool.h"
24 #include "roc_core/stddefs.h"
25 #include "roc_node/context.h"
26 #include "roc_node/node.h"
27 #include "roc_packet/iwriter.h"
30 
31 namespace roc {
32 namespace node {
33 
34 //! Sender node.
35 class Sender : public Node, private pipeline::IPipelineTaskScheduler {
36 public:
37  //! Slot index.
38  typedef uint64_t slot_index_t;
39 
40  //! Initialize.
41  Sender(Context& context, const pipeline::SenderSinkConfig& pipeline_config);
42 
43  //! Deinitialize.
45 
46  //! Check if successfully constructed.
47  bool is_valid() const;
48 
49  //! Set interface config.
51  address::Interface iface,
52  const netio::UdpConfig& config);
53 
54  //! Connect to remote endpoint.
56  address::Interface iface,
57  const address::EndpointUri& uri);
58 
59  //! Remove slot.
61 
62  //! Callback for slot metrics.
63  typedef void (*slot_metrics_func_t)(const pipeline::SenderSlotMetrics& slot_metrics,
64  void* slot_arg);
65 
66  //! Callback for participant metrics.
67  typedef void (*party_metrics_func_t)(
68  const pipeline::SenderParticipantMetrics& party_metrics,
69  size_t party_index,
70  void* party_arg);
71 
72  //! Get metrics.
74  slot_metrics_func_t slot_metrics_func,
75  void* slot_metrics_arg,
76  party_metrics_func_t party_metrics_func,
77  size_t* party_metrics_size,
78  void* party_metrics_arg);
79 
80  //! Check if there are incomplete or broken slots.
82 
83  //! Check if there are broken slots.
84  bool has_broken();
85 
86  //! Get sender sink.
88 
89 private:
90  struct Port {
91  netio::UdpConfig config;
92  netio::UdpConfig orig_config;
94  packet::IWriter* outbound_writer;
95 
96  Port()
97  : handle(NULL)
98  , outbound_writer(NULL) {
99  }
100  };
101 
102  struct Slot : core::RefCounted<Slot, core::PoolAllocation>, core::HashmapNode<> {
103  const slot_index_t index;
105  Port ports[address::Iface_Max];
106  bool broken;
107 
108  Slot(core::IPool& pool,
109  slot_index_t index,
111  : core::RefCounted<Slot, core::PoolAllocation>(pool)
112  , index(index)
113  , handle(handle)
114  , broken(false) {
115  }
116 
117  slot_index_t key() const {
118  return index;
119  }
120 
121  static core::hashsum_t key_hash(slot_index_t index) {
122  return core::hashsum_int(index);
123  }
124 
125  static bool key_equal(slot_index_t index1, slot_index_t index2) {
126  return index1 == index2;
127  }
128  };
129 
130  bool check_compatibility_(address::Interface iface, const address::EndpointUri& uri);
131  void update_compatibility_(address::Interface iface, const address::EndpointUri& uri);
132 
133  core::SharedPtr<Slot> get_slot_(slot_index_t slot_index, bool auto_create);
134  void cleanup_slot_(Slot& slot);
135  void break_slot_(Slot& slot);
136 
137  Port&
138  select_outgoing_port_(Slot& slot, address::Interface, address::AddrFamily family);
139  bool setup_outgoing_port_(Port& port,
140  address::Interface iface,
141  address::AddrFamily family);
142 
143  virtual void schedule_task_processing(pipeline::PipelineLoop&,
144  core::nanoseconds_t delay);
145  virtual void cancel_task_processing(pipeline::PipelineLoop&);
146 
147  core::Mutex mutex_;
148 
149  pipeline::SenderLoop pipeline_;
150  ctl::ControlLoop::Tasks::PipelineProcessing processing_task_;
151 
152  core::SlabPool<Slot> slot_pool_;
153  core::Hashmap<Slot> slot_map_;
154 
155  bool used_interfaces_[address::Iface_Max];
156  address::Protocol used_protocols_[address::Iface_Max];
157 
158  pipeline::SenderSlotMetrics slot_metrics_;
159  core::Array<pipeline::SenderParticipantMetrics, 8> party_metrics_;
160 
161  bool valid_;
162 };
163 
164 } // namespace node
165 } // namespace roc
166 
167 #endif // ROC_NODE_SENDER_H_
Allocation policies.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition: attributes.h:31
Network endpoint URI.
Definition: endpoint_uri.h:27
Base class for Hashmap element.
Definition: hashmap_node.h:61
Memory pool interface.
Definition: ipool.h:23
Base class for object with reference counter.
Definition: ref_counted.h:40
struct PortHandle * PortHandle
Opaque port handle.
Definition: network_loop.h:55
Node context.
Definition: context.h:44
Base class for nodes.
Definition: node.h:23
Context & context()
All nodes hold reference to context.
Sender node.
Definition: sender.h:35
ROC_ATTR_NODISCARD bool connect(slot_index_t slot_index, address::Interface iface, const address::EndpointUri &uri)
Connect to remote endpoint.
bool is_valid() const
Check if successfully constructed.
bool has_broken()
Check if there are broken slots.
Sender(Context &context, const pipeline::SenderSinkConfig &pipeline_config)
Initialize.
ROC_ATTR_NODISCARD bool unlink(slot_index_t slot_index)
Remove slot.
void(* party_metrics_func_t)(const pipeline::SenderParticipantMetrics &party_metrics, size_t party_index, void *party_arg)
Callback for participant metrics.
Definition: sender.h:67
ROC_ATTR_NODISCARD bool get_metrics(slot_index_t slot_index, slot_metrics_func_t slot_metrics_func, void *slot_metrics_arg, party_metrics_func_t party_metrics_func, size_t *party_metrics_size, void *party_metrics_arg)
Get metrics.
ROC_ATTR_NODISCARD bool configure(slot_index_t slot_index, address::Interface iface, const netio::UdpConfig &config)
Set interface config.
bool has_incomplete()
Check if there are incomplete or broken slots.
uint64_t slot_index_t
Slot index.
Definition: sender.h:38
~Sender()
Deinitialize.
void(* slot_metrics_func_t)(const pipeline::SenderSlotMetrics &slot_metrics, void *slot_arg)
Callback for slot metrics.
Definition: sender.h:63
sndio::ISink & sink()
Get sender sink.
Packet writer interface.
Definition: iwriter.h:23
Pipeline task scheduler interface. PipelineLoop uses this interface to schedule asynchronous work....
struct SlotHandle * SlotHandle
Opaque slot handle.
Definition: sender_loop.h:47
Sink interface.
Definition: isink.h:22
Node context.
Network endpoint URI.
Intrusive hash table.
Interface ID.
Pipeline task scheduler interface.
Packet writer interface.
Mutex.
Interface
Interface ID.
Definition: interface.h:19
@ Iface_Max
Number of interfaces.
Definition: interface.h:36
Protocol
Protocol ID.
Definition: protocol.h:19
AddrFamily
Address family.
Definition: addr_family.h:19
hashsum_t hashsum_int(int16_t)
Compute hash of 16-bit integer.
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
size_t hashsum_t
Hash type.
Definition: hashsum.h:21
Root namespace.
Base class for nodes.
Protocol ID.
Base class for object with reference counter.
Unique ownrship pointer.
Sender pipeline loop.
Memory pool.
Commonly used types and functions.
UDP port parameters.
Definition: udp_port.h:32
Sender-side metrics specific to one participant (remote receiver).
Definition: metrics.h:24
Parameters of sender sink and sender session.
Definition: config.h:58
Sender-side metrics of the whole slot.
Definition: metrics.h:36