scufflecloud_big_bin/dataloaders/
organizations.rs1use std::collections::{HashMap, HashSet};
2use std::time::Duration;
3
4use core_db_types::models::{Organization, OrganizationId, OrganizationMember, UserId};
5use core_db_types::schema::{organization_members, organizations};
6use diesel::{ExpressionMethods, QueryDsl, SelectableHelper};
7use diesel_async::pooled_connection::bb8;
8use diesel_async::{AsyncPgConnection, RunQueryDsl};
9use itertools::Itertools;
10use scuffle_batching::{DataLoader, DataLoaderFetcher};
11
12pub(crate) struct OrganizationLoader(bb8::Pool<AsyncPgConnection>);
13
14impl DataLoaderFetcher for OrganizationLoader {
15 type Key = OrganizationId;
16 type Value = Organization;
17
18 async fn load(&self, keys: HashSet<Self::Key>) -> Option<HashMap<Self::Key, Self::Value>> {
19 let mut conn = self
20 .0
21 .get()
22 .await
23 .map_err(|e| tracing::error!(err = %e, "failed to get connection"))
24 .ok()?;
25
26 let organizations = organizations::dsl::organizations
27 .filter(organizations::dsl::id.eq_any(keys))
28 .select(Organization::as_select())
29 .load::<Organization>(&mut conn)
30 .await
31 .map_err(|e| tracing::error!(err = %e, "failed to load organizations"))
32 .ok()?;
33
34 Some(organizations.into_iter().map(|o| (o.id, o)).collect())
35 }
36}
37
38impl OrganizationLoader {
39 pub(crate) fn new(pool: bb8::Pool<AsyncPgConnection>) -> DataLoader<Self> {
40 DataLoader::new(Self(pool), 1000, 500, Duration::from_millis(5))
41 }
42}
43
44pub(crate) struct OrganizationMemberByUserIdLoader(bb8::Pool<AsyncPgConnection>);
45
46impl DataLoaderFetcher for OrganizationMemberByUserIdLoader {
47 type Key = UserId;
48 type Value = Vec<OrganizationMember>;
49
50 async fn load(&self, keys: HashSet<Self::Key>) -> Option<HashMap<Self::Key, Self::Value>> {
51 let mut conn = self
52 .0
53 .get()
54 .await
55 .map_err(|e| tracing::error!(err = %e, "failed to get connection"))
56 .ok()?;
57
58 let organization_members = organization_members::dsl::organization_members
59 .filter(organization_members::dsl::user_id.eq_any(keys))
60 .select(OrganizationMember::as_select())
61 .load::<OrganizationMember>(&mut conn)
62 .await
63 .map_err(|e| tracing::error!(err = %e, "failed to load organization members"))
64 .ok()?;
65
66 Some(organization_members.into_iter().into_group_map_by(|m| m.user_id))
67 }
68}
69
70impl OrganizationMemberByUserIdLoader {
71 pub(crate) fn new(pool: bb8::Pool<AsyncPgConnection>) -> DataLoader<Self> {
72 DataLoader::new(Self(pool), 1000, 500, Duration::from_millis(5))
73 }
74}