diff --git a/NAMESPACE b/NAMESPACE index cf94e3bd..d858b069 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -233,8 +233,11 @@ importFrom(readr,write_lines) importFrom(readr,write_tsv) importFrom(rentrez,entrez_fetch) importFrom(rlang,.data) +importFrom(rlang,abort) importFrom(rlang,as_string) +importFrom(rlang,inform) importFrom(rlang,sym) +importFrom(rlang,warn) importFrom(sendmailR,mime_part) importFrom(sendmailR,sendmail) importFrom(seqinr,dist.alignment) diff --git a/R/acc2lin.R b/R/acc2lin.R index 5f25afe2..e6a74457 100644 --- a/R/acc2lin.R +++ b/R/acc2lin.R @@ -6,9 +6,12 @@ # suppressPackageStartupMessages(library(tidyverse)) # suppressPackageStartupMessages(library(biomartr)) + # https://stackoverflow.com/questions/18730491/sink-does-not-release-file #' Sink Reset #' +#' @importFrom rlang warn abort inform +#' #' @return No return, but run to close all outstanding `sink()`s #' #' @export @@ -18,9 +21,24 @@ #' sinkReset() #' } sinkReset <- function() { + # Handle all errors and warnings + tryCatch({ for (i in seq_len(sink.number())) { sink(NULL) } + rlang::inform("All sinks closed", class = "sink_reset_info") + }, error = function(e) { + rlang::abort(paste("Error: ", e$message), class = "sink_reset_error") + }, warning = function(w) { + rlang::warn(paste("Warning: ", w$message), class = "sink_reset_warning") + }, finally = { + # If any additional cleanup is needed, it can be done here + if (sink.number() > 0) { + # Additional cleanup if sinks are still open + rlang::inform("Some sinks remain open, ensure proper cleanup.", + class = "sink_cleanup_warning") + } + }) } @@ -41,7 +59,7 @@ sinkReset <- function() { #' #' @importFrom dplyr pull #' @importFrom magrittr %>% -#' @importFrom rlang sym +#' @importFrom rlang sym warn abort inform #' #' @return A `data.frame` that combines the original `df` with the lineage #' information. @@ -52,25 +70,65 @@ sinkReset <- function() { #' addLineage() #' } addLineage <- function(df, acc_col = "AccNum", assembly_path, - lineagelookup_path, ipgout_path = NULL, plan = "sequential", ...) { - s_acc_col <- sym(acc_col) + lineagelookup_path, ipgout_path = NULL, + plan = "sequential", ...) { + # check for validate inputs + if (!is.data.frame(df)) { + rlang::abort("Input 'df' must be a data frame.", class = "input_error") + } + + if (!acc_col %in% colnames(df)) { + rlang::abort(paste("Column", acc_col, + "not found in data frame."), class = "column_error") + } + + # Ensure paths are character strings + if (!is.character(assembly_path) || !is.character(lineagelookup_path)) { + rlang::abort("Both 'assembly_path' and + 'lineagelookup_path' must be character strings.", + class = "path_type_error") + } + + # Ensure paths exist + if (!file.exists(assembly_path)) { + rlang::abort(paste("Assembly file not found at:", + assembly_path), class = "file_not_found_error") + } + + if (!file.exists(lineagelookup_path)) { + rlang::abort(paste("Lineage lookup file not found at:", + lineagelookup_path), class = "file_not_found_error") + } + tryCatch({ + # Attempt to add lineages + acc_col <- sym(acc_col) accessions <- df %>% pull(acc_col) - lins <- acc2Lineage(accessions, assembly_path, lineagelookup_path, ipgout_path, plan) - - # Drop a lot of the unimportant columns for now? - # will make merging much easier - lins <- lins[, c( - "Strand", "Start", "Stop", "Nucleotide Accession", "Source", - "Id", "Strain" - ) := NULL] - lins <- unique(lins) - - # dup <- lins %>% group_by(Protein) %>% - # summarize(count = n()) %>% filter(count > 1) %>% - # pull(Protein) - - merged <- merge(df, lins, by.x = acc_col, by.y = "Protein", all.x = TRUE) - return(merged) + lins <- acc2Lineage( + accessions, assembly_path, lineagelookup_path, ipgout_path, plan + ) + + # Drop a lot of the unimportant columns for now? + # will make merging much easier + lins <- lins[, c( + "Strand", "Start", "Stop", "Nucleotide Accession", "Source", + "Id", "Strain" + ) := NULL] + lins <- unique(lins) + + # dup <- lins %>% group_by(Protein) %>% + # summarize(count = n()) %>% filter(count > 1) %>% + # pull(Protein) + + merged <- merge(df, lins, by.x = acc_col, by.y = "Protein", all.x = TRUE) + return(merged) + }, error = function(e) { + rlang::abort(paste("Error during lineage addition:", e$message), + class = "lineage_addition_error") + }, warning = function(w) { + rlang::warn(paste("Warning during lineage addition:", w$message), + class = "lineage_addition_warning") + }) + } @@ -91,31 +149,67 @@ addLineage <- function(df, acc_col = "AccNum", assembly_path, #' @param plan A string specifying the parallelization strategy for the future #' package, such as `"sequential"` or `"multisession"`. #' +#' @importFrom rlang warn abort inform +#' #' @return A `data.table` that contains the lineage information, mapping protein #' accessions to their tax IDs and lineages. #' @export -#' @export #' #' @examples #' \dontrun{ #' acc2Lineage() #' } -acc2Lineage <- function(accessions, assembly_path, lineagelookup_path, ipgout_path = NULL, plan = "sequential", ...) { - tmp_ipg <- F - if (is.null(ipgout_path)) { - tmp_ipg <- T - ipgout_path <- tempfile("ipg", fileext = ".txt") - } +acc2Lineage <- function(accessions, assembly_path, + lineagelookup_path, ipgout_path = NULL, + plan = "sequential", ...) { + tmp_ipg <- F + if (is.null(ipgout_path)) { + tmp_ipg <- TRUE + ipgout_path <- tempfile("ipg", fileext = ".txt") + } + + lins <- NULL + tryCatch({ + # Attempt to fetch IPG efetchIPG(accessions, out_path = ipgout_path, plan) - lins <- IPG2Lineage(accessions, ipgout_path, assembly_path, lineagelookup_path) + # Attempt to process IPG to lineages + lins <- IPG2Lineage(accessions, ipgout_path, + assembly_path, lineagelookup_path) + }, error = function(e) { + rlang::abort( + message = paste("An error occurred during IPG fetching + or lineage processing:", e$message), + class = "lineage_processing_error", + # adding additional context + accessions = accessions, + assembly_path = assembly_path, + lineagelookup_path = lineagelookup_path, + ipgout_path = ipgout_path, + plan = plan + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning during IPG fetching + or lineage processing:", w$message), + class = "lineage_processing_warning", + accessions = accessions, + assembly_path = assembly_path, + lineagelookup_path = lineagelookup_path, + ipgout_path = ipgout_path, + plan = plan + ) + }, finally = { + # Cleanup: delete temporary IPG file if it was created + if (tmp_ipg && file.exists(ipgout_path)) { + unlink(ipgout_path) + } + }) - if (tmp_ipg) { - unlink(tempdir(), recursive = T) - } return(lins) } + #' efetchIPG #' #' @author Samuel Chen, Janani Ravi @@ -130,6 +224,7 @@ acc2Lineage <- function(accessions, assembly_path, lineagelookup_path, ipgout_pa #' @importFrom furrr future_map #' @importFrom future plan #' @importFrom rentrez entrez_fetch +#' @importFrom rlang warn abort inform #' #' @return No return value. The function writes the fetched results to `out_path`. #' @export @@ -139,12 +234,28 @@ acc2Lineage <- function(accessions, assembly_path, lineagelookup_path, ipgout_pa #' efetchIPG() #' } efetchIPG <- function(accnums, out_path, plan = "sequential", ...) { - if (length(accnums) > 0) { - partition <- function(in_data, groups) { - # \\TODO This function should be defined outside of efetchIPG(). It can be non-exported/internal - # Partition data to limit number of queries per second for rentrez fetch: - # limit of 10/second w/ key - l <- length(in_data) + # Argument validation + if (!is.character(accnums) || length(accnums) == 0) { + rlang::abort("Error: 'accnums' must be a non-empty character vector.", + class = "validation_error") + } + + if (!is.character(out_path) || nchar(out_path) == 0) { + rlang::abort("Error: 'out_path' must be a non-empty string.", + class = "validation_error") + } + + if (!is.function(plan)) { + rlang::abort("Error: 'plan' must be a valid plan function.", + class = "validation_error") + } + if (length(accnums) > 0) { + partition <- function(in_data, groups) { + # \\TODO This function should be defined outside of efetchIPG(). + # It can be non-exported/internal + # Partition data to limit number of queries per second for rentrez fetch: + # limit of 10/second w/ key + l <- length(in_data) partitioned <- list() for (i in 1:groups){ @@ -153,6 +264,7 @@ efetchIPG <- function(accnums, out_path, plan = "sequential", ...) { return(partitioned) } + tryCatch({ # Set the future plan strategy plan(strategy = plan, .skip = T) @@ -165,22 +277,41 @@ efetchIPG <- function(accnums, out_path, plan = "sequential", ...) { # Open the sink to the output path sink(out_path) - a <- future_map(1:length(partitioned_acc), function(x) { - # Avoid hitting the rate API limit - if (x %% 9 == 0) { - Sys.sleep(1) - } - cat( - entrez_fetch( - id = partitioned_acc[[x]], - db = "ipg", - rettype = "xml", - api_key = "YOUR_KEY_HERE" ## Can this be included in public package? + a <- future_map(1:length(partitioned_acc), function(x) { + # Avoid hitting the rate API limit + if (x %% 9 == 0) { + Sys.sleep(1) + } + cat( + entrez_fetch( + id = partitioned_acc[[x]], + db = "ipg", + rettype = "xml", + api_key = "YOUR_KEY_HERE" ## Can this be included in public package? + ) ) + }) + sink(NULL) + }, error = function(e) { + rlang::abort( + message = paste("An error occurred: ", e$message), + class = "fetch_error", + accnums = accnums, + out_path = out_path, + plan = plan + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "fetch_warning", + accnums = accnums, + out_path = out_path, + plan = plan ) + }, finally = { + # Ensure the sink is closed in case of errors + if (sink.number() > 0) sink(NULL) }) - sink(NULL) - } } @@ -204,6 +335,7 @@ efetchIPG <- function(accnums, out_path, plan = "sequential", ...) { #' "createLineageLookup()" function #' #' @importFrom data.table fread +#' @importFrom rlang warn abort inform #' #' @return A `data.table` with the lineage information for the provided protein #' accessions. @@ -214,7 +346,40 @@ efetchIPG <- function(accnums, out_path, plan = "sequential", ...) { #' IPG2Lineage() #' } #' -IPG2Lineage <- function(accessions, ipg_file, assembly_path, lineagelookup_path, ...) { +IPG2Lineage <- function(accessions, ipg_file, + assembly_path, lineagelookup_path, ...) { + # Argument validation for accessions + if (!is.character(accessions) || length(accessions) == 0) { + rlang::abort("Input 'accessions' must be a non-empty + character vector.", class = "validation_error") + } + + # check for validate inputs + if (!is.character(ipg_file)) { + rlang::abort("Input 'ipg_file' must be a + character string.", class = "validation_error") + } + + # Ensure paths are character strings + if (!is.character(assembly_path) || !is.character(lineagelookup_path)) { + rlang::abort("Both 'assembly_path' and 'lineagelookup_path' + must be character strings.", class = "validation_error") + } + + # Ensure paths exist + if (!file.exists(assembly_path)) { + rlang::abort(paste("Assembly file not found at:", assembly_path), + class = "file_error") + } + + if (!file.exists(lineagelookup_path)) { + rlang::abort(paste("Lineage lookup file not found at:", lineagelookup_path), + class = "file_error") + } + + # Process the IPG file + try({ + # Attempt to read the IPG file ipg_dt <- fread(ipg_file, sep = "\t", fill = T) # Filter the IPG data table to only include the accessions @@ -223,10 +388,33 @@ IPG2Lineage <- function(accessions, ipg_file, assembly_path, lineagelookup_path, # Rename the 'Assembly' column to 'GCA_ID' ipg_dt <- setnames(ipg_dt, "Assembly", "GCA_ID") + # Convert the IPG data table to a lineage data table lins <- GCA2Lineage(prot_data = ipg_dt, assembly_path, lineagelookup_path) + + # Filter out rows with missing lineage information lins <- lins[!is.na(Lineage)] %>% unique() return(lins) + }, error = function(e) { + rlang::abort( + message = paste("An error occurred: ", e$message), + class = "processing_error", + accessions = accessions, + ipg_file = ipg_file, + assembly_path = assembly_path, + lineagelookup_path = lineagelookup_path + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "processing_warning", + accessions = accessions, + ipg_file = ipg_file, + assembly_path = assembly_path, + lineagelookup_path = lineagelookup_path + ) + }) + } diff --git a/R/assign_job_queue.R b/R/assign_job_queue.R index 69609417..52af46bf 100644 --- a/R/assign_job_queue.R +++ b/R/assign_job_queue.R @@ -1,3 +1,4 @@ + # for now, we're using an env var, COMMON_SRC_ROOT, to specify this folder since # the working directory is changed in many parts of the current molevolvr # pipeline. @@ -11,18 +12,28 @@ common_root <- Sys.getenv("COMMON_SRC_ROOT") #' @description #' Construct list where names (MolEvolvR advanced options) point to processes #' +#' @importFrom rlang warn abort inform +#' #' @return list where names (MolEvolvR advanced options) point to processes #' #' example: list_opts2procs <- mapOption2Process #' @export mapOption2Process <- function() { - opts2processes <- list( - "homology_search" = c("dblast", "dblast_cleanup"), - "domain_architecture" = c("iprscan", "ipr2lineage", "ipr2da"), - # processes always present agnostic of advanced options - "always" = c("blast_clust", "clust2table") - ) - return(opts2processes) + tryCatch({ + opts2processes <- list( + "homology_search" = c("dblast", "dblast_cleanup"), + "domain_architecture" = c("iprscan", "ipr2lineage", "ipr2da"), + # processes always present agnostic of advanced options + "always" = c("blast_clust", "clust2table") + ) + return(opts2processes) + }, error = function(e) { + rlang::abort(paste("Error: ", e$message), class = "Opts_to_process_error") + }, warning = function(w) { + rlang::warn(paste("Warning: ", w$message), + class = "Opts_to_process_warning") + }) + } #' mapAdvOption2Process @@ -32,6 +43,8 @@ mapOption2Process <- function() { #' #' @param advanced_opts character vector of MolEvolvR advanced options #' +#' @importFrom rlang warn abort inform +#' #' @return character vector of process names that will execute given #' the advanced options #' @@ -40,14 +53,33 @@ mapOption2Process <- function() { #' procs <- mapAdvOption2Process(advanced_opts) #' @export mapAdvOption2Process <- function(advanced_opts) { - # append 'always' to add procs that always run - advanced_opts <- c(advanced_opts, "always") - opts2proc <- mapOption2Process() - # setup index for opts2proc based on advanced options - idx <- which(names(opts2proc) %in% advanced_opts) - # extract processes that will run - procs <- opts2proc[idx] |> unlist() - return(procs) + if (!is.character(advanced_opts)) { + rlang::abort("Argument must be a character vector!", + class = "validation_error") + } + tryCatch({ + # append 'always' to add procs that always run + advanced_opts <- c(advanced_opts, "always") + opts2proc <- make_opts2procs() + # setup index for opts2proc based on advanced options + idx <- which(names(opts2proc) %in% advanced_opts) + # extract processes that will run + procs <- opts2proc[idx] |> unlist() + return(procs) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error: ", e$message), + class = "map_advanced_opts2procs_error", + advanced_opts = advanced_opts + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "map_advanced_opts2procs_warning", + advanced_opts = advanced_opts + ) + }) + } #' calculateProcessRuntime @@ -59,6 +91,7 @@ mapAdvOption2Process <- function(advanced_opts) { #' directory #' #' @importFrom dplyr across everything select summarise +#' @importFrom rlang warn abort inform #' #' @return [list] names: processes; values: median runtime (seconds) #' @@ -76,7 +109,20 @@ mapAdvOption2Process <- function(advanced_opts) { #' list_proc_medians <- calculateProcessRuntime(dir_job_results) #' @export calculateProcessRuntime <- function(dir_job_results) { - source(file.path(common_root, "molevol_scripts", "R", "metrics.R")) + tryCatch({ + # Check if dir_job_results is a character string + if (!is.character(dir_job_results) || length(dir_job_results) != 1) { + rlang::abort("Input 'dir_job_results' must be a single character string.", + class = "validation_error") + } + + # Check if dir_job_results exists + if (!dir.exists(dir_job_results)) { + rlang::abort(paste("The directory", dir_job_results, "does not exist."), + class = "file_error") + } + + source(file.path(common_root, "molevol_scripts", "R", "metrics.R")) # aggregate logs from path_log_data <- file.path(common_root, @@ -87,30 +133,36 @@ calculateProcessRuntime <- function(dir_job_results) { dir.create(dirname(path_log_data), recursive = TRUE, showWarnings = FALSE) } + # attempt to load pre-generated logdata + if (!file.exists(path_log_data)) { + logs <- aggregate_logs(dir_job_results, latest_date = Sys.Date() - 60) + save(logs, file = path_log_data) + } else { + load(path_log_data) # loads the logs object + } + df_log <- logs$df_log + procs <- c( + "dblast", "dblast_cleanup", "iprscan", + "ipr2lineage", "ipr2da", "blast_clust", + "clust2table" + ) + list_proc_medians <- df_log |> + dplyr::select(dplyr::all_of(procs)) |> + dplyr::summarise( + dplyr::across( + dplyr::everything(), + \(x) median(x, na.rm = TRUE) + ) + ) |> + as.list() + return(list_proc_medians) + }, error = function(e) { + rlang::abort(paste("Encountered an error: ", e$message), + class = "processing_error") + }, warning = function(w) { + rlang::warn(paste("Warning: ", w$message), class = "processing_warning") + }) - # attempt to load pre-generated logdata - if (!file.exists(path_log_data)) { - logs <- aggregate_logs(dir_job_results, latest_date = Sys.Date() - 60) - save(logs, file = path_log_data) - } else { - load(path_log_data) # loads the logs object - } - df_log <- logs$df_log - procs <- c( - "dblast", "dblast_cleanup", "iprscan", - "ipr2lineage", "ipr2da", "blast_clust", - "clust2table" - ) - list_proc_medians <- df_log |> - dplyr::select(dplyr::all_of(procs)) |> - dplyr::summarise( - dplyr::across( - dplyr::everything(), - \(x) median(x, na.rm = TRUE) - ) - ) |> - as.list() - return(list_proc_medians) } #' writeProcessRuntime2TSV @@ -125,6 +177,7 @@ calculateProcessRuntime <- function(dir_job_results) { #' @importFrom tibble as_tibble #' @importFrom readr write_tsv #' @importFrom tidyr pivot_longer +#' @importFrom rlang warn abort inform #' #' @return [tbl_df] 2 columns: 1) process and 2) median seconds #' @@ -134,18 +187,50 @@ calculateProcessRuntime <- function(dir_job_results) { #' ) #' @export writeProcessRuntime2TSV <- function(dir_job_results, filepath) { - df_proc_medians <- calculateProcessRuntime(dir_job_results) |> - tibble::as_tibble() |> - tidyr::pivot_longer( - dplyr::everything(), - names_to = "process", - values_to = "median_seconds" - ) |> - dplyr::arrange(dplyr::desc(median_seconds)) - - # Write the resulting tibble to a TSV file - readr::write_tsv(df_proc_medians, file = filepath) - return(df_proc_medians) + tryCatch({ + # Error handling for input arguments + if (!is.character(dir_job_results) || length(dir_job_results) != 1) { + rlang::abort("Input 'dir_job_results' must be a single character string.", + class = "validation_error") + } + + if (!dir.exists(dir_job_results)) { + rlang::abort(paste("The directory", dir_job_results, "does not exist."), + class = "file_error") + } + + if (!is.character(filepath) || length(filepath) != 1) { + rlang::abort("Input 'filepath' must be a single character string.", + class = "validation_error") + } + df_proc_medians <- get_proc_medians(dir_job_results) |> + tibble::as_tibble() |> + tidyr::pivot_longer( + dplyr::everything(), + names_to = "process", + values_to = "median_seconds" + ) |> + dplyr::arrange(dplyr::desc(median_seconds)) + + # Write the resulting tibble to a TSV file + readr::write_tsv(df_proc_medians, file = filepath) + return(df_proc_medians) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error: ", e$message), + class = "processing_error", + dir_job_results = dir_job_results, + filepath = filepath + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "processing_warning", + dir_job_results = dir_job_results, + filepath = filepath + ) + }) + } #' writeProcessRuntime2YML @@ -159,10 +244,11 @@ writeProcessRuntime2TSV <- function(dir_job_results, filepath) { #' read location. #' #' @param dir_job_results [chr] path to MolEvolvR job_results directory -#' @param filepath [chr] path to save YAML file; if NULL, +#' @param filepath [chr] path to save YAML file; if NULL, #' uses ./molevol_scripts/log_data/job_proc_weights.yml #' #' @importFrom yaml write_yaml +#' @importFrom rlang warn abort inform #' #' @examples #' \dontrun{ @@ -176,8 +262,55 @@ writeProcessRuntime2YML <- function(dir_job_results, filepath = NULL) { if (is.null(filepath)) { filepath <- file.path(common_root, "molevol_scripts", "log_data", "job_proc_weights.yml") } - medians <- calculateProcessRuntime(dir_job_results) - yaml::write_yaml(medians, filepath) + tryCatch({ + # Error handling for dir_job_results arguments + if (!is.character(dir_job_results) || length(dir_job_results) != 1) { + rlang::abort( + message = "Input 'dir_job_results' must be a single character string.", + class = "validation_error", + dir_job_results = dir_job_results + ) + } + + if (!dir.exists(dir_job_results)) { + rlang::abort( + message = paste("The directory", dir_job_results, "does not exist."), + class = "file_error", + dir_job_results = dir_job_results + ) + } + + if (is.null(filepath)) { + filepath <- file.path(common_root, + "molevol_scripts", + "log_data", + "job_proc_weights.yml") + } + if (!is.character(filepath) || length(filepath) != 1) { + rlang::abort( + message = "Input 'filepath' must be a single character string.", + class = "validation_error", + filepath = filepath + ) + } + + medians <- calculateProcessRuntime(dir_job_results) + yaml::write_yaml(medians, filepath) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error: ", e$message), + class = "processing_error", + dir_job_results = dir_job_results, + filepath = filepath + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "processing_warning", + dir_job_results = dir_job_results, + filepath = filepath + ) + }) } #' getProcessRuntimeWeights @@ -190,6 +323,7 @@ writeProcessRuntime2YML <- function(dir_job_results, filepath = NULL) { #' #' @importFrom stringr str_glue str_trim #' @importFrom yaml read_yaml +#' @importFrom rlang warn abort inform #' #' @return [list] names: processes; values: median runtime (seconds) #' @@ -207,13 +341,24 @@ getProcessRuntimeWeights <- function(medians_yml_path = NULL) { # attempt to read the weights from the YAML file produced by # writeProcessRuntime2YML() if (stringr::str_trim(medians_yml_path) == "") { - stop( - stringr::str_glue("medians_yml_path is empty - ({medians_yml_path}), returning default weights") + rlang::abort( + message = stringr::str_glue("medians_yml_path is empty + ({medians_yml_path}), returning default weights"), + class = "input_error", + medians_yml_path = medians_yml_path ) } proc_weights <- yaml::read_yaml(medians_yml_path) + + if (!is.list(proc_weights) || length(proc_weights) == 0) { + rlang::abort( + message = "The loaded YAML file does not + contain valid process weights.", + class = "file_error", + medians_yml_path = medians_yml_path + ) + } }, # to avoid fatal errors in reading the proc weights yaml, # some median process runtimes have been hardcoded based on @@ -246,6 +391,7 @@ getProcessRuntimeWeights <- function(medians_yml_path = NULL) { #' #' @importFrom dplyr if_else #' @importFrom stringr str_glue +#' @importFrom rlang warn abort inform #' #' @return total estimated number of seconds a job will process (walltime) #' @@ -257,13 +403,52 @@ calculateEstimatedWallTimeFromOpts <- function(advanced_opts, n_inputs = 1L, n_hits = NULL, verbose = FALSE) { - # to calculate est walltime for a homology search job, the number of hits - # must be provided - validation_fail <- is.null(n_hits) && "homology_search" %in% advanced_opts - stopifnot(!validation_fail) + + tryCatch({ + # to calculate est walltime for a homology search job, the number of hits + # must be provided + validation_fail <- is.null(n_hits) && "homology_search" %in% advanced_opts + stopifnot(!validation_fail) + + # Validate advanced_opts + if (!is.character(advanced_opts)) { + rlang::abort( + message = "Argument 'advanced_opts' must be a character vector.", + class = "validation_error", + advanced_opts = advanced_opts + ) + } + + # Validate n_inputs + if (!is.numeric(n_inputs) || length(n_inputs) != 1 || n_inputs <= 0) { + rlang::abort( + message = "Argument 'n_inputs' + must be a single positive numeric value.", + class = "validation_error", + n_inputs = n_inputs + ) + } + + # Validate n_hits if homology_search is in advanced_opts + if ("homology_search" %in% advanced_opts && + (is.null(n_hits) || !is.numeric(n_hits) || + length(n_hits) != 1 || n_hits < 0)) { + rlang::abort( + message = "Argument 'n_hits' must be a single non-negative numeric + value when 'homology_search' is in 'advanced_opts'.", + class = "validation_error", + n_hits = n_hits + ) + } # Get process weights - proc_weights <- writeProcessRuntime2YML() + proc_weights <- writeProcessRuntime2YML() + if (!is.list(proc_weights)) { + rlang::abort( + message = "Process weights could not be retrieved correctly.", + class = "processing_error" + ) + } # sort process weights by names and convert to vec proc_weights <- proc_weights[order(names(proc_weights))] |> unlist() @@ -300,6 +485,24 @@ calculateEstimatedWallTimeFromOpts <- function(advanced_opts, cat(file = stderr(), msg) } return(est_walltime) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error: ", e$message), + class = "processing_error", + advanced_opts = advanced_opts, + n_inputs = n_inputs, + n_hits = n_hits + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "processing_warning", + advanced_opts = advanced_opts, + n_inputs = n_inputs, + n_hits = n_hits + ) + }) + } @@ -313,6 +516,8 @@ calculateEstimatedWallTimeFromOpts <- function(advanced_opts, #' @param t_long threshold value that defines the lower bound for assigning a #' job to the "long queue" #' +#' @importFrom rlang warn abort inform +#' #' @return a string of "short" or "long" #' #' example: @@ -324,8 +529,45 @@ assignJobQueue <- function( t_sec_estimate, t_cutoff = 21600 # 6 hours ) { - queue <- ifelse(t_sec_estimate > t_cutoff, "long", "short") - return(queue) + tryCatch({ + # Validate t_sec_estimate + if (!is.numeric(t_sec_estimate) || length(t_sec_estimate) != 1) { + rlang::abort( + message = "Argument 't_sec_estimate' must be a single numeric value.", + class = "validation_error", + t_sec_estimate = t_sec_estimate + ) + } + + # Validate t_cutoff + if (!is.numeric(t_cutoff) || length(t_cutoff) != 1 || t_cutoff < 0) { + rlang::abort( + message = "Argument 't_cutoff' must be a + single non-negative numeric value.", + class = "validation_error", + t_cutoff = t_cutoff + ) + } + + + queue <- ifelse(t_sec_estimate > t_cutoff, "long", "short") + return(queue) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error: ", e$message), + class = "processing_error", + t_sec_estimate = t_sec_estimate, + t_cutoff = t_cutoff + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning: ", w$message), + class = "processing_warning", + t_sec_estimate = t_sec_estimate, + t_cutoff = t_cutoff + ) + }) + } #' plotEstimatedWallTimes @@ -339,6 +581,7 @@ assignJobQueue <- function( #' @importFrom dplyr mutate select #' @importFrom ggplot2 aes geom_line ggplot labs #' @importFrom tibble as_tibble +#' @importFrom rlang warn abort inform #' #' @return line plot object #' @@ -348,6 +591,7 @@ assignJobQueue <- function( #' dev/molevol_scripts/docs/estimate_walltimes.png", plot = p) #' @export plotEstimatedWallTimes <- function() { + tryCatch({ opts <- mapOption2Process() |> names() # get all possible submission permutations (powerset) get_powerset <- function(vec) { @@ -417,8 +661,8 @@ plotEstimatedWallTimes <- function() { # sec to hrs df_walltimes <- df_walltimes |> dplyr::mutate(est_walltime = est_walltime / 3600) - p <- ggplot2::ggplot(df_walltimes, ggplot2::aes(x = n_inputs, - y = est_walltime, + p <- ggplot2::ggplot(df_walltimes, ggplot2::aes(x = n_inputs, + y = est_walltime, color = advanced_opts)) + ggplot2::geom_line() + ggplot2::labs( @@ -427,4 +671,16 @@ plotEstimatedWallTimes <- function() { y = "Estimated walltime (hours)" ) return(p) + }, error = function(e) { + rlang::abort( + message = paste("Encountered an error:", e$message), + .internal = TRUE + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning:", w$message), + .internal = TRUE + ) + }) + } diff --git a/R/blastWrappers.R b/R/blastWrappers.R index 3c9c4192..257381b4 100755 --- a/R/blastWrappers.R +++ b/R/blastWrappers.R @@ -17,6 +17,8 @@ #' @param num_alignments Number of alignments to report. #' @param num_threads Number of threads to use for the search (default is 1). #' +#' @importFrom rlang warn abort inform +#' #' @return This function does not return a value; it outputs results to the #' specified file. #' @export @@ -28,24 +30,71 @@ runDeltaBlast <- function(runDeltaBlast, db_search_path, db = "refseq", query, evalue = "1e-5", out, num_alignments, num_threads = 1) { - start <- Sys.time() - system(paste0("export BLASTDB=/", db_search_path)) + # Argument validation + if (!file.exists(deltablast_path)) { + rlang::abort(paste("The DELTABLAST executable path is invalid:", + deltablast_path)) + } + if (!dir.exists(db_search_path)) { + rlang::abort(paste("The database search path is invalid:", db_search_path)) + } + if (!file.exists(query)) { + rlang::abort(paste("The query file path is invalid:", query)) + } + if (!is.numeric(as.numeric(evalue)) || as.numeric(evalue) <= 0) { + rlang::abort(paste("The evalue must be a positive number:", evalue)) + } + if (!is.numeric(num_alignments) || num_alignments <= 0) { + rlang::abort(paste("The number of alignments must be a positive integer:", + num_alignments)) + } + if (!is.numeric(num_threads) || num_threads <= 0) { + rlang::abort(paste("The number of threads must be a positive integer:", + num_threads)) + } - system2( - command = deltablast_path, - args = c( - "-db", db, - "-query", query, - "-evalue", evalue, - "-out", out, - "-num_threads", num_threads, - "-num_alignments", num_alignments - # ,"-outfmt", outfmt - ) - ) - print(Sys.time() - start) + start <- Sys.time() + + tryCatch({ + system(paste0("export BLASTDB=/", db_search_path)) + system2( + command = deltablast_path, + args = c( + "-db", db, + "-query", query, + "-evalue", evalue, + "-out", out, + "-num_threads", num_threads, + "-num_alignments", num_alignments + # ,"-outfmt", outfmt + ) + ) + print(Sys.time() - start) + }, error = function(e) { + rlang::abort( + message = paste("Error in run_deltablast:", e$message), + class = "processing_error", + deltablast_path = deltablast_path, + db_search_path = db_search_path, + query = query, + out = out, + num_alignments = num_alignments, + num_threads = num_threads + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning in run_deltablast:", w$message), + class = "processing_warning", + deltablast_path = deltablast_path, + db_search_path = db_search_path, + query = query, + out = out, + num_alignments = num_alignments, + num_threads = num_threads + ) + }) } @@ -64,6 +113,8 @@ runDeltaBlast <- function(runDeltaBlast, db_search_path, #' @param out Path to the output file where results will be saved. #' @param num_threads Number of threads to use for the search (default is 1). #' +#' @importFrom rlang warn abort inform +#' #' @return This function does not return a value; it outputs results to the #' specified file. #' @export @@ -75,7 +126,35 @@ runDeltaBlast <- function(runDeltaBlast, db_search_path, runRPSBlast <- function(rpsblast_path, db_search_path, db = "refseq", query, evalue = "1e-5", out, num_threads = 1) { - start <- Sys.time() + + # Argument validation + if (!file.exists(rpsblast_path)) { + rlang::abort(paste("The RPSBLAST executable path is invalid:", + rpsblast_path), + class = "file_error") + } + if (!dir.exists(db_search_path)) { + rlang::abort(paste("The database search path is invalid:", db_search_path), + class = "file_error") + } + if (!file.exists(query)) { + rlang::abort(paste("The query file path is invalid:", query), + class = "file_error") + } + if (!is.numeric(as.numeric(evalue)) || as.numeric(evalue) <= 0) { + rlang::abort(paste("The evalue must be a positive number:", evalue), + class = "validation_error") + } + if (!is.numeric(num_threads) || num_threads <= 0) { + rlang::abort(paste("The number of threads must be a positive integer:", + num_threads), + class = "validation_error") + } + + start <- Sys.time() + + tryCatch({ + system(paste0("export BLASTDB=/", db_search_path)) system2( command = rpsblast_path, @@ -85,8 +164,29 @@ runRPSBlast <- function(rpsblast_path, db_search_path, "-evalue", evalue, "-out", out, "-num_threads", num_threads - # , "-outfmt", outfmt ) ) print(Sys.time() - start) + }, error = function(e) { + rlang::abort( + message = paste("Error in run_rpsblast:", e$message), + class = "processing_error", + rpsblast_path = rpsblast_path, + db_search_path = db_search_path, + query = query, + out = out, + num_threads = num_threads + ) + }, warning = function(w) { + rlang::warn( + message = paste("Warning in run_rpsblast:", w$message), + class = "processing_warning", + rpsblast_path = rpsblast_path, + db_search_path = db_search_path, + query = query, + out = out, + num_threads = num_threads + ) + }) + }