diff --git a/src/bors/gitops.rs b/src/bors/gitops.rs index ef047120..f18a2d57 100644 --- a/src/bors/gitops.rs +++ b/src/bors/gitops.rs @@ -1,7 +1,7 @@ use crate::github::{CommitSha, GithubRepoName}; use anyhow::Context; use secrecy::SecretString; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::Command; /// Represents a git binary. @@ -32,46 +32,36 @@ impl Git { Self { git: path } } - /// Pushes a commit from the source repository to the target repository. - /// Note: to achieve higher performance, this does not fetch or push any trees! - /// It can be used only to push a single commit between two repositories. - pub async fn transfer_commit_between_repositories( + /// Initialize a local bare repository cache if it hasn't been initialized yet. + pub async fn init_repository_cache(&self, repo_path: &Path) -> anyhow::Result<()> { + std::fs::create_dir_all(repo_path).context("Cannot create repository cache directory")?; + let head_path = repo_path.join("HEAD"); + if !head_path.exists() { + run_command( + tokio::process::Command::new(&self.git) + .kill_on_drop(true) + .current_dir(repo_path) + .arg("init") + .arg("--bare"), + ) + .await + .context("Cannot perform git init")?; + } + Ok(()) + } + + /// Prepare a local bare repository for transferring a commit. + /// This initializes the repository (if needed) and fetches the requested commit. + pub async fn prepare_repository_for_commit( &self, + repo_path: &Path, source_repo: &GithubRepoName, - target_repo: &GithubRepoName, commit: &CommitSha, - target_branch: &str, - token: SecretString, ) -> anyhow::Result<()> { - use secrecy::ExposeSecret; - - // What we want to do here is to push a commit A from repo R1 (source) to repo R2 (target) - // as quickly as possible, and in a stateless way. - // Previously, we used libgit2 to do essentially `fetch --depth=1` followed by a `git push`. - // However, this is wasteful, because we do not actually need to download any blobs or - // trees. - // For the transfer, we simply need to transfer a simply commit between those two - // repositories. - // So we first do a blob/treeless clone of the source repository, and then push a single - // commit to the target repository. git will use its unshallowing logic to lazily download - // the pushed commit from the source repo, and then push it to the target repo. - - // Create a temporary directory for the local repository - let temp_dir = tempfile::tempdir()?; - let root_path = temp_dir.path(); + self.init_repository_cache(repo_path).await?; let source_repo_url = format!("https://github.com/{source_repo}.git"); - run_command( - tokio::process::Command::new(&self.git) - .kill_on_drop(true) - .current_dir(root_path) - .arg("init") - .arg("--bare"), - ) - .await - .context("Cannot perform git init")?; - // It **should** be much faster to do a partial clone than a fetch with depth=1. // However, on the production server, the partial clone of rust-lang/rust seems to choke :( // So we use the fetch as an alternative. @@ -79,7 +69,7 @@ impl Git { run_command( tokio::process::Command::new(&self.git) .kill_on_drop(true) - .current_dir(root_path) + .current_dir(repo_path) .arg("fetch") .arg("--depth=1") // Note: using --filter=tree:0 makes the fetch much faster, but the resulting push @@ -88,7 +78,21 @@ impl Git { .arg(commit.as_ref()), ) .await - .context("Cannot perform git clone")?; + .context("Cannot perform git fetch")?; + Ok(()) + } + + /// Pushes a commit from the prepared local repository to the target repository. + /// Note: the repository at `repo_path` must already contain `commit`. + pub async fn transfer_commit_between_repositories( + &self, + repo_path: &Path, + target_repo: &GithubRepoName, + commit: &CommitSha, + target_branch: &str, + token: SecretString, + ) -> anyhow::Result<()> { + use secrecy::ExposeSecret; let target_branch = format!("refs/heads/{target_branch}"); // Create the refspec: push the commit to the target branch @@ -108,7 +112,7 @@ impl Git { run_command( tokio::process::Command::new(&self.git) .kill_on_drop(true) - .current_dir(root_path) + .current_dir(repo_path) // Do not store the token on disk .arg("-c") .arg("credential.helper=") diff --git a/src/bors/gitops_queue.rs b/src/bors/gitops_queue.rs index fd6095b5..4da49b50 100644 --- a/src/bors/gitops_queue.rs +++ b/src/bors/gitops_queue.rs @@ -8,9 +8,12 @@ use secrecy::SecretString; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::future::Future; +#[cfg(not(test))] +use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::Duration; +use tempfile::TempDir; use tokio::sync::mpsc; use tracing::Instrument; @@ -21,6 +24,9 @@ const GITOPS_QUEUE_CAPACITY: usize = 3; /// Maximum duration of a local git operation before it times out. const GITOP_TIMEOUT: Duration = Duration::from_secs(60); +/// Special pull request number used for clone operations. +const CLONE_PR_NUMBER: PullRequestNumber = PullRequestNumber(0); + #[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct PullRequestId { pub repo: GithubRepoName, @@ -35,6 +41,8 @@ pub struct GitOpsQueueEntry { struct GitOpsSharedState { git: Option, + /// Temporary directory used for caching local repository clones. + cache_dir: TempDir, /// Pull requests on which a local git operation is currently queued or in-progress. pending_prs: HashSet, } @@ -80,6 +88,28 @@ impl GitOpsQueueSender { } } } + + pub fn enqueue_clone_repository(&self, repository: GithubRepoName) -> anyhow::Result { + let log_repo = repository.clone(); + let pr_id = PullRequestId { + repo: repository.clone(), + pr: CLONE_PR_NUMBER, + }; + let command = GitOpsCommand::CloneRepository(CloneRepositoryCommand { + repository, + on_finish: Box::new(|result| { + Box::pin(async move { + if let Err(error) = result { + tracing::warn!( + "Repository cache initialization failed for {log_repo}: {error:?}" + ); + } + Ok(()) + }) + }), + }); + self.try_send(pr_id, command) + } } /// Command that can be executed by the gitops queue. @@ -87,6 +117,8 @@ impl GitOpsQueueSender { pub enum GitOpsCommand { /// Push a commit from one repository to another. Push(PushCommand), + /// Clone or initialize a repository cache for later operations. + CloneRepository(CloneRepositoryCommand), } pub type PushCallback = Box< @@ -94,6 +126,11 @@ pub type PushCallback = Box< + Send, >; +pub type CloneCallback = Box< + dyn FnOnce(anyhow::Result<()>) -> Pin> + Send>> + + Send, +>; + /// Force push `commit` from `source_repo` to `target_branch` of `target_repo`. /// Use `token` for authentication. /// @@ -107,6 +144,23 @@ pub struct PushCommand { pub on_finish: PushCallback, } +pub struct CloneRepositoryCommand { + pub repository: GithubRepoName, + pub on_finish: CloneCallback, +} + +impl Debug for CloneRepositoryCommand { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let Self { + repository, + on_finish: _, + } = self; + f.debug_struct("CloneRepositoryCommand") + .field("repository", repository) + .finish() + } +} + impl Debug for PushCommand { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let Self { @@ -128,9 +182,23 @@ impl Debug for PushCommand { pub fn create_gitops_queue(git: Option) -> (GitOpsQueueSender, GitOpsQueueReceiver) { let (tx, rx) = mpsc::channel(GITOPS_QUEUE_CAPACITY); + #[cfg(test)] + let cache_dir = tempfile::Builder::new() + .prefix("bors-gitops-cache-") + .tempdir() + .expect("Cannot create gitops cache temp dir"); + #[cfg(not(test))] + let cache_dir = { + let base_dir = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); + tempfile::Builder::new() + .prefix("gitops-cache-") + .tempdir_in(&base_dir) + .expect("Cannot create gitops cache temp dir") + }; let state = Arc::new(RwLock::new(GitOpsSharedState { pending_prs: Default::default(), git, + cache_dir, })); ( GitOpsQueueSender { @@ -164,7 +232,10 @@ pub async fn handle_gitops_entry( "{source_repo}:{commit} -> {target_repo}:{target_branch}" ); - let git = rx.state.read().unwrap().git.clone(); + let (git, _cache_dir) = { + let state = rx.state.read().unwrap(); + (state.git.clone(), state.cache_dir.path().to_path_buf()) + }; let res = if let Some(_git) = git { let fut = async move { use std::time::Instant; @@ -173,15 +244,20 @@ pub async fn handle_gitops_entry( #[cfg(test)] let res = anyhow::Ok(()); #[cfg(not(test))] - let res = _git - .transfer_commit_between_repositories( - &source_repo, + let res = async { + let repo_path = _cache_dir.join(source_repo.to_string()); + _git.prepare_repository_for_commit(&repo_path, &source_repo, &commit) + .await?; + _git.transfer_commit_between_repositories( + &repo_path, &target_repo, &commit, &target_branch, _token, ) - .await; + .await + } + .await; tracing::trace!("Push took {:.3}s", start.elapsed().as_secs_f64()); res } @@ -204,6 +280,42 @@ pub async fn handle_gitops_entry( } Ok(()) } + GitOpsCommand::CloneRepository(CloneRepositoryCommand { + repository, + on_finish, + }) => { + let span = tracing::debug_span!("clone repository cache", "{repository}"); + let (git, cache_dir) = { + let state = rx.state.read().unwrap(); + (state.git.clone(), state.cache_dir.path().to_path_buf()) + }; + let res = if let Some(_git) = git { + let fut = async move { + let _repo_path = cache_dir.join(repository.to_string()); + #[cfg(test)] + let res = anyhow::Ok(()); + #[cfg(not(test))] + let res = _git.init_repository_cache(&_repo_path).await; + res + } + .instrument(span.clone()); + match tokio::time::timeout(GITOP_TIMEOUT, fut).await { + Ok(res) => res, + Err(_) => Err(anyhow::anyhow!("Clone timeouted")), + } + } else { + Err(anyhow::anyhow!("Local git is not available")) + }; + if let Err(error) = on_finish(res).instrument(span.clone()).await { + span.in_scope(|| { + tracing::error!("Completion callback failed: {error:?}"); + }); + + #[cfg(test)] + return Err(error); + } + Ok(()) + } } }; diff --git a/src/bors/process.rs b/src/bors/process.rs index 69a39cc1..bd2311d0 100644 --- a/src/bors/process.rs +++ b/src/bors/process.rs @@ -61,6 +61,29 @@ pub fn create_bors_process( }; let senders2 = senders.clone(); + #[cfg(not(test))] + { + if ctx.local_git_available() { + for repo in ctx.repositories.repositories() { + let repo_name = repo.repository().clone(); + let log_repo = repo_name.clone(); + match senders.gitops_queue().enqueue_clone_repository(repo_name) { + Ok(true) => {} + Ok(false) => { + tracing::warn!( + "Gitops queue is full; cache initialization skipped for {log_repo}" + ); + } + Err(error) => { + tracing::warn!( + "Failed to enqueue repository cache initialization for {log_repo}: {error:?}" + ); + } + } + } + } + } + let service = async move { // In tests, we shutdown these futures by dropping the channel sender, // In that case, we need to wait until both of these futures resolve,