From 7f732207537d75884e18e627f35688436e764ae6 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 13 Apr 2024 00:19:43 +0200 Subject: [PATCH] feat: make into stream instead Signed-off-by: kjuulh --- crates/contractor/src/services/gitea.rs | 41 +++++++++++-------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/crates/contractor/src/services/gitea.rs b/crates/contractor/src/services/gitea.rs index 2a39954..b531e94 100644 --- a/crates/contractor/src/services/gitea.rs +++ b/crates/contractor/src/services/gitea.rs @@ -150,24 +150,21 @@ impl DefaultGiteaClient { } pub async fn fetch_user_repos(&self) -> anyhow::Result> { - let (mut repos, pages) = self.fetch_user_repos_page(1).await?; + let (repos, pages) = self.fetch_user_repos_page(1).await?; - let mut tasks = FuturesUnordered::new(); - - for page in pages { - tasks.push(async move { + let tasks = pages + .into_iter() + .map(|page| async move { let (new_repos, _) = self.fetch_user_repos_page(page).await?; Ok::, anyhow::Error>(new_repos) }) - } + .collect::>(); - while let Some(new_repos) = tasks.next().await { - let mut new_repos = new_repos?; - repos.append(&mut new_repos); - } + let res: Result>, anyhow::Error> = tasks.try_collect().await; + let res = res?.into_iter().flatten(); - Ok(repos) + Ok(repos.into_iter().chain(res).collect()) } async fn fetch_org_repos_page( @@ -211,24 +208,21 @@ impl DefaultGiteaClient { } pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result> { - let (mut repos, pages) = self.fetch_org_repos_page(org, 1).await?; + let (repos, pages) = self.fetch_org_repos_page(org, 1).await?; - let mut tasks = FuturesUnordered::new(); - - for page in pages { - tasks.push(async move { + let tasks = pages + .into_iter() + .map(|page| async move { let (new_repos, _) = self.fetch_org_repos_page(org, page).await?; Ok::, anyhow::Error>(new_repos) }) - } + .collect::>(); - while let Some(new_repos) = tasks.next().await { - let mut new_repos = new_repos?; - repos.append(&mut new_repos); - } + let res: Result>, anyhow::Error> = tasks.try_collect().await; + let res = res?.into_iter().flatten(); - Ok(repos) + Ok(repos.into_iter().chain(res).collect()) } async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result> { @@ -470,6 +464,7 @@ pub mod traits; use anyhow::Context; pub use extensions::*; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use itertools::Itertools; use reqwest::{StatusCode, Url}; use serde::{Deserialize, Serialize};