From b509f396883063eec19196d6a3ed4b42f247fd83 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 15:45:30 -0400 Subject: [PATCH 01/21] correlate function to try to convert daq timestamps to real time --- src/daq_log_parse/correlate.rs | 147 +++++++++++++++++++++++++++++++++ src/daq_log_parse/mod.rs | 1 + 2 files changed, 148 insertions(+) create mode 100644 src/daq_log_parse/correlate.rs diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs new file mode 100644 index 0000000..b51c556 --- /dev/null +++ b/src/daq_log_parse/correlate.rs @@ -0,0 +1,147 @@ +use std::ops::Sub; + +use crate::daq_log_parse::parse::ParsedMessage; +use chrono::TimeZone; + +pub struct TimeAdjustedMessage { + pub parsed_message: ParsedMessage, + pub real_timestamp: chrono::DateTime, +} + +pub enum CorrelatedParsedMessages { + TimeAdjusted(Vec), + Unadjusted(Vec), +} + +pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { + match &dsv { + can_decode::DecodedSignalValue::Numeric(v) => v.round() as u64, + can_decode::DecodedSignalValue::Enum(v, _) => *v as u64, + } +} + +pub fn time_correlate_chunk(chunk: Vec) -> CorrelatedParsedMessages { + // Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time + // Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert + // all messages in the chunk to have real timestamps. + // If the correlation is successful, we return the time-adjusted messages. + // If we can't find any GPS messages, or if the correlation fails for some reason, we return the + // original messages without time adjustment. + + // First, find all GPS messages and extract their timestamps and real times + let mut gps_points = Vec::new(); + for msg in &chunk { + if msg.decoded.name == "gps_time" { + let millisecond = msg + .decoded + .signals + .get("millisecond") + .map(|sig| sig_to_value(&sig.value)); + let second = msg + .decoded + .signals + .get("second") + .map(|sig| sig_to_value(&sig.value)); + let minute = msg + .decoded + .signals + .get("minute") + .map(|sig| sig_to_value(&sig.value)); + let hour = msg + .decoded + .signals + .get("hour") + .map(|sig| sig_to_value(&sig.value)); + let day = msg + .decoded + .signals + .get("day") + .map(|sig| sig_to_value(&sig.value)); + let month = msg + .decoded + .signals + .get("month") + .map(|sig| sig_to_value(&sig.value)); + let year = msg + .decoded + .signals + .get("year") + .map(|sig| sig_to_value(&sig.value)); + + if let (Some(ms), Some(s), Some(min), Some(h), Some(d), Some(mon), Some(y)) = + (millisecond, second, minute, hour, day, month, year) + { + // Construct a chrono::DateTime from the extracted values + if let Some(dt) = chrono::NaiveDate::from_ymd_opt(y as i32, mon as u32, d as u32) + .and_then(|date| { + date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32) + }) + { + let dt_local = chrono::Local.from_local_datetime(&dt).unwrap(); + gps_points.push((msg.timestamp, dt_local)); + } else { + log::error!( + "GPS message at {} ms has invalid date/time values, skipping", + msg.timestamp + ); + continue; + } + } else { + log::error!( + "GPS message at {} ms is missing some time signals, skipping", + msg.timestamp + ); + continue; + } + } + } + + if gps_points.is_empty() { + // No GPS points found, can't correlate + return CorrelatedParsedMessages::Unadjusted(chunk); + } + + // Attempt to correlate. Use the first GPS point as a reference, and calculate the offset for + // each subsequent GPS point. If the offsets are consistent-ish, we can assume a linear + // correlation and adjust all timestamps accordingly. If the offsets are wildly inconsistent, + // give up on correlation. + let (ref_log_ts, ref_real_ts) = gps_points[0]; + let mut offsets = Vec::new(); + for (log_ts, real_ts) in &gps_points[1..] { + let offset = *real_ts + - chrono::Duration::milliseconds(*log_ts as i64) + - (ref_real_ts - chrono::Duration::milliseconds(ref_log_ts as i64)); + offsets.push(offset); + } + + // Check if offsets are consistent (within 20 ms of each other) + let zero = chrono::Duration::zero(); + let max_offset = offsets.iter().max().unwrap_or(&zero); + let min_offset = offsets.iter().min().unwrap_or(&zero); + if max_offset.sub(*min_offset) > chrono::Duration::milliseconds(20) { + log::error!( + "Offsets between GPS points are inconsistent (max: {:?}, min: {:?}), giving up on correlation", + max_offset, + min_offset + ); + return CorrelatedParsedMessages::Unadjusted(chunk); + } + + // Use the average offset for correlation + let avg_offset = offsets + .iter() + .fold(chrono::Duration::zero(), |acc, x| acc + *x) + / (offsets.len() as i32); + + let time_adjusted_messages = chunk + .into_iter() + .map(|msg| TimeAdjustedMessage { + real_timestamp: ref_real_ts + + chrono::Duration::milliseconds(msg.timestamp as i64 - ref_log_ts as i64) + + avg_offset, + parsed_message: msg, + }) + .collect(); + + CorrelatedParsedMessages::TimeAdjusted(time_adjusted_messages) +} diff --git a/src/daq_log_parse/mod.rs b/src/daq_log_parse/mod.rs index 5537d91..f2939ef 100644 --- a/src/daq_log_parse/mod.rs +++ b/src/daq_log_parse/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod correlate; pub mod parse; pub mod table; From 20e9b03be18ec9c5875073c57a91e82da27acc1e Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 15:59:34 -0400 Subject: [PATCH 02/21] Switching to a fn because we need to map every timestamp in the csv to a realtime --- src/daq_log_parse/correlate.rs | 44 ++++++++++++++++------------------ 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index b51c556..7e6e419 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -1,16 +1,20 @@ -use std::ops::Sub; +use std::ops::Sub as _; +use chrono::TimeZone as _; use crate::daq_log_parse::parse::ParsedMessage; -use chrono::TimeZone; -pub struct TimeAdjustedMessage { - pub parsed_message: ParsedMessage, - pub real_timestamp: chrono::DateTime, -} -pub enum CorrelatedParsedMessages { - TimeAdjusted(Vec), - Unadjusted(Vec), +pub struct CorrelationFunction { + ref_real_ts: chrono::DateTime, + ref_log_ts: u32, + avg_offset: chrono::Duration, +} +impl CorrelationFunction { + pub fn correlate(&self, log_ts: u64) -> chrono::DateTime { + self.ref_real_ts + + chrono::Duration::milliseconds(log_ts as i64 - self.ref_log_ts as i64) + + self.avg_offset + } } pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { @@ -20,7 +24,7 @@ pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { } } -pub fn time_correlate_chunk(chunk: Vec) -> CorrelatedParsedMessages { +pub fn time_correlate_chunk(chunk: Vec) -> Option { // Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time // Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert // all messages in the chunk to have real timestamps. @@ -98,7 +102,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelatedParsedMessag if gps_points.is_empty() { // No GPS points found, can't correlate - return CorrelatedParsedMessages::Unadjusted(chunk); + return None; } // Attempt to correlate. Use the first GPS point as a reference, and calculate the offset for @@ -124,7 +128,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelatedParsedMessag max_offset, min_offset ); - return CorrelatedParsedMessages::Unadjusted(chunk); + return None; } // Use the average offset for correlation @@ -133,15 +137,9 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelatedParsedMessag .fold(chrono::Duration::zero(), |acc, x| acc + *x) / (offsets.len() as i32); - let time_adjusted_messages = chunk - .into_iter() - .map(|msg| TimeAdjustedMessage { - real_timestamp: ref_real_ts - + chrono::Duration::milliseconds(msg.timestamp as i64 - ref_log_ts as i64) - + avg_offset, - parsed_message: msg, - }) - .collect(); - - CorrelatedParsedMessages::TimeAdjusted(time_adjusted_messages) + Some(CorrelationFunction { + ref_real_ts, + ref_log_ts, + avg_offset, + }) } From 627ce39441ef8172ca3a8bdb2a34f31ab817d505 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 16:04:09 -0400 Subject: [PATCH 03/21] also return the chunked because it took ownernship --- src/daq_log_parse/correlate.rs | 66 ++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 7e6e419..99ef1c7 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -1,20 +1,47 @@ -use std::ops::Sub as _; use chrono::TimeZone as _; +use std::ops::Sub as _; use crate::daq_log_parse::parse::ParsedMessage; - pub struct CorrelationFunction { - ref_real_ts: chrono::DateTime, - ref_log_ts: u32, - avg_offset: chrono::Duration, + ref_real_ts: chrono::DateTime, + ref_log_ts: u32, + avg_offset: chrono::Duration, } impl CorrelationFunction { - pub fn correlate(&self, log_ts: u64) -> chrono::DateTime { - self.ref_real_ts - + chrono::Duration::milliseconds(log_ts as i64 - self.ref_log_ts as i64) - + self.avg_offset - } + pub fn correlate(&self, log_ts: u64) -> chrono::DateTime { + self.ref_real_ts + + chrono::Duration::milliseconds(log_ts as i64 - self.ref_log_ts as i64) + + self.avg_offset + } +} + +pub struct CorrelationChunkResult { + chunk: Vec, + correlation_fn: Option, +} + +pub fn time_correlate_chunks(chunks: Vec>) -> Vec { + chunks + .into_iter() + .map(|chunk| time_correlate_chunk(chunk)) + .collect() +} + +impl CorrelationChunkResult { + pub fn uncorrelated_new(chunk: Vec) -> Self { + Self { + chunk, + correlation_fn: None, + } + } + + pub fn correlated_new(chunk: Vec, correlation_fn: CorrelationFunction) -> Self { + Self { + chunk, + correlation_fn: Some(correlation_fn), + } + } } pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { @@ -24,7 +51,7 @@ pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 { } } -pub fn time_correlate_chunk(chunk: Vec) -> Option { +pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult { // Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time // Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert // all messages in the chunk to have real timestamps. @@ -102,7 +129,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> Option) -> Option) -> Option Date: Sun, 10 May 2026 16:04:25 -0400 Subject: [PATCH 04/21] using time_correlate_chunks table not updated yet --- src/ui/log_parser.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ui/log_parser.rs b/src/ui/log_parser.rs index 25246ce..b468d0c 100644 --- a/src/ui/log_parser.rs +++ b/src/ui/log_parser.rs @@ -157,11 +157,12 @@ impl LogParser { let parsed = daq_log_parse::parse::parse_log_files(&logs_dir, &parser_bus_0, &parser_bus_1); let chunked_parsed = daq_log_parse::parse::chunk_parsed(parsed); + let correlated_chunks = daq_log_parse::correlate::time_correlate_chunks(chunked_parsed); let mut table_builder = daq_log_parse::table::TableBuilder::new(); table_builder.create_header(&parser_bus_0, "VCAN"); table_builder.create_header(&parser_bus_1, "MCAN"); - table_builder.create_and_write_tables(&output_dir, chunked_parsed); + table_builder.create_and_write_tables(&output_dir, correlated_chunks); log::info!("Parsing completed successfully"); let _ = parse_to_ui_tx.send(MsgFromParserThread::SuccessExit(format!( From 9dd509479660cf99bb83ab42cfe90a9f3b1fbfad Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 16:06:36 -0400 Subject: [PATCH 05/21] rename --- src/daq_log_parse/correlate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 99ef1c7..ab9642d 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -17,7 +17,7 @@ impl CorrelationFunction { } pub struct CorrelationChunkResult { - chunk: Vec, + parsed_msgs: Vec, correlation_fn: Option, } @@ -31,14 +31,14 @@ pub fn time_correlate_chunks(chunks: Vec>) -> Vec) -> Self { Self { - chunk, + parsed_msgs: chunk, correlation_fn: None, } } pub fn correlated_new(chunk: Vec, correlation_fn: CorrelationFunction) -> Self { Self { - chunk, + parsed_msgs: chunk, correlation_fn: Some(correlation_fn), } } From c1e725b9af2a4ea0a7fccd6cf4313ebf01a002b1 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 16:08:39 -0400 Subject: [PATCH 06/21] update table to use row 0 as correlated time and row 1 as daq time --- src/daq_log_parse/correlate.rs | 6 +++--- src/daq_log_parse/table.rs | 29 ++++++++++++++++------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index ab9642d..29a6413 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -9,7 +9,7 @@ pub struct CorrelationFunction { avg_offset: chrono::Duration, } impl CorrelationFunction { - pub fn correlate(&self, log_ts: u64) -> chrono::DateTime { + pub fn correlate(&self, log_ts: u32) -> chrono::DateTime { self.ref_real_ts + chrono::Duration::milliseconds(log_ts as i64 - self.ref_log_ts as i64) + self.avg_offset @@ -17,8 +17,8 @@ impl CorrelationFunction { } pub struct CorrelationChunkResult { - parsed_msgs: Vec, - correlation_fn: Option, + pub parsed_msgs: Vec, + pub correlation_fn: Option, } pub fn time_correlate_chunks(chunks: Vec>) -> Vec { diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 76b3ac2..2ede0cc 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -1,5 +1,4 @@ -use crate::daq_log_parse::consts; -use crate::daq_log_parse::parse; +use crate::daq_log_parse::{consts, correlate}; use can_decode::DecodedSignalValue; pub struct TableBuilder { @@ -16,11 +15,11 @@ pub struct TableBuilder { impl TableBuilder { pub fn new() -> Self { let mut tb = Self { - bus_row: vec!["Bus".to_string()], - node_row: vec!["Node".to_string()], - message_row: vec!["Message".to_string()], - signal_row: vec!["Signal".to_string()], - next_col_idx: 1, + bus_row: vec!["".to_string(), "Bus".to_string()], + node_row: vec!["".to_string(), "Node".to_string()], + message_row: vec!["".to_string(), "Message".to_string()], + signal_row: vec!["".to_string(), "Signal".to_string()], + next_col_idx: 2, // real time and then daq time columns indexer: std::collections::HashMap::new(), }; tb @@ -58,11 +57,11 @@ impl TableBuilder { pub fn create_and_write_tables( &self, out_folder: &std::path::Path, - chunked_parsed: Vec>, + correlated_chunks: Vec, ) { std::fs::create_dir_all(out_folder).unwrap(); - for (chunk_idx, chunk) in chunked_parsed.iter().enumerate() { + for (chunk_idx, chunk) in correlated_chunks.iter().enumerate() { let mut csv_table = vec![ self.bus_row.clone(), self.node_row.clone(), @@ -70,8 +69,8 @@ impl TableBuilder { self.signal_row.clone(), ]; - let first_time = chunk.first().map(|m| m.timestamp).unwrap_or(0); - let last_time = chunk.last().map(|m| m.timestamp).unwrap_or(0); + let first_time = chunk.parsed_msgs.first().map(|m| m.timestamp).unwrap_or(0); + let last_time = chunk.parsed_msgs.last().map(|m| m.timestamp).unwrap_or(0); let first_row_time = (first_time / consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS; let last_row_time = last_time.div_ceil(consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS; @@ -83,11 +82,15 @@ impl TableBuilder { let row_time = first_row_time + row_idx * consts::BIN_WIDTH_MS; let row_time_sec = row_time as f32 / 1000.0; let mut row = vec!["".to_string(); self.bus_row.len()]; - row[0] = format!("{:.3}", row_time_sec); + let correlated_time = chunk.correlation_fn.as_ref().map(|cf| cf.correlate(row_time)); + if let Some(ct) = correlated_time { + row[0] = ct.format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + } + row[1] = format!("{:.3}", row_time_sec); csv_table.push(row); } - for msg in chunk { + for msg in chunk.parsed_msgs.iter() { let decoded = &msg.decoded; for (sig_name, sig_value) in &decoded.signals { let key = (msg.bus_name.clone(), decoded.name.clone(), sig_name.clone()); From a434c5b2ef3913684777c2a94629913cf40e6a1c Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 16:12:08 -0400 Subject: [PATCH 07/21] using first row real time as log name when possible --- src/daq_log_parse/table.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 2ede0cc..84ef4c0 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -107,7 +107,16 @@ impl TableBuilder { } } } - let out_file = out_folder.join(format!("out_{:03}.csv", chunk_idx)); + + let first_correlated_time: Option = chunk + .correlation_fn + .as_ref() + .and_then(|cf| cf.correlate(first_time).format("%Y_%m_%d_%H_%M_%S%.3f").to_string().parse().ok()); + + let out_file = match first_correlated_time { + Some(t) => out_folder.join(format!("out_{:03}_{}.csv", chunk_idx, t)), + None => out_folder.join(format!("out_{:03}.csv", chunk_idx)), + }; let mut wtr = csv::Writer::from_path(out_file).unwrap(); for row in csv_table { wtr.write_record(&row).unwrap(); From eecc219f33a6d37915a3e00ff333537d332ac685 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 16:12:20 -0400 Subject: [PATCH 08/21] fmt --- src/daq_log_parse/table.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 84ef4c0..ad69ec4 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -82,7 +82,10 @@ impl TableBuilder { let row_time = first_row_time + row_idx * consts::BIN_WIDTH_MS; let row_time_sec = row_time as f32 / 1000.0; let mut row = vec!["".to_string(); self.bus_row.len()]; - let correlated_time = chunk.correlation_fn.as_ref().map(|cf| cf.correlate(row_time)); + let correlated_time = chunk + .correlation_fn + .as_ref() + .map(|cf| cf.correlate(row_time)); if let Some(ct) = correlated_time { row[0] = ct.format("%Y-%m-%d %H:%M:%S%.3f").to_string(); } @@ -108,10 +111,14 @@ impl TableBuilder { } } - let first_correlated_time: Option = chunk - .correlation_fn - .as_ref() - .and_then(|cf| cf.correlate(first_time).format("%Y_%m_%d_%H_%M_%S%.3f").to_string().parse().ok()); + let first_correlated_time: Option = + chunk.correlation_fn.as_ref().and_then(|cf| { + cf.correlate(first_time) + .format("%Y_%m_%d_%H_%M_%S%.3f") + .to_string() + .parse() + .ok() + }); let out_file = match first_correlated_time { Some(t) => out_folder.join(format!("out_{:03}_{}.csv", chunk_idx, t)), From 5f49c5956f162fe23132d7e67c3a688c9ed2fe62 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:29:06 -0400 Subject: [PATCH 09/21] year is based from 2000? --- src/daq_log_parse/correlate.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 29a6413..0b98a23 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -102,8 +102,9 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult if let (Some(ms), Some(s), Some(min), Some(h), Some(d), Some(mon), Some(y)) = (millisecond, second, minute, hour, day, month, year) { + let full_year = if y < 100 { 2000 + y as i32 } else { y as i32 }; // Construct a chrono::DateTime from the extracted values - if let Some(dt) = chrono::NaiveDate::from_ymd_opt(y as i32, mon as u32, d as u32) + if let Some(dt) = chrono::NaiveDate::from_ymd_opt(full_year, mon as u32, d as u32) .and_then(|date| { date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32) }) From ea303ed3a0e05233bbc8c7d64e55740b72e6c0fa Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:41:41 -0400 Subject: [PATCH 10/21] skip timestamps with weird years (init not finished yet) --- src/daq_log_parse/correlate.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 0b98a23..bc51d31 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -24,7 +24,7 @@ pub struct CorrelationChunkResult { pub fn time_correlate_chunks(chunks: Vec>) -> Vec { chunks .into_iter() - .map(|chunk| time_correlate_chunk(chunk)) + .map(time_correlate_chunk) .collect() } @@ -109,6 +109,15 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32) }) { + let current_year = chrono::Local::now().year_ce().1 as i32; + if full_year < current_year - 1 || full_year > current_year + 1 { + log::warn!( + "GPS message at {} ms has suspicious year value {}, skipping", + msg.timestamp, full_year + ); + continue; + } + let dt_local = chrono::Local.from_local_datetime(&dt).unwrap(); gps_points.push((msg.timestamp, dt_local)); } else { From 95e9d1abf6be1be25446c6b32ac66a2342794a66 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:42:29 -0400 Subject: [PATCH 11/21] only hr min, s, ns in parsed csv --- src/daq_log_parse/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index ad69ec4..2ad5757 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -114,7 +114,7 @@ impl TableBuilder { let first_correlated_time: Option = chunk.correlation_fn.as_ref().and_then(|cf| { cf.correlate(first_time) - .format("%Y_%m_%d_%H_%M_%S%.3f") + .format("%H_%M_%S%.3f") .to_string() .parse() .ok() From aff279356eb14e5b38faef401fbda966c4e65655 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:57:29 -0400 Subject: [PATCH 12/21] fix output? --- src/daq_log_parse/table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 2ad5757..bacddec 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -87,7 +87,7 @@ impl TableBuilder { .as_ref() .map(|cf| cf.correlate(row_time)); if let Some(ct) = correlated_time { - row[0] = ct.format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + row[0] = ct.format("%H:%M:%S%.3f").to_string(); } row[1] = format!("{:.3}", row_time_sec); csv_table.push(row); @@ -114,7 +114,7 @@ impl TableBuilder { let first_correlated_time: Option = chunk.correlation_fn.as_ref().and_then(|cf| { cf.correlate(first_time) - .format("%H_%M_%S%.3f") + .format("%Y_%m_%d_%H_%M_%S%_3f") .to_string() .parse() .ok() From 240cd7df2319f8a4855eb1c45b4a01535b70df53 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:57:51 -0400 Subject: [PATCH 13/21] trying linear regression instead? --- src/daq_log_parse/correlate.rs | 148 +++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 44 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index bc51d31..5687b5e 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -1,18 +1,24 @@ use chrono::TimeZone as _; -use std::ops::Sub as _; +use chrono::Datelike as _; use crate::daq_log_parse::parse::ParsedMessage; pub struct CorrelationFunction { - ref_real_ts: chrono::DateTime, - ref_log_ts: u32, - avg_offset: chrono::Duration, + /// real_time ~= slope * log_time_ms + intercept_ms + /// + /// Stored as: + /// unix_ms = slope * log_ts_ms + intercept_ms + slope: f64, + intercept_ms: f64, } + impl CorrelationFunction { pub fn correlate(&self, log_ts: u32) -> chrono::DateTime { - self.ref_real_ts - + chrono::Duration::milliseconds(log_ts as i64 - self.ref_log_ts as i64) - + self.avg_offset + let unix_ms = self.slope * log_ts as f64 + self.intercept_ms; + + chrono::DateTime::from_timestamp_millis(unix_ms.round() as i64) + .unwrap() + .with_timezone(&chrono::Local) } } @@ -109,17 +115,24 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32) }) { - let current_year = chrono::Local::now().year_ce().1 as i32; - if full_year < current_year - 1 || full_year > current_year + 1 { + let dt_utc = chrono::Utc.from_utc_datetime(&dt); + let dt_local = chrono::DateTime::::from(dt_utc); + + let current_year = chrono::Local::now().year(); + if dt_local.year() < current_year - 1 || dt_local.year() > current_year + 1 { log::warn!( "GPS message at {} ms has suspicious year value {}, skipping", - msg.timestamp, full_year + msg.timestamp, dt_local.year() ); continue; } - let dt_local = chrono::Local.from_local_datetime(&dt).unwrap(); gps_points.push((msg.timestamp, dt_local)); + println!( + "Found GPS point: log timestamp = {} ms, real timestamp = {}", + msg.timestamp, + dt_local.format("%Y-%m-%d %H:%M:%S%.3f") + ); } else { log::error!( "GPS message at {} ms has invalid date/time values, skipping", @@ -142,44 +155,91 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult return CorrelationChunkResult::uncorrelated_new(chunk); } - // Attempt to correlate. Use the first GPS point as a reference, and calculate the offset for - // each subsequent GPS point. If the offsets are consistent-ish, we can assume a linear - // correlation and adjust all timestamps accordingly. If the offsets are wildly inconsistent, - // give up on correlation. - let (ref_log_ts, ref_real_ts) = gps_points[0]; - let mut offsets = Vec::new(); - for (log_ts, real_ts) in &gps_points[1..] { - let offset = *real_ts - - chrono::Duration::milliseconds(*log_ts as i64) - - (ref_real_ts - chrono::Duration::milliseconds(ref_log_ts as i64)); - offsets.push(offset); - } - - // Check if offsets are consistent (within 20 ms of each other) - let zero = chrono::Duration::zero(); - let max_offset = offsets.iter().max().unwrap_or(&zero); - let min_offset = offsets.iter().min().unwrap_or(&zero); - if max_offset.sub(*min_offset) > chrono::Duration::milliseconds(20) { - log::error!( - "Offsets between GPS points are inconsistent (max: {:?}, min: {:?}), giving up on correlation", - max_offset, - min_offset - ); - return CorrelationChunkResult::uncorrelated_new(chunk); - } - - // Use the average offset for correlation - let avg_offset = offsets + let points: Vec = gps_points .iter() - .fold(chrono::Duration::zero(), |acc, x| acc + *x) - / (offsets.len() as i32); + .map(|(log_ts, real_ts)| Point { + x: *log_ts as f64, + y: real_ts.timestamp_millis() as f64, + }) + .collect(); + let (slope, intercept) = match linear_regression(&points) { + Some(v) => v, + None => { + log::error!("Failed to refit correlation line"); + return CorrelationChunkResult::uncorrelated_new(chunk); + } + }; + + let rms_error_ms = { + let mse = points + .iter() + .map(|p| { + let predicted = slope * p.x + intercept; + let error = p.y - predicted; + error * error + }) + .sum::() + / points.len() as f64; + + mse.sqrt() + }; + + log::info!( + "GPS correlation successful: slope={:.9}, intercept_ms={:.3}, rms_error_ms={:.2}, points={}", + slope, + intercept, + rms_error_ms, + points.len() + ); + CorrelationChunkResult::correlated_new( chunk, CorrelationFunction { - ref_real_ts, - ref_log_ts, - avg_offset, + slope, + intercept_ms: intercept, }, ) } + + +struct Point { + x: f64, // log timestamp ms + y: f64, // unix timestamp ms +} + +/// Least squares linear regression. +/// +/// Fits: +/// +/// y = slope * x + intercept +fn linear_regression(points: &[Point]) -> Option<(f64, f64)> { + if points.len() < 2 { + return None; + } + + let n = points.len() as f64; + + let mut sum_x = 0.0; + let mut sum_y = 0.0; + let mut sum_xy = 0.0; + let mut sum_x2 = 0.0; + + for p in points { + sum_x += p.x; + sum_y += p.y; + sum_xy += p.x * p.y; + sum_x2 += p.x * p.x; + } + + let denom = n * sum_x2 - sum_x * sum_x; + + if denom.abs() < 1e-9 { + return None; + } + + let slope = (n * sum_xy - sum_x * sum_y) / denom; + let intercept = (sum_y - slope * sum_x) / n; + + Some((slope, intercept)) +} \ No newline at end of file From e74f828de15d4ed67af8c99cbc08d5d6d9cdc4a3 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:58:13 -0400 Subject: [PATCH 14/21] fmt --- src/daq_log_parse/correlate.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 5687b5e..040c5c5 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -1,5 +1,5 @@ -use chrono::TimeZone as _; use chrono::Datelike as _; +use chrono::TimeZone as _; use crate::daq_log_parse::parse::ParsedMessage; @@ -28,10 +28,7 @@ pub struct CorrelationChunkResult { } pub fn time_correlate_chunks(chunks: Vec>) -> Vec { - chunks - .into_iter() - .map(time_correlate_chunk) - .collect() + chunks.into_iter().map(time_correlate_chunk).collect() } impl CorrelationChunkResult { @@ -122,7 +119,8 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult if dt_local.year() < current_year - 1 || dt_local.year() > current_year + 1 { log::warn!( "GPS message at {} ms has suspicious year value {}, skipping", - msg.timestamp, dt_local.year() + msg.timestamp, + dt_local.year() ); continue; } @@ -170,7 +168,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult } }; - let rms_error_ms = { + let rms_error_ms = { let mse = points .iter() .map(|p| { @@ -192,7 +190,6 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult points.len() ); - CorrelationChunkResult::correlated_new( chunk, CorrelationFunction { @@ -202,7 +199,6 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult ) } - struct Point { x: f64, // log timestamp ms y: f64, // unix timestamp ms @@ -242,4 +238,4 @@ fn linear_regression(points: &[Point]) -> Option<(f64, f64)> { let intercept = (sum_y - slope * sum_x) / n; Some((slope, intercept)) -} \ No newline at end of file +} From abad09d1cc496a0c3fe768d8495d700e82da5b78 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 17:58:24 -0400 Subject: [PATCH 15/21] fix warnings --- src/daq_log_parse/table.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index bacddec..bd73b45 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -14,15 +14,14 @@ pub struct TableBuilder { impl TableBuilder { pub fn new() -> Self { - let mut tb = Self { + Self { bus_row: vec!["".to_string(), "Bus".to_string()], node_row: vec!["".to_string(), "Node".to_string()], message_row: vec!["".to_string(), "Message".to_string()], signal_row: vec!["".to_string(), "Signal".to_string()], next_col_idx: 2, // real time and then daq time columns indexer: std::collections::HashMap::new(), - }; - tb + } } pub fn create_header(&mut self, parser: &can_decode::Parser, bus_name: &str) { From 499a0c3a3fba2bdda3e3a81754b2f41703416dcc Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 18:45:28 -0400 Subject: [PATCH 16/21] fix output for real --- src/daq_log_parse/table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index bd73b45..35fbe61 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -86,7 +86,7 @@ impl TableBuilder { .as_ref() .map(|cf| cf.correlate(row_time)); if let Some(ct) = correlated_time { - row[0] = ct.format("%H:%M:%S%.3f").to_string(); + row[0] = ct.format("%H:%M:%S.%3f").to_string(); } row[1] = format!("{:.3}", row_time_sec); csv_table.push(row); @@ -113,7 +113,7 @@ impl TableBuilder { let first_correlated_time: Option = chunk.correlation_fn.as_ref().and_then(|cf| { cf.correlate(first_time) - .format("%Y_%m_%d_%H_%M_%S%_3f") + .format("%Y_%m_%d__%H_%M_%S") .to_string() .parse() .ok() From 5ee2c090fb79697e91af30a4583f2ab7a6caff18 Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 21:26:38 -0400 Subject: [PATCH 17/21] avoid unwrap --- src/daq_log_parse/correlate.rs | 16 ++++++++++++---- src/daq_log_parse/table.rs | 7 ++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 040c5c5..6a51853 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -13,12 +13,20 @@ pub struct CorrelationFunction { } impl CorrelationFunction { - pub fn correlate(&self, log_ts: u32) -> chrono::DateTime { + pub fn correlate(&self, log_ts: u32) -> Option> { let unix_ms = self.slope * log_ts as f64 + self.intercept_ms; - chrono::DateTime::from_timestamp_millis(unix_ms.round() as i64) - .unwrap() - .with_timezone(&chrono::Local) + match chrono::DateTime::from_timestamp_millis(unix_ms.round() as i64) { + Some(dt) => Some(dt.with_timezone(&chrono::Local)), + None => { + log::error!( + "Correlated time {} ms for log time {} ms is out of range for chrono::DateTime", + unix_ms, + log_ts + ); + None + } + } } } diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 35fbe61..7e5b1b2 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -84,7 +84,7 @@ impl TableBuilder { let correlated_time = chunk .correlation_fn .as_ref() - .map(|cf| cf.correlate(row_time)); + .and_then(|cf| cf.correlate(row_time)); if let Some(ct) = correlated_time { row[0] = ct.format("%H:%M:%S.%3f").to_string(); } @@ -113,10 +113,7 @@ impl TableBuilder { let first_correlated_time: Option = chunk.correlation_fn.as_ref().and_then(|cf| { cf.correlate(first_time) - .format("%Y_%m_%d__%H_%M_%S") - .to_string() - .parse() - .ok() + .map(|dt| dt.format("%Y_%m_%d__%H_%M_%S").to_string()) }); let out_file = match first_correlated_time { From 1791fdf75f258509f8465e835f84efab911281ef Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 21:31:34 -0400 Subject: [PATCH 18/21] remove debug print --- src/daq_log_parse/correlate.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 6a51853..a0f30d5 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -134,11 +134,6 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult } gps_points.push((msg.timestamp, dt_local)); - println!( - "Found GPS point: log timestamp = {} ms, real timestamp = {}", - msg.timestamp, - dt_local.format("%Y-%m-%d %H:%M:%S%.3f") - ); } else { log::error!( "GPS message at {} ms has invalid date/time values, skipping", From f6c98a3018d6fe5480a2f452d6bdacc3ac7e4f6b Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 21:33:18 -0400 Subject: [PATCH 19/21] adjust table output format timestamps not folded under the row headers --- src/daq_log_parse/table.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/daq_log_parse/table.rs b/src/daq_log_parse/table.rs index 7e5b1b2..6587e77 100644 --- a/src/daq_log_parse/table.rs +++ b/src/daq_log_parse/table.rs @@ -15,11 +15,15 @@ pub struct TableBuilder { impl TableBuilder { pub fn new() -> Self { Self { - bus_row: vec!["".to_string(), "Bus".to_string()], - node_row: vec!["".to_string(), "Node".to_string()], - message_row: vec!["".to_string(), "Message".to_string()], - signal_row: vec!["".to_string(), "Signal".to_string()], - next_col_idx: 2, // real time and then daq time columns + bus_row: vec!["".to_string(), "".to_string(), "Bus".to_string()], + node_row: vec!["".to_string(), "".to_string(), "Node".to_string()], + message_row: vec!["".to_string(), "".to_string(), "Message".to_string()], + signal_row: vec![ + "Real Time".to_string(), + "DAQ Timestamp".to_string(), + "Signal".to_string(), + ], + next_col_idx: 3, // real time, daq timestamp, then row headers columns indexer: std::collections::HashMap::new(), } } @@ -120,12 +124,12 @@ impl TableBuilder { Some(t) => out_folder.join(format!("out_{:03}_{}.csv", chunk_idx, t)), None => out_folder.join(format!("out_{:03}.csv", chunk_idx)), }; - let mut wtr = csv::Writer::from_path(out_file).unwrap(); + let mut wtr = csv::Writer::from_path(out_file.clone()).unwrap(); for row in csv_table { wtr.write_record(&row).unwrap(); } wtr.flush().unwrap(); - println!("Wrote chunk {} to CSV", chunk_idx); + println!("Wrote chunk {} to CSV ({})", chunk_idx, out_file.display()); } } } From 965b6c26d0ea063d0621803c4890ee0d220a26ce Mon Sep 17 00:00:00 2001 From: LelsersLasers Date: Sun, 10 May 2026 21:35:09 -0400 Subject: [PATCH 20/21] increase year bound --- src/daq_log_parse/correlate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index a0f30d5..38bd5fa 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -124,7 +124,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult let dt_local = chrono::DateTime::::from(dt_utc); let current_year = chrono::Local::now().year(); - if dt_local.year() < current_year - 1 || dt_local.year() > current_year + 1 { + if dt_local.year() < current_year - 2 || dt_local.year() > current_year + 2 { log::warn!( "GPS message at {} ms has suspicious year value {}, skipping", msg.timestamp, From efbc62cfb347b08323a6bc4caeb2c52451726b08 Mon Sep 17 00:00:00 2001 From: Lord-Lelsers <48894200+LelsersLasers@users.noreply.github.com> Date: Wed, 27 May 2026 13:01:35 -0700 Subject: [PATCH 21/21] update comments --- src/daq_log_parse/correlate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/daq_log_parse/correlate.rs b/src/daq_log_parse/correlate.rs index 38bd5fa..0343d6f 100644 --- a/src/daq_log_parse/correlate.rs +++ b/src/daq_log_parse/correlate.rs @@ -66,9 +66,8 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult // Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time // Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert // all messages in the chunk to have real timestamps. - // If the correlation is successful, we return the time-adjusted messages. - // If we can't find any GPS messages, or if the correlation fails for some reason, we return the - // original messages without time adjustment. + // If the correlation is successful, we return the orginal messages along with a correlation function. + // If we can't find any GPS messages, or if the correlation fails, return just the original messages. // First, find all GPS messages and extract their timestamps and real times let mut gps_points = Vec::new(); @@ -156,6 +155,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult return CorrelationChunkResult::uncorrelated_new(chunk); } + // Attempt to fit a line to the GPS points to find the correlation function let points: Vec = gps_points .iter() .map(|(log_ts, real_ts)| Point { @@ -171,6 +171,7 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult } }; + // Print debug info about the correlation quality let rms_error_ms = { let mse = points .iter() @@ -184,7 +185,6 @@ pub fn time_correlate_chunk(chunk: Vec) -> CorrelationChunkResult mse.sqrt() }; - log::info!( "GPS correlation successful: slope={:.9}, intercept_ms={:.3}, rms_error_ms={:.2}, points={}", slope,