Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
csv_dumper.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2024 Roc Streaming authors
3  *
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7  */
8 
9 //! @file roc_core/csv_dumper.h
10 //! @brief Asynchronous CSV dumper.
11 
12 #ifndef ROC_CORE_CSV_DUMPER_H_
13 #define ROC_CORE_CSV_DUMPER_H_
14 
15 #include "roc_core/atomic.h"
16 #include "roc_core/mutex.h"
17 #include "roc_core/optional.h"
18 #include "roc_core/rate_limiter.h"
19 #include "roc_core/semaphore.h"
21 #include "roc_core/stddefs.h"
22 #include "roc_core/thread.h"
23 #include "roc_core/time.h"
24 
25 namespace roc {
26 namespace core {
27 
28 //! Maximum number of fields in CSV entry.
29 static const size_t Csv_MaxFields = 10;
30 
31 //! CSV entry.
32 //! Corresponds to one line in output file.
33 struct CsvEntry {
34  char type; //!< One-character entry type (first field).
35  size_t n_fields; //!< Number of fields.
36  double fields[Csv_MaxFields]; //!< Fields.
37 
38  CsvEntry()
39  : type('\0')
40  , n_fields(0) {
41  }
42 };
43 
44 //! CSV write configuration.
45 struct CsvConfig {
46  //! Maximum number of queued entries.
47  //! If queue becomes larger, entries are dropped.
48  size_t max_queued;
49 
50  //! Maximum allowed interval between subsequent entries of same type.
51  //! If zero, there is no limit.
52  //! If non-zero, each entry type is rate-limited according to this.
54 
55  CsvConfig()
56  : max_queued(1000)
58  }
59 };
60 
61 //! Asynchronous CSV dumper.
62 //! Writes entries to CSV file from background thread.
63 //! Recommended to be used from a single thread.
64 class CsvDumper : public Thread {
65 public:
66  //! Open file.
67  //! @p path - output file.
68  //! @p max_interval - maximum number of writes per second for each entry type.
69  CsvDumper(const char* path, const CsvConfig& config, IArena& arena);
70 
71  //! Close file.
73 
74  //! Check if opened without errors.
75  bool is_valid() const;
76 
77  //! Check whether write() would enqueue or drop entry.
78  //! Lock-free operation.
79  bool would_write(char type);
80 
81  //! Enqueue entry for writing.
82  //! Makes a copy of entry and pushes it to a lock-free ring buffer.
83  //! If buffer size limit or rate limit is exceeded, entry is dropped.
84  //! Lock-free operation.
85  void write(const CsvEntry& entry);
86 
87  //! Stop background thread.
88  void stop();
89 
90 private:
91  virtual void run();
92 
93  RateLimiter& limiter_(char type);
94 
95  bool open_(const char* path);
96  void close_();
97  bool dump_(const CsvEntry& entry);
98 
99  const CsvConfig config_;
100 
101  FILE* file_;
102 
103  Mutex write_mutex_;
104  Semaphore write_sem_;
105  SpscRingBuffer<CsvEntry> ringbuf_;
106 
107  Optional<RateLimiter> rate_lims_[128];
108 
109  Atomic<int> stop_;
110  bool valid_;
111 };
112 
113 } // namespace core
114 } // namespace roc
115 
116 #endif // ROC_CORE_CSV_DUMPER_H_
Atomic.
Asynchronous CSV dumper. Writes entries to CSV file from background thread. Recommended to be used fr...
Definition: csv_dumper.h:64
bool is_valid() const
Check if opened without errors.
bool would_write(char type)
Check whether write() would enqueue or drop entry. Lock-free operation.
~CsvDumper()
Close file.
CsvDumper(const char *path, const CsvConfig &config, IArena &arena)
Open file. path - output file. max_interval - maximum number of writes per second for each entry type...
void stop()
Stop background thread.
void write(const CsvEntry &entry)
Enqueue entry for writing. Makes a copy of entry and pushes it to a lock-free ring buffer....
Memory arena interface.
Definition: iarena.h:23
Mutex.
Definition: mutex.h:31
Optionally constructed object.
Definition: optional.h:25
Thread-safe lock-free single-producer single-consumer circular buffer of copyable objects.
Base class for thread objects.
Definition: thread.h:27
Mutex.
const nanoseconds_t Millisecond
One millisecond represented in nanoseconds.
Definition: time.h:67
int64_t nanoseconds_t
Nanoseconds.
Definition: time.h:58
Root namespace.
Optionally constructed object.
Rate limiter.
Single-producer single-consumer circular buffer of copyable objects.
Commonly used types and functions.
CSV write configuration.
Definition: csv_dumper.h:45
size_t max_queued
Maximum number of queued entries. If queue becomes larger, entries are dropped.
Definition: csv_dumper.h:48
nanoseconds_t max_interval
Maximum allowed interval between subsequent entries of same type. If zero, there is no limit....
Definition: csv_dumper.h:53
CSV entry. Corresponds to one line in output file.
Definition: csv_dumper.h:33
char type
One-character entry type (first field).
Definition: csv_dumper.h:34
size_t n_fields
Number of fields.
Definition: csv_dumper.h:35
double fields[Csv_MaxFields]
Fields.
Definition: csv_dumper.h:36
Thread.
Time definitions.