feat(csv): add csv source

This commit is contained in:
Matthieu Bessat 2024-01-13 17:51:23 +01:00
parent b658fcd69e
commit e9569ebf20
9 changed files with 275 additions and 23 deletions

54
Cargo.lock generated
View file

@ -149,6 +149,37 @@ version = "1.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355"
[[package]]
name = "argh"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7af5ba06967ff7214ce4c7419c7d185be7ecd6cc4965a8f6e1d8ce0398aad219"
dependencies = [
"argh_derive",
"argh_shared",
]
[[package]]
name = "argh_derive"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56df0aeedf6b7a2fc67d06db35b09684c3e8da0c95f8f27685cb17e08413d87a"
dependencies = [
"argh_shared",
"proc-macro2",
"quote",
"syn 2.0.43",
]
[[package]]
name = "argh_shared"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5693f39141bda5760ecc4111ab08da40565d1771038c4a0250f03457ec707531"
dependencies = [
"serde",
]
[[package]]
name = "async-channel"
version = "1.9.0"
@ -586,6 +617,27 @@ dependencies = [
"subtle",
]
[[package]]
name = "csv"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "ctr"
version = "0.6.0"
@ -1484,9 +1536,11 @@ name = "paheko_helloasso_adapter_rs"
version = "0.1.0"
dependencies = [
"anyhow",
"argh",
"base64_light",
"chrono",
"clap",
"csv",
"dotenvy",
"email_address",
"envy",

View file

@ -23,3 +23,5 @@ phonenumber = "0.3.3"
email_address = "0.2"
fully_pub = "0.1.4"
base64_light = "0.1.4"
csv = "1.3.0"
argh = "0.1.12"

10
TODO.md
View file

@ -2,6 +2,16 @@ Following the dev
like rossman said, you need to split up things
# todos
- Normalize cities
- Verify postal code
- Normalize first name and last name
- Check if a user already exists by quering the first and last name with string distance
- use `strsim` lib
- Lint, format code
- Remove uneeded deps
# schedule
## 2023-12-23

View file

@ -14,6 +14,7 @@ use strum::Display;
use serde::{Serialize, Deserialize};
use url::Url;
use fully_pub::fully_pub;
use argh::FromArgs;
/// permanent config to store long-term config
/// used to ingest env settings
@ -125,13 +126,11 @@ async fn get_auth_client_from_cache(
}
}
fn parse_normalize_phone(phone_number_opt: Option<String>) -> Option<String> {
let number_raw = phone_number_opt?;
let parsed = match phonenumber::parse(Some(phonenumber::country::Id::FR), number_raw) {
Ok(r) => {
r
},
fn parse_normalize_phone(inp: String) -> Option<String> {
let parsed = match phonenumber::parse(
Some(phonenumber::country::Id::FR), inp
) {
Ok(r) => r,
Err(_e) => {
return None;
}
@ -160,7 +159,7 @@ fn get_proxy_from_url(proxy_url: &Option<String>) -> Result<Option<reqwest::Prox
})
}
async fn launch_adapter() -> Result<()> {
async fn launch_adapter(source: SourceType) -> Result<()> {
dotenvy::dotenv()?;
let config: Config = envy::from_env().context("Failed to load env vars")?;
@ -181,17 +180,41 @@ async fn launch_adapter() -> Result<()> {
client_secret: config.paheko_client_secret.clone()
};
let paheko_client: paheko::AuthentifiedClient = paheko_client.login(paheko_credentials).await?;
sync_helloasso::sync_helloasso(&paheko_client, &config, &mut user_cache).await?;
// sync_csv::sync(&paheko_client, &config, &mut user_cache).await?;
match source {
SourceType::Csv => sync_csv::sync_csv(&paheko_client, &config, &mut user_cache).await?,
SourceType::Helloasso => sync_helloasso::sync_helloasso(&paheko_client, &config, &mut user_cache).await?
}
Ok(())
}
#[derive(FromArgs)]
/// Members and Membership sync adaper for paheko (support Hellosso and CSV)
struct App {
/// the source of sync (CSV or helloasso)
#[argh(option, short = 'm')]
source: String,
}
enum SourceType {
Helloasso,
Csv
}
#[tokio::main]
async fn main() {
// TODO: add argument parser to have handle config file
let res = launch_adapter().await;
let app: App = argh::from_env();
let source = match app.source.as_ref() {
"helloasso" => SourceType::Helloasso,
"csv" => SourceType::Csv,
_ => {
eprintln!("Must provide a valid source argument.");
return;
}
};
let res = launch_adapter(source).await;
match res {
Err(err) => {
eprintln!("Program failed, details bellow");

View file

@ -285,7 +285,6 @@ impl AuthentifiedClient {
.json(&payload)
.send().await?;
if res.status() != 200 {
dbg!(res);
return Err(APIClientError::InvalidStatusCode.into());
}
res.json().await.context("Sql query")

135
src/sync_csv.rs Normal file
View file

@ -0,0 +1,135 @@
use crate::paheko;
use crate::{
Config, UserCache,
parse_normalize_phone, normalize_str
};
use anyhow::Result;
use crate::utils::parse_datetime_american;
use crate::sync_paheko::{GeneralizedAnswer, sync_paheko};
use email_address::EmailAddress;
use chrono::prelude::Datelike;
use std::io;
// read csv from stdin
pub async fn sync_csv(paheko_client: &paheko::AuthentifiedClient, config: &Config, user_cache: &mut UserCache) -> Result<()> {
// raw row record directly from CSV
#[derive(Debug, serde::Deserialize)]
struct AnswerRecord {
#[serde(rename = "Date")]
date: String,
#[serde(rename = "Email acheteur")]
email: String,
#[serde(rename = "Prénom")]
first_name: String,
#[serde(rename = "Nom")]
last_name: String,
#[serde(rename = "Tarif")]
membership_mode: String,
#[serde(rename = "Montant (€)")]
total_amount: String,
#[serde(rename = "Cotisation (€)")]
subscription_amount: String,
#[serde(rename = "Don (€)")]
donation_amount: String,
#[serde(rename = "Champ complémentaire 1 Prénom conjoint")]
linked_user_first_name: String,
#[serde(rename = "Champ complémentaire 2 ADRESSE")]
address: String,
#[serde(rename = "Champ complémentaire 3 CODE POSTAL")]
postal_code: String,
#[serde(rename = "Champ complémentaire 4 VILLE")]
city: String,
#[serde(rename = "Champ complémentaire 5 TÉLÉPHONE")]
phone: String,
#[serde(rename = "Champ complémentaire 7 PROFESSION")]
job: String,
#[serde(rename = "Champ complémentaire 8 CENTRE D'INTÉRÊTS / COMPÉTENCES")]
skills: String,
#[serde(rename = "Champ complémentaire 9 DATE DE NAISSANCE")]
birth_date: String,
#[serde(rename = "Référence")]
reference: String
}
let mut rdr = csv::Reader::from_reader(io::stdin());
let mut generalized_answers: Vec<GeneralizedAnswer> = vec![];
fn process_csv_value(value: String) -> Option<String> {
let value = normalize_str(value);
if value.is_empty() {
return None
}
return Some(value)
}
fn process_price(value: String) -> f64 {
value
.trim()
.chars().filter(|c| c.is_numeric() || *c == '.')
.collect::<String>()
.parse().unwrap_or(0.0)
}
for result in rdr.deserialize() {
let record: AnswerRecord = result?;
println!("{:?}", record);
let mut generalized_answer = GeneralizedAnswer {
first_name: Some(normalize_str(record.first_name)),
last_name: normalize_str(record.last_name),
email: process_csv_value(record.email).and_then(|s| EmailAddress::is_valid(&s).then(|| s)),
phone: process_csv_value(record.phone).and_then(|s| parse_normalize_phone(s)),
skills: process_csv_value(record.skills),
address: process_csv_value(record.address)
.expect("Expected answer to have address"),
postal_code: process_csv_value(record.postal_code)
.expect("Expected answer to have postalcode"),
city: process_csv_value(record.city)
.expect("Expected answer answer to have city"),
country: "fr".to_string(),
job: process_csv_value(record.job),
birth_year: process_csv_value(record.birth_date)
.and_then(|raw_date| parse_datetime_american(&raw_date))
.map(|d| d.year() as u32),
inception_time: process_csv_value(record.date)
.map(|s|
parse_datetime_american(&s).expect("Record must have a valid date")
)
.expect("Record must have a date"),
reference: format!("BP/{}", process_csv_value(record.reference).expect("Row must have reference")), // BP as Bulletin Papier
donation_amount: process_price(record.donation_amount),
subscription_amount: process_price(record.subscription_amount), // FIXME: get subscription from mode
membership_mode: serde_json::from_value(serde_json::Value::String(record.membership_mode.clone()))
.expect("Expected a membership mode to be valid"),
linked_user_first_name: process_csv_value(record.linked_user_first_name)
};
// apply custom user override
// this particular answer had duplicate phone and email from another answer
// if answer.id == 64756582 {
// generalized_answer.email = None;
// generalized_answer.phone = None;
// }
generalized_answers.push(generalized_answer);
}
println!("Generated GeneralizedAnswers");
sync_paheko(
paheko_client,
config,
user_cache,
generalized_answers,
"530",
"Papier"
).await?;
eprintln!("CSV sync done.");
Ok(())
}

View file

@ -8,6 +8,7 @@ use crate::sync_paheko::{GeneralizedAnswer, sync_paheko};
use anyhow::Result;
use url::Url;
use email_address::EmailAddress;
/// rust how to access inner enum value
#[derive(Debug, PartialEq, Clone, Copy)]
@ -70,7 +71,6 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config:
println!("Got {} answers to the membership form. Processing...", &answers.len());
use email_address::*;
fn choose_email(answer: &helloasso::FormAnswer) -> Option<String> {
read_custom_field(answer, HelloassoCustomFieldType::Email)
.and_then(|x| {
@ -93,8 +93,11 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config:
first_name: Some(normalize_str(answer.user.first_name.clone())),
last_name: normalize_str(answer.user.last_name.clone()),
email,
phone: parse_normalize_phone(read_custom_field(&answer, HelloassoCustomFieldType::Phone)),
skills: read_custom_field(&answer, HelloassoCustomFieldType::Skills).map(normalize_str),
phone: read_custom_field(&answer, HelloassoCustomFieldType::Phone)
.map(normalize_str)
.and_then(parse_normalize_phone),
skills: read_custom_field(&answer, HelloassoCustomFieldType::Skills)
.map(normalize_str),
address: read_custom_field(&answer, HelloassoCustomFieldType::Address)
.map(normalize_str)
.expect("Expected ha answer to have address"),
@ -108,8 +111,8 @@ pub async fn sync_helloasso(paheko_client: &paheko::AuthentifiedClient, config:
birth_year: read_custom_field(&answer, HelloassoCustomFieldType::Birthday).and_then(parse_and_get_birthday_year),
inception_time: answer.order.inception_time,
reference: format!("HA/{}", answer.id),
donation_amount: 0,
subscription_amount: answer.amount,
donation_amount: 0.0,
subscription_amount: f64::from(answer.amount)/100.0,
membership_mode: serde_json::from_value(serde_json::Value::String(answer.mode.clone()))
.expect("Expected a membership mode to be valid"),
linked_user_first_name: read_custom_field(&answer, HelloassoCustomFieldType::LinkedUserFirstName)

View file

@ -43,8 +43,8 @@ struct GeneralizedAnswer {
membership_mode: MembershipMode,
inception_time: DateTime<Utc>,
subscription_amount: u32,
donation_amount: u32,
subscription_amount: f64,
donation_amount: f64,
reference: String,
linked_user_first_name: Option<String>
@ -52,7 +52,6 @@ struct GeneralizedAnswer {
fn get_accounting_year_for_time<'a>(accounting_years: &'a Vec<AccountingYear>, time: &'a DateTime<Utc>) -> Option<&'a AccountingYear> {
let date_ref = time.date_naive().clone();
dbg!("{:?}", date_ref);
accounting_years.iter().find(|year| year.start_date < date_ref && date_ref < year.end_date)
}
@ -66,6 +65,13 @@ pub async fn sync_paheko(
) -> Result<()> {
// FIXME: search existing paheko users using the first name and last name, some ppl don't have
// emails
struct Stats {
subscriptions_created: u32,
users_created: u32
}
let mut stats = Stats { subscriptions_created: 0, users_created: 0 };
let mut pk_memberships: Vec<paheko::Membership> = vec![];
@ -120,6 +126,7 @@ pub async fn sync_paheko(
eprintln!(" Created paheko user");
pk_next_user_id += 1;
existing_users.push(c.clone());
stats.users_created += 1;
c
}
};
@ -150,7 +157,7 @@ pub async fn sync_paheko(
NaiveDate::from_ymd_opt(2024, 12, 31).unwrap().and_hms_opt(23, 59, 59).unwrap(),
Utc
),
payed_amount: f64::from(answer.subscription_amount)/100.0,
payed_amount: answer.subscription_amount,
users_ids: vec![pk_user_summary.id.clone()]
};
@ -231,8 +238,11 @@ pub async fn sync_paheko(
.await.context("Expected to create new paheko transaction")?;
eprintln!(" Created paheko transaction");
stats.subscriptions_created += 1;
pk_memberships.push(pk_membership);
}
eprintln!("{via_name} sync done.");
eprintln!("{} subs created; {} users created", stats.subscriptions_created, stats.users_created);
Ok(())
}

View file

@ -43,3 +43,19 @@ where
.map_err(serde::de::Error::custom)
}
pub fn parse_datetime(inp: &str) -> Option<DateTime<Utc>> {
let date = NaiveDate::parse_from_str(inp, "%d/%m/%Y").ok()?;
Some(DateTime::<Utc>::from_naive_utc_and_offset(
date.and_hms_opt(0, 0, 0).unwrap(),
Utc
))
}
pub fn parse_datetime_american(inp: &str) -> Option<DateTime<Utc>> {
let date = NaiveDate::parse_from_str(inp, "%m/%d/%Y").ok()?;
Some(DateTime::<Utc>::from_naive_utc_and_offset(
date.and_hms_opt(0, 0, 0).unwrap(),
Utc
))
}