helloasso-paheko-adapter/helloasso_paheko_adapter/paheko/__init__.py

80 lines
1.9 KiB
Python

import requests
from datetime import datetime
import json
from requests.auth import HTTPBasicAuth
from requests_oauth2client import ApiClient
class PahekoClient:
BASE_URL = "https://paheko.etoiledebethleem.fr/api"
def __init__(self, client_id: str, client_secret: str):
self.auth = HTTPBasicAuth(client_id, client_secret)
self.client = ApiClient(
self.BASE_URL,
auth=self.auth
)
def sql_query(self, q):
res = self.client.post("/sql", data=q)
if res.status_code != 200:
raise ValueError("Failed to request data to Paheko API")
c = res.content.decode()
# skip first 2 lines
new_json = "[" + "\n".join(c.split("\n")[3:]) + "]"
return json.loads(new_json)
def get_services(self):
return self.sql_query('SELECT * FROM services LIMIT 5;')
def get_members(self):
return self.sql_query('SELECT * FROM services LIMIT 5;')
def get_current_membership_period(self):
services = self.get_services()
for s in services:
sd = datetime.fromisoformat(s['start_date'])
ed = datetime.fromisoformat(s['end_date'])
if sd < datetime.now() < ed:
return {
**s,
'start_date': sd,
'end_date': ed
}
return None
from dataclasses import dataclass
from typing import List, Literal, Optional, Union
class Membership:
ha_payment_id: str
ha_order_id: str
pass
class IndividualMembership(Membership):
pass
class CoupleMembership(Membership):
# relationship membership
partner_id: str
@dataclass
class PahekoMember:
tmp_id: str
paheko_id: str
first_name: str
last_name: str
address: str
postal_code: str
city: str
email: str
phone: str
birth_date: str
occupation: str
skills: str
memberships: List[Membership]