Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 54 additions & 42 deletions src/bors/gitops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::github::{CommitSha, GithubRepoName};
use anyhow::Context;
use secrecy::SecretString;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;

/// Represents a git binary.
Expand Down Expand Up @@ -32,63 +32,75 @@ impl Git {
Self { git: path }
}

/// Pushes a commit from the source repository to the target repository.
/// Note: to achieve higher performance, this does not fetch or push any trees!
/// It can be used only to push a single commit between two repositories.
pub async fn transfer_commit_between_repositories(
/// Initialize a local bare repository cache if it hasn't been initialized yet.
/// This clones the repository in blobless mode to seed the cache.
pub async fn init_repository_cache(
&self,
source_repo: &GithubRepoName,
target_repo: &GithubRepoName,
commit: &CommitSha,
target_branch: &str,
token: SecretString,
repo_path: &Path,
repository: &GithubRepoName,
) -> anyhow::Result<()> {
use secrecy::ExposeSecret;

// What we want to do here is to push a commit A from repo R1 (source) to repo R2 (target)
// as quickly as possible, and in a stateless way.
// Previously, we used libgit2 to do essentially `fetch --depth=1` followed by a `git push`.
// However, this is wasteful, because we do not actually need to download any blobs or
// trees.
// For the transfer, we simply need to transfer a simply commit between those two
// repositories.
// So we first do a blob/treeless clone of the source repository, and then push a single
// commit to the target repository. git will use its unshallowing logic to lazily download
// the pushed commit from the source repo, and then push it to the target repo.

// Create a temporary directory for the local repository
let temp_dir = tempfile::tempdir()?;
let root_path = temp_dir.path();

let source_repo_url = format!("https://github.com/{source_repo}.git");

if repo_path.join(".git").exists() || repo_path.join("HEAD").exists() {
return Ok(());
}
if repo_path.exists() {
std::fs::remove_dir_all(repo_path)
.context("Cannot reset repository cache directory")?;
}
let repo_url = format!("https://github.com/{repository}.git");
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.arg("init")
.arg("--bare"),
.arg("clone")
// --bare is used to avoid checking out the repository on disk, which is not needed
.arg("--bare")
// Treeless clone is used to avoid downloading history of all blobs and trees
.arg("--filter=tree:0")
.arg(&repo_url)
.arg(repo_path),
)
.await
.context("Cannot perform git init")?;
.context("Cannot perform git clone")?;
Ok(())
}

/// Prepare a local bare repository for transferring a commit.
/// This initializes the repository (if needed) and fetches the requested commit.
pub async fn prepare_repository_for_commit(
&self,
repo_path: &Path,
source_repo: &GithubRepoName,
commit: &CommitSha,
) -> anyhow::Result<()> {
self.init_repository_cache(repo_path, source_repo).await?;

// It **should** be much faster to do a partial clone than a fetch with depth=1.
// However, on the production server, the partial clone of rust-lang/rust seems to choke :(
// So we use the fetch as an alternative.
let source_repo_url = format!("https://github.com/{source_repo}.git");

// We reuse a cached bare repository, so we perform a regular fetch.
tracing::debug!("Fetching commit");
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.current_dir(repo_path)
.arg("fetch")
.arg("--depth=1")
// Note: using --filter=tree:0 makes the fetch much faster, but the resulting push
// becomes MUCH slower :(
.arg(source_repo_url)
.arg(commit.as_ref()),
)
.await
.context("Cannot perform git clone")?;
.context("Cannot perform git fetch")?;
Ok(())
}

/// Pushes a commit from the prepared local repository to the target repository.
/// Note: the repository at `repo_path` must already contain `commit`.
pub async fn transfer_commit_between_repositories(
&self,
repo_path: &Path,
target_repo: &GithubRepoName,
commit: &CommitSha,
target_branch: &str,
token: SecretString,
) -> anyhow::Result<()> {
use secrecy::ExposeSecret;

let target_branch = format!("refs/heads/{target_branch}");
// Create the refspec: push the commit to the target branch
Expand All @@ -108,7 +120,7 @@ impl Git {
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.current_dir(repo_path)
// Do not store the token on disk
.arg("-c")
.arg("credential.helper=")
Expand Down
111 changes: 100 additions & 11 deletions src/bors/gitops_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use secrecy::SecretString;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand All @@ -21,6 +22,10 @@ const GITOPS_QUEUE_CAPACITY: usize = 3;
/// Maximum duration of a local git operation before it times out.
const GITOP_TIMEOUT: Duration = Duration::from_secs(60);

/// Maximum duration of an init git operation before it times out.
/// Init operation (cloning a repo) is performed at the start of bors, so it can take a longer time.
const GITOP_INIT_TIMEOUT: Duration = Duration::from_secs(60 * 5);

#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct PullRequestId {
pub repo: GithubRepoName,
Expand All @@ -30,11 +35,13 @@ pub struct PullRequestId {
#[derive(Debug)]
pub struct GitOpsQueueEntry {
command: GitOpsCommand,
pr: PullRequestId,
pr: Option<PullRequestId>,
}

struct GitOpsSharedState {
git: Option<Git>,
/// Directory used for caching local repository clones.
cache_dir: PathBuf,
/// Pull requests on which a local git operation is currently queued or in-progress.
pending_prs: HashSet<PullRequestId>,
}
Expand Down Expand Up @@ -64,14 +71,22 @@ impl GitOpsQueueSender {

/// Try to enqueue a git operation.
/// Returns `true` if the operation was enqueued or `false` if the queue is full.
pub fn try_send(&self, id: PullRequestId, command: GitOpsCommand) -> anyhow::Result<bool> {
///
/// If the git operation is associated with a PR, it should be passed in the `pr` parameter.
pub fn try_send(
&self,
pr: Option<PullRequestId>,
command: GitOpsCommand,
) -> anyhow::Result<bool> {
let entry = GitOpsQueueEntry {
command,
pr: id.clone(),
pr: pr.clone(),
};
match self.sender.try_send(entry) {
Ok(_) => {
self.state.write().unwrap().pending_prs.insert(id);
if let Some(id) = pr {
self.state.write().unwrap().pending_prs.insert(id);
}
Ok(true)
}
Err(mpsc::error::TrySendError::Full(_)) => Ok(false),
Expand All @@ -80,13 +95,20 @@ impl GitOpsQueueSender {
}
}
}

pub fn enqueue_clone_repository(&self, repository: GithubRepoName) -> anyhow::Result<bool> {
let command = GitOpsCommand::CloneRepository(CloneRepositoryCommand { repository });
self.try_send(None, command)
}
}

/// Command that can be executed by the gitops queue.
#[derive(Debug)]
pub enum GitOpsCommand {
/// Push a commit from one repository to another.
Push(PushCommand),
/// Clone a repository on disk to serve as a cache for later operations.
CloneRepository(CloneRepositoryCommand),
}

pub type PushCallback = Box<
Expand All @@ -107,6 +129,19 @@ pub struct PushCommand {
pub on_finish: PushCallback,
}

pub struct CloneRepositoryCommand {
pub repository: GithubRepoName,
}

impl Debug for CloneRepositoryCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let Self { repository } = self;
f.debug_struct("CloneRepositoryCommand")
.field("repository", repository)
.finish()
}
}

impl Debug for PushCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let Self {
Expand All @@ -128,9 +163,16 @@ impl Debug for PushCommand {

pub fn create_gitops_queue(git: Option<Git>) -> (GitOpsQueueSender, GitOpsQueueReceiver) {
let (tx, rx) = mpsc::channel(GITOPS_QUEUE_CAPACITY);
#[cfg(test)]
let cache_dir = std::env::temp_dir().join("bors-gitops-cache");
#[cfg(not(test))]
let cache_dir = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join("gitops-cache");
let state = Arc::new(RwLock::new(GitOpsSharedState {
pending_prs: Default::default(),
git,
cache_dir,
}));
(
GitOpsQueueSender {
Expand Down Expand Up @@ -164,7 +206,10 @@ pub async fn handle_gitops_entry(
"{source_repo}:{commit} -> {target_repo}:{target_branch}"
);

let git = rx.state.read().unwrap().git.clone();
let (git, _cache_dir) = {
let state = rx.state.read().unwrap();
(state.git.clone(), state.cache_dir.clone())
};
let res = if let Some(_git) = git {
let fut = async move {
use std::time::Instant;
Expand All @@ -173,16 +218,21 @@ pub async fn handle_gitops_entry(
#[cfg(test)]
let res = anyhow::Ok(());
#[cfg(not(test))]
let res = _git
.transfer_commit_between_repositories(
&source_repo,
let res = async {
let repo_path = _cache_dir.join(source_repo.to_string());
_git.prepare_repository_for_commit(&repo_path, &source_repo, &commit)
.await?;
_git.transfer_commit_between_repositories(
&repo_path,
&target_repo,
&commit,
&target_branch,
_token,
)
.await;
tracing::trace!("Push took {:.3}s", start.elapsed().as_secs_f64());
.await
}
.await;
tracing::trace!("Git push took {:.3}s", start.elapsed().as_secs_f64());
res
}
.instrument(span.clone());
Expand All @@ -204,10 +254,49 @@ pub async fn handle_gitops_entry(
}
Ok(())
}
GitOpsCommand::CloneRepository(CloneRepositoryCommand { repository }) => {
let span = tracing::debug_span!("clone repository cache", "{repository}");
let (git, cache_dir) = {
let state = rx.state.read().unwrap();
(state.git.clone(), state.cache_dir.clone())
};
if let Some(_git) = git {
use std::time::Instant;

let start = Instant::now();

let repository2 = repository.clone();
let fut = async move {
let _repo_path = cache_dir.join(repository2.to_string());
#[cfg(test)]
let res = anyhow::Ok(());
#[cfg(not(test))]
let res = _git.init_repository_cache(&_repo_path, &repository2).await;
res
}
.instrument(span.clone());

match tokio::time::timeout(GITOP_INIT_TIMEOUT, fut).await {
Ok(res) => {
tracing::trace!(
"Repository `{}` git init took {:.3}s",
repository,
start.elapsed().as_secs_f64()
);
res
}
Err(_) => Err(anyhow::anyhow!("Clone timeouted")),
}
} else {
Err(anyhow::anyhow!("Local git is not available"))
}
}
}
};

let res = handle.await;
rx.state.write().unwrap().pending_prs.remove(&pr);
if let Some(pr) = pr {
rx.state.write().unwrap().pending_prs.remove(&pr);
}
res
}
2 changes: 1 addition & 1 deletion src/bors/handlers/squash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub(super) async fn command_squash(
on_finish,
});

if !gitops_queue.try_send(pr_id, command)? {
if !gitops_queue.try_send(Some(pr_id), command)? {
send_comment(
":hourglass: There are too many git operations in progress at the moment. Please try again a few minutes later."
.to_string(),
Expand Down
23 changes: 23 additions & 0 deletions src/bors/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ pub fn create_bors_process(
};
let senders2 = senders.clone();

#[cfg(not(test))]
{
if ctx.local_git_available() {
for repo in ctx.repositories.repositories() {
let repo_name = repo.repository().clone();
let log_repo = repo_name.clone();
match senders.gitops_queue().enqueue_clone_repository(repo_name) {
Ok(true) => {}
Ok(false) => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in practice this won't be an issue, as the current queue capacity is 3, and we only work with rust-lang/rust in production, but the capacity shouldn't be used for some commands. Let's not deal with that now.

tracing::warn!(
"Gitops queue is full; cache initialization skipped for {log_repo}"
);
}
Err(error) => {
tracing::warn!(
"Failed to enqueue repository cache initialization for {log_repo}: {error:?}"
);
}
}
}
}
}

let service = async move {
// In tests, we shutdown these futures by dropping the channel sender,
// In that case, we need to wait until both of these futures resolve,
Expand Down
Loading