From e9569ebf20dc619044b18664663808f21429c8dd Mon Sep 17 00:00:00 2001 From: Matthieu Bessat Date: Sat, 13 Jan 2024 17:51:23 +0100 Subject: [PATCH] feat(csv): add csv source --- Cargo.lock | 54 +++++++++++++++++ Cargo.toml | 2 + TODO.md | 10 ++++ src/main.rs | 49 +++++++++++---- src/paheko.rs | 1 - src/sync_csv.rs | 135 ++++++++++++++++++++++++++++++++++++++++++ src/sync_helloasso.rs | 13 ++-- src/sync_paheko.rs | 18 ++++-- src/utils.rs | 16 +++++ 9 files changed, 275 insertions(+), 23 deletions(-) create mode 100644 src/sync_csv.rs diff --git a/Cargo.lock b/Cargo.lock index 3b6dcd4..ea3b0ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,6 +149,37 @@ version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" +[[package]] +name = "argh" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7af5ba06967ff7214ce4c7419c7d185be7ecd6cc4965a8f6e1d8ce0398aad219" +dependencies = [ + "argh_derive", + "argh_shared", +] + +[[package]] +name = "argh_derive" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56df0aeedf6b7a2fc67d06db35b09684c3e8da0c95f8f27685cb17e08413d87a" +dependencies = [ + "argh_shared", + "proc-macro2", + "quote", + "syn 2.0.43", +] + +[[package]] +name = "argh_shared" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5693f39141bda5760ecc4111ab08da40565d1771038c4a0250f03457ec707531" +dependencies = [ + "serde", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -586,6 +617,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.6.0" @@ -1484,9 +1536,11 @@ name = "paheko_helloasso_adapter_rs" version = "0.1.0" dependencies = [ "anyhow", + "argh", "base64_light", "chrono", "clap", + "csv", "dotenvy", "email_address", "envy", diff --git a/Cargo.toml b/Cargo.toml index ce0afa8..36d865a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,5 @@ phonenumber = "0.3.3" email_address = "0.2" fully_pub = "0.1.4" base64_light = "0.1.4" +csv = "1.3.0" +argh = "0.1.12" diff --git a/TODO.md b/TODO.md index bd4ef1d..eda1acb 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,16 @@ Following the dev like rossman said, you need to split up things +# todos + +- Normalize cities +- Verify postal code +- Normalize first name and last name +- Check if a user already exists by quering the first and last name with string distance + - use `strsim` lib +- Lint, format code +- Remove uneeded deps + # schedule ## 2023-12-23 diff --git a/src/main.rs b/src/main.rs index acda87e..98d0407 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use strum::Display; use serde::{Serialize, Deserialize}; use url::Url; use fully_pub::fully_pub; +use argh::FromArgs; /// permanent config to store long-term config /// used to ingest env settings @@ -125,13 +126,11 @@ async fn get_auth_client_from_cache( } } -fn parse_normalize_phone(phone_number_opt: Option) -> Option { - let number_raw = phone_number_opt?; - - let parsed = match phonenumber::parse(Some(phonenumber::country::Id::FR), number_raw) { - Ok(r) => { - r - }, +fn parse_normalize_phone(inp: String) -> Option { + let parsed = match phonenumber::parse( + Some(phonenumber::country::Id::FR), inp + ) { + Ok(r) => r, Err(_e) => { return None; } @@ -160,7 +159,7 @@ fn get_proxy_from_url(proxy_url: &Option) -> Result Result<()> { +async fn launch_adapter(source: SourceType) -> Result<()> { dotenvy::dotenv()?; let config: Config = envy::from_env().context("Failed to load env vars")?; @@ -181,17 +180,41 @@ async fn launch_adapter() -> Result<()> { client_secret: config.paheko_client_secret.clone() }; let paheko_client: paheko::AuthentifiedClient = paheko_client.login(paheko_credentials).await?; - - sync_helloasso::sync_helloasso(&paheko_client, &config, &mut user_cache).await?; - // sync_csv::sync(&paheko_client, &config, &mut user_cache).await?; + + match source { + SourceType::Csv => sync_csv::sync_csv(&paheko_client, &config, &mut user_cache).await?, + SourceType::Helloasso => sync_helloasso::sync_helloasso(&paheko_client, &config, &mut user_cache).await? + } Ok(()) } + +#[derive(FromArgs)] +/// Members and Membership sync adaper for paheko (support Hellosso and CSV) +struct App { + /// the source of sync (CSV or helloasso) + #[argh(option, short = 'm')] + source: String, +} + +enum SourceType { + Helloasso, + Csv +} + #[tokio::main] async fn main() { - // TODO: add argument parser to have handle config file - let res = launch_adapter().await; + let app: App = argh::from_env(); + let source = match app.source.as_ref() { + "helloasso" => SourceType::Helloasso, + "csv" => SourceType::Csv, + _ => { + eprintln!("Must provide a valid source argument."); + return; + } + }; + let res = launch_adapter(source).await; match res { Err(err) => { eprintln!("Program failed, details bellow"); diff --git a/src/paheko.rs b/src/paheko.rs index fdd9c22..d48c753 100644 --- a/src/paheko.rs +++ b/src/paheko.rs @@ -285,7 +285,6 @@ impl AuthentifiedClient { .json(&payload) .send().await?; if res.status() != 200 { - dbg!(res); return Err(APIClientError::InvalidStatusCode.into()); } res.json().await.context("Sql query") diff --git a/src/sync_csv.rs b/src/sync_csv.rs new file mode 100644 index 0000000..290ab1e --- /dev/null +++ b/src/sync_csv.rs @@ -0,0 +1,135 @@ +use crate::paheko; +use crate::{ + Config, UserCache, + parse_normalize_phone, normalize_str +}; + +use anyhow::Result; +use crate::utils::parse_datetime_american; +use crate::sync_paheko::{GeneralizedAnswer, sync_paheko}; +use email_address::EmailAddress; +use chrono::prelude::Datelike; + +use std::io; + +// read csv from stdin +pub async fn sync_csv(paheko_client: &paheko::AuthentifiedClient, config: &Config, user_cache: &mut UserCache) -> Result<()> { + // raw row record directly from CSV + #[derive(Debug, serde::Deserialize)] + struct AnswerRecord { + #[serde(rename = "Date")] + date: String, + #[serde(rename = "Email acheteur")] + email: String, + #[serde(rename = "Prénom")] + first_name: String, + #[serde(rename = "Nom")] + last_name: String, + #[serde(rename = "Tarif")] + membership_mode: String, + + #[serde(rename = "Montant (€)")] + total_amount: String, + #[serde(rename = "Cotisation (€)")] + subscription_amount: String, + #[serde(rename = "Don (€)")] + donation_amount: String, + + #[serde(rename = "Champ complémentaire 1 Prénom conjoint")] + linked_user_first_name: String, + + #[serde(rename = "Champ complémentaire 2 ADRESSE")] + address: String, + #[serde(rename = "Champ complémentaire 3 CODE POSTAL")] + postal_code: String, + #[serde(rename = "Champ complémentaire 4 VILLE")] + city: String, + #[serde(rename = "Champ complémentaire 5 TÉLÉPHONE")] + phone: String, + + #[serde(rename = "Champ complémentaire 7 PROFESSION")] + job: String, + #[serde(rename = "Champ complémentaire 8 CENTRE D'INTÉRÊTS / COMPÉTENCES")] + skills: String, + #[serde(rename = "Champ complémentaire 9 DATE DE NAISSANCE")] + birth_date: String, + + #[serde(rename = "Référence")] + reference: String + } + let mut rdr = csv::Reader::from_reader(io::stdin()); + let mut generalized_answers: Vec = vec![]; + + fn process_csv_value(value: String) -> Option { + let value = normalize_str(value); + if value.is_empty() { + return None + } + return Some(value) + } + + fn process_price(value: String) -> f64 { + value + .trim() + .chars().filter(|c| c.is_numeric() || *c == '.') + .collect::() + .parse().unwrap_or(0.0) + } + + for result in rdr.deserialize() { + let record: AnswerRecord = result?; + println!("{:?}", record); + + let mut generalized_answer = GeneralizedAnswer { + first_name: Some(normalize_str(record.first_name)), + last_name: normalize_str(record.last_name), + email: process_csv_value(record.email).and_then(|s| EmailAddress::is_valid(&s).then(|| s)), + phone: process_csv_value(record.phone).and_then(|s| parse_normalize_phone(s)), + skills: process_csv_value(record.skills), + address: process_csv_value(record.address) + .expect("Expected answer to have address"), + postal_code: process_csv_value(record.postal_code) + .expect("Expected answer to have postalcode"), + city: process_csv_value(record.city) + .expect("Expected answer answer to have city"), + country: "fr".to_string(), + job: process_csv_value(record.job), + birth_year: process_csv_value(record.birth_date) + .and_then(|raw_date| parse_datetime_american(&raw_date)) + .map(|d| d.year() as u32), + inception_time: process_csv_value(record.date) + .map(|s| + parse_datetime_american(&s).expect("Record must have a valid date") + ) + .expect("Record must have a date"), + reference: format!("BP/{}", process_csv_value(record.reference).expect("Row must have reference")), // BP as Bulletin Papier + donation_amount: process_price(record.donation_amount), + subscription_amount: process_price(record.subscription_amount), // FIXME: get subscription from mode + membership_mode: serde_json::from_value(serde_json::Value::String(record.membership_mode.clone())) + .expect("Expected a membership mode to be valid"), + linked_user_first_name: process_csv_value(record.linked_user_first_name) + }; + + // apply custom user override + // this particular answer had duplicate phone and email from another answer + // if answer.id == 64756582 { + // generalized_answer.email = None; + // generalized_answer.phone = None; + // } + + generalized_answers.push(generalized_answer); + } + println!("Generated GeneralizedAnswers"); + sync_paheko( + paheko_client, + config, + user_cache, + generalized_answers, + "530", + "Papier" + ).await?; + eprintln!("CSV sync done."); + Ok(()) +} + + diff --git a/src/sync_helloasso.rs b/src/sync_helloasso.rs index 53fee44..ad9451d 100644 --- a/src/sync_helloasso.rs +++ b/src/sync_helloasso.rs @@ -8,6 +8,7 @@ use crate::sync_paheko::{GeneralizedAnswer, sync_paheko}; use anyhow::Result; use url::Url; +use email_address::EmailAddress; /// rust how to access inner enum value #[derive(Debug, PartialEq, Clone, Copy)] @@ -70,7 +71,6 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config: println!("Got {} answers to the membership form. Processing...", &answers.len()); - use email_address::*; fn choose_email(answer: &helloasso::FormAnswer) -> Option { read_custom_field(answer, HelloassoCustomFieldType::Email) .and_then(|x| { @@ -93,8 +93,11 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config: first_name: Some(normalize_str(answer.user.first_name.clone())), last_name: normalize_str(answer.user.last_name.clone()), email, - phone: parse_normalize_phone(read_custom_field(&answer, HelloassoCustomFieldType::Phone)), - skills: read_custom_field(&answer, HelloassoCustomFieldType::Skills).map(normalize_str), + phone: read_custom_field(&answer, HelloassoCustomFieldType::Phone) + .map(normalize_str) + .and_then(parse_normalize_phone), + skills: read_custom_field(&answer, HelloassoCustomFieldType::Skills) + .map(normalize_str), address: read_custom_field(&answer, HelloassoCustomFieldType::Address) .map(normalize_str) .expect("Expected ha answer to have address"), @@ -108,8 +111,8 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config: birth_year: read_custom_field(&answer, HelloassoCustomFieldType::Birthday).and_then(parse_and_get_birthday_year), inception_time: answer.order.inception_time, reference: format!("HA/{}", answer.id), - donation_amount: 0, - subscription_amount: answer.amount, + donation_amount: 0.0, + subscription_amount: f64::from(answer.amount)/100.0, membership_mode: serde_json::from_value(serde_json::Value::String(answer.mode.clone())) .expect("Expected a membership mode to be valid"), linked_user_first_name: read_custom_field(&answer, HelloassoCustomFieldType::LinkedUserFirstName) diff --git a/src/sync_paheko.rs b/src/sync_paheko.rs index 65384c2..83bfb53 100644 --- a/src/sync_paheko.rs +++ b/src/sync_paheko.rs @@ -43,8 +43,8 @@ struct GeneralizedAnswer { membership_mode: MembershipMode, inception_time: DateTime, - subscription_amount: u32, - donation_amount: u32, + subscription_amount: f64, + donation_amount: f64, reference: String, linked_user_first_name: Option @@ -52,7 +52,6 @@ struct GeneralizedAnswer { fn get_accounting_year_for_time<'a>(accounting_years: &'a Vec, time: &'a DateTime) -> Option<&'a AccountingYear> { let date_ref = time.date_naive().clone(); - dbg!("{:?}", date_ref); accounting_years.iter().find(|year| year.start_date < date_ref && date_ref < year.end_date) } @@ -66,6 +65,13 @@ pub async fn sync_paheko( ) -> Result<()> { // FIXME: search existing paheko users using the first name and last name, some ppl don't have // emails + + struct Stats { + subscriptions_created: u32, + users_created: u32 + } + + let mut stats = Stats { subscriptions_created: 0, users_created: 0 }; let mut pk_memberships: Vec = vec![]; @@ -120,6 +126,7 @@ pub async fn sync_paheko( eprintln!(" Created paheko user"); pk_next_user_id += 1; existing_users.push(c.clone()); + stats.users_created += 1; c } }; @@ -150,7 +157,7 @@ pub async fn sync_paheko( NaiveDate::from_ymd_opt(2024, 12, 31).unwrap().and_hms_opt(23, 59, 59).unwrap(), Utc ), - payed_amount: f64::from(answer.subscription_amount)/100.0, + payed_amount: answer.subscription_amount, users_ids: vec![pk_user_summary.id.clone()] }; @@ -231,8 +238,11 @@ pub async fn sync_paheko( .await.context("Expected to create new paheko transaction")?; eprintln!(" Created paheko transaction"); + stats.subscriptions_created += 1; + pk_memberships.push(pk_membership); } eprintln!("{via_name} sync done."); + eprintln!("{} subs created; {} users created", stats.subscriptions_created, stats.users_created); Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 456d1d6..4354e7e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -43,3 +43,19 @@ where .map_err(serde::de::Error::custom) } + +pub fn parse_datetime(inp: &str) -> Option> { + let date = NaiveDate::parse_from_str(inp, "%d/%m/%Y").ok()?; + Some(DateTime::::from_naive_utc_and_offset( + date.and_hms_opt(0, 0, 0).unwrap(), + Utc + )) +} + +pub fn parse_datetime_american(inp: &str) -> Option> { + let date = NaiveDate::parse_from_str(inp, "%m/%d/%Y").ok()?; + Some(DateTime::::from_naive_utc_and_offset( + date.and_hms_opt(0, 0, 0).unwrap(), + Utc + )) +}