Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 330 additions & 0 deletions R/generate_matrices_ml.R
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,331 @@ skipImbalancedMatrix <- function(genome_ids,
split
}

#' Build leave-one-out (LOO) merged parquet matrices from drug parquet files.
#'
#' @param path Character. Base directory containing stratified parquet matrices.
#' Expected subdirs: matrix/..
#' @param verbosity Character. "minimal" or "debug"; when "debug", prints detailed steps.
#' @return Invisibly returns a tibble with paths of created LOO parquet files.
#' @import arrow dplyr purrr stringr tibble
.parquet2LOODrugMatrix <- function(
path,
verbosity = c("minimal", "debug")
) {
verbosity <- match.arg(verbosity)
log <- .make_logger(verbosity)

## ------------------------------------------------------------------
## Paths
## ------------------------------------------------------------------
matrix_path <- gsub("\\\\", "/", file.path(path, "matrix"))
LOO_path <- gsub("\\\\", "/", file.path(path, "LOO_matrix_drug"))

if (!dir.exists(matrix_path)) {
log("info", paste0("Matrix directory does not exist: ", matrix_path))
return(invisible(tibble::tibble(created_file = character())))
}
if (!dir.exists(LOO_path)) dir.create(LOO_path, recursive = TRUE)

## ------------------------------------------------------------------
## Locate parquet files
## ------------------------------------------------------------------
files <- list.files(matrix_path, pattern = "\\.parquet$", full.names = TRUE)
files <- files[!grepl("drug_class", basename(files))]
files <- gsub("\\\\", "/", files)

if (length(files) == 0) {
log("info", paste0("No parquet files found in ", matrix_path))
return(invisible(tibble::tibble(created_file = character())))
}

log("debug", paste0("Found ", length(files), " parquet files"))

## ------------------------------------------------------------------
## Parse filenames
## Expected: <prefix>_<drug>_<feature>_sparse.parquet
## ------------------------------------------------------------------
parsed_info <- tibble::tibble(
file = files,
parts = stringr::str_split(basename(files), "_")
) |>
dplyr::mutate(
idx_sparse = purrr::map_int(parts, \(x) {
w <- which(x == "sparse.parquet")
if (length(w) == 0) NA_integer_ else w[1]
}),
feature = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i) || i < 6) {
NA_character_
} else {
paste(x[(i - 2):(i - 1)], collapse = "_")
}
}),
prefix = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i) || i < 3) {
NA_character_
} else {
paste(x[1:(i - 4)], collapse = "_")
}
}),
drug = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i)) NA_character_ else x[i - 3]
})
) |>
dplyr::filter(!is.na(prefix), !is.na(feature), !is.na(drug))

if (nrow(parsed_info) == 0) {
log("info", "No files matched expected naming pattern")
return(invisible(tibble::tibble(created_file = character())))
}

log("debug", paste0("Parsed ", nrow(parsed_info), " files"))

## ------------------------------------------------------------------
## Iterate over (prefix, feature)
## ------------------------------------------------------------------
prefix_feature <- parsed_info |>
dplyr::distinct(prefix, feature)

created <- character(0)

for (i in seq_len(nrow(prefix_feature))) {
sub_prefix <- prefix_feature$prefix[i]
sub_feature <- prefix_feature$feature[i]

group_df <- parsed_info |>
dplyr::filter(prefix == sub_prefix, feature == sub_feature)

if (nrow(group_df) == 0) next

## Split by drug
grouped <- split(group_df, group_df$drug)

## --------------------------------------------------------------
## Leave‑one‑drug‑out with genome blocking
## --------------------------------------------------------------
purrr::walk(names(grouped), function(leave_one_out) {
## Read held‑out drug to get its genome IDs
heldout_tbl <- arrow::read_parquet(grouped[[leave_one_out]]$file)

if (!"genome_id" %in% colnames(heldout_tbl)) {
stop("Column '", "genome_id", "' not found in parquet file")
}

heldout_genomes <- unique(heldout_tbl[["genome_id"]])

## Training drugs
subset <- grouped[names(grouped) != leave_one_out]

## Remove overlapping genomes (STRICT leakage control)
subset <- purrr::map(
subset,
\(df) {
tbl <- arrow::read_parquet(df$file)
dplyr::filter(tbl, !.data[["genome_id"]] %in% heldout_genomes)
}
)

## Drop empty datasets
subset <- subset[purrr::map_int(subset, nrow) > 0]

if (length(subset) == 0) {
log(
"debug",
paste0(
"Skipping LOO drug ", leave_one_out,
": no data left after genome filtering"
)
)
return(NULL)
}

combined <- dplyr::bind_rows(subset)

## Safety check (can comment out once stable)
stopifnot(!any(combined[["genome_id"]] %in% heldout_genomes))

out_file <- gsub("\\\\", "/", file.path(
LOO_path,
paste0(
sub_prefix, "_leaveout_", leave_one_out, "_",
sub_feature, "_sparse.parquet"
)
))

arrow::write_parquet(combined, out_file)
created <<- c(created, out_file)

log(
"debug",
paste0(
"Created LOO file: ", out_file,
" | removed ", length(heldout_genomes), " genomes"
)
)
})
}

log("info", "All LOO‑drug matrices generated with genome‑level blocking")
invisible(tibble::tibble(created_file = created))
}

.parquet2CrossDrugTestMatrix <- function(
path,
verbosity = c("minimal", "debug")
) {
verbosity <- match.arg(verbosity)
log <- .make_logger(verbosity)

## ------------------------------------------------------------------
## Paths
## ------------------------------------------------------------------
matrix_path <- gsub("\\\\", "/", file.path(path, "matrix"))
out_path <- gsub("\\\\", "/", file.path(path, "cross_drug_test"))

if (!dir.exists(matrix_path)) {
log("info", paste0("Matrix directory does not exist: ", matrix_path))
return(invisible(tibble::tibble(created_file = character())))
}
if (!dir.exists(out_path)) dir.create(out_path, recursive = TRUE)

## ------------------------------------------------------------------
## Locate parquet files
## ------------------------------------------------------------------
files <- list.files(matrix_path, pattern = "\\.parquet$", full.names = TRUE)
files <- files[!grepl("drug_class", basename(files))]
files <- gsub("\\\\", "/", files)

if (length(files) == 0) {
log("info", paste0("No parquet files found in ", matrix_path))
return(invisible(tibble::tibble(created_file = character())))
}

log("debug", paste0("Found ", length(files), " parquet files"))

## ------------------------------------------------------------------
## Parse filenames (same logic as LOO)
## ------------------------------------------------------------------
parsed_info <- tibble::tibble(
file = files,
parts = stringr::str_split(basename(files), "_")
) |>
dplyr::mutate(
idx_sparse = purrr::map_int(parts, \(x) {
w <- which(x == "sparse.parquet")
if (length(w) == 0) NA_integer_ else w[1]
}),
feature = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i) || i < 6) {
NA_character_
} else {
paste(x[(i - 2):(i - 1)], collapse = "_")
}
}),
prefix = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i) || i < 3) {
NA_character_
} else {
paste(x[1:(i - 4)], collapse = "_")
}
}),
drug = purrr::pmap_chr(list(parts, idx_sparse), \(x, i) {
if (is.na(i)) NA_character_ else x[i - 3]
})
) |>
dplyr::filter(!is.na(prefix), !is.na(feature), !is.na(drug))

if (nrow(parsed_info) == 0) {
log("info", "No files matched expected naming pattern")
return(invisible(tibble::tibble(created_file = character())))
}

log("debug", paste0("Parsed ", nrow(parsed_info), " files"))

## ------------------------------------------------------------------
## Iterate over (prefix, feature)
## ------------------------------------------------------------------
prefix_feature <- parsed_info |>
dplyr::distinct(prefix, feature)

created <- character(0)

for (i in seq_len(nrow(prefix_feature))) {
sub_prefix <- prefix_feature$prefix[i]
sub_feature <- prefix_feature$feature[i]

group_df <- parsed_info |>
dplyr::filter(prefix == sub_prefix, feature == sub_feature)

if (nrow(group_df) < 2) next

grouped <- split(group_df, group_df$drug)

drugs <- names(grouped)

## --------------------------------------------------------------
## Pairwise cross‑drug testing (A → B)
## --------------------------------------------------------------
for (drugA in drugs) {
for (drugB in drugs) {
if (drugA == drugB) next

## Read training (drug A)
train_tbl <- arrow::read_parquet(grouped[[drugA]]$file)

if (!"genome_id" %in% colnames(train_tbl)) {
stop("Column '", "genome_id", "' not found in parquet file")
}

train_genomes <- unique(train_tbl[["genome_id"]])

## Read test (drug B) and REMOVE overlapping genomes
test_tbl <- arrow::read_parquet(grouped[[drugB]]$file) |>
dplyr::filter(!.data[["genome_id"]] %in% train_genomes)

if (nrow(test_tbl) == 0) {
log(
"debug",
paste0(
"Skipping ", drugA, " → ", drugB,
": no test data after genome filtering"
)
)
next
}

## Safety check
stopifnot(!any(test_tbl[["genome_id"]] %in% train_genomes))

out_file <- gsub("\\\\", "/", file.path(
out_path,
paste0(
sub_prefix, "_", drugA,
"_cross_testing_", drugB, "_",
sub_feature, "_sparse.parquet"
)
))

arrow::write_parquet(test_tbl, out_file)
created <- c(created, out_file)

log(
"debug",
paste0(
"Created cross‑drug test: ",
drugA, " → ", drugB,
" | removed ", length(train_genomes), " genomes"
)
)
}
}
}

log("info", "All cross‑drug testing matrices generated")
invisible(tibble::tibble(created_file = created))
}


#' Generate all ML input matrices
#'
#' a) each drug/drugclass matrices for all data and feature types
Expand Down Expand Up @@ -1022,6 +1347,11 @@ generateMLInputs <- function(parquet_duckdb_path = "results/Cje_parquet.duckdb",
verbosity = verbosity
)

.parquet2CrossDrugTestMatrix(out_path, verbosity = verbosity)

.parquet2LOODrugMatrix(out_path, verbosity = verbosity)


log("info", "All matrices generated and saved.")
invisible(TRUE)
}
Loading