diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs new file mode 100644 index 0000000..0343d6f --- /dev/null +++ b/src/daq_log_parse/correlate.rs @@ -0,0 +1,244 @@ +use chrono::Datelike as _; +use chrono::TimeZone as _; + +use crate::daq_log_parse::parse::ParsedMessage; + +pub struct CorrelationFunction { + /// real_time ~= slope * log_time_ms + intercept_ms + /// + /// Stored as: + /// unix_ms = slope * log_ts_ms + intercept_ms + slope: f64, + intercept_ms: f64, +} + +impl CorrelationFunction { + pub fn correlate(&self, log_ts: u32) -> Option> { + let unix_ms = self.slope * log_ts as f64 + self.intercept_ms; + + match chrono::DateTime::from_timestamp_millis(unix_ms.round() as i64) { + Some(dt) => Some(dt.with_timezone(&chrono::Local)), + None => { + log::error!( + "Correlated time {} ms for log time {} ms is out of range for chrono::DateTime", + unix_ms, + log_ts + ); + None + } + } + } +} + +pub struct CorrelationChunkResult { + pub parsed_msgs: Vec, + pub correlation_fn: Option, +} + +pub fn time_correlate_chunks(chunks: Vec>) -> Vec { + chunks.into_iter().map(time_correlate_chunk).collect() +} + +impl CorrelationChunkResult { + pub fn uncorrelated_new(chunk: Vec) -> Self { + Self { + parsed_msgs: chunk, + correlation_fn: None, + } + } + + pub fn correlated_new(chunk: Vec, correlation_fn: CorrelationFunction) -> Self { + Self { + parsed_msgs: chunk, + correlation_fn: Some(correlation_fn), + } + } +} + +pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { + match &dsv { + can_decode::DecodedSignalValue::Numeric(v) => v.round() as u64, + can_decode::DecodedSignalValue::Enum(v, _) => *v as u64, + } +} + +pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult { + // Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time + // Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert + // all messages in the chunk to have real timestamps. + // If the correlation is successful, we return the orginal messages along with a correlation function. + // If we can't find any GPS messages, or if the correlation fails, return just the original messages. + + // First, find all GPS messages and extract their timestamps and real times + let mut gps_points = Vec::new(); + for msg in &chunk { + if msg.decoded.name == "gps_time" { + let millisecond = msg + .decoded + .signals + .get("millisecond") + .map(|sig| sig_to_value(&sig.value)); + let second = msg + .decoded + .signals + .get("second") + .map(|sig| sig_to_value(&sig.value)); + let minute = msg + .decoded + .signals + .get("minute") + .map(|sig| sig_to_value(&sig.value)); + let hour = msg + .decoded + .signals + .get("hour") + .map(|sig| sig_to_value(&sig.value)); + let day = msg + .decoded + .signals + .get("day") + .map(|sig| sig_to_value(&sig.value)); + let month = msg + .decoded + .signals + .get("month") + .map(|sig| sig_to_value(&sig.value)); + let year = msg + .decoded + .signals + .get("year") + .map(|sig| sig_to_value(&sig.value)); + + if let (Some(ms), Some(s), Some(min), Some(h), Some(d), Some(mon), Some(y)) = + (millisecond, second, minute, hour, day, month, year) + { + let full_year = if y < 100 { 2000 + y as i32 } else { y as i32 }; + // Construct a chrono::DateTime from the extracted values + if let Some(dt) = chrono::NaiveDate::from_ymd_opt(full_year, mon as u32, d as u32) + .and_then(|date| { + date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32) + }) + { + let dt_utc = chrono::Utc.from_utc_datetime(&dt); + let dt_local = chrono::DateTime::::from(dt_utc); + + let current_year = chrono::Local::now().year(); + if dt_local.year() < current_year - 2 || dt_local.year() > current_year + 2 { + log::warn!( + "GPS message at {} ms has suspicious year value {}, skipping", + msg.timestamp, + dt_local.year() + ); + continue; + } + + gps_points.push((msg.timestamp, dt_local)); + } else { + log::error!( + "GPS message at {} ms has invalid date/time values, skipping", + msg.timestamp + ); + continue; + } + } else { + log::error!( + "GPS message at {} ms is missing some time signals, skipping", + msg.timestamp + ); + continue; + } + } + } + + if gps_points.is_empty() { + // No GPS points found, can't correlate + return CorrelationChunkResult::uncorrelated_new(chunk); + } + + // Attempt to fit a line to the GPS points to find the correlation function + let points: Vec = gps_points + .iter() + .map(|(log_ts, real_ts)| Point { + x: *log_ts as f64, + y: real_ts.timestamp_millis() as f64, + }) + .collect(); + let (slope, intercept) = match linear_regression(&points) { + Some(v) => v, + None => { + log::error!("Failed to refit correlation line"); + return CorrelationChunkResult::uncorrelated_new(chunk); + } + }; + + // Print debug info about the correlation quality + let rms_error_ms = { + let mse = points + .iter() + .map(|p| { + let predicted = slope * p.x + intercept; + let error = p.y - predicted; + error * error + }) + .sum::() + / points.len() as f64; + + mse.sqrt() + }; + log::info!( + "GPS correlation successful: slope={:.9}, intercept_ms={:.3}, rms_error_ms={:.2}, points={}", + slope, + intercept, + rms_error_ms, + points.len() + ); + + CorrelationChunkResult::correlated_new( + chunk, + CorrelationFunction { + slope, + intercept_ms: intercept, + }, + ) +} + +struct Point { + x: f64, // log timestamp ms + y: f64, // unix timestamp ms +} + +/// Least squares linear regression. +/// +/// Fits: +/// +/// y = slope * x + intercept +fn linear_regression(points: &[Point]) -> Option<(f64, f64)> { + if points.len() < 2 { + return None; + } + + let n = points.len() as f64; + + let mut sum_x = 0.0; + let mut sum_y = 0.0; + let mut sum_xy = 0.0; + let mut sum_x2 = 0.0; + + for p in points { + sum_x += p.x; + sum_y += p.y; + sum_xy += p.x * p.y; + sum_x2 += p.x * p.x; + } + + let denom = n * sum_x2 - sum_x * sum_x; + + if denom.abs() < 1e-9 { + return None; + } + + let slope = (n * sum_xy - sum_x * sum_y) / denom; + let intercept = (sum_y - slope * sum_x) / n; + + Some((slope, intercept)) +} diff --git a/src/daq_log_parse/mod.rs b/src/daq_log_parse/mod.rs index 5537d91..f2939ef 100644 --- a/src/daq_log_parse/mod.rs +++ b/src/daq_log_parse/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod correlate; pub mod parse; pub mod table; diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 76b3ac2..6587e77 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -1,5 +1,4 @@ -use crate::daq_log_parse::consts; -use crate::daq_log_parse::parse; +use crate::daq_log_parse::{consts, correlate}; use can_decode::DecodedSignalValue; pub struct TableBuilder { @@ -15,15 +14,18 @@ pub struct TableBuilder { impl TableBuilder { pub fn new() -> Self { - let mut tb = Self { - bus_row: vec!["Bus".to_string()], - node_row: vec!["Node".to_string()], - message_row: vec!["Message".to_string()], - signal_row: vec!["Signal".to_string()], - next_col_idx: 1, + Self { + bus_row: vec!["".to_string(), "".to_string(), "Bus".to_string()], + node_row: vec!["".to_string(), "".to_string(), "Node".to_string()], + message_row: vec!["".to_string(), "".to_string(), "Message".to_string()], + signal_row: vec![ + "Real Time".to_string(), + "DAQ Timestamp".to_string(), + "Signal".to_string(), + ], + next_col_idx: 3, // real time, daq timestamp, then row headers columns indexer: std::collections::HashMap::new(), - }; - tb + } } pub fn create_header(&mut self, parser: &can_decode::Parser, bus_name: &str) { @@ -58,11 +60,11 @@ impl TableBuilder { pub fn create_and_write_tables( &self, out_folder: &std::path::Path, - chunked_parsed: Vec>, + correlated_chunks: Vec, ) { std::fs::create_dir_all(out_folder).unwrap(); - for (chunk_idx, chunk) in chunked_parsed.iter().enumerate() { + for (chunk_idx, chunk) in correlated_chunks.iter().enumerate() { let mut csv_table = vec![ self.bus_row.clone(), self.node_row.clone(), @@ -70,8 +72,8 @@ impl TableBuilder { self.signal_row.clone(), ]; - let first_time = chunk.first().map(|m| m.timestamp).unwrap_or(0); - let last_time = chunk.last().map(|m| m.timestamp).unwrap_or(0); + let first_time = chunk.parsed_msgs.first().map(|m| m.timestamp).unwrap_or(0); + let last_time = chunk.parsed_msgs.last().map(|m| m.timestamp).unwrap_or(0); let first_row_time = (first_time / consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS; let last_row_time = last_time.div_ceil(consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS; @@ -83,11 +85,18 @@ impl TableBuilder { let row_time = first_row_time + row_idx * consts::BIN_WIDTH_MS; let row_time_sec = row_time as f32 / 1000.0; let mut row = vec!["".to_string(); self.bus_row.len()]; - row[0] = format!("{:.3}", row_time_sec); + let correlated_time = chunk + .correlation_fn + .as_ref() + .and_then(|cf| cf.correlate(row_time)); + if let Some(ct) = correlated_time { + row[0] = ct.format("%H:%M:%S.%3f").to_string(); + } + row[1] = format!("{:.3}", row_time_sec); csv_table.push(row); } - for msg in chunk { + for msg in chunk.parsed_msgs.iter() { let decoded = &msg.decoded; for (sig_name, sig_value) in &decoded.signals { let key = (msg.bus_name.clone(), decoded.name.clone(), sig_name.clone()); @@ -104,13 +113,23 @@ impl TableBuilder { } } } - let out_file = out_folder.join(format!("out_{:03}.csv", chunk_idx)); - let mut wtr = csv::Writer::from_path(out_file).unwrap(); + + let first_correlated_time: Option = + chunk.correlation_fn.as_ref().and_then(|cf| { + cf.correlate(first_time) + .map(|dt| dt.format("%Y_%m_%d__%H_%M_%S").to_string()) + }); + + let out_file = match first_correlated_time { + Some(t) => out_folder.join(format!("out_{:03}_{}.csv", chunk_idx, t)), + None => out_folder.join(format!("out_{:03}.csv", chunk_idx)), + }; + let mut wtr = csv::Writer::from_path(out_file.clone()).unwrap(); for row in csv_table { wtr.write_record(&row).unwrap(); } wtr.flush().unwrap(); - println!("Wrote chunk {} to CSV", chunk_idx); + println!("Wrote chunk {} to CSV ({})", chunk_idx, out_file.display()); } } } diff --git a/src/ui/log_parser.rs b/src/ui/log_parser.rs index 25246ce..b468d0c 100644 --- a/src/ui/log_parser.rs +++ b/src/ui/log_parser.rs @@ -157,11 +157,12 @@ impl LogParser { let parsed = daq_log_parse::parse::parse_log_files(&logs_dir, &parser_bus_0, &parser_bus_1); let chunked_parsed = daq_log_parse::parse::chunk_parsed(parsed); + let correlated_chunks = daq_log_parse::correlate::time_correlate_chunks(chunked_parsed); let mut table_builder = daq_log_parse::table::TableBuilder::new(); table_builder.create_header(&parser_bus_0, "VCAN"); table_builder.create_header(&parser_bus_1, "MCAN"); - table_builder.create_and_write_tables(&output_dir, chunked_parsed); + table_builder.create_and_write_tables(&output_dir, correlated_chunks); log::info!("Parsing completed successfully"); let _ = parse_to_ui_tx.send(MsgFromParserThread::SuccessExit(format!(