1use crate::event::Event;
4use crate::filter::AppliedFilter;
5use crate::frame::gps::{GpsFrame, RawGpsFrame};
6use crate::frame::main::{MainFrame, RawMainFrame};
7use crate::frame::slow::{RawSlowFrame, SlowFrame};
8use crate::frame::{self, DataFrameKind, FilteredFrameDef, FrameKind, GpsHomeFrame};
9use crate::parser::InternalError;
10use crate::{FilterSet, Headers, Reader};
11
12#[derive(Debug)]
14pub struct DataParser<'data, 'headers> {
15 headers: &'headers Headers<'data>,
16 main_filter: AppliedFilter,
17 slow_filter: AppliedFilter,
18 gps_filter: AppliedFilter,
19 data: Reader<'data>,
20 data_len: usize,
21 stats: Stats,
22 main_frames: MainFrameHistory,
23 gps_home_frame: Option<GpsHomeFrame>,
24 done: bool,
25}
26
27impl<'data, 'headers> DataParser<'data, 'headers> {
28 pub(crate) fn new(
29 data: Reader<'data>,
30 headers: &'headers Headers<'data>,
31 filters: &FilterSet,
32 ) -> Self {
33 let data_len = data.remaining();
34
35 Self {
36 headers,
37 main_filter: filters.main.apply(headers.main_frame_def()),
38 slow_filter: filters.slow.apply(headers.slow_frame_def()),
39 gps_filter: headers
40 .gps_frame_def()
41 .map(|def| filters.gps.apply(def))
42 .unwrap_or_default(),
43 data,
44 data_len,
45 stats: Stats::default(),
46 main_frames: MainFrameHistory::default(),
47 gps_home_frame: None,
48 done: false,
49 }
50 }
51
52 pub fn main_frame_def<'a>(&'a self) -> FilteredFrameDef<'a, frame::MainFrameDef<'data>> {
53 FilteredFrameDef::new(self.headers.main_frame_def(), &self.main_filter)
54 }
55
56 pub fn slow_frame_def<'a>(&'a self) -> FilteredFrameDef<'a, frame::SlowFrameDef<'data>> {
57 FilteredFrameDef::new(self.headers.slow_frame_def(), &self.slow_filter)
58 }
59
60 pub fn gps_frame_def<'a>(&'a self) -> Option<FilteredFrameDef<'a, frame::GpsFrameDef<'data>>> {
61 self.headers
62 .gps_frame_def()
63 .map(|def| FilteredFrameDef::new(def, &self.gps_filter))
64 }
65
66 #[inline]
68 pub fn stats(&self) -> &Stats {
69 &self.stats
70 }
71
72 #[inline]
74 pub fn is_done(&self) -> bool {
75 self.done
76 }
77
78 pub fn next<'parser>(&'parser mut self) -> Option<ParserEvent<'data, 'headers, 'parser>> {
81 if self.done {
82 return None;
83 }
84
85 loop {
86 let byte = self.data.read_u8()?;
87 let restore = self.data.get_restore_point();
88
89 let Some(kind) = FrameKind::from_byte(byte) else {
90 skip_to_frame(&mut self.data);
91 continue;
92 };
93
94 tracing::trace!("trying to parse {kind:?} frame");
95
96 let result = match kind {
97 FrameKind::Event => Event::parse(&mut self.data).map(InternalFrame::Event),
98 FrameKind::Data(DataFrameKind::Intra | DataFrameKind::Inter) => {
99 RawMainFrame::parse(&mut self.data, self.headers, kind, &self.main_frames)
100 .map(InternalFrame::Main)
101 }
102 FrameKind::Data(DataFrameKind::Slow) => self
103 .headers
104 .slow_frame_def()
105 .parse(&mut self.data, self.headers)
106 .map(InternalFrame::Slow),
107 FrameKind::Data(DataFrameKind::Gps) => {
108 self.headers.gps_frame_def().as_ref().map_or_else(
109 || {
110 tracing::debug!("found GPS frame without GPS frame definition");
111 Err(InternalError::Retry)
112 },
113 |gps| {
114 gps.parse(
115 &mut self.data,
116 self.headers,
117 self.main_frames.last().map(|frame| frame.time),
118 self.gps_home_frame.as_ref(),
119 )
120 .map(InternalFrame::Gps)
121 },
122 )
123 }
124 FrameKind::Data(DataFrameKind::GpsHome) => {
125 self.headers.gps_home_frame_def().as_ref().map_or_else(
126 || {
127 tracing::debug!(
128 "found GPS home frame without GPS home frame definition"
129 );
130 Err(InternalError::Retry)
131 },
132 |gps_home| {
133 gps_home
134 .parse(&mut self.data, self.headers)
135 .map(InternalFrame::GpsHome)
136 },
137 )
138 }
139 };
140
141 self.stats.progress = 1. - ((self.data.remaining() as f32) / (self.data_len as f32));
142
143 match result {
144 Ok(frame)
146 if self
147 .data
148 .peek()
149 .is_none_or(|byte| FrameKind::from_byte(byte).is_some()) =>
150 {
151 match frame {
152 InternalFrame::Event(event) => {
153 if matches!(event, Event::End { .. }) {
154 self.done = true;
155 self.stats.progress = 1.;
156 }
157
158 self.stats.counts.event += 1;
159 return Some(ParserEvent::Event(event));
160 }
161 InternalFrame::Main(main) => {
162 self.stats.counts.main += 1;
163 let main = self.main_frames.push(main);
164
165 return Some(ParserEvent::Main(MainFrame::new(
166 self.headers,
167 main,
168 &self.main_filter,
169 )));
170 }
171 InternalFrame::Slow(slow) => {
172 self.stats.counts.slow += 1;
173 return Some(ParserEvent::Slow(SlowFrame::new(
174 self.headers,
175 slow,
176 &self.slow_filter,
177 )));
178 }
179 InternalFrame::Gps(gps) => {
180 self.stats.counts.gps += 1;
181 return Some(ParserEvent::Gps(GpsFrame::new(
182 self.headers,
183 gps,
184 &self.gps_filter,
185 )));
186 }
187 InternalFrame::GpsHome(gps_home) => {
188 self.stats.counts.gps_home += 1;
189 self.gps_home_frame = Some(gps_home);
190 continue;
191 }
192 }
193 }
194 Ok(_) | Err(InternalError::Retry) => {
195 tracing::debug!("found corrupted {kind:?} frame");
196 self.data.restore(restore);
197 skip_to_frame(&mut self.data);
198 }
199 Err(InternalError::Eof) => {
200 tracing::debug!("found unexpected end of file in data section");
201 return None;
202 }
203 }
204 }
205 }
206}
207
208#[derive(Debug, Clone, Default)]
210#[cfg_attr(feature = "_serde", derive(serde::Serialize))]
211#[non_exhaustive]
212pub struct Stats {
213 pub counts: FrameCounts,
215
216 pub progress: f32,
222}
223
224#[derive(Debug, Clone, Copy, Default)]
225#[cfg_attr(feature = "_serde", derive(serde::Serialize))]
226pub struct FrameCounts {
227 pub event: usize,
228 pub main: usize,
229 pub slow: usize,
230 pub gps: usize,
231 pub gps_home: usize,
232}
233
234#[derive(Debug)]
238pub enum ParserEvent<'data, 'headers, 'parser> {
239 Event(Event),
240 Main(MainFrame<'data, 'headers, 'parser>),
241 Slow(SlowFrame<'data, 'headers, 'parser>),
242 Gps(GpsFrame<'data, 'headers, 'parser>),
243}
244
245#[cold]
246fn skip_to_frame(data: &mut Reader) {
247 data.skip_until_any(
248 &[
249 FrameKind::Event,
250 FrameKind::Data(DataFrameKind::Intra),
251 FrameKind::Data(DataFrameKind::Slow),
252 FrameKind::Data(DataFrameKind::Gps),
253 FrameKind::Data(DataFrameKind::GpsHome),
254 ]
255 .map(u8::from),
256 );
257}
258
259#[derive(Debug, Default)]
260pub(crate) struct MainFrameHistory {
261 history: [Option<RawMainFrame>; 2],
262 index_new: usize,
263}
264
265impl MainFrameHistory {
266 fn index_old(&self) -> usize {
267 (self.index_new + 1) % self.history.len()
268 }
269
270 fn push(&mut self, frame: RawMainFrame) -> &RawMainFrame {
271 self.index_new = self.index_old();
272 self.history[self.index_new] = Some(frame);
273 self.last().unwrap()
274 }
275
276 pub(crate) fn last(&self) -> Option<&RawMainFrame> {
277 self.history[self.index_new].as_ref()
278 }
279
280 pub(crate) fn last_last(&self) -> Option<&RawMainFrame> {
281 self.history[self.index_old()].as_ref()
282 }
283}
284
285#[derive(Debug)]
286enum InternalFrame {
287 Event(Event),
288 Main(RawMainFrame),
289 Slow(RawSlowFrame),
290 Gps(RawGpsFrame),
291 GpsHome(GpsHomeFrame),
292}