diff --git a/DESCRIPTION b/DESCRIPTION index 1fcdb5c..afc7d4a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -42,7 +42,8 @@ Imports: reshape2, stringr, tibble, - tidyr + tidyr, + treemapify biocViews: GenomeAssembly, Annotation, Sequencing VignetteBuilder: knitr Suggests: diff --git a/R/amRdataPlots.R b/R/amRdataPlots.R new file mode 100644 index 0000000..44ae8a7 --- /dev/null +++ b/R/amRdataPlots.R @@ -0,0 +1,456 @@ +#' Generate a summary report for AMR metadata +#' +#' @param metadata_parquet Character string. Path to a Parquet file containing +#' standardized AMR metadata. +#' @param out_path Character string. Directory where the Markdown report is written. +#' +#' @return Writes a structured, human‑readable summary report to +#' "/amr_metadata_summary.md". +#' +#' @import dplyr +#' @import arrow +#' @import kableExtra +#' +#' @examples +#' generateSummary( +#' metadata_parquet = "results/metadata.parquet", +#' out_path = "results/" +#' ) +#' +#' @export +generateSummary <- function(metadata_parquet, out_path) { + # Little helper to apply distinct + non-empty + sorted vector + clean_distinct <- function(df, col) { + df |> + dplyr::distinct({{ col }}) |> + dplyr::filter(!is.na({{ col }}), {{ col }} != "") |> + dplyr::arrange({{ col }}) |> + dplyr::pull({{ col }}) + } + + # Format for Markdown + md_tbl <- function(df) { + knitr::kable(df, format = "pipe") + } + + # Create a file + write_new <- function(path, lines) { + con <- file(path, open = "w", encoding = "UTF-8") + on.exit(close(con), add = TRUE) + writeLines(lines, con = con, sep = "\n", useBytes = TRUE) + } + + # Add lines to file + append_lines <- function(path, lines) { + con <- file(path, open = "a", encoding = "UTF-8") + on.exit(close(con), add = TRUE) + writeLines(lines, con = con, sep = "\n", useBytes = TRUE) + } + + # Setting paths + if (!dir.exists(out_path)) dir.create(out_path, recursive = TRUE, showWarnings = FALSE) + metadata_parquet <- normalizePath(metadata_parquet) + metadata <- arrow::read_parquet(metadata_parquet) + md_path <- file.path(out_path, "amr_metadata_summary.md") + + # Validation (got any data?) + if (nrow(metadata) == 0) { + stop("The output table is empty. Please check your query or input data.") + } + + # Core summaries + TotalEntryCount <- metadata |> dplyr::count() + CleanEntryCount <- metadata |> + dplyr::distinct(genome.genome_id) |> + dplyr::count() + + Antibiotics <- clean_distinct(metadata, genome_drug.antibiotic) + AntibioticClasses <- clean_distinct(metadata, drug_class) + + LabMethods <- metadata |> + dplyr::group_by(genome_drug.laboratory_typing_method) |> + dplyr::count() |> + dplyr::ungroup() + + PubMed_ids <- clean_distinct(metadata, genome_drug.pmid) + + PhenotypeCount <- metadata |> + dplyr::group_by(genome_drug.resistant_phenotype) |> + dplyr::count() |> + dplyr::ungroup() + + PhenotypebyDrugCount <- metadata |> + dplyr::group_by(genome_drug.resistant_phenotype, genome_drug.antibiotic) |> + dplyr::count() |> + dplyr::ungroup() + + ResPropbyDrug <- metadata |> + dplyr::group_by(genome_drug.antibiotic) |> + dplyr::count(genome_drug.resistant_phenotype) |> + dplyr::mutate(prop = n / sum(n)) |> + dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> + dplyr::transmute(genome_drug.antibiotic, res_prop = round(prop, 3)) |> + dplyr::ungroup() + + PhenotypebyDrugClassCount <- metadata |> + dplyr::group_by(genome.genome_id, drug_class) |> + dplyr::filter(!(any(genome_drug.resistant_phenotype == "Resistant") & + genome_drug.resistant_phenotype == "Susceptible")) |> + dplyr::ungroup() |> + dplyr::group_by(genome.genome_id, drug_class) |> + dplyr::slice_head(n = 1) |> + dplyr::ungroup() |> + dplyr::group_by(genome_drug.resistant_phenotype, drug_class) |> + dplyr::count() |> + dplyr::ungroup() + + ResPropbyDrugClass <- metadata |> + dplyr::group_by(drug_class) |> + dplyr::count(genome_drug.resistant_phenotype) |> + dplyr::mutate(prop = n / sum(n)) |> + dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> + dplyr::transmute(drug_class, res_prop = round(prop, 3)) |> + dplyr::ungroup() + + Year <- metadata |> + dplyr::distinct(genome.collection_year) |> + dplyr::filter(!is.na(genome.collection_year)) |> + dplyr::pull() |> + sort() + + YearCount <- metadata |> + dplyr::group_by(genome.collection_year) |> + dplyr::filter(!is.na(genome.collection_year)) |> + dplyr::count() |> + dplyr::ungroup() + + Country <- clean_distinct(metadata, genome.isolation_country) + CountryCount <- metadata |> + dplyr::group_by(genome.isolation_country) |> + dplyr::filter(!is.na(genome.isolation_country), genome.isolation_country != "") |> + dplyr::count() |> + dplyr::ungroup() + + Source <- clean_distinct(metadata, genome.isolation_source) + SourceCount <- metadata |> + dplyr::group_by(genome.isolation_source) |> + dplyr::filter(!is.na(genome.isolation_source), genome.isolation_source != "") |> + dplyr::count() |> + dplyr::ungroup() + + Host <- clean_distinct(metadata, genome.host_common_name) + + # Header + write_new(md_path, "# AMR summary report\n") + + # Basic stats + append_lines( + md_path, + c( + sprintf("- **Entries**: %s", TotalEntryCount[[1]]), + sprintf("- **Unique genome IDs**: %s", CleanEntryCount[[1]]), + "", + sprintf( + "- **Publications** (%d): %s", + length(PubMed_ids), + if (length(PubMed_ids)) paste(PubMed_ids, collapse = ", ") else "None" + ), + "" + ) + ) + + # Lists + append_lines( + md_path, + c( + sprintf("## Antibiotics (%d)", length(Antibiotics)), + "", + paste(Antibiotics, collapse = ", "), + "", + sprintf("## Antibiotic classes (%d)", length(AntibioticClasses)), + "", + paste(AntibioticClasses, collapse = ", "), + "" + ) + ) + + # Tables! + append_lines(md_path, c("## Phenotype counts", "", md_tbl(PhenotypeCount), "", "")) + append_lines(md_path, c("## Phenotype x antibiotic", "", md_tbl(PhenotypebyDrugCount), "", "")) + append_lines(md_path, c("## Resistant proportion per antibiotic", "", md_tbl(ResPropbyDrug), "", "")) + append_lines(md_path, c("## Phenotype x antibiotic class", "", md_tbl(PhenotypebyDrugClassCount), "", "")) + append_lines(md_path, c("## Resistant proportion per antibiotic class", "", md_tbl(ResPropbyDrugClass), "", "")) + append_lines(md_path, c("## Laboratory methods", "", md_tbl(LabMethods), "", "")) + append_lines(md_path, c("## Year counts", "", md_tbl(YearCount), "", "")) + append_lines(md_path, c("## Country counts", "", md_tbl(CountryCount), "", "")) + append_lines(md_path, c("## Isolation sources", "", md_tbl(SourceCount), "", "")) + + # Hosts as a simple list + if (length(Host)) { + append_lines(md_path, c("## Hosts", "", paste0("- ", Host), "", "")) + } +} + + +#' Write all summary plots to file(s) +#' +#' @param metadata_parquet Character. Path to the Parquet metadata file. +#' @param out_path Character. Output directory for plot files. +#' +#' @return Invisibly returns a vector of pdf file paths written. +#' @export +generatePlots <- function(metadata_parquet, + out_path) { + device <- "pdf" + if (!dir.exists(out_path)) { + dir.create(out_path, showWarnings = FALSE, recursive = TRUE) + } + + metadata <- arrow::read_parquet(normalizePath(metadata_parquet)) + + # --------- Build plots (same visuals as your generatePlots) ---------- + # 1) Phenotypes across antibiotics and time + df_year <- metadata |> + dplyr::filter(!is.na(genome.collection_year)) |> + dplyr::select( + genome.genome_id, + drug_abbr, + genome_drug.resistant_phenotype, + genome.isolation_country, + genome.collection_year + ) + + summary_year <- df_year |> + dplyr::group_by( + drug_abbr, + genome_drug.resistant_phenotype, + genome.collection_year + ) |> + dplyr::summarise(count = dplyr::n(), .groups = "drop") + + p1 <- ggplot2::ggplot( + summary_year, + ggplot2::aes( + x = genome.collection_year, + y = count, + colour = genome_drug.resistant_phenotype + ) + ) + + ggplot2::geom_line() + + ggplot2::geom_point() + + ggplot2::facet_wrap(~drug_abbr, scales = "free_y") + + ggplot2::labs( + title = "Resistant phenotypes across antibiotics and time", + x = "Year", y = "Number of isolates", + colour = "Phenotype" + ) + + ggplot2::scale_color_brewer(palette = "Pastel1") + + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "black"), + legend.position = "bottom", + axis.text.x = ggplot2::element_text(angle = 45, hjust = 1, colour = "black"), + axis.text.y = ggplot2::element_text(colour = "black"), + axis.title = ggplot2::element_text(colour = "black"), + panel.grid.minor = element_blank() + ) + + p1 + # 2) Resistance only over time + + + # 1) Levels for antibiotics (from the same subset used in p2) + abx_levels <- summary_year |> + dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> + dplyr::distinct(drug_abbr) |> + dplyr::arrange(drug_abbr) |> + dplyr::pull(drug_abbr) + + # 2) Base Okabe–Ito (CVD-friendly) and pastelizer + okabe_ito_base <- c( + "#000000", # black + "#E69F00", # orange + "#56B4E9", # sky blue + "#009E73", # bluish green + "#F0E442", # yellow + "#0072B2", # blue + "#D55E00", # vermillion + "#CC79A7" # reddish purple + ) + + okabe_ito_pastel <- function(n, lighten = 0.15) { + # Interpolate if more than 8 needed + cols <- if (n <= length(okabe_ito_base)) { + okabe_ito_base[seq_len(n)] + } else { + grDevices::colorRampPalette(okabe_ito_base)(n) + } + to_rgb <- function(hex) grDevices::col2rgb(hex) / 255 + blend_with_white <- function(hex, a = lighten) { + rgb <- to_rgb(hex) + out <- (1 - a) * rgb + a * c(1, 1, 1) + grDevices::rgb(out[1], out[2], out[3]) + } + vapply(cols, blend_with_white, character(1), a = lighten) + } + + # 3) Build a NAMED palette aligned to factor levels + pal_vals <- okabe_ito_pastel(length(abx_levels), lighten = 0.15) + pal_named <- stats::setNames(pal_vals, abx_levels) + + # 4) Plot with factor levels + named palette (no warnings, distinct colors) + p2 <- ggplot2::ggplot( + summary_year |> + dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> + dplyr::mutate( + antibiotic_fac = factor(drug_abbr, levels = abx_levels) + ), + ggplot2::aes( + x = genome.collection_year, y = count, + colour = antibiotic_fac + ) + ) + + ggplot2::geom_line() + + ggplot2::geom_point() + + ggplot2::labs( + title = "Distribution of resistance data over time", + x = "Year", + y = "Number of resistant isolates", + colour = "Antibiotic" + ) + + ggplot2::scale_color_manual(values = pal_named, drop = TRUE) + # <- named palette + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "#2D2D2D"), + legend.position = "bottom" + ) + + # 3) Time × geography × phenotype + df_country <- metadata |> + dplyr::filter(genome.isolation_country != "") |> + dplyr::select( + genome.genome_id, + drug_abbr, + genome_drug.resistant_phenotype, + genome.isolation_country, + genome.collection_year + ) + + summary_country_year <- df_country |> + dplyr::group_by( + genome.collection_year, + genome_drug.resistant_phenotype, + genome.isolation_country + ) |> + dplyr::summarise(count = dplyr::n(), .groups = "drop") + + p3 <- ggplot2::ggplot( + summary_country_year, + ggplot2::aes( + x = genome.collection_year, + y = genome.isolation_country, + size = count, + color = genome_drug.resistant_phenotype + ) + ) + + ggplot2::geom_point(alpha = 0.75) + + ggplot2::scale_size(range = c(3, 15)) + + ggplot2::scale_color_viridis_d(option = "C", begin = 0.25, end = 0.95) + + ggplot2::labs( + title = "AMR isolates across time and geography", + x = "Year", y = "Country", + size = "Count", color = "Phenotype" + ) + + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "black"), + legend.position = "right" + ) + + # 4) Phenotype proportion per antibiotic (stacked, normalized) + p4 <- ggplot2::ggplot( + metadata, + ggplot2::aes( + x = drug_abbr, + fill = genome_drug.resistant_phenotype + ) + ) + + ggplot2::geom_bar(position = "fill") + + ggplot2::coord_flip() + + ggplot2::labs( + title = "Phenotype proportion per antibiotic", + x = "Antibiotic", y = "Proportion", fill = "Phenotype" + ) + + ggplot2::scale_fill_brewer(palette = "Pastel1") + # <- keep pastel + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "black"), + legend.position = "bottom", + axis.text.x = ggplot2::element_text(angle = 45, hjust = 1) + ) + + # 5) Treemap of isolation sources + summary_isolation_source <- metadata |> + dplyr::filter(genome.isolation_source != "") |> + dplyr::group_by(genome.isolation_source, genome_drug.resistant_phenotype) |> + dplyr::summarise(count = dplyr::n(), .groups = "drop") |> + dplyr::filter(genome_drug.resistant_phenotype == "Resistant") + + p5 <- ggplot2::ggplot( + summary_isolation_source, + ggplot2::aes(area = count, fill = genome.isolation_source) + ) + + treemapify::geom_treemap() + + # treemapify::geom_treemap_text( + # ggplot2::aes(label = genome_drug.resistant_phenotype), + # color = "grey15", grow = FALSE + # ) + + treemapify::geom_treemap_text( + ggplot2::aes(label = genome.isolation_source), + color = "grey15", grow = FALSE + ) + + ggplot2::labs( + title = "Distribution of Resistant isolates by isolation source", + fill = "Isolation source" + ) + + ggplot2::scale_fill_manual( + values = colorRampPalette(RColorBrewer::brewer.pal(8, "Pastel2"))(n_distinct(summary_isolation_source$genome.isolation_source)) + ) + + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "black"), + legend.position = "none" + ) + + # 6) Histogram of resistant classes per genome + p6 <- ggplot2::ggplot(metadata, ggplot2::aes(num_resistant_classes)) + + ggplot2::geom_histogram(binwidth = 1, fill = "steelblue") + + ggplot2::labs( + title = "Distribution of resistant classes per genome", + x = "# Resistant Classes", y = "Count" + ) + + ggplot2::theme_minimal(base_size = 12) + + ggplot2::theme( + text = ggplot2::element_text(colour = "black"), + legend.position = "bottom", + axis.text = ggplot2::element_text(colour = "black"), + axis.title = ggplot2::element_text(colour = "black"), + panel.grid.minor = element_blank() + ) + + plots <- list(p1 = p1, p2 = p2, p3 = p3, p4 = p4, p5 = p5, p6 = p6) + + # --------- Write to device ---------- + paths <- character(0) + + pdf_path <- file.path(out_path, paste0("amRdata_exploratory_plots.pdf")) + grDevices::pdf(pdf_path, onefile = TRUE) + on.exit(grDevices::dev.off(), add = TRUE) + for (nm in names(plots)) { + print(plots[[nm]]) + } + paths <- c(paths, pdf_path) + + + invisible(paths) +} diff --git a/R/data_curation.R b/R/data_curation.R index 5ca98bb..1694568 100644 --- a/R/data_curation.R +++ b/R/data_curation.R @@ -45,8 +45,10 @@ } # Parse - df <- utils::read.table(text = raw_data, sep = "\t", header = TRUE, fill = TRUE, - quote = "", check.names = FALSE, comment.char = "") + df <- utils::read.table( + text = raw_data, sep = "\t", header = TRUE, fill = TRUE, + quote = "", check.names = FALSE, comment.char = "" + ) df <- tibble::as_tibble(df) |> dplyr::mutate(dplyr::across(dplyr::everything(), ~ iconv(.x, from = "", to = "UTF-8", sub = ""))) @@ -61,7 +63,7 @@ # Make sure the BV-BRC metadata live where they're supposed to .ensure_bvbrc_cache <- function(base_dir = ".", verbose = TRUE, - cache_rel = file.path("data", "bvbrc", "bvbrcData.duckdb"), + cache_rel = file.path("data", "bvbrc", "bvbrcData.duckdb"), cache_table = "bvbrc_bac_data") { base_dir <- normalizePath(base_dir, mustWork = FALSE) cache_db <- file.path(base_dir, cache_rel) @@ -115,12 +117,12 @@ base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") bvbrc_dir <- file.path(data_dir, "bvbrc") - logs_dir <- file.path(data_dir, "logs") + logs_dir <- file.path(data_dir, "logs") dir.create(bvbrc_dir, recursive = TRUE, showWarnings = FALSE) - dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) + dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) - db_path <- file.path(bvbrc_dir, "bvbrcData.duckdb") + db_path <- file.path(bvbrc_dir, "bvbrcData.duckdb") table_name <- "bvbrc_bac_data" meta_table <- "__meta" @@ -128,10 +130,10 @@ DBI::dbExecute( con, - glue::glue('CREATE TABLE IF NOT EXISTS {meta_table} ( + glue::glue("CREATE TABLE IF NOT EXISTS {meta_table} ( table_name TEXT PRIMARY KEY, last_updated TIMESTAMP - )') + )") ) # Tiny update check helper @@ -139,12 +141,14 @@ res <- tryCatch( DBI::dbGetQuery( con, - glue::glue('SELECT last_updated FROM {meta_table} - WHERE table_name = {DBI::dbQuoteString(con, table_name)}') + glue::glue("SELECT last_updated FROM {meta_table} + WHERE table_name = {DBI::dbQuoteString(con, table_name)}") ), error = function(e) NULL ) - if (is.null(res) || nrow(res) == 0L) return(NA) + if (is.null(res) || nrow(res) == 0L) { + return(NA) + } as.POSIXct(res$last_updated[[1]], origin = "1970-01-01", tz = "UTC") } @@ -277,7 +281,9 @@ #' If nothing suitable is found, throw a fit and an error. #' @keywords internal .resolveQueryValue <- function(query_type, query_value, user_bacs) { - if (!is.null(query_value) && nzchar(query_value)) return(query_value) + if (!is.null(query_value) && nzchar(query_value)) { + return(query_value) + } if (missing(user_bacs) || length(user_bacs) == 0) { stop("Provide query_value or user_bacs for the selected query_type.") } @@ -359,7 +365,7 @@ stringr::str_replace_all("\\s+", "_") |> stringr::str_replace_all("[^A-Za-z0-9._-]", "") - db_dir <- file.path(data_dir, bug_dirname) + db_dir <- file.path(data_dir, bug_dirname) dir.create(db_dir, recursive = TRUE, showWarnings = FALSE) db_file <- paste0(.generateDBname(user_bacs), ".duckdb") @@ -397,14 +403,13 @@ overwrite = FALSE, image = "danylmb/bvbrc:5.3", verbose = TRUE) { - - query_type <- match.arg(query_type) + query_type <- match.arg(query_type) query_value <- .resolveQueryValue(query_type, query_value, user_bacs) - + if (isTRUE(verbose)) { message("Querying BV-BRC: ", query_type, " == ", query_value) } - + # Count count_cmd <- paste0( "docker run --rm ", image, @@ -414,10 +419,10 @@ " --in genome_status,WGS,Complete", " --count" ) - count_lines <- tryCatch(system(count_cmd, intern = TRUE), error = function(e) character()) + count_lines <- tryCatch(system(count_cmd, intern = TRUE), error = function(e) character()) count_result <- suppressWarnings(as.integer(if (length(count_lines) >= 2) count_lines[2] else NA_integer_)) if (isTRUE(verbose) && !is.na(count_result)) message("Count returned: ", count_result) - + # Details data_cmd <- paste0( "docker run --rm ", image, @@ -429,7 +434,7 @@ ) data_raw <- tryCatch(system(data_cmd, intern = TRUE), error = function(e) character()) if (length(data_raw) == 0L) stop("BV-BRC returned no data for: ", query_type, " = ", query_value) - + data_result <- tibble::as_tibble( utils::read.table( text = data_raw, sep = "\t", header = TRUE, fill = TRUE, @@ -443,16 +448,16 @@ `genome.species` = as.character(`genome.species`), `genome.strain` = as.character(`genome.strain`) ) - + # Per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path - + con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) DBI::dbWriteTable(con, "bac_data", data_result, overwrite = TRUE) - + if (isTRUE(verbose)) message("Wrote table 'bac_data' to: ", db_path) - + list(count_result = count_result, duckdbConnection = con, table_name = "bac_data") } @@ -474,28 +479,28 @@ overwrite = FALSE, verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) - + if (isTRUE(verbose)) message("Resolving input taxa.") bac_input_data <- .retrieveCustomQuery(base_dir = base_dir, user_bacs = user_bacs) - + if (is.null(bac_input_data) || nrow(bac_input_data) == 0) { message("No valid input provided or no matches found.") return(NULL) } - + # Resolve per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path - - bac_data <- tibble::tibble() + + bac_data <- tibble::tibble() taxon_ids <- unique(bac_input_data$genome.taxon_id) - + if (isTRUE(verbose)) message("Querying BV-BRC for ", length(taxon_ids), " taxon IDs.") - + for (i in seq_along(taxon_ids)) { tax <- taxon_ids[i] if (isTRUE(verbose)) message("Taxon ", i, "/", length(taxon_ids), ": ", tax) - + res <- .getGenomeIDs( base_dir = base_dir, query_type = "taxon_id", @@ -504,22 +509,22 @@ overwrite = TRUE, verbose = verbose ) - + con <- res$duckdbConnection tbl <- res$table_name each_bac_data <- tibble::as_tibble(DBI::dbReadTable(con, tbl)) bac_data <- dplyr::bind_rows(bac_data, each_bac_data) - + # Close per-iteration connection to avoid open handles piling up try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE) } - + if (nrow(bac_data) > 0) { genome_ids <- bac_data |> dplyr::distinct(`genome.genome_id`) |> dplyr::pull(`genome.genome_id`) genome_ids <- genome_ids[!is.na(genome_ids)] - + if (length(genome_ids) > 0) { if (isTRUE(verbose)) message("Collected ", length(genome_ids), " distinct genome IDs.") return(genome_ids) @@ -578,18 +583,18 @@ verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") - tmp_dir <- file.path(data_dir, "tmp") + tmp_dir <- file.path(data_dir, "tmp") dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE) - + if (isTRUE(verbose)) { message("Preparing AMR query input for ", length(batch_genome_IDs), " genomes.") } - + docker_path <- Sys.which("docker") if (!nzchar(docker_path)) { stop("Docker is not available on your PATH but is required.") } - + # Generate genome list with p3-echo echo_args <- c( "run", "--rm", @@ -599,12 +604,12 @@ genome_ids_output <- suppressWarnings( system2("docker", args = echo_args, stdout = TRUE, stderr = TRUE) ) - + # Write a temporary file in data/tmp/ tmp_in <- tempfile(tmpdir = tmp_dir, pattern = "genome_drug_ids_", fileext = ".tsv") writeLines(genome_ids_output, con = tmp_in) tmp_in_mounted <- file.path("/data", "tmp", basename(tmp_in)) - + # Allow abx_filter to be a single string with spaces OR a vector of args abx_args <- if (length(abx_filter) == 1L) { @@ -612,7 +617,7 @@ } else { abx_filter } - + # Query drug data drug_args <- c( "run", "--rm", @@ -622,15 +627,15 @@ abx_args, "--attr", drug_fields ) - + if (isTRUE(verbose)) { message("Running AMR query.") } drug_data <- suppressWarnings(system2("docker", args = drug_args, stdout = TRUE, stderr = TRUE)) - + # Clean up after yourself try(unlink(tmp_in), silent = TRUE) - + return(drug_data) } @@ -655,18 +660,18 @@ verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") - tmp_dir <- file.path(data_dir, "tmp") + tmp_dir <- file.path(data_dir, "tmp") dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE) - + if (isTRUE(verbose)) { message("Preparing genome metadata input for ", length(batch_genome_IDs), " genomes.") } - + docker_path <- Sys.which("docker") if (!nzchar(docker_path)) { stop("Docker is not available on your PATH but is required.") } - + # Generate genome list with p3-echo echo_args <- c( "run", "--rm", @@ -676,14 +681,14 @@ genome_ids_output <- suppressWarnings( system2("docker", args = echo_args, stdout = TRUE, stderr = TRUE) ) - + tmp_in <- tempfile(tmpdir = tmp_dir, pattern = "genome_ids_", fileext = ".tsv") writeLines(genome_ids_output, con = tmp_in) tmp_in_mounted <- file.path("/data", "tmp", basename(tmp_in)) - + # Choose attributes (AMR for this pipeline) chosen_fields <- if (identical(filter_type, "AMR")) amr_fields else microtrait_fields - + get_args <- c( "run", "--rm", "-v", paste0(data_dir, ":/data"), @@ -691,15 +696,15 @@ "--input", tmp_in_mounted, "--attr", chosen_fields ) - + if (isTRUE(verbose)) { message("Running genome metadata query.") } genome_data <- suppressWarnings(system2("docker", args = get_args, stdout = TRUE, stderr = TRUE)) - + # Cleaning up try(unlink(tmp_in), silent = TRUE) - + return(genome_data) } @@ -735,8 +740,10 @@ retrieveMetadata <- function(user_bacs, base_dir <- normalizePath(base_dir, mustWork = FALSE) if (isTRUE(verbose)) message("Resolving genome IDs for user inputs.") - genome_ids <- .retrieveQueryIDs(base_dir = base_dir, user_bacs = user_bacs, - overwrite = overwrite, verbose = verbose) + genome_ids <- .retrieveQueryIDs( + base_dir = base_dir, user_bacs = user_bacs, + overwrite = overwrite, verbose = verbose + ) if (length(genome_ids) == 0) { message("No genome IDs available for the specified inputs.") return(NULL) @@ -753,8 +760,11 @@ retrieveMetadata <- function(user_bacs, "pmid,resistant_phenotype,", "source,taxon_id,testing_standard" ) - abx_filter <- if (identical(abx, "All")) "--required antibiotic" - else paste0("--in antibiotic,", paste(abx, collapse = ",")) + abx_filter <- if (identical(abx, "All")) { + "--required antibiotic" + } else { + paste0("--in antibiotic,", paste(abx, collapse = ",")) + } amr_fields <- paste0( "assembly_accession,assembly_method,", "bioproject_accession,biosample_accession,", @@ -808,7 +818,10 @@ retrieveMetadata <- function(user_bacs, ), envir = environment() ) - parallel::clusterEvalQ(cluster, { library(tibble); library(dplyr) }) + parallel::clusterEvalQ(cluster, { + library(tibble) + library(dplyr) + }) if (isTRUE(verbose)) message("Retrieving AMR phenotype data in batches.") batch_drug_data <- parallel::parLapply(cluster, genome_batches, function(batch) { @@ -822,7 +835,10 @@ retrieveMetadata <- function(user_bacs, ) }) combined_drug_data <- unlist(batch_drug_data, use.names = FALSE) - if (length(combined_drug_data) == 0) { message("No drug data returned."); return(NULL) } + if (length(combined_drug_data) == 0) { + message("No drug data returned.") + return(NULL) + } combined_drug_data_tbl <- tibble::as_tibble( utils::read.table( @@ -847,7 +863,10 @@ retrieveMetadata <- function(user_bacs, ) }) combined_genome_data <- unlist(batch_genome_data, use.names = FALSE) - if (length(combined_genome_data) == 0) { message("No genome data returned."); return(NULL) } + if (length(combined_genome_data) == 0) { + message("No genome data returned.") + return(NULL) + } combined_genome_data_tbl <- tibble::as_tibble( utils::read.table( @@ -860,13 +879,14 @@ retrieveMetadata <- function(user_bacs, dplyr::mutate(`genome.genome_id` = as.character(`genome.genome_id`)) # Per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path logs_dir <- file.path(base_dir, "data", "logs") dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) cat(sprintf("[%s] Writing metadata DuckDB: %s\n", Sys.time(), db_path), - file = file.path(logs_dir, "bvbrc.log"), append = TRUE) + file = file.path(logs_dir, "bvbrc.log"), append = TRUE + ) con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) DBI::dbWriteTable(con, "amr_phenotype", combined_drug_data_tbl, overwrite = TRUE) @@ -892,10 +912,14 @@ retrieveMetadata <- function(user_bacs, # FASTA sanitizer to ensure Panaroo compatibility with BV-BRC CLI downloads .strip_fasta_preamble <- function(fna_path) { - if (!file.exists(fna_path)) return(invisible(FALSE)) + if (!file.exists(fna_path)) { + return(invisible(FALSE)) + } txt <- readLines(fna_path, warn = FALSE) first <- which(grepl("^\\s*>", txt))[1] - if (is.na(first)) return(invisible(FALSE)) + if (is.na(first)) { + return(invisible(FALSE)) + } if (first > 1L) { txt <- txt[first:length(txt)] txt[1] <- sub("^\\ufeff", "", txt[1]) @@ -907,14 +931,20 @@ retrieveMetadata <- function(user_bacs, # GFF sanitizer to ensure Panaroo compatibility with BV-BRC CLI downloads .sanitize_gff <- function(gff_path) { - if (!file.exists(gff_path)) return(invisible(FALSE)) + if (!file.exists(gff_path)) { + return(invisible(FALSE)) + } lines <- readLines(gff_path, warn = FALSE) - if (length(lines) == 0L) return(invisible(FALSE)) + if (length(lines) == 0L) { + return(invisible(FALSE)) + } if (!grepl("^##gff-version\\s*3", lines[1])) { lines <- c("##gff-version 3", lines) } out <- vapply(lines, function(line) { - if (grepl("^#", line)) return(line) + if (grepl("^#", line)) { + return(line) + } parts <- strsplit(line, "[\t ]", perl = TRUE)[[1]] if (length(parts) >= 9) { paste(c(parts[1:8], paste(parts[9:length(parts)], collapse = " ")), collapse = "\t") @@ -943,16 +973,19 @@ retrieveMetadata <- function(user_bacs, verbose = TRUE, fallback_to_bvbrc_cache = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - db_path <- paths$db_path - + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + db_path <- paths$db_path + con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) - - on.exit({ - # Keep connection open for caller - NULL - }, add = TRUE) - + + on.exit( + { + # Keep connection open for caller + NULL + }, + add = TRUE + ) + # Happy path: metadata present -> apply AMR-aware filters if ("metadata" %in% DBI::dbListTables(con)) { if (isTRUE(verbose)) message("Loading metadata for filtering.") @@ -962,7 +995,7 @@ retrieveMetadata <- function(user_bacs, message("No data available in 'metadata'.") return(NULL) } - + # Normalize evidence labels initial_metadata <- tibble::as_tibble(initial_metadata) |> dplyr::mutate( @@ -973,13 +1006,13 @@ retrieveMetadata <- function(user_bacs, TRUE ~ `genome_drug.evidence` ) ) - + # AMR and quality filtering filtered_metadata <- initial_metadata |> dplyr::filter(`genome_drug.evidence` == "Laboratory Method") |> dplyr::filter(`genome.genome_quality` == "Good") |> dplyr::filter(`genome_drug.resistant_phenotype` %in% c("Resistant", "Susceptible", "Intermediate")) - + DBI::dbWriteTable(con, "filtered", filtered_metadata, overwrite = TRUE) if (isTRUE(verbose)) { message("Post-filter rows: ", nrow(filtered_metadata)) @@ -987,33 +1020,33 @@ retrieveMetadata <- function(user_bacs, } return(list(duckdbConnection = con, table_name = "filtered")) } - + # Attempt BV-BRC cache location if (!isTRUE(fallback_to_bvbrc_cache)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("No 'metadata' table found in ", db_path, ". Run retrieveMetadata() first.") } - + if (isTRUE(verbose)) { message("No 'metadata' in per-selection DB. Falling back to BV-BRC cache at data/bvbrc/.") } - + cache_db <- file.path(base_dir, "data", "bvbrc", "bvbrcData.duckdb") if (!file.exists(cache_db)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("BV-BRC cache not found at: ", cache_db, ". Run .updateBVBRCdata() first.") } - + con_cache <- DBI::dbConnect(duckdb::duckdb(), dbdir = cache_db) on.exit(try(DBI::dbDisconnect(con_cache, shutdown = TRUE), silent = TRUE), add = TRUE) - + if (!"bvbrc_bac_data" %in% DBI::dbListTables(con_cache)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("Table 'bvbrc_bac_data' not found in BV-BRC cache: ", cache_db) } - + bv <- tibble::as_tibble(DBI::dbReadTable(con_cache, "bvbrc_bac_data")) - + # Derive matches from user_bacs (taxon IDs or species) sel <- tibble::tibble(`genome.genome_id` = character()) for (v in user_bacs) { @@ -1029,22 +1062,22 @@ retrieveMetadata <- function(user_bacs, } } sel <- dplyr::distinct(sel) - + if (nrow(sel) == 0L) { DBI::dbDisconnect(con, shutdown = TRUE) stop("No genomes matched user_bacs in BV-BRC cache. (Cache present but no hits.)") } - + # Minimal 'filtered' for downstream steps (downloaders & genomeList) minimal_filtered <- sel # Ensure expected column name used downstream: # retrieveGenomes() reads "filtered" and expects `genome.genome_id` to exist. DBI::dbWriteTable(con, "filtered", minimal_filtered, overwrite = TRUE) - + if (isTRUE(verbose)) { message("Wrote table 'filtered' to: ", db_path) } - + list(duckdbConnection = con, table_name = "filtered") } @@ -1056,9 +1089,12 @@ retrieveMetadata <- function(user_bacs, # Prefer bash .pick_shell <- function(image) { chk <- suppressWarnings(system2("docker", - c("run", "--rm", image, "sh", "-lc", - "command -v bash >/dev/null || echo NOBASH"), - stdout = TRUE, stderr = TRUE)) + c( + "run", "--rm", image, "sh", "-lc", + "command -v bash >/dev/null || echo NOBASH" + ), + stdout = TRUE, stderr = TRUE + )) if (length(chk) && any(grepl("NOBASH", chk))) "sh" else "bash" } @@ -1105,21 +1141,28 @@ retrieveMetadata <- function(user_bacs, .ftp_download_one <- function(genomeID, out_dir, tries = 3L, min_bytes = 100) { files <- c(".fna", ".PATRIC.faa", ".PATRIC.gff") dests <- file.path(out_dir, paste0(genomeID, files)) - if (.is_complete_set(out_dir, genomeID, min_bytes)) return(TRUE) + if (.is_complete_set(out_dir, genomeID, min_bytes)) { + return(TRUE) + } get_one <- function(url, dest) { if (nzchar(Sys.which("wget"))) { res <- suppressWarnings(system2("wget", - c("-q", "-O", shQuote(dest), shQuote(url)), - stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + c("-q", "-O", shQuote(dest), shQuote(url)), + stdout = TRUE, stderr = TRUE + )) + st <- attr(res, "status") + if (is.null(st)) st <- 0L return(st == 0L && file.exists(dest) && file.info(dest)$size > min_bytes) } else if (nzchar(Sys.which("curl"))) { - curl_args <- if (startsWith(url, "ftps://")) - c("--ssl-reqd", "-L", "-o", shQuote(dest), shQuote(url)) else - c("-L", "-o", shQuote(dest), shQuote(url)) + curl_args <- if (startsWith(url, "ftps://")) { + c("--ssl-reqd", "-L", "-o", shQuote(dest), shQuote(url)) + } else { + c("-L", "-o", shQuote(dest), shQuote(url)) + } res <- suppressWarnings(system2("curl", curl_args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + st <- attr(res, "status") + if (is.null(st)) st <- 0L return(st == 0L && file.exists(dest) && file.info(dest)$size > min_bytes) } FALSE @@ -1128,13 +1171,27 @@ retrieveMetadata <- function(user_bacs, for (ext_i in seq_along(files)) { dest <- dests[ext_i] if (file.exists(dest) && file.info(dest)$size > min_bytes) next - ftps <- sprintf("ftps://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) + ftps <- sprintf("ftps://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) https <- sprintf("https://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) ok <- FALSE - for (k in 1:tries) { if (get_one(ftps, dest)) { ok <- TRUE; break } } - if (!ok) for (k in 1:2) { if (get_one(https, dest)) { ok <- TRUE; break } } - if (!ok) return(FALSE) + for (k in 1:tries) { + if (get_one(ftps, dest)) { + ok <- TRUE + break + } + } + if (!ok) { + for (k in 1:2) { + if (get_one(https, dest)) { + ok <- TRUE + break + } + } + } + if (!ok) { + return(FALSE) + } } .is_complete_set(out_dir, genomeID, min_bytes) } @@ -1143,7 +1200,7 @@ retrieveMetadata <- function(user_bacs, # p3-dump-genomes to fetch FASTA and .gto files .cli_dump_fastas_gto_chunk <- function(image, out_dir, genome_ids, tag, tries = 3L) { - out_dir <- normalizePath(out_dir, mustWork = FALSE) + out_dir <- normalizePath(out_dir, mustWork = FALSE) dir.create(out_dir, recursive = TRUE, showWarnings = FALSE) ids_file <- file.path(out_dir, paste0("ids_", tag, ".txt")) writeLines(genome_ids, ids_file) @@ -1152,12 +1209,15 @@ retrieveMetadata <- function(user_bacs, mount <- .docker_path(out_dir) # Safety against Windows-specific CRLF lines before `p3-dump-genomes` - sh_cmd <- sprintf('tr -d "\\r" < /out/%s | p3-dump-genomes --outDir /out --fasta --prot --gto -', - basename(ids_file)) + sh_cmd <- sprintf( + 'tr -d "\\r" < /out/%s | p3-dump-genomes --outDir /out --fasta --prot --gto -', + basename(ids_file) + ) - args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) + args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + st <- attr(res, "status") + if (is.null(st)) st <- 0L if (st != 0L && tries > 1L) { Sys.sleep(1) @@ -1185,57 +1245,57 @@ retrieveMetadata <- function(user_bacs, # Export GFF from GTO per genomes in each chunk .cli_export_gff_chunk <- function(image, out_dir, genome_ids, tag, tries = 3L) { - out_dir <- normalizePath(out_dir, mustWork = FALSE) + out_dir <- normalizePath(out_dir, mustWork = FALSE) ids_file <- file.path(out_dir, paste0("ids_", tag, ".txt")) writeLines(genome_ids, ids_file) - exporter <- "/usr/bin/rast-export-genome" - shell <- .pick_shell(image) - mount <- .docker_path(out_dir) + exporter <- "/usr/bin/rast-export-genome" + shell <- .pick_shell(image) + mount <- .docker_path(out_dir) stderr_file <- file.path(out_dir, paste0("gff_chunk_", tag, ".stderr.txt")) # GFFs from BV-BRC .gto export are not directly compatible in Panaroo # This block reformats the contig IDs and ensures .gff/.fna pairs work together sh_cmd <- paste0( - 'set -euo pipefail; ', - 'fail_n=0; : > /out/', basename(stderr_file), '; ', + "set -euo pipefail; ", + "fail_n=0; : > /out/", basename(stderr_file), "; ", 'while IFS= read -r b || [ -n "$b" ]; do ', ' b=${b%$\'\\r\'}; [ -n "$b" ] || continue; ', ' gto="/out/${b}.gto"; gff="/out/${b}.PATRIC.gff"; map="/out/${b}.orig2id.tsv"; ', - ' if [ ! -s "$gto" ]; then echo "MISSING_GTO $b" >>/out/', basename(stderr_file), '; continue; fi; ', + ' if [ ! -s "$gto" ]; then echo "MISSING_GTO $b" >>/out/', basename(stderr_file), "; continue; fi; ", # Export GFF ' [ -s "$gff" ] || ', exporter, ' -i "$gto" -o "$gff" gff ', - ' || { echo "EXPORT_FAIL_GFF $b" >>/out/', basename(stderr_file), '; fail_n=$((fail_n+1)); continue; }; ', + ' || { echo "EXPORT_FAIL_GFF $b" >>/out/', basename(stderr_file), "; fail_n=$((fail_n+1)); continue; }; ", # Build mapping original_id -> id, and default to id if original_id missing - ' if command -v jq >/dev/null 2>&1; then ', + " if command -v jq >/dev/null 2>&1; then ", ' jq -r \'.contigs[] | [(.original_id // .id), .id] | @tsv\' "$gto" > "$map"; ', - ' else ', + " else ", ' python3 - "$gto" > "$map" <<\'PY\'\n', - 'import sys, json\n', - 'g = json.load(open(sys.argv[1]))\n', + "import sys, json\n", + "g = json.load(open(sys.argv[1]))\n", 'for c in g.get("contigs", []):\n', ' o = c.get("original_id") or c.get("id")\n', ' i = c.get("id")\n', - ' if o and i:\n', + " if o and i:\n", ' print(f"{o}\\t{i}")\n', - 'PY\n', - ' fi; ', + "PY\n", + " fi; ", # Relabel GFF sequence IDs: original_id -> id - ' awk \'FNR==NR{m[$1]=$2; next} ', - ' /^##sequence-region/ { if ($2 in m) {$2=m[$2]} print; next } ', - ' /^#/ { print; next } ', + " awk 'FNR==NR{m[$1]=$2; next} ", + " /^##sequence-region/ { if ($2 in m) {$2=m[$2]} print; next } ", + " /^#/ { print; next } ", ' { if ($1 in m) $1=m[$1]; print }\' "$map" "$gff" > "${gff}.tmp" && mv "${gff}.tmp" "$gff"; ', - - 'done < /out/', basename(ids_file), '; ', - 'exit 0' + "done < /out/", basename(ids_file), "; ", + "exit 0" ) args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) - res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) + st <- attr(res, "status") + if (is.null(st)) st <- 0L if (st != 0L && tries > 1L) { Sys.sleep(1) @@ -1278,14 +1338,15 @@ retrieveGenomes <- function(base_dir = ".", cli_gff_workers = 4L, chunk_size = 50L, verbose = TRUE) { - - method <- match.arg(method) + method <- match.arg(method) base_dir <- normalizePath(base_dir, mustWork = FALSE) # IDs from .filterGenomes() if (isTRUE(verbose)) message("Filtering genomes before download.") f_out <- .filterGenomes(base_dir = base_dir, user_bacs = user_bacs, verbose = verbose) - if (is.null(f_out)) return(character()) + if (is.null(f_out)) { + return(character()) + } con <- f_out$duckdbConnection tbl <- f_out$table_name @@ -1294,12 +1355,12 @@ retrieveGenomes <- function(base_dir = ".", dplyr::pull(`genome.genome_id`) # Set up the paths and such - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - bug_dir <- dirname(paths$db_path) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + bug_dir <- dirname(paths$db_path) genome_path <- file.path(bug_dir, "genomes") - logs_dir <- file.path(base_dir, "data", "logs") + logs_dir <- file.path(base_dir, "data", "logs") dir.create(genome_path, recursive = TRUE, showWarnings = FALSE) - dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) + dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) # Adding support for resuming a download if (isTRUE(skip_existing)) { @@ -1310,10 +1371,12 @@ retrieveGenomes <- function(base_dir = ".", if (length(ids) == 0L) { if (isTRUE(verbose)) message("All genomes already complete.") - all_complete <- .list_complete(genome_path, - tibble::as_tibble(DBI::dbReadTable(con, tbl)) |> - dplyr::distinct(`genome.genome_id`) |> - dplyr::pull(`genome.genome_id`)) + all_complete <- .list_complete( + genome_path, + tibble::as_tibble(DBI::dbReadTable(con, tbl)) |> + dplyr::distinct(`genome.genome_id`) |> + dplyr::pull(`genome.genome_id`) + ) return(all_complete) } @@ -1322,18 +1385,23 @@ retrieveGenomes <- function(base_dir = ".", if (isTRUE(verbose)) message("Trying FTPS download. Workers=", ftp_workers) future::plan(future::multisession, workers = max(1, ftp_workers)) ft_ok <- future.apply::future_lapply(ids, function(gid) .ftp_download_one(gid, genome_path), - future.seed = TRUE) + future.seed = TRUE + ) ok_ids <- ids[unlist(ft_ok)] if (isTRUE(verbose)) message("Complete file sets for ", length(ok_ids), " genomes (FTP).") return(c(ok_ids, .list_complete(genome_path, setdiff(ids, ok_ids)))) } # CLI for FASTA, FAA, and GTO, then GFF from GTO - chunks <- split(ids, ceiling(seq_along(ids)/chunk_size)) + chunks <- split(ids, ceiling(seq_along(ids) / chunk_size)) # Parallel chunk containers - if (isTRUE(verbose)) message("CLI being run in parallel for ", length(chunks), - " data chunks.") + if (isTRUE(verbose)) { + message( + "CLI being run in parallel for ", length(chunks), + " data chunks." + ) + } future::plan(future::multisession, workers = max(1, cli_fasta_workers)) fa_res <- future.apply::future_mapply( FUN = function(vec, tag) .cli_dump_fastas_gto_chunk(image, genome_path, vec, tag), @@ -1343,8 +1411,12 @@ retrieveGenomes <- function(base_dir = ".", if (!all(fa_res) && isTRUE(verbose)) warning(sum(!fa_res), " data chunks failed.") # GFF extraction in parallel containers - if (isTRUE(verbose)) message("GFF extraction being run in parallel for ", - length(chunks), " data chunks.") + if (isTRUE(verbose)) { + message( + "GFF extraction being run in parallel for ", + length(chunks), " data chunks." + ) + } future::plan(future::multisession, workers = max(1, cli_gff_workers)) g_res <- future.apply::future_mapply( FUN = function(vec, tag) .cli_export_gff_chunk(image, genome_path, vec, tag), @@ -1355,8 +1427,12 @@ retrieveGenomes <- function(base_dir = ".", # Success set: .fna + .PATRIC.faa + .PATRIC.gff all present per isolate ok_ids <- ids[vapply(ids, .is_complete_set, logical(1), dir = genome_path)] - if (isTRUE(verbose)) message("Complete file sets downloaded for ", - length(ok_ids), " genomes.") + if (isTRUE(verbose)) { + message( + "Complete file sets downloaded for ", + length(ok_ids), " genomes." + ) + } ok_ids } @@ -1375,15 +1451,14 @@ retrieveGenomes <- function(base_dir = ".", genomeList <- function(base_dir = ".", user_bacs, verbose = TRUE) { - base_dir <- normalizePath(base_dir, mustWork = FALSE) - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - db_path <- paths$db_path - bug_dir <- dirname(db_path) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + db_path <- paths$db_path + bug_dir <- dirname(db_path) genome_path <- file.path(bug_dir, "genomes") - files_all <- list.files(genome_path, full.names = TRUE) - files_all <- files_all[file.info(files_all)$size > 100] + files_all <- list.files(genome_path, full.names = TRUE) + files_all <- files_all[file.info(files_all)$size > 100] # Separate by type gff_files <- files_all[grepl("\\.PATRIC\\.gff$", files_all)] @@ -1402,14 +1477,18 @@ genomeList <- function(base_dir = ".", faa_path <- file.path(genome_path, paste0(genomeID, ".PATRIC.faa")) data.frame( - genome_id = genomeID, - gff_path = if (file.exists(gff_path) && file.info(gff_path)$size > 100) gff_path else NA, - fna_path = if (file.exists(fna_path) && file.info(fna_path)$size > 100) fna_path else NA, - faa_path = if (file.exists(faa_path) && file.info(faa_path)$size > 100) faa_path else NA, + genome_id = genomeID, + gff_path = if (file.exists(gff_path) && file.info(gff_path)$size > 100) gff_path else NA, + fna_path = if (file.exists(fna_path) && file.info(fna_path)$size > 100) fna_path else NA, + faa_path = if (file.exists(faa_path) && file.info(faa_path)$size > 100) faa_path else NA, panaroo_input = if ( file.exists(gff_path) && file.exists(fna_path) && file.exists(faa_path) && - file.info(gff_path)$size > 100 && file.info(fna_path)$size > 100 && file.info(faa_path)$size > 100 - ) paste(gff_path, fna_path) else NA, + file.info(gff_path)$size > 100 && file.info(fna_path)$size > 100 && file.info(faa_path)$size > 100 + ) { + paste(gff_path, fna_path) + } else { + NA + }, stringsAsFactors = FALSE ) }) @@ -1468,7 +1547,7 @@ prepareGenomes <- function(user_bacs, method = c("ftp", "cli"), overwrite = FALSE, verbose = TRUE) { - method <- match.arg(method) + method <- match.arg(method) base_dir <- normalizePath(base_dir, mustWork = FALSE) # Ensure the BV-BRC metadata cache exists, fetch if missing diff --git a/R/data_processing.R b/R/data_processing.R index 76b4dca..d9ff231 100644 --- a/R/data_processing.R +++ b/R/data_processing.R @@ -13,7 +13,7 @@ # Map host paths under mounted root to container path #' .to_container() #' -#' Used for OS-agnostic mapping of Docker directories and mount paths +#' Used for OS-agnostic mapping of Docker directories and mount paths #' #' @keywords internal #' @examples NULL @@ -26,7 +26,7 @@ # Launch Panaroo to build a pangenome (per batch) #' processPanaroo() -#' +#' #' See Panaroo's documentation for details on how the parameters affect your #' pangenome output: https://gthlab.au/panaroo/#/gettingstarted/params #' @@ -51,12 +51,12 @@ panaroo_threads_per_job) { output_path <- .docker_path(output_path) dir.create(output_path, recursive = TRUE, showWarnings = FALSE) - + # Fail fast if Docker is missing if (!nzchar(Sys.which("docker"))) { stop("Docker is not available on your PATH but is required to run Panaroo.") } - + # Host mount root = bug directory mount_host <- output_path mount_cont <- "/work" @@ -81,7 +81,7 @@ # Convert to container-visible paths genome_filepath_cont <- .to_container(genome_filepath_host, host_root = mount_host, container_root = mount_cont) - output_dir_cont <- .to_container(output_dir_host, host_root = mount_host, container_root = mount_cont) + output_dir_cont <- .to_container(output_dir_host, host_root = mount_host, container_root = mount_cont) # Run Panaroo in Docker cmd_args <- c( @@ -99,14 +99,17 @@ "--remove-invalid-genes", "--core_threshold", as.character(core_threshold), "--len_dif_percent", as.character(len_dif_percent), - "--threshold", as.character(cluster_threshold), - "-f", as.character(family_seq_identity), - "-t", as.character(panaroo_threads_per_job) + "--threshold", as.character(cluster_threshold), + "-f", as.character(family_seq_identity), + "-t", as.character(panaroo_threads_per_job) ) - res <- tryCatch({ - system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) - }, error = function(e) e) + res <- tryCatch( + { + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) + }, + error = function(e) e + ) if (inherits(res, "error")) { stop(sprintf("Docker/Panaroo failed to launch: %s", res$message)) @@ -179,7 +182,6 @@ family_seq_identity = 0.5, threads = 8, split_jobs = FALSE) { - duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) @@ -188,17 +190,17 @@ output_path <- dirname(duckdb_path) } output_path <- normalizePath(output_path) - + genome_query_output <- DBI::dbReadTable(con, "files") - + panaroo_input_files <- genome_query_output |> dplyr::pull(panaroo_input) - + # Drop true NAs panaroo_input_files <- panaroo_input_files[!is.na(panaroo_input_files)] - + split_files <- strsplit(panaroo_input_files, " ") - + # Plan for filtering .with_future_plan(workers = threads) valid_entries <- furrr::future_map(split_files, function(paths) { @@ -213,7 +215,7 @@ filtered_panaroo_input <- sapply(split_files[unlist(valid_entries)], paste, collapse = " ") total_lines <- length(filtered_panaroo_input) - batch_size <- if (isTRUE(split_jobs)) ceiling(total_lines / 5) else total_lines + batch_size <- if (isTRUE(split_jobs)) ceiling(total_lines / 5) else total_lines panaroo_batches <- split(filtered_panaroo_input, ceiling(seq_along(filtered_panaroo_input) / batch_size)) n_jobs <- length(panaroo_batches) @@ -256,39 +258,38 @@ #' @param cluster_threshold Numeric. Sequence identity threshold (`--threshold`). Default `0.95`. #' @param family_seq_identity Numeric. Gene family clustering identity (`-f`). Default `0.5`. #' @param threads Integer. Number of threads for Panaroo and parallel execution. Default `8`. -#' +#' #' @returns A a single combined pangenome. -#' +#' #' @keywords internal .mergePanaroo <- function(input_path, core_threshold = 0.90, len_dif_percent = 0.95, cluster_threshold = 0.95, family_seq_identity = 0.5, - threads = 8){ - + threads = 8) { input_path <- .docker_path(input_path) - + # Fail fast if Docker is missing if (!nzchar(Sys.which("docker"))) { stop("Docker is not available on your PATH but is required to run panaroo-merge.") } - + merge_dir <- file.path(input_path, "merge_output") dir.create(merge_dir, recursive = TRUE, showWarnings = FALSE) - + all_dirs <- list.dirs(input_path, recursive = FALSE, full.names = TRUE) all_dirs <- all_dirs[grepl("^panaroo_out_", basename(all_dirs))] - + valid_dirs <- all_dirs[file.exists(file.path(all_dirs, "final_graph.gml"))] - + if (length(valid_dirs) > 1) { mount_host <- input_path mount_cont <- "/work" - + # Provide each dir as a separate argv token after "-d" dir_args <- as.vector(t(.to_container(valid_dirs, host_root = mount_host, container_root = mount_cont))) - + cmd_args <- c( "run", "--platform", "linux/amd64", @@ -302,11 +303,11 @@ "--merge_paralogs", "--core_threshold", as.character(core_threshold), "--len_dif_percent", as.character(len_dif_percent), - "--threshold", as.character(cluster_threshold), - "-f", as.character(family_seq_identity), - "-t", as.character(threads) + "--threshold", as.character(cluster_threshold), + "-f", as.character(family_seq_identity), + "-t", as.character(threads) ) - + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) } else { stop("No valid Panaroo batch directories found (need >= 2 with final_graph.gml).") @@ -314,7 +315,6 @@ } - #' Load Panaroo gene presence/absence table into DuckDB #' #' Reads `gene_presence_absence.csv` and constructs a genome-by-gene count @@ -326,13 +326,13 @@ #' @return A tibble containing the gene count matrix. #' #' @keywords internal -.panaroo2geneTable <- function(panaroo_output_path, duckdb_path){ +.panaroo2geneTable <- function(panaroo_output_path, duckdb_path) { filepath <- file.path(normalizePath(panaroo_output_path), "gene_presence_absence.csv") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_count <- read.table(filepath, sep=",", header=TRUE, fill=TRUE, quote="") |> + gene_count <- read.table(filepath, sep = ",", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> dplyr::select(-c(Non.unique.Gene.name, Annotation)) |> tidyr::pivot_longer(cols = -1) |> @@ -356,13 +356,13 @@ #' @return A tibble with `Gene` and `Annotation` columns. #' #' @keywords internal -.panaroo2geneNames <- function(panaroo_output_path, duckdb_path){ +.panaroo2geneNames <- function(panaroo_output_path, duckdb_path) { filepath <- file.path(normalizePath(panaroo_output_path), "gene_presence_absence.csv") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_names <- read.table(filepath, sep=",", header=TRUE, fill=TRUE, quote="") |> + gene_names <- read.table(filepath, sep = ",", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> dplyr::select(c(Gene, Annotation)) @@ -381,15 +381,15 @@ #' @return A tibble containing the struct matrix. #' #' @keywords internal -.panaroo2StructTable <- function(panaroo_output_path, duckdb_path){ +.panaroo2StructTable <- function(panaroo_output_path, duckdb_path) { struct_filepath <- file.path(normalizePath(panaroo_output_path), "struct_presence_absence.Rtab") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_struct <- read.table(struct_filepath, sep="\t", header=TRUE, fill=TRUE, quote="") |> + gene_struct <- read.table(struct_filepath, sep = "\t", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> - tidyr::pivot_longer(cols= -1) |> + tidyr::pivot_longer(cols = -1) |> tidyr::pivot_wider(names_from = Gene, values_from = value) |> dplyr::rename("genome_id" = "name") |> dplyr::mutate(genome_id = stringr::str_replace_all(genome_id, c("^X" = "", "\\.PATRIC$" = ""))) @@ -409,7 +409,7 @@ #' @return Invisibly returns TRUE. #' #' @keywords internal -.panaroo2OtherTables <- function(panaroo_output_path, duckdb_path){ +.panaroo2OtherTables <- function(panaroo_output_path, duckdb_path) { panaroo_output_path <- normalizePath(panaroo_output_path) duckdb_path <- normalizePath(duckdb_path) fasta_filepath <- file.path(panaroo_output_path, "pan_genome_reference.fa") @@ -418,15 +418,19 @@ gene_fasta <- Biostrings::readDNAStringSet(filepath = fasta_filepath) DBI::dbWriteTable(con, "gene_ref_seq", - tibble::tibble(name = names(gene_fasta), - sequence = as.character(gene_fasta)), - overwrite = TRUE) + tibble::tibble( + name = names(gene_fasta), + sequence = as.character(gene_fasta) + ), + overwrite = TRUE + ) readr::read_csv(file.path(panaroo_output_path, "gene_presence_absence.csv")) |> dplyr::select(-`Non-unique Gene name`) |> tidyr::pivot_longer(-c("Gene", "Annotation"), - names_to = "genome_ids", - values_to = "protein_ids") |> + names_to = "genome_ids", + values_to = "protein_ids" + ) |> dplyr::mutate(genome_ids = gsub(".PATRIC", "", genome_ids)) |> dplyr::select(genome_ids, Gene, protein_ids) |> dplyr::distinct() |> @@ -447,9 +451,9 @@ #' @return Invisibly returns TRUE. #' #' @keywords internal -.panaroo2duckdb <- function(panaroo_output_path, duckdb_path){ +.panaroo2duckdb <- function(panaroo_output_path, duckdb_path) { panaroo_output_path <- normalizePath(panaroo_output_path) - duckdb_path <- normalizePath(duckdb_path) + duckdb_path <- normalizePath(duckdb_path) .panaroo2geneTable(panaroo_output_path, duckdb_path) .panaroo2geneNames(panaroo_output_path, duckdb_path) @@ -484,7 +488,6 @@ threads = 0, memory = 0, extra_args = c("-g", "1")) { - # Fail fast if Docker is missing if (!nzchar(Sys.which("docker"))) { stop("Docker is not available on your PATH but is required to run CD-HIT.") @@ -530,7 +533,7 @@ "weizhongli1987/cdhit:4.8.1", "cd-hit", "-i", .to_container(cdhit_input_faa, mount_host, mount_cont), - "-o", .to_container(clustered_faa, mount_host, mount_cont), + "-o", .to_container(clustered_faa, mount_host, mount_cont), "-c", as.character(identity), "-n", as.character(word_length), "-T", as.character(threads), @@ -540,21 +543,26 @@ ) message("Running cd-hit via Docker...") - output <- tryCatch({ - system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) - }, error = function(e) { - stop("cd-hit execution failed: ", e$message) - }) - + output <- tryCatch( + { + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) + }, + error = function(e) { + stop("cd-hit execution failed: ", e$message) + } + ) + if (!file.exists(clustered_faa)) { stop("cd-hit failed: output file not found. Check stderr:\n", paste(output, collapse = "\n")) } # Ensure .clstr exists (used downstream) if (!file.exists(paste0(clustered_faa, ".clstr"))) { - stop("cd-hit did not produce the expected .clstr file at: ", paste0(clustered_faa, ".clstr"), - "\nFull output:\n", paste(output, collapse = "\n")) + stop( + "cd-hit did not produce the expected .clstr file at: ", paste0(clustered_faa, ".clstr"), + "\nFull output:\n", paste(output, collapse = "\n") + ) } - + message("cd-hit completed successfully.") list( cdhit_input_faa = cdhit_input_faa, @@ -568,7 +576,7 @@ #' `runPanaroo2Duckdb()` executes Panaroo on the genomes registered in a #' per-selection DuckDB (created earlier by `prepareGenomes()`), optionally in #' multiple batches, and imports all resulting pangenome tables into the same -#' DuckDB database. +#' DuckDB database. #' #' It acts as a high-level wrapper around: #' * **`.runPanaroo()`** — runs Panaroo (single or multi-batch) @@ -633,10 +641,10 @@ #' and modeling steps in `amRdata` and `amRml`. #' #' @seealso -#' * `.runPanaroo()` — core Panaroo execution -#' * `.mergePanaroo()` — merge multiple Panaroo batches -#' * `.panaroo2duckdb()` — import Panaroo results into DuckDB -#' * [runDataProcessing()] — full pipeline including CD-HIT & InterProScan +#' * `.runPanaroo()` — core Panaroo execution +#' * `.mergePanaroo()` — merge multiple Panaroo batches +#' * `.panaroo2duckdb()` — import Panaroo results into DuckDB +#' * [runDataProcessing()] — full pipeline including CD-HIT & InterProScan #' #' @examples #' \dontrun{ @@ -672,14 +680,14 @@ runPanaroo2Duckdb <- function(duckdb_path, if (isTRUE(verbose)) message("Launching Panaroo.") .runPanaroo( - duckdb_path = duckdb_path, - output_path = out_dir, - core_threshold = core_threshold, - len_dif_percent = len_dif_percent, + duckdb_path = duckdb_path, + output_path = out_dir, + core_threshold = core_threshold, + len_dif_percent = len_dif_percent, cluster_threshold = cluster_threshold, family_seq_identity = family_seq_identity, - threads = threads, - split_jobs = split_jobs + threads = threads, + split_jobs = split_jobs ) # Identify Panaroo outputs that contain a final_graph.gml file @@ -730,17 +738,19 @@ runPanaroo2Duckdb <- function(duckdb_path, .parseProteinClusters <- function(clustered_faa) { clstr <- paste0(clustered_faa, ".clstr") if (!file.exists(clstr)) { - stop("CD-HIT cluster file not found: ", clstr, - "\nEnsure .runCDHIT() completed successfully and produced the .clstr file.") + stop( + "CD-HIT cluster file not found: ", clstr, + "\nEnsure .runCDHIT() completed successfully and produced the .clstr file." + ) } - + lines <- data.table::fread(clstr, sep = "\n", header = FALSE)$V1 cluster_ids <- grep("^>Cluster", lines) cluster_map <- data.table::data.table() for (i in seq_along(cluster_ids)) { start <- cluster_ids[i] + 1 - end <- if (i < length(cluster_ids)) cluster_ids[i + 1] - 1 else length(lines) + end <- if (i < length(cluster_ids)) cluster_ids[i + 1] - 1 else length(lines) cluster_lines <- lines[start:end] # This finds the reference cluster ID and names the cluster with it @@ -752,8 +762,10 @@ runPanaroo2Duckdb <- function(duckdb_path, } # Pull genome IDs - genome_matches <- stringr::str_match(cluster_lines, - "fig\\|([0-9]+\\.[0-9]+)\\.peg\\.[0-9]+")[, 2] + genome_matches <- stringr::str_match( + cluster_lines, + "fig\\|([0-9]+\\.[0-9]+)\\.peg\\.[0-9]+" + )[, 2] genome_matches <- genome_matches[!is.na(genome_matches)] if (length(genome_matches) > 0) { @@ -810,9 +822,9 @@ buildMatrices <- function(cluster_map) .buildProtMatrices(cluster_map) names_faa <- names(cdhit_output_faa) |> tibble::as_tibble() |> dplyr::mutate( - proteinID = stringr::str_extract(value, "^fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), - locus_tag = stringr::str_match(value, "peg\\.[0-9]+\\|([^\\s]+)")[, 2], - proteinName= stringr::str_trim(stringr::str_match(value, "\\|[^\\s]+\\s+(.*?)\\s+\\[")[, 2]) + proteinID = stringr::str_extract(value, "^fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), + locus_tag = stringr::str_match(value, "peg\\.[0-9]+\\|([^\\s]+)")[, 2], + proteinName = stringr::str_trim(stringr::str_match(value, "\\|[^\\s]+\\s+(.*?)\\s+\\[")[, 2]) ) |> dplyr::select(-value) @@ -828,47 +840,47 @@ CDHIT2duckdb <- function(duckdb_path, word_length = 5, threads = 0, memory = 0, - extra_args = c("-g", "1")){ - + extra_args = c("-g", "1")) { duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) if (missing(output_path) || output_path %in% c(".", "results", "results/")) { - output_path <- dirname(duckdb_path) # e.g., ./results/ + output_path <- dirname(duckdb_path) # e.g., ./results/ } output_path <- normalizePath(output_path) cdhit_outputs <- .runCDHIT(duckdb_path, - output_path, - output_prefix = output_prefix, - identity = identity, - word_length = word_length, - threads = threads, - memory = memory, - extra_args = extra_args) - - cluster_map <- .parseProteinClusters(cdhit_outputs$clustered_faa) + output_path, + output_prefix = output_prefix, + identity = identity, + word_length = word_length, + threads = threads, + memory = memory, + extra_args = extra_args + ) + + cluster_map <- .parseProteinClusters(cdhit_outputs$clustered_faa) cluster_count <- .buildProtMatrices(cluster_map) DBI::dbWriteTable(con, "protein_count", cluster_count, overwrite = TRUE) cluster_fasta <- cdhit_outputs$cdhit_input_faa - cluster_name <- .clusterNames(cluster_map, cluster_fasta) + cluster_name <- .clusterNames(cluster_map, cluster_fasta) DBI::dbWriteTable(con, "protein_names", cluster_name, overwrite = TRUE) clustered_faa <- Biostrings::readAAStringSet(cdhit_outputs$clustered_faa) DBI::dbWriteTable(con, "protein_cluster_seq", - tibble::tibble( - name = names(clustered_faa) |> stringr::str_extract("fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), - sequence = as.character(clustered_faa) - ), - overwrite = TRUE) + tibble::tibble( + name = names(clustered_faa) |> stringr::str_extract("fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), + sequence = as.character(clustered_faa) + ), + overwrite = TRUE + ) invisible(TRUE) } - #' Check or install InterProScan data bundle #' #' Ensures that the InterProScan data directory exists locally, downloading @@ -885,51 +897,55 @@ CDHIT2duckdb <- function(duckdb_path, #' #' @keywords internal .checkInterProData <- function( - version = "5.76-107.0", - dest_dir = "inst/extdata/interpro", - docker_image = sprintf("interpro/interproscan:%s", version), - platform = "linux/amd64", - curl_bin = "curl", - verbose = TRUE + version = "5.76-107.0", + dest_dir = "inst/extdata/interpro", + docker_image = sprintf("interpro/interproscan:%s", version), + platform = "linux/amd64", + curl_bin = "curl", + verbose = TRUE ) { msg <- function(...) if (verbose) message(sprintf(...)) - + if (!dir.exists(dest_dir)) dir.create(dest_dir, recursive = TRUE, showWarnings = FALSE) dest_dir <- normalizePath(dest_dir, mustWork = TRUE) - + root_dir <- file.path(dest_dir, sprintf("interproscan-%s", version)) data_dir <- file.path(root_dir, "data") - + # Simple existence check if (dir.exists(data_dir) && length(list.files(data_dir, recursive = TRUE)) > 0) { msg("InterProScan data already present at: %s", data_dir) return(list(data_dir = normalizePath(data_dir), ready = TRUE)) } - + # Download bundle if needed - tar_url <- sprintf("http://ftp.ebi.ac.uk/pub/software/unix/iprscan/5/%s/alt/interproscan-data-%s.tar.gz", - version, version) - md5_url <- paste0(tar_url, ".md5") + tar_url <- sprintf( + "http://ftp.ebi.ac.uk/pub/software/unix/iprscan/5/%s/alt/interproscan-data-%s.tar.gz", + version, version + ) + md5_url <- paste0(tar_url, ".md5") tar_path <- file.path(dest_dir, basename(tar_url)) md5_path <- paste0(tar_path, ".md5") - + if (!file.exists(tar_path)) { msg("Downloading InterProScan data bundle.") - status_tar <- system2(curl_bin, c("-L", "-o", tar_path, tar_url)) - status_md5 <- system2(curl_bin, c("-L", "-o", md5_path, md5_url)) - if (status_tar != 0 || status_md5 != 0) + status_tar <- system2(curl_bin, c("-L", "-o", tar_path, tar_url)) + status_md5 <- system2(curl_bin, c("-L", "-o", md5_path, md5_url)) + if (status_tar != 0 || status_md5 != 0) { stop("Failed to download InterProScan data bundle.") + } } - + msg("Verifying MD5 checksum.") md5_expected <- sub("\\s+.*$", "", readLines(md5_path)[1]) - md5_actual <- tools::md5sum(tar_path)[[1]] - if (!identical(tolower(md5_expected), tolower(md5_actual))) + md5_actual <- tools::md5sum(tar_path)[[1]] + if (!identical(tolower(md5_expected), tolower(md5_actual))) { stop("MD5 checksum mismatch for InterProScan data bundle.") - + } + msg("Extracting InterProScan data bundle.") utils::untar(tar_path, exdir = dest_dir, tar = "internal") - + msg("Data unpacked successfully.") return(list(data_dir = normalizePath(data_dir), ready = TRUE)) } @@ -946,9 +962,11 @@ CDHIT2duckdb <- function(duckdb_path, #' #' @keywords internal .getDfIPRColNames <- function() { - c("AccNum", "SeqMD5Digest", "SLength", "Analysis", + c( + "AccNum", "SeqMD5Digest", "SLength", "Analysis", "DB.ID", "SignDesc", "StartLoc", "StopLoc", "Score", - "Status", "RunDate", "IPRAcc", "IPRDesc", "placeholder") + "Status", "RunDate", "IPRAcc", "IPRDesc", "placeholder" + ) } #' Internal helpers for reading InterProScan TSV outputs @@ -993,8 +1011,9 @@ CDHIT2duckdb <- function(duckdb_path, #' @keywords internal .readIPRscanTsv <- function(filepath) { readr::read_tsv(filepath, - col_types = .getDfIPRColTypes(), - col_names = .getDfIPRColNames()) + col_types = .getDfIPRColTypes(), + col_names = .getDfIPRColNames() + ) } @@ -1026,7 +1045,7 @@ CDHIT2duckdb <- function(duckdb_path, file_format, docker_image = sprintf("interpro/interproscan:%s", "5.76-107.0")) { # Normalize and mount paths - path <- .docker_path(path) + path <- .docker_path(path) bind_data <- .docker_path(ipr_data_path) dir.create(file.path(path, "tmp", "iprscan"), recursive = TRUE, showWarnings = FALSE) @@ -1050,39 +1069,42 @@ CDHIT2duckdb <- function(duckdb_path, "-v", paste0(bind_data, ":/opt/interproscan/data"), "-w", "/work", docker_image, - "--input", .to_container(temp_fasta_file, path, "/work"), + "--input", .to_container(temp_fasta_file, path, "/work"), "--cpu", as.character(threads), "-f", file_format, "--appl", appl_str, "-b", chunk_out_file_base_cont ) - - status <- tryCatch({ - system2( - "docker", - args = c( - "run", - "--rm", - "--platform", "linux/amd64", # force amd64 for ARM hosts - "-v", paste0(path, ":", "/work"), - "-v", paste0(bind_data, ":/opt/interproscan/data"), - "-w", "/work", - docker_image, - "--input", .to_container(temp_fasta_file, path, "/work"), - "--cpu", as.character(threads), - "-f", file_format, - "--appl", appl_str, - "-b", chunk_out_file_base_cont - ), - stdout = TRUE, - stderr = TRUE - ) - }, error = function(e) { - stop(sprintf("InterProScan execution failed for chunk %d: %s", chunk_id, e$message)) - }) - out_tsv <- paste0(chunk_out_file_base_host, ".tsv") + status <- tryCatch( + { + system2( + "docker", + args = c( + "run", + "--rm", + "--platform", "linux/amd64", # force amd64 for ARM hosts + "-v", paste0(path, ":", "/work"), + "-v", paste0(bind_data, ":/opt/interproscan/data"), + "-w", "/work", + docker_image, + "--input", .to_container(temp_fasta_file, path, "/work"), + "--cpu", as.character(threads), + "-f", file_format, + "--appl", appl_str, + "-b", chunk_out_file_base_cont + ), + stdout = TRUE, + stderr = TRUE + ) + }, + error = function(e) { + stop(sprintf("InterProScan execution failed for chunk %d: %s", chunk_id, e$message)) + } + ) + + out_tsv <- paste0(chunk_out_file_base_host, ".tsv") out_tsvgz <- paste0(chunk_out_file_base_host, ".tsv.gz") if (file.exists(out_tsv)) { @@ -1090,8 +1112,10 @@ CDHIT2duckdb <- function(duckdb_path, } else if (file.exists(out_tsvgz)) { return(out_tsvgz) } else { - stop(sprintf("InterProScan produced no output for chunk %d. Checked: %s and %s.\nLast message:\n%s", - chunk_id, out_tsv, out_tsvgz, paste(status, collapse = "\n"))) + stop(sprintf( + "InterProScan produced no output for chunk %d. Checked: %s and %s.\nLast message:\n%s", + chunk_id, out_tsv, out_tsvgz, paste(status, collapse = "\n") + )) } } @@ -1100,22 +1124,21 @@ domainFromIPR <- function(duckdb_path, path, out_file_base = "iprscan", appl = c("Pfam"), - ipr_version = "5.76-107.0", + ipr_version = "5.76-107.0", ipr_dest_dir = "inst/extdata/interpro", ipr_platform = "linux/amd64", auto_prepare_data = TRUE, threads = 8, file_format = "TSV", docker_repo = "interpro/interproscan") { - duckdb_path <- normalizePath(duckdb_path) if (missing(path) || path %in% c(".", "results", "results/")) { path <- dirname(duckdb_path) } path <- normalizePath(path) - + ipr_image <- sprintf("%s:%s", docker_repo, ipr_version) - + # Prepare data if needed ipr_info <- if (isTRUE(auto_prepare_data)) { .checkInterProData( @@ -1126,35 +1149,38 @@ domainFromIPR <- function(duckdb_path, verbose = TRUE ) } else { - list(data_dir = file.path(ipr_dest_dir, sprintf("interproscan-%s", ipr_version), "data"), - ready = NA) + list( + data_dir = file.path(ipr_dest_dir, sprintf("interproscan-%s", ipr_version), "data"), + ready = NA + ) } ipr_data_path <- ipr_info$data_dir - + # Pull image once try(suppressWarnings(system2("docker", args = c("pull", ipr_image))), silent = TRUE) - + con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - + sequences_df <- dplyr::tbl(con, "protein_cluster_seq") |> tibble::as_tibble() - if (nrow(sequences_df) == 0L) + if (nrow(sequences_df) == 0L) { stop("No sequences found in 'protein_cluster_seq'. Please run CDHIT2duckdb() first.") - + } + # Chunking for parallel (not currently implemented due to memory limits) - chunks <- list(sequences_df) # Force 1 chunk for RAM limits - + chunks <- list(sequences_df) # Force 1 chunk for RAM limits + # Forcing 1 container operation for RAM limits workers <- 1 cpu_per_container <- threads - + message(sprintf( "InterPro: running in single-container mode with %d CPU(s).", cpu_per_container )) - + .with_future_plan(workers = 1) - + results <- future.apply::future_lapply(seq_along(chunks), function(i) { res <- try( .process_chunk( @@ -1176,55 +1202,60 @@ domainFromIPR <- function(duckdb_path, } res }) - + # Combine results tsvs <- Filter(function(x) !is.null(x) && file.exists(x), results) - if (length(tsvs) == 0L) + if (length(tsvs) == 0L) { stop("InterProScan produced no usable outputs. Check Docker logs above.") - + } + df_iprscan <- do.call(rbind, lapply(tsvs, .readIPRscanTsv)) - + # Load processed tables (unchanged) DBI::dbWriteTable(con, "domain_names", - df_iprscan |> - dplyr::select(AccNum, DB.ID, SignDesc, IPRAcc, IPRDesc, StartLoc, StopLoc), - overwrite = TRUE) - + df_iprscan |> + dplyr::select(AccNum, DB.ID, SignDesc, IPRAcc, IPRDesc, StartLoc, StopLoc), + overwrite = TRUE + ) + df_protein_domain_pa <- df_iprscan |> dplyr::select(AccNum, DB.ID, IPRAcc, placeholder) |> dplyr::mutate(domain_ID = stringr::str_glue("{DB.ID}_{IPRAcc}")) |> dplyr::distinct() |> dplyr::mutate(placeholder = stringr::str_replace_all(placeholder, "-", "1")) |> - tidyr::pivot_wider(id_cols = AccNum, names_from = domain_ID, values_from = placeholder, - values_fill = "0") |> + tidyr::pivot_wider( + id_cols = AccNum, names_from = domain_ID, values_from = placeholder, + values_fill = "0" + ) |> dplyr::group_by(AccNum) |> dplyr::summarize(across(everything(), ~ ifelse(any(. == "1"), "1", "0")), .groups = "drop") |> dplyr::mutate(across(-AccNum, as.numeric)) - + protein_filter <- dplyr::tbl(con, "protein_count") |> tibble::as_tibble() accs <- unique(df_protein_domain_pa$AccNum) accs_in_matrix <- intersect(accs, colnames(protein_filter)) - if (length(accs_in_matrix) == 0L) + if (length(accs_in_matrix) == 0L) { stop("No InterPro accessions match protein_count columns.") - + } + protein_filter <- protein_filter |> dplyr::select(genome_id, dplyr::all_of(accs_in_matrix)) df_protein_domain_pa <- df_protein_domain_pa |> dplyr::filter(AccNum %in% accs_in_matrix) |> dplyr::arrange(match(AccNum, accs_in_matrix)) - + domain_count <- as.matrix(protein_filter |> dplyr::select(-genome_id)) %*% as.matrix(df_protein_domain_pa |> dplyr::select(-AccNum)) |> tibble::as_tibble() |> dplyr::mutate(genome_id = protein_filter |> dplyr::pull(genome_id)) |> dplyr::relocate(genome_id, .before = dplyr::everything()) - + DBI::dbWriteTable(conn = con, name = "domain_count", domain_count, overwrite = TRUE) invisible(TRUE) } # Clean BV-BRC metadata, then save as Parquet files -cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ - duckdb_path <- normalizePath(duckdb_path) +cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/") { + duckdb_path <- normalizePath(duckdb_path) # If no explicit path is provided (or a generic one), choose results// when # the DuckDB lives under data//, or else fall back to the DuckDB directory. if (missing(path) || path %in% c(".", "results", "results/")) { @@ -1247,20 +1278,22 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ clean_drug <- readr::read_tsv(file.path(ref_file_path, "clean_drug.tsv")) drug_class <- readr::read_tsv(file.path(ref_file_path, "drug_class.tsv")) - drug_abbr <- readr::read_tsv(file.path(ref_file_path, "drug_abbr.tsv")) + drug_abbr <- readr::read_tsv(file.path(ref_file_path, "drug_abbr.tsv")) class_abbr <- readr::read_tsv(file.path(ref_file_path, "class_abbr.tsv")) clean_countries <- readr::read_tsv(file.path(ref_file_path, "cleaned_bvbrc_countries.tsv")) |> - dplyr::select("raw_entry", "clean_name", "short_name")|> + dplyr::select("raw_entry", "clean_name", "short_name") |> dplyr::distinct() dplyr::tbl(con, "filtered") |> tibble::as_tibble() |> - dplyr::select("genome_drug.genome_id", "genome_drug.antibiotic", - "genome_drug.genome_name", "genome_drug.laboratory_typing_method", - "genome_drug.resistant_phenotype", "genome_drug.taxon_id", - "genome_drug.pmid", "genome.collection_year", - "genome.isolation_country", "genome.host_common_name", - "genome.isolation_source", "genome.species") |> + dplyr::select( + "genome_drug.genome_id", "genome_drug.antibiotic", + "genome_drug.genome_name", "genome_drug.laboratory_typing_method", + "genome_drug.resistant_phenotype", "genome_drug.taxon_id", + "genome_drug.pmid", "genome.collection_year", + "genome.isolation_country", "genome.host_common_name", + "genome.isolation_source", "genome.species" + ) |> dplyr::left_join(clean_drug, by = c("genome_drug.antibiotic" = "original_drug")) |> dplyr::filter(!is.na(cleaned_drug)) |> dplyr::left_join(drug_class, by = c("cleaned_drug" = "drug")) |> @@ -1269,7 +1302,7 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ DBI::dbWriteTable(conn = con, name = "filtered", overwrite = TRUE) resistance_summary <- dplyr::tbl(con, "filtered") |> - tibble::as_tibble() |> + tibble::as_tibble() |> dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> dplyr::group_by(genome_drug.genome_id) |> dplyr::summarise( @@ -1283,7 +1316,7 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ dplyr::mutate(genome_drug.antibiotic = cleaned_drug) |> dplyr::select(-cleaned_drug) |> dplyr::left_join(clean_countries, by = c("genome.isolation_country" = "raw_entry")) |> - dplyr::rename("cleaned_country"="clean_name", "country_abbr"="short_name") |> + dplyr::rename("cleaned_country" = "clean_name", "country_abbr" = "short_name") |> dplyr::mutate(genome.isolation_country = cleaned_country) |> dplyr::select(-cleaned_country) |> dplyr::left_join(resistance_summary, by = "genome_drug.genome_id") |> @@ -1291,38 +1324,42 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ is.na(resistant_classes) ~ genome_drug.resistant_phenotype, TRUE ~ resistant_classes )) |> - dplyr::mutate(num_resistant_classes= dplyr::case_when( + dplyr::mutate(num_resistant_classes = dplyr::case_when( is.na(num_resistant_classes) ~ 0, TRUE ~ num_resistant_classes )) |> dplyr::mutate(genome.collection_year = as.numeric(genome.collection_year)) |> - dplyr::mutate(year_bin = cut(genome.collection_year, breaks = year_breaks, - right = FALSE, include.lowest = TRUE, - labels = paste(year_breaks[-length(year_breaks)], - year_breaks[-1] - 1, sep = "-"))) |> + dplyr::mutate(year_bin = cut(genome.collection_year, + breaks = year_breaks, + right = FALSE, include.lowest = TRUE, + labels = paste(year_breaks[-length(year_breaks)], + year_breaks[-1] - 1, + sep = "-" + ) + )) |> DBI::dbWriteTable(conn = con, name = "cleaned_metadata", overwrite = TRUE) # Parquet output paths - genes_parquet <- file.path(path, "gene_count.parquet") - gene_names_parquet <- file.path(path, "gene_names.parquet") - gene_ref_seq_parquet <- file.path(path, "gene_seqs.parquet") - genome_gene_protein_parquet <- file.path(path, "genome_gene_protein.parquet") - struct_parquet <- file.path(path, "struct.parquet") + genes_parquet <- file.path(path, "gene_count.parquet") + gene_names_parquet <- file.path(path, "gene_names.parquet") + gene_ref_seq_parquet <- file.path(path, "gene_seqs.parquet") + genome_gene_protein_parquet <- file.path(path, "genome_gene_protein.parquet") + struct_parquet <- file.path(path, "struct.parquet") - proteins_parquet <- file.path(path, "protein_count.parquet") - domains_parquet <- file.path(path, "domain_count.parquet") + proteins_parquet <- file.path(path, "protein_count.parquet") + domains_parquet <- file.path(path, "domain_count.parquet") - metadata_parquet <- file.path(path, "metadata.parquet") # cleaned_metadata exported as 'metadata' + metadata_parquet <- file.path(path, "metadata.parquet") # cleaned_metadata exported as 'metadata' - domain_names_parquet <- file.path(path, "domain_names.parquet") - protein_names_parquet <- file.path(path, "protein_names.parquet") + domain_names_parquet <- file.path(path, "domain_names.parquet") + protein_names_parquet <- file.path(path, "protein_names.parquet") - protein_cluster_seq_parquet <- file.path(path, "protein_seqs.parquet") + protein_cluster_seq_parquet <- file.path(path, "protein_seqs.parquet") # Also export AMR/genome/original metadata - amr_phenotype_parquet <- file.path(path, "amr_phenotype.parquet") - genome_data_parquet <- file.path(path, "genome_data.parquet") - original_metadata_parquet <- file.path(path, "original_metadata.parquet") + amr_phenotype_parquet <- file.path(path, "amr_phenotype.parquet") + genome_data_parquet <- file.path(path, "genome_data.parquet") + original_metadata_parquet <- file.path(path, "original_metadata.parquet") writeCompressedParquet <- function(df, path) { arrow::write_parquet( @@ -1334,7 +1371,9 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ ) } - db_name <- duckdb_path |> stringr::str_split_i(".duckdb", i = 1) |> paste0("_parquet.duckdb") + db_name <- duckdb_path |> + stringr::str_split_i(".duckdb", i = 1) |> + paste0("_parquet.duckdb") con_new <- DBI::dbConnect(duckdb::duckdb(), db_name) on.exit(try(DBI::dbDisconnect(con_new, shutdown = FALSE), silent = TRUE), add = TRUE) @@ -1399,8 +1438,8 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ # debug/complete views: amr_phenotype, genome_data, original_metadata DBI::dbReadTable(con, "amr_phenotype") |> writeCompressedParquet(amr_phenotype_parquet) - DBI::dbReadTable(con, "genome_data") |> writeCompressedParquet(genome_data_parquet) - DBI::dbReadTable(con, "metadata") |> writeCompressedParquet(original_metadata_parquet) + DBI::dbReadTable(con, "genome_data") |> writeCompressedParquet(genome_data_parquet) + DBI::dbReadTable(con, "metadata") |> writeCompressedParquet(original_metadata_parquet) DBI::dbExecute(con_new, sprintf("CREATE OR REPLACE VIEW amr_phenotype AS SELECT * FROM read_parquet('%s')", amr_phenotype_parquet)) DBI::dbExecute(con_new, sprintf("CREATE OR REPLACE VIEW genome_data AS SELECT * FROM read_parquet('%s')", genome_data_parquet)) @@ -1515,16 +1554,16 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ #' #' **Input Requirements** #' * The `duckdb_path` must reference a per-selection DuckDB that contains: -#' `files` (paths to `.gff`, `.fna`, `.PATRIC.faa`), -#' `filtered` (genomes selected for download/filtering), and +#' `files` (paths to `.gff`, `.fna`, `.PATRIC.faa`), +#' `filtered` (genomes selected for download/filtering), and #' BV-BRC metadata tables written by earlier steps. #' #' **Outputs & Side Effects** #' * Writes tool-specific intermediate outputs under `output_path` (e.g., `panaroo_out_*`, CD-HIT files). #' * Writes Parquet files to `output_path`: -#' `gene_count.parquet`, `protein_count.parquet`, `domain_count.parquet`, `struct.parquet`, -#' `gene_names.parquet`, `protein_names.parquet`, `domain_names.parquet`, -#' `gene_seqs.parquet`, `protein_seqs.parquet`, `genome_gene_protein.parquet`, +#' `gene_count.parquet`, `protein_count.parquet`, `domain_count.parquet`, `struct.parquet`, +#' `gene_names.parquet`, `protein_names.parquet`, `domain_names.parquet`, +#' `gene_seqs.parquet`, `protein_seqs.parquet`, `genome_gene_protein.parquet`, #' `metadata.parquet`, `amr_phenotype.parquet`, `genome_data.parquet`, `original_metadata.parquet`. #' * Creates a new Parquet-backed DuckDB (`*_parquet.duckdb`) with read-only views pointing to those Parquets. #' @@ -1565,7 +1604,7 @@ runDataProcessing <- function(duckdb_path, cdhit_identity = 0.9, cdhit_word_length = 5, cdhit_memory = 0, - cdhit_extra_args = c("-g","1"), + cdhit_extra_args = c("-g", "1"), cdhit_output_prefix = "cdhit_out", # InterPro ipr_appl = c("Pfam"), @@ -1651,4 +1690,3 @@ runDataProcessing <- function(duckdb_path, parquet_duckdb_path = normalizePath(parquet_duckdb_path) )) } -