Files
flux-releaser/crates/flux-releaser/src/services/git.rs
kjuulh 96f3b52c2a
Some checks failed
continuous-integration/drone/push Build is failing
feat: set interval down to a minute
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-08 12:44:05 +02:00

338 lines
9.7 KiB
Rust

use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use git2::{
build::{CheckoutBuilder, RepoBuilder},
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
Signature,
};
use tokio::{io::AsyncWriteExt, sync::Mutex};
use super::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
};
#[derive(Clone)]
pub struct SharedGit {
inner: Arc<Git>,
}
impl From<Git> for SharedGit {
fn from(value: Git) -> Self {
Self {
inner: Arc::new(value),
}
}
}
impl std::ops::Deref for SharedGit {
type Target = Git;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct Git {
location: Mutex<PathBuf>,
registry: String,
cluster_list: ClusterList,
archive: Archive,
}
impl Git {
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
Self {
registry,
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
cluster_list,
archive,
}
}
pub async fn publish(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// TODO: implement exponential backoff and 5 attempts
self.publish_attempt(archive, service, namespace, environment)
.await?;
Ok(())
}
async fn publish_attempt(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// 1. Clone repo into location (with lock) or update
let location = self.location.lock().await;
let repo_dir = location.join("repo");
let repo = match self.get_repo(&repo_dir).await? {
//TODO: Possible handle error by just recloning the repo
None => self.clone_repo(&repo_dir).await?,
Some(repo) => {
self.update_repo(repo).await?;
self.get_repo(&repo_dir)
.await?
.ok_or(anyhow::anyhow!("failed to open repository"))?
}
};
// 2. Extract from archive
// TODO: maybe pad archive with hash or something
let unpack_dir = location.join("archive");
self.archive.unpack_archive(archive, &unpack_dir).await?;
// 3. Splat tar over application and cluster
// The archive should always be structured like so:
// - <environment>/
// - * manifests
// 3a. prepare git repo for new files
let clusters = self
.cluster_list
.get(environment)
.await?
.ok_or(anyhow::anyhow!(
"environment is not registered: {} in cluster list",
environment
))?;
for cluster in clusters {
let service_entry = repo_dir
.join("deployments")
.join(&cluster)
.join(namespace)
.join(service);
if service_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&service_entry).await?;
let cluster_entry = repo_dir
.join("clusters")
.join(&cluster)
.join(namespace)
.join(service);
if cluster_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&cluster_entry).await?;
let archive_dir = unpack_dir.join(environment);
if !archive_dir.exists() {
anyhow::bail!("selected environment is not published for archive");
}
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_file() {
let entry_path = entry.path();
let dest_path = service_entry.join(entry.file_name());
tokio::fs::copy(entry_path, dest_path).await?;
}
}
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
let file_contents = format!(
r#"
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
name: {service}
namespace: flux-system
spec:
interval: 1m
retryInterval: 30s
path: ./deployments/{cluster}/{namespace}/{service}
prune: true
sourceRef:
kind: GitRepository
name: flux-system
namespace: flux-system
"#
);
cluster_entry_file
.write_all(file_contents.as_bytes())
.await?;
}
// 4. Commit && Push
self.commit_and_push(repo, environment, service).await?;
// 5. Cleanup
tokio::fs::remove_dir_all(unpack_dir).await?;
Ok(())
}
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
match Repository::open(location) {
Ok(r) => Ok(Some(r)),
Err(e) => match e.code() {
git2::ErrorCode::NotFound => Ok(None),
_ => Err(e).context("failed to open git repository"),
},
}
}
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
let co = CheckoutBuilder::new();
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
let repo = RepoBuilder::new()
.fetch_options(fo)
.with_checkout(co)
.clone(&self.registry, location)
.context("failed to clone repository")?;
Ok(repo)
}
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
let mut remote = repository.find_remote("origin")?;
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
remote
.fetch(&["main"], Some(&mut fo), None)
.context("failed to update repo")?;
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
let origin_head_commit = origin_head.peel_to_commit()?;
// Perform a hard reset to the origin's HEAD
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
Ok(())
}
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
let mut cb = RemoteCallbacks::new();
cb.credentials(|_, username, _| {
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
}
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
Cred::userpass_plaintext(&username, &password)
});
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
Ok(cb)
}
async fn commit_and_push(
&self,
repo: Repository,
environment: &str,
service: &str,
) -> anyhow::Result<()> {
let mut index = repo.index()?;
// Add all files to the index
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
index.write()?;
// Create a tree from the index
let oid = index.write_tree()?;
let tree = repo.find_tree(oid)?;
// Get the current HEAD commit
let parent_commit = repo.head()?.peel_to_commit()?;
// Create a signature
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
// Create the commit
repo.commit(
Some("HEAD"),
&sig,
&sig,
&format!("chore({environment}/{service}): releasing service"),
&tree,
&[&parent_commit],
)?;
let mut remote = repo.find_remote("origin")?;
let mut po = PushOptions::new();
po.remote_callbacks(self.get_cred()?);
remote.push(
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
Some(&mut po),
)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::services::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
git::Git,
};
#[tokio::test]
async fn can_clone_upstream() -> anyhow::Result<()> {
// FIXME: right now CI doesn't support git
return Ok(());
let random = Uuid::new_v4().to_string();
println!("running test for id: {}", random);
println!("current_dir: {}", std::env::current_dir()?.display());
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
let mut dest = Vec::new();
arch.read_to_end(&mut dest).await?;
let git = Git::new(
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
ClusterList::default(),
Archive::default(),
);
let mut location = git.location.lock().await;
*location = location.join(random);
println!("into: {}", location.display());
drop(location);
git.publish(
&ArchiveFile { content: dest },
"flux-releaser-test",
"dev",
"dev",
)
.await?;
Ok(())
}
}