commit with legacy python

This commit is contained in:
Matthieu Bessat 2023-11-06 11:14:16 +01:00
commit d719975061
14 changed files with 1085 additions and 0 deletions

View file

View file

@ -0,0 +1,141 @@
import uuid
import csv
import fileinput
from datetime import datetime
from dataclasses import dataclass
import json
INPUT_FIELDS_MAPPING = {
"last_name": "Nom",
"first_name_1": "Prénom",
"date": "\ufeffDate",
"email_1": "Email acheteur",
"first_name_2": "Champ complémentaire 1 Prénom conjoint",
"address": "Champ complémentaire 2 ADRESSE",
"postal_code": "Champ complémentaire 3 CODE POSTAL",
"city": "Champ complémentaire 4 VILLE",
"birth_date": "Champ complémentaire 9 DATE DE NAISSANCE",
"email_2": "Champ complémentaire 6 EMAIL",
"job": "Champ complémentaire 7 PROFESSION",
"skills": "Champ complémentaire 8 CENTRE D'INTÉRÊTS / COMPÉTENCES",
"payment_method": "Moyen de paiement",
"status": "Status",
"tarif": "Tarif",
}
def get_matching_keys(src_dict, value):
return [k for k,v in src_dict.items() if v == value]
def load_csv_row_to_dict(field_mapping, first_row, row):
final_dict = {}
for i, cell in enumerate(row):
keys = get_matching_keys(field_mapping, first_row[i])
if len(keys) == 0: continue
final_dict[keys[0]] = cell
return final_dict
def get_id():
return str(uuid.uuid4()).split("-")[0]
@dataclass
class PahekoMember:
name: str
email: str
phone: str
status: str
def from_helloasso_members_csv_to_paheko_normalized():
reader = csv.reader([i for i in fileinput.input()], delimiter=";")
column_line = []
paheko_users = []
for i, line in enumerate(reader):
if i == 0:
column_line = line
continue
ha_membership: dict = load_csv_row_to_dict(INPUT_FIELDS_MAPPING, column_line, line)
def get_email(ha_ms):
if ha_ms["email_1"] == None: return ha_ms["email_2"]
if ha_ms["email_2"] == None: return ha_ms["email_1"]
if ha_ms['email_2'] != ha_ms['email_2']:
return ha_ms["email_2"]
return ha_ms["email_1"]
def format_date_time_french(raw_date):
return datetime.strptime(raw_date, "%d/%m/%Y %H:%M:%S")
def format_date_french(raw_date):
return datetime.strptime(raw_date, "%d/%m/%Y")
def format_string(subj):
return subj.strip()
def format_name(subj):
return subj[0:1].upper() + subj[1:].lower()
def format_mode(subj):
return subj
if format_string(ha_membership['status']) != "Validé":
continue
# then normalize dict
paheko_user: dict = {
'id': get_id(),
'first_name': format_name(format_string(ha_membership['first_name_1'])),
'last_name': format_name(format_string(ha_membership['last_name'])),
'mode_adhesion': format_mode(ha_membership['tarif']),
'email': format_string(get_email(ha_membership)),
'date': format_date_time_french(ha_membership['date']),
'birth_date': format_date_french(ha_membership['birth_date']) if ha_membership['birth_date'] and ha_membership['birth_date'].strip() != '' else None,
'linked_users': []
}
keys_to_copy = ['job', 'skills', 'address', 'postal_code', 'city']
for key in keys_to_copy:
if ha_membership[key].strip() == '':
paheko_user[key] = None
continue
paheko_user[key] = format_string(ha_membership[key])
linked_user = None
if ha_membership["first_name_2"].strip() != '':
# we count as two membership
linked_user = {
'id': get_id(),
'first_name': format_name(format_string(ha_membership['first_name_2'])),
'linked_users': [paheko_user['id']]
}
copy_from_parent_user = ['last_name', 'address', 'postal_code', 'city', 'date']
for k in copy_from_parent_user:
linked_user[k] = paheko_user[k]
paheko_user["linked_users"].append(linked_user['id'])
paheko_users.append(paheko_user)
if linked_user:
paheko_users.append(linked_user)
# pprint(paheko_users, sort_dicts=False)
print(json.dumps(paheko_users, sort_keys=True, default=str, indent=4))
from pprint import pprint
from helloasso_paheko_adapter.helloasso import HelloassoClient, Organization
from helloasso_paheko_adapter.paheko import PahekoClient
import vcr
def from_helloasso_payments_api_to_paheko(env, org_slug):
ha_client = HelloassoClient(env.HELLOASSO_API_CLIENT_ID, env.HELLOASSO_API_CLIENT_SECRET)
pk_client = PahekoClient(env.PAHEKO_API_CLIENT_ID, env.PAHEKO_API_CLIENT_SECRET)
ha_org = Organization(ha_client, org_slug)
# 1. get latest adhesion periode
period_id = "1" # or "fee_id"
print(pk_client.get_current_membership_period())
# 2. list payments
with vcr.use_cassette('tmp/vcr_cassettes/list_payments.yaml'):
payments = ha_org.list_payments()['data']
pprint(payments)

View file

@ -0,0 +1,9 @@
from helloasso_paheko_adapter.adapter import from_helloasso_members_csv_to_paheko_normalized
from helloasso_paheko_adapter.adapter import from_helloasso_payments_api_to_paheko
from helloasso_paheko_adapter.env import Env
def main():
# from_helloasso_members_csv_to_paheko_normalized()
env = Env()
org_slug = "l-etoile-de-bethleem-association-des-amis-de-la-chapelle-de-bethleem-d-aubevoye"
from_helloasso_payments_api_to_paheko(env, org_slug)

View file

@ -0,0 +1,19 @@
import os
class Env:
HELLOASSO_API_CLIENT_ID: str
HELLOASSO_API_CLIENT_SECRET: str
PAHEKO_API_CLIENT_ID: str
PAHEKO_API_CLIENT_SECRET: str
def __init__(self):
self.load_from_process_env()
def load_from_process_env(self):
attr_keys = list(self.__class__.__dict__.keys()) + list(self.__annotations__.keys())
for k in attr_keys:
if k.startswith('__'): continue
from_env = None
if k in os.environ:
from_env = os.environ[k]
setattr(self, k, from_env)

View file

@ -0,0 +1,43 @@
from requests_oauth2client import OAuth2Client, OAuth2ClientCredentialsAuth, ApiClient
class HelloassoAppClient:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.oauth2client = OAuth2Client(
token_endpoint="https://api.helloasso.com/oauth2/token",
client_id=self.client_id,
client_secret=self.client_secret
)
self.auth = OAuth2ClientCredentialsAuth(self.oauth2client)
self.client = ApiClient(
"https://api.helloasso.com/v5",
auth=self.auth
)
class HelloassoUserClient:
def __init__(self, email, password):
self.email = email
self.password = password
self.client = ApiClient(
"https://api.helloasso.com/v5",
headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
)
def login(self):
self.client.post("/auth/login", data={})
class Organization():
client: HelloassoClient
slug: str
def __init__(self, client, slug):
self.client = client
self.slug = slug
def list_payments(self):
res = self.client.client.get(f"/organizations/{self.slug}/payments")
# FIXME : ahndle pagination , cannot test right now because of not enought ppl
return res.json()

View file

@ -0,0 +1,80 @@
import requests
from datetime import datetime
import json
from requests.auth import HTTPBasicAuth
from requests_oauth2client import ApiClient
class PahekoClient:
BASE_URL = "https://paheko.etoiledebethleem.fr/api"
def __init__(self, client_id: str, client_secret: str):
self.auth = HTTPBasicAuth(client_id, client_secret)
self.client = ApiClient(
self.BASE_URL,
auth=self.auth
)
def sql_query(self, q):
res = self.client.post("/sql", data=q)
if res.status_code != 200:
raise ValueError("Failed to request data to Paheko API")
c = res.content.decode()
# skip first 2 lines
new_json = "[" + "\n".join(c.split("\n")[3:]) + "]"
return json.loads(new_json)
def get_services(self):
return self.sql_query('SELECT * FROM services LIMIT 5;')
def get_members(self):
return self.sql_query('SELECT * FROM services LIMIT 5;')
def get_current_membership_period(self):
services = self.get_services()
for s in services:
sd = datetime.fromisoformat(s['start_date'])
ed = datetime.fromisoformat(s['end_date'])
if sd < datetime.now() < ed:
return {
**s,
'start_date': sd,
'end_date': ed
}
return None
from dataclasses import dataclass
from typing import List, Literal, Optional, Union
class Membership:
ha_payment_id: str
ha_order_id: str
pass
class IndividualMembership(Membership):
pass
class CoupleMembership(Membership):
# relationship membership
partner_id: str
@dataclass
class PahekoMember:
tmp_id: str
paheko_id: str
first_name: str
last_name: str
address: str
postal_code: str
city: str
email: str
phone: str
birth_date: str
occupation: str
skills: str
memberships: List[Membership]

View file

@ -0,0 +1,43 @@
import os
from paheko_helloasso_adapter.helloasso import HelloassoClient, Organization
from pprint import pprint
class Env:
HELLOASSO_API_CLIENT_ID: str
HELLOASSO_API_CLIENT_SECRET: str
def __init__(self):
attr_keys = list(self.__class__.__dict__.keys()) + list(self.__annotations__.keys())
for k in attr_keys:
if k.startswith('__'): continue
from_env = None
if k in os.environ:
from_env = os.environ[k]
setattr(self, k, from_env)
def main():
env = Env()
# api_client = ApiV5Client(
# api_base='api.helloasso.com',
# client_id=env.HELLOASSO_API_CLIENT_ID,
# client_secret=env.HELLOASSO_API_CLIENT_SECRET,
# timeout=60
# )
# class OrganizationApi(object):
# def __init__(self, client):
# self._client = client
# def list(self) -> dict:
# return self._client.call(f"/organizations").json()
# def get_by_slug(self, slug: str) -> dict:
# return self._client.call(f"/organizations/{slug}").json()
# org_api = OrganizationApi(api_client)
org_slug = "l-etoile-de-bethleem-association-des-amis-de-la-chapelle-de-bethleem-d-aubevoye"
client = HelloassoClient(env.HELLOASSO_API_CLIENT_ID, env.HELLOASSO_API_CLIENT_SECRET)
o = Organization(client, org_slug)
pprint(o.list_payments())