From 2b095687b4c63a4e9c4516bc577cb2aa644a4916 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 13 Apr 2024 00:07:46 +0200 Subject: [PATCH] feat: add paging Signed-off-by: kjuulh --- crates/contractor/src/main.rs | 2 + crates/contractor/src/services/gitea.rs | 148 ++++++++++++++++++++---- 2 files changed, 128 insertions(+), 22 deletions(-) diff --git a/crates/contractor/src/main.rs b/crates/contractor/src/main.rs index e2f513b..4041ca8 100644 --- a/crates/contractor/src/main.rs +++ b/crates/contractor/src/main.rs @@ -81,6 +81,8 @@ async fn main() -> anyhow::Result<()> { .reconciler() .reconcile(user, org, filter, force_refresh) .await?; + + tracing::info!("done running reconcile"); } None => {} } diff --git a/crates/contractor/src/services/gitea.rs b/crates/contractor/src/services/gitea.rs index c52a085..2a39954 100644 --- a/crates/contractor/src/services/gitea.rs +++ b/crates/contractor/src/services/gitea.rs @@ -113,11 +113,13 @@ pub struct CreateGiteaWebhookConfig { } impl DefaultGiteaClient { - pub async fn fetch_user_repos(&self) -> anyhow::Result> { - //FIXME: We should collect the pages for these queries + async fn fetch_user_repos_page( + &self, + page: usize, + ) -> anyhow::Result<(Vec, Vec)> { let client = reqwest::Client::new(); - let url = format!("{}/api/v1/user/repos", self.url); + let url = format!("{}/api/v1/user/repos?page={page}&limit=50", self.url); tracing::trace!("calling url: {}", &url); @@ -128,34 +130,105 @@ impl DefaultGiteaClient { .send() .await?; + let mut pages = Vec::new(); + if page <= 1 { + if let Some(link_header) = response.headers().get("link") { + let link_str = link_header.to_str()?; + pages = parse_link(page, link_str)?; + } + } + let repositories = response.json::>().await?; - Ok(repositories - .into_iter() - .flat_map(Repository::try_from) - .collect()) + Ok(( + repositories + .into_iter() + .flat_map(Repository::try_from) + .collect(), + pages, + )) + } + + pub async fn fetch_user_repos(&self) -> anyhow::Result> { + let (mut repos, pages) = self.fetch_user_repos_page(1).await?; + + let mut tasks = FuturesUnordered::new(); + + for page in pages { + tasks.push(async move { + let (new_repos, _) = self.fetch_user_repos_page(page).await?; + + Ok::, anyhow::Error>(new_repos) + }) + } + + while let Some(new_repos) = tasks.next().await { + let mut new_repos = new_repos?; + repos.append(&mut new_repos); + } + + Ok(repos) + } + + async fn fetch_org_repos_page( + &self, + org: &str, + page: usize, + ) -> anyhow::Result<(Vec, Vec)> { + let client = reqwest::Client::new(); + + let url = format!( + "{}/api/v1/orgs/{}/repos?page={page}&limit=50", + self.url, org + ); + + tracing::trace!("calling url: {}", &url); + + let response = client + .get(&url) + .header("Content-Type", "application/json") + .header("Authorization", format!("token {}", self.token)) + .send() + .await?; + + let mut pages = Vec::new(); + if page <= 1 { + if let Some(link_header) = response.headers().get("link") { + let link_str = link_header.to_str()?; + pages = parse_link(page, link_str)?; + } + } + + let repositories = response.json::>().await?; + + Ok(( + repositories + .into_iter() + .flat_map(Repository::try_from) + .collect(), + pages, + )) } pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result> { - let client = reqwest::Client::new(); + let (mut repos, pages) = self.fetch_org_repos_page(org, 1).await?; - let url = format!("{}/api/v1/orgs/{}/repos", self.url, org); + let mut tasks = FuturesUnordered::new(); - tracing::trace!("calling url: {}", &url); + for page in pages { + tasks.push(async move { + let (new_repos, _) = self.fetch_org_repos_page(org, page).await?; - let response = client - .get(&url) - .header("Content-Type", "application/json") - .header("Authorization", format!("token {}", self.token)) - .send() - .await?; + Ok::, anyhow::Error>(new_repos) + }) + } - let repositories = response.json::>().await?; + while let Some(new_repos) = tasks.next().await { + let mut new_repos = new_repos?; + repos.append(&mut new_repos); + } - Ok(repositories - .into_iter() - .flat_map(Repository::try_from) - .collect()) + Ok(repos) } async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result> { @@ -362,10 +435,41 @@ impl traits::GiteaClient for DefaultGiteaClient { } } +// ; rel="next",; rel="last" +fn parse_link(page: usize, link_str: &str) -> anyhow::Result> { + let link_sections = link_str.split(','); + + for link_section in link_sections { + if let Some((link, rel)) = link_section.rsplit_once("; ") { + if rel == r#"rel="last""# { + let actual_link = &link[1..link.len() - 1]; + + let url = Url::parse(actual_link)?; + + if let Some(page_num) = url + .query_pairs() + .into_iter() + .find(|(name, _)| name == "page") + .map(|(_, value)| value) + { + let page_num: usize = page_num.parse()?; + + let page_numbers = (page + 1..page_num).collect::>(); + + return Ok(page_numbers); + } + } + } + } + + Ok(Vec::default()) +} + mod extensions; pub mod traits; use anyhow::Context; pub use extensions::*; -use reqwest::StatusCode; +use futures::{stream::FuturesUnordered, StreamExt}; +use reqwest::{StatusCode, Url}; use serde::{Deserialize, Serialize};