Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions src/bors/gitops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::github::{CommitSha, GithubRepoName};
use anyhow::Context;
use secrecy::SecretString;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Command;

/// Represents a git binary.
Expand Down Expand Up @@ -32,54 +32,44 @@ impl Git {
Self { git: path }
}

/// Pushes a commit from the source repository to the target repository.
/// Note: to achieve higher performance, this does not fetch or push any trees!
/// It can be used only to push a single commit between two repositories.
pub async fn transfer_commit_between_repositories(
/// Initialize a local bare repository cache if it hasn't been initialized yet.
pub async fn init_repository_cache(&self, repo_path: &Path) -> anyhow::Result<()> {
std::fs::create_dir_all(repo_path).context("Cannot create repository cache directory")?;
let head_path = repo_path.join("HEAD");
if !head_path.exists() {
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(repo_path)
.arg("init")
.arg("--bare"),
)
.await
.context("Cannot perform git init")?;
}
Ok(())
}

/// Prepare a local bare repository for transferring a commit.
/// This initializes the repository (if needed) and fetches the requested commit.
pub async fn prepare_repository_for_commit(
&self,
repo_path: &Path,
source_repo: &GithubRepoName,
target_repo: &GithubRepoName,
commit: &CommitSha,
target_branch: &str,
token: SecretString,
) -> anyhow::Result<()> {
use secrecy::ExposeSecret;

// What we want to do here is to push a commit A from repo R1 (source) to repo R2 (target)
// as quickly as possible, and in a stateless way.
// Previously, we used libgit2 to do essentially `fetch --depth=1` followed by a `git push`.
// However, this is wasteful, because we do not actually need to download any blobs or
// trees.
// For the transfer, we simply need to transfer a simply commit between those two
// repositories.
// So we first do a blob/treeless clone of the source repository, and then push a single
// commit to the target repository. git will use its unshallowing logic to lazily download
// the pushed commit from the source repo, and then push it to the target repo.

// Create a temporary directory for the local repository
let temp_dir = tempfile::tempdir()?;
let root_path = temp_dir.path();
self.init_repository_cache(repo_path).await?;

let source_repo_url = format!("https://github.com/{source_repo}.git");

run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.arg("init")
.arg("--bare"),
)
.await
.context("Cannot perform git init")?;

// It **should** be much faster to do a partial clone than a fetch with depth=1.
// However, on the production server, the partial clone of rust-lang/rust seems to choke :(
// So we use the fetch as an alternative.
tracing::debug!("Fetching commit");
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.current_dir(repo_path)
.arg("fetch")
.arg("--depth=1")
// Note: using --filter=tree:0 makes the fetch much faster, but the resulting push
Expand All @@ -88,7 +78,21 @@ impl Git {
.arg(commit.as_ref()),
)
.await
.context("Cannot perform git clone")?;
.context("Cannot perform git fetch")?;
Ok(())
}

/// Pushes a commit from the prepared local repository to the target repository.
/// Note: the repository at `repo_path` must already contain `commit`.
pub async fn transfer_commit_between_repositories(
&self,
repo_path: &Path,
target_repo: &GithubRepoName,
commit: &CommitSha,
target_branch: &str,
token: SecretString,
) -> anyhow::Result<()> {
use secrecy::ExposeSecret;

let target_branch = format!("refs/heads/{target_branch}");
// Create the refspec: push the commit to the target branch
Expand All @@ -108,7 +112,7 @@ impl Git {
run_command(
tokio::process::Command::new(&self.git)
.kill_on_drop(true)
.current_dir(root_path)
.current_dir(repo_path)
// Do not store the token on disk
.arg("-c")
.arg("credential.helper=")
Expand Down
122 changes: 117 additions & 5 deletions src/bors/gitops_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use secrecy::SecretString;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::future::Future;
#[cfg(not(test))]
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::mpsc;
use tracing::Instrument;

Expand All @@ -21,6 +24,9 @@ const GITOPS_QUEUE_CAPACITY: usize = 3;
/// Maximum duration of a local git operation before it times out.
const GITOP_TIMEOUT: Duration = Duration::from_secs(60);

/// Special pull request number used for clone operations.
const CLONE_PR_NUMBER: PullRequestNumber = PullRequestNumber(0);
Comment on lines +27 to +28
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use CLONE_PR_NUMBER = 0 as a synthetic PR ID for clone tasks. Is this OK?


#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct PullRequestId {
pub repo: GithubRepoName,
Expand All @@ -35,6 +41,8 @@ pub struct GitOpsQueueEntry {

struct GitOpsSharedState {
git: Option<Git>,
/// Temporary directory used for caching local repository clones.
cache_dir: TempDir,
/// Pull requests on which a local git operation is currently queued or in-progress.
pending_prs: HashSet<PullRequestId>,
}
Expand Down Expand Up @@ -80,20 +88,49 @@ impl GitOpsQueueSender {
}
}
}

pub fn enqueue_clone_repository(&self, repository: GithubRepoName) -> anyhow::Result<bool> {
let log_repo = repository.clone();
let pr_id = PullRequestId {
repo: repository.clone(),
pr: CLONE_PR_NUMBER,
};
let command = GitOpsCommand::CloneRepository(CloneRepositoryCommand {
repository,
on_finish: Box::new(|result| {
Box::pin(async move {
if let Err(error) = result {
tracing::warn!(
"Repository cache initialization failed for {log_repo}: {error:?}"
);
}
Ok(())
})
}),
});
self.try_send(pr_id, command)
}
}

/// Command that can be executed by the gitops queue.
#[derive(Debug)]
pub enum GitOpsCommand {
/// Push a commit from one repository to another.
Push(PushCommand),
/// Clone or initialize a repository cache for later operations.
CloneRepository(CloneRepositoryCommand),
}

pub type PushCallback = Box<
dyn FnOnce(anyhow::Result<()>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send,
>;

pub type CloneCallback = Box<
dyn FnOnce(anyhow::Result<()>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send,
>;

/// Force push `commit` from `source_repo` to `target_branch` of `target_repo`.
/// Use `token` for authentication.
///
Expand All @@ -107,6 +144,23 @@ pub struct PushCommand {
pub on_finish: PushCallback,
}

pub struct CloneRepositoryCommand {
pub repository: GithubRepoName,
pub on_finish: CloneCallback,
}

impl Debug for CloneRepositoryCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let Self {
repository,
on_finish: _,
} = self;
f.debug_struct("CloneRepositoryCommand")
.field("repository", repository)
.finish()
}
}

impl Debug for PushCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let Self {
Expand All @@ -128,9 +182,23 @@ impl Debug for PushCommand {

pub fn create_gitops_queue(git: Option<Git>) -> (GitOpsQueueSender, GitOpsQueueReceiver) {
let (tx, rx) = mpsc::channel(GITOPS_QUEUE_CAPACITY);
#[cfg(test)]
let cache_dir = tempfile::Builder::new()
.prefix("bors-gitops-cache-")
.tempdir()
.expect("Cannot create gitops cache temp dir");
#[cfg(not(test))]
let cache_dir = {
let base_dir = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
tempfile::Builder::new()
.prefix("gitops-cache-")
.tempdir_in(&base_dir)
.expect("Cannot create gitops cache temp dir")
};
Comment on lines +185 to +197
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create the cache TempDir internally. Would it be more reasonable to pass the cache directory in as a parameter instead?

let state = Arc::new(RwLock::new(GitOpsSharedState {
pending_prs: Default::default(),
git,
cache_dir,
}));
(
GitOpsQueueSender {
Expand Down Expand Up @@ -164,7 +232,10 @@ pub async fn handle_gitops_entry(
"{source_repo}:{commit} -> {target_repo}:{target_branch}"
);

let git = rx.state.read().unwrap().git.clone();
let (git, _cache_dir) = {
let state = rx.state.read().unwrap();
(state.git.clone(), state.cache_dir.path().to_path_buf())
};
let res = if let Some(_git) = git {
let fut = async move {
use std::time::Instant;
Expand All @@ -173,15 +244,20 @@ pub async fn handle_gitops_entry(
#[cfg(test)]
let res = anyhow::Ok(());
#[cfg(not(test))]
let res = _git
.transfer_commit_between_repositories(
&source_repo,
let res = async {
let repo_path = _cache_dir.join(source_repo.to_string());
_git.prepare_repository_for_commit(&repo_path, &source_repo, &commit)
.await?;
_git.transfer_commit_between_repositories(
&repo_path,
&target_repo,
&commit,
&target_branch,
_token,
)
.await;
.await
}
.await;
tracing::trace!("Push took {:.3}s", start.elapsed().as_secs_f64());
res
}
Expand All @@ -204,6 +280,42 @@ pub async fn handle_gitops_entry(
}
Ok(())
}
GitOpsCommand::CloneRepository(CloneRepositoryCommand {
repository,
on_finish,
}) => {
let span = tracing::debug_span!("clone repository cache", "{repository}");
let (git, cache_dir) = {
let state = rx.state.read().unwrap();
(state.git.clone(), state.cache_dir.path().to_path_buf())
};
let res = if let Some(_git) = git {
let fut = async move {
let _repo_path = cache_dir.join(repository.to_string());
#[cfg(test)]
let res = anyhow::Ok(());
#[cfg(not(test))]
let res = _git.init_repository_cache(&_repo_path).await;
res
}
.instrument(span.clone());
match tokio::time::timeout(GITOP_TIMEOUT, fut).await {
Ok(res) => res,
Err(_) => Err(anyhow::anyhow!("Clone timeouted")),
}
} else {
Err(anyhow::anyhow!("Local git is not available"))
};
if let Err(error) = on_finish(res).instrument(span.clone()).await {
span.in_scope(|| {
tracing::error!("Completion callback failed: {error:?}");
});

#[cfg(test)]
return Err(error);
}
Ok(())
}
}
};

Expand Down
23 changes: 23 additions & 0 deletions src/bors/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ pub fn create_bors_process(
};
let senders2 = senders.clone();

#[cfg(not(test))]
{
if ctx.local_git_available() {
for repo in ctx.repositories.repositories() {
let repo_name = repo.repository().clone();
let log_repo = repo_name.clone();
match senders.gitops_queue().enqueue_clone_repository(repo_name) {
Ok(true) => {}
Ok(false) => {
tracing::warn!(
"Gitops queue is full; cache initialization skipped for {log_repo}"
);
}
Err(error) => {
tracing::warn!(
"Failed to enqueue repository cache initialization for {log_repo}: {error:?}"
);
}
}
}
}
}

let service = async move {
// In tests, we shutdown these futures by dropping the channel sender,
// In that case, we need to wait until both of these futures resolve,
Expand Down
Loading