Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions src/daq_log_parse/correlate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
use chrono::Datelike as _;
use chrono::TimeZone as _;

use crate::daq_log_parse::parse::ParsedMessage;

pub struct CorrelationFunction {
/// real_time ~= slope * log_time_ms + intercept_ms
///
/// Stored as:
/// unix_ms = slope * log_ts_ms + intercept_ms
slope: f64,
intercept_ms: f64,
}

impl CorrelationFunction {
pub fn correlate(&self, log_ts: u32) -> Option<chrono::DateTime<chrono::Local>> {
let unix_ms = self.slope * log_ts as f64 + self.intercept_ms;

match chrono::DateTime::from_timestamp_millis(unix_ms.round() as i64) {
Some(dt) => Some(dt.with_timezone(&chrono::Local)),
None => {
log::error!(
"Correlated time {} ms for log time {} ms is out of range for chrono::DateTime",
unix_ms,
log_ts
);
None
}
}
}
}

pub struct CorrelationChunkResult {
pub parsed_msgs: Vec<ParsedMessage>,
pub correlation_fn: Option<CorrelationFunction>,
}

pub fn time_correlate_chunks(chunks: Vec<Vec<ParsedMessage>>) -> Vec<CorrelationChunkResult> {
chunks.into_iter().map(time_correlate_chunk).collect()
}

impl CorrelationChunkResult {
pub fn uncorrelated_new(chunk: Vec<ParsedMessage>) -> Self {
Self {
parsed_msgs: chunk,
correlation_fn: None,
}
}

pub fn correlated_new(chunk: Vec<ParsedMessage>, correlation_fn: CorrelationFunction) -> Self {
Self {
parsed_msgs: chunk,
correlation_fn: Some(correlation_fn),
}
}
}

pub fn sig_to_value(dsv: &can_decode::DecodedSignalValue) -> u64 {
match &dsv {
can_decode::DecodedSignalValue::Numeric(v) => v.round() as u64,
can_decode::DecodedSignalValue::Enum(v, _) => *v as u64,
}
}

pub fn time_correlate_chunk(chunk: Vec<ParsedMessage>) -> CorrelationChunkResult {
// Idea: in the chunk, look for GPS messages which have both a timestamp and a corresponding real time
// Use those to create a mapping from the log's timestamps to real time, and use that mapping to convert
// all messages in the chunk to have real timestamps.
// If the correlation is successful, we return the orginal messages along with a correlation function.
// If we can't find any GPS messages, or if the correlation fails, return just the original messages.

// First, find all GPS messages and extract their timestamps and real times
let mut gps_points = Vec::new();
for msg in &chunk {
if msg.decoded.name == "gps_time" {
let millisecond = msg
.decoded
.signals
.get("millisecond")
.map(|sig| sig_to_value(&sig.value));
let second = msg
.decoded
.signals
.get("second")
.map(|sig| sig_to_value(&sig.value));
let minute = msg
.decoded
.signals
.get("minute")
.map(|sig| sig_to_value(&sig.value));
let hour = msg
.decoded
.signals
.get("hour")
.map(|sig| sig_to_value(&sig.value));
let day = msg
.decoded
.signals
.get("day")
.map(|sig| sig_to_value(&sig.value));
let month = msg
.decoded
.signals
.get("month")
.map(|sig| sig_to_value(&sig.value));
let year = msg
.decoded
.signals
.get("year")
.map(|sig| sig_to_value(&sig.value));

if let (Some(ms), Some(s), Some(min), Some(h), Some(d), Some(mon), Some(y)) =
(millisecond, second, minute, hour, day, month, year)
{
let full_year = if y < 100 { 2000 + y as i32 } else { y as i32 };
// Construct a chrono::DateTime from the extracted values
if let Some(dt) = chrono::NaiveDate::from_ymd_opt(full_year, mon as u32, d as u32)
.and_then(|date| {
date.and_hms_milli_opt(h as u32, min as u32, s as u32, ms as u32)
})
{
let dt_utc = chrono::Utc.from_utc_datetime(&dt);
let dt_local = chrono::DateTime::<chrono::Local>::from(dt_utc);

let current_year = chrono::Local::now().year();
if dt_local.year() < current_year - 2 || dt_local.year() > current_year + 2 {
log::warn!(
"GPS message at {} ms has suspicious year value {}, skipping",
msg.timestamp,
dt_local.year()
);
continue;
}

Comment thread
LelsersLasers marked this conversation as resolved.
gps_points.push((msg.timestamp, dt_local));
} else {
log::error!(
"GPS message at {} ms has invalid date/time values, skipping",
msg.timestamp
);
continue;
}
} else {
log::error!(
"GPS message at {} ms is missing some time signals, skipping",
msg.timestamp
);
continue;
}
}
}

if gps_points.is_empty() {
// No GPS points found, can't correlate
return CorrelationChunkResult::uncorrelated_new(chunk);
}

// Attempt to fit a line to the GPS points to find the correlation function
let points: Vec<Point> = gps_points
.iter()
.map(|(log_ts, real_ts)| Point {
x: *log_ts as f64,
y: real_ts.timestamp_millis() as f64,
})
.collect();
let (slope, intercept) = match linear_regression(&points) {
Some(v) => v,
None => {
log::error!("Failed to refit correlation line");
return CorrelationChunkResult::uncorrelated_new(chunk);
}
};

// Print debug info about the correlation quality
let rms_error_ms = {
let mse = points
.iter()
.map(|p| {
let predicted = slope * p.x + intercept;
let error = p.y - predicted;
error * error
})
.sum::<f64>()
/ points.len() as f64;

mse.sqrt()
};
log::info!(
"GPS correlation successful: slope={:.9}, intercept_ms={:.3}, rms_error_ms={:.2}, points={}",
slope,
intercept,
rms_error_ms,
points.len()
);

CorrelationChunkResult::correlated_new(
chunk,
CorrelationFunction {
slope,
intercept_ms: intercept,
},
)
}

struct Point {
x: f64, // log timestamp ms
y: f64, // unix timestamp ms
}

/// Least squares linear regression.
///
/// Fits:
///
/// y = slope * x + intercept
fn linear_regression(points: &[Point]) -> Option<(f64, f64)> {
if points.len() < 2 {
return None;
}

let n = points.len() as f64;

let mut sum_x = 0.0;
let mut sum_y = 0.0;
let mut sum_xy = 0.0;
let mut sum_x2 = 0.0;

for p in points {
sum_x += p.x;
sum_y += p.y;
sum_xy += p.x * p.y;
sum_x2 += p.x * p.x;
}

let denom = n * sum_x2 - sum_x * sum_x;

if denom.abs() < 1e-9 {
return None;
}

let slope = (n * sum_xy - sum_x * sum_y) / denom;
let intercept = (sum_y - slope * sum_x) / n;

Some((slope, intercept))
}
1 change: 1 addition & 0 deletions src/daq_log_parse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod consts;
pub mod correlate;
pub mod parse;
pub mod table;
57 changes: 38 additions & 19 deletions src/daq_log_parse/table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::daq_log_parse::consts;
use crate::daq_log_parse::parse;
use crate::daq_log_parse::{consts, correlate};
use can_decode::DecodedSignalValue;

pub struct TableBuilder {
Expand All @@ -15,15 +14,18 @@ pub struct TableBuilder {

impl TableBuilder {
pub fn new() -> Self {
let mut tb = Self {
bus_row: vec!["Bus".to_string()],
node_row: vec!["Node".to_string()],
message_row: vec!["Message".to_string()],
signal_row: vec!["Signal".to_string()],
next_col_idx: 1,
Self {
bus_row: vec!["".to_string(), "".to_string(), "Bus".to_string()],
node_row: vec!["".to_string(), "".to_string(), "Node".to_string()],
message_row: vec!["".to_string(), "".to_string(), "Message".to_string()],
signal_row: vec![
"Real Time".to_string(),
"DAQ Timestamp".to_string(),
"Signal".to_string(),
],
next_col_idx: 3, // real time, daq timestamp, then row headers columns
indexer: std::collections::HashMap::new(),
};
tb
}
}

pub fn create_header(&mut self, parser: &can_decode::Parser, bus_name: &str) {
Expand Down Expand Up @@ -58,20 +60,20 @@ impl TableBuilder {
pub fn create_and_write_tables(
&self,
out_folder: &std::path::Path,
chunked_parsed: Vec<Vec<parse::ParsedMessage>>,
correlated_chunks: Vec<correlate::CorrelationChunkResult>,
) {
std::fs::create_dir_all(out_folder).unwrap();

for (chunk_idx, chunk) in chunked_parsed.iter().enumerate() {
for (chunk_idx, chunk) in correlated_chunks.iter().enumerate() {
let mut csv_table = vec![
self.bus_row.clone(),
self.node_row.clone(),
self.message_row.clone(),
self.signal_row.clone(),
];

let first_time = chunk.first().map(|m| m.timestamp).unwrap_or(0);
let last_time = chunk.last().map(|m| m.timestamp).unwrap_or(0);
let first_time = chunk.parsed_msgs.first().map(|m| m.timestamp).unwrap_or(0);
let last_time = chunk.parsed_msgs.last().map(|m| m.timestamp).unwrap_or(0);

let first_row_time = (first_time / consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS;
let last_row_time = last_time.div_ceil(consts::BIN_WIDTH_MS) * consts::BIN_WIDTH_MS;
Expand All @@ -83,11 +85,18 @@ impl TableBuilder {
let row_time = first_row_time + row_idx * consts::BIN_WIDTH_MS;
let row_time_sec = row_time as f32 / 1000.0;
let mut row = vec!["".to_string(); self.bus_row.len()];
row[0] = format!("{:.3}", row_time_sec);
let correlated_time = chunk
.correlation_fn
.as_ref()
.and_then(|cf| cf.correlate(row_time));
if let Some(ct) = correlated_time {
row[0] = ct.format("%H:%M:%S.%3f").to_string();
}
row[1] = format!("{:.3}", row_time_sec);
csv_table.push(row);
}

for msg in chunk {
for msg in chunk.parsed_msgs.iter() {
let decoded = &msg.decoded;
for (sig_name, sig_value) in &decoded.signals {
let key = (msg.bus_name.clone(), decoded.name.clone(), sig_name.clone());
Expand All @@ -104,13 +113,23 @@ impl TableBuilder {
}
}
}
let out_file = out_folder.join(format!("out_{:03}.csv", chunk_idx));
let mut wtr = csv::Writer::from_path(out_file).unwrap();

let first_correlated_time: Option<String> =
chunk.correlation_fn.as_ref().and_then(|cf| {
cf.correlate(first_time)
.map(|dt| dt.format("%Y_%m_%d__%H_%M_%S").to_string())
});
Comment thread
LelsersLasers marked this conversation as resolved.

let out_file = match first_correlated_time {
Some(t) => out_folder.join(format!("out_{:03}_{}.csv", chunk_idx, t)),
None => out_folder.join(format!("out_{:03}.csv", chunk_idx)),
};
let mut wtr = csv::Writer::from_path(out_file.clone()).unwrap();
for row in csv_table {
wtr.write_record(&row).unwrap();
}
wtr.flush().unwrap();
println!("Wrote chunk {} to CSV", chunk_idx);
println!("Wrote chunk {} to CSV ({})", chunk_idx, out_file.display());
}
}
}
3 changes: 2 additions & 1 deletion src/ui/log_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ impl LogParser {
let parsed =
daq_log_parse::parse::parse_log_files(&logs_dir, &parser_bus_0, &parser_bus_1);
let chunked_parsed = daq_log_parse::parse::chunk_parsed(parsed);
let correlated_chunks = daq_log_parse::correlate::time_correlate_chunks(chunked_parsed);

let mut table_builder = daq_log_parse::table::TableBuilder::new();
table_builder.create_header(&parser_bus_0, "VCAN");
table_builder.create_header(&parser_bus_1, "MCAN");
table_builder.create_and_write_tables(&output_dir, chunked_parsed);
table_builder.create_and_write_tables(&output_dir, correlated_chunks);

log::info!("Parsing completed successfully");
let _ = parse_to_ui_tx.send(MsgFromParserThread::SuccessExit(format!(
Expand Down
Loading