scufflecloud_big_bin/dataloaders/
organizations.rs

1use std::collections::{HashMap, HashSet};
2use std::time::Duration;
3
4use core_db_types::models::{Organization, OrganizationId, OrganizationMember, UserId};
5use core_db_types::schema::{organization_members, organizations};
6use diesel::{ExpressionMethods, QueryDsl, SelectableHelper};
7use diesel_async::pooled_connection::bb8;
8use diesel_async::{AsyncPgConnection, RunQueryDsl};
9use itertools::Itertools;
10use scuffle_batching::{DataLoader, DataLoaderFetcher};
11
12pub(crate) struct OrganizationLoader(bb8::Pool<AsyncPgConnection>);
13
14impl DataLoaderFetcher for OrganizationLoader {
15    type Key = OrganizationId;
16    type Value = Organization;
17
18    async fn load(&self, keys: HashSet<Self::Key>) -> Option<HashMap<Self::Key, Self::Value>> {
19        let mut conn = self
20            .0
21            .get()
22            .await
23            .map_err(|e| tracing::error!(err = %e, "failed to get connection"))
24            .ok()?;
25
26        let organizations = organizations::dsl::organizations
27            .filter(organizations::dsl::id.eq_any(keys))
28            .select(Organization::as_select())
29            .load::<Organization>(&mut conn)
30            .await
31            .map_err(|e| tracing::error!(err = %e, "failed to load organizations"))
32            .ok()?;
33
34        Some(organizations.into_iter().map(|o| (o.id, o)).collect())
35    }
36}
37
38impl OrganizationLoader {
39    pub(crate) fn new(pool: bb8::Pool<AsyncPgConnection>) -> DataLoader<Self> {
40        DataLoader::new(Self(pool), 1000, 500, Duration::from_millis(5))
41    }
42}
43
44pub(crate) struct OrganizationMemberByUserIdLoader(bb8::Pool<AsyncPgConnection>);
45
46impl DataLoaderFetcher for OrganizationMemberByUserIdLoader {
47    type Key = UserId;
48    type Value = Vec<OrganizationMember>;
49
50    async fn load(&self, keys: HashSet<Self::Key>) -> Option<HashMap<Self::Key, Self::Value>> {
51        let mut conn = self
52            .0
53            .get()
54            .await
55            .map_err(|e| tracing::error!(err = %e, "failed to get connection"))
56            .ok()?;
57
58        let organization_members = organization_members::dsl::organization_members
59            .filter(organization_members::dsl::user_id.eq_any(keys))
60            .select(OrganizationMember::as_select())
61            .load::<OrganizationMember>(&mut conn)
62            .await
63            .map_err(|e| tracing::error!(err = %e, "failed to load organization members"))
64            .ok()?;
65
66        Some(organization_members.into_iter().into_group_map_by(|m| m.user_id))
67    }
68}
69
70impl OrganizationMemberByUserIdLoader {
71    pub(crate) fn new(pool: bb8::Pool<AsyncPgConnection>) -> DataLoader<Self> {
72        DataLoader::new(Self(pool), 1000, 500, Duration::from_millis(5))
73    }
74}