Initial commit: Family Planner application

Complete family planning application with:
- React frontend with TypeScript
- Node.js/Express backend with TypeScript
- Python ingestion service for document processing
- Planning ingestion service with LLM integration
- Shared UI components and type definitions
- OAuth integration for calendar synchronization
- Comprehensive documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
philippe
2025-10-14 10:43:33 +02:00
commit fdd72c1135
239 changed files with 44160 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "family-planner-ingestion"
version = "0.1.0"
description = "OCR and planning ingestion service for Family Planner Hub"
authors = [{ name = "Family Planner" }]
dependencies = [
"fastapi>=0.110.0",
"pydantic>=2.6.0",
"uvicorn[standard]>=0.30.0",
"python-multipart>=0.0.9",
"pillow>=10.0.0",
"pdfplumber>=0.11.0",
"openpyxl>=3.1.0"\n "pymupdf>=1.24.0"\n]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"ruff>=0.5.0",
"httpx>=0.27.0"
]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
minversion = "8.0"
addopts = "-q"
testpaths = ["tests"]

View File

@@ -0,0 +1 @@
# Package marker for ingestion-service

View File

@@ -0,0 +1,16 @@
Metadata-Version: 2.4
Name: family-planner-ingestion
Version: 0.1.0
Summary: OCR and planning ingestion service for Family Planner Hub
Author: Family Planner
Requires-Dist: fastapi>=0.110.0
Requires-Dist: pydantic>=2.6.0
Requires-Dist: uvicorn[standard]>=0.30.0
Requires-Dist: python-multipart>=0.0.9
Requires-Dist: pillow>=10.0.0
Requires-Dist: pdfplumber>=0.11.0
Requires-Dist: openpyxl>=3.1.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0.0; extra == "dev"
Requires-Dist: ruff>=0.5.0; extra == "dev"
Requires-Dist: httpx>=0.27.0; extra == "dev"

View File

@@ -0,0 +1,15 @@
pyproject.toml
src/__init__.py
src/family_planner_ingestion.egg-info/PKG-INFO
src/family_planner_ingestion.egg-info/SOURCES.txt
src/family_planner_ingestion.egg-info/dependency_links.txt
src/family_planner_ingestion.egg-info/requires.txt
src/family_planner_ingestion.egg-info/top_level.txt
src/ingestion/__init__.py
src/ingestion/main.py
src/ingestion/schemas.py
src/ingestion/pipelines/__init__.py
src/ingestion/pipelines/image.py
src/ingestion/pipelines/pdf.py
src/ingestion/pipelines/spreadsheet.py
tests/test_health.py

View File

@@ -0,0 +1,12 @@
fastapi>=0.110.0
pydantic>=2.6.0
uvicorn[standard]>=0.30.0
python-multipart>=0.0.9
pillow>=10.0.0
pdfplumber>=0.11.0
openpyxl>=3.1.0
[dev]
pytest>=8.0.0
ruff>=0.5.0
httpx>=0.27.0

View File

@@ -0,0 +1,2 @@
__init__
ingestion

View File

@@ -0,0 +1 @@
# Ingestion service package init

View File

@@ -0,0 +1,4 @@
{
"OPENAI_API_KEY": "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",
"INGESTION_OPENAI_MODEL": "gpt-4o"
}

View File

@@ -0,0 +1,107 @@
from fastapi import FastAPI, UploadFile, File, Form, Body, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from .schemas import IngestionRequest, IngestionResponse
from .pipelines import parse_document
import os
import json
from pathlib import Path
app = FastAPI(title="Family Planner Ingestion", version="0.1.0")
# Security: Restrict CORS based on environment
# In production, set ALLOWED_ORIGINS environment variable to your domain
_env = os.getenv("NODE_ENV", "development")
_allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:5173,http://localhost:5000").split(",")
if _env == "production":
# Production: strict CORS
app.add_middleware(
CORSMiddleware,
allow_origins=_allowed_origins,
allow_methods=["GET", "POST"],
allow_headers=["Content-Type"],
allow_credentials=False,
)
else:
# Development: permissive (but still logged)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/ingest", response_model=IngestionResponse)
async def ingest(
schedule_id: str = Form(...),
child_id: str = Form(...),
file: UploadFile = File(...),
) -> IngestionResponse:
print(f"[ingestion] Received request: schedule_id={schedule_id}, child_id={child_id}, filename={file.filename}, content_type={file.content_type}")
request = IngestionRequest(schedule_id=schedule_id, child_id=child_id, filename=file.filename)
# TODO: store and stream file content to OCR
activities = await parse_document(file)
print(f"[ingestion] Parsed {len(activities)} activities")
return IngestionResponse(schedule_id=request.schedule_id, status="completed", activities=activities)
# --- Simple runtime configuration management (store API keys once) ---
CONFIG_PATH = Path(__file__).resolve().parent / "config.json"
def _load_config() -> dict:
if CONFIG_PATH.exists():
try:
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
return {}
def _save_config(cfg: dict) -> None:
CONFIG_PATH.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
def _apply_config_to_env(cfg: dict) -> None:
if "OPENAI_API_KEY" in cfg and cfg["OPENAI_API_KEY"]:
os.environ["OPENAI_API_KEY"] = cfg["OPENAI_API_KEY"]
if "INGESTION_OPENAI_MODEL" in cfg and cfg["INGESTION_OPENAI_MODEL"]:
os.environ["INGESTION_OPENAI_MODEL"] = cfg["INGESTION_OPENAI_MODEL"]
# Load config on startup
_apply_config_to_env(_load_config())
@app.get("/config")
async def get_config() -> dict:
cfg = _load_config()
return {
"openaiConfigured": bool(cfg.get("OPENAI_API_KEY") or os.getenv("OPENAI_API_KEY")),
"model": cfg.get("INGESTION_OPENAI_MODEL") or os.getenv("INGESTION_OPENAI_MODEL", "gpt-4o"),
}
@app.post("/config/openai")
async def set_openai_config(api_key: str = Body(..., embed=True), model: str | None = Body(None)) -> dict:
# Security: Disable this endpoint in production
# In production, configure API keys via environment variables only
if _env == "production":
raise HTTPException(
status_code=403,
detail="Configuration endpoint disabled in production. Use environment variables instead."
)
cfg = _load_config()
cfg["OPENAI_API_KEY"] = api_key
if model:
cfg["INGESTION_OPENAI_MODEL"] = model
_save_config(cfg)
_apply_config_to_env(cfg)
return {"ok": True}

View File

@@ -0,0 +1,24 @@
from fastapi import UploadFile
from .pdf import parse_pdf
from .image import parse_image
from .spreadsheet import parse_spreadsheet
from .csvfile import parse_csv
from .jsonfile import parse_jsonfile
from ..schemas import ActivitySchema
async def parse_document(file: UploadFile) -> list[ActivitySchema]:
content_type = file.content_type or ""
name = (file.filename or "").lower()
if content_type in {"application/pdf"} or name.endswith(".pdf"):
return await parse_pdf(file)
if content_type.startswith("image/") or name.endswith((".png", ".jpg", ".jpeg", ".webp")):
return await parse_image(file)
if name.endswith((".xls", ".xlsx")):
return await parse_spreadsheet(file)
if name.endswith(".csv"):
return await parse_csv(file)
if name.endswith(".json"):
return await parse_jsonfile(file)
return []

View File

@@ -0,0 +1,66 @@
from typing import List
from fastapi import UploadFile
from ..schemas import ActivitySchema
import csv
import io
from datetime import datetime
def _to_dt(value) -> datetime:
if isinstance(value, datetime):
return value
for fmt in ("%Y-%m-%d %H:%M", "%d/%m/%Y %H:%M", "%H:%M %d/%m/%Y", "%Y-%m-%d", "%d/%m/%Y"):
try:
return datetime.strptime(str(value), fmt)
except Exception:
pass
# time only
try:
t = datetime.strptime(str(value), "%H:%M").time()
today = datetime.now().strftime("%Y-%m-%d")
return datetime.fromisoformat(f"{today}T{t.strftime('%H:%M')}")
except Exception:
return datetime.now()
async def parse_csv(file: UploadFile) -> List[ActivitySchema]:
content = await file.read()
text = content.decode("utf-8", errors="replace")
f = io.StringIO(text)
reader = csv.DictReader(f)
# Header aliases
def get(row, *names):
for n in names:
if n in row and row[n] not in (None, ""):
return row[n]
return None
activities: List[ActivitySchema] = []
for row in reader:
title = get(row, "title", "intitule", "activity", "activite")
if not title:
continue
category = (get(row, "category", "categorie", "type") or "other").lower()
start_val = get(row, "startDateTime", "start_date", "start", "debut", "start_time")
end_val = get(row, "endDateTime", "end_date", "end", "fin", "end_time")
location = get(row, "location", "lieu")
notes = get(row, "notes", "commentaire")
start_dt = _to_dt(start_val) if start_val else datetime.now()
end_dt = _to_dt(end_val) if end_val else start_dt
if category not in {"school", "sport", "medical", "event", "other"}:
category = "other"
activities.append(
ActivitySchema(
title=str(title),
category=category, # type: ignore[arg-type]
start_date=start_dt,
end_date=end_dt,
location=str(location) if location is not None else None,
notes=str(notes) if notes is not None else None,
confidence=0.7,
)
)
return activities

View File

@@ -0,0 +1,350 @@
from typing import List
from fastapi import UploadFile
from ..schemas import ActivitySchema
import os
import json
import base64
import datetime as dt
import urllib.request
import urllib.error
import re
from PIL import Image
import io
def _local_ocr_parse(img_bytes: bytes) -> list:
try:
import pytesseract # type: ignore
except Exception:
return []
try:
img = Image.open(io.BytesIO(img_bytes)) # type: ignore
except Exception:
return []
try:
text = pytesseract.image_to_string(img, lang="fra+eng")
except Exception:
text = ""
if not text.strip():
return []
# Simple heuristic: lines like "Mardi 08:00 - 10:00 Piscine"
items = []
day_map = {
"lundi": 0, "mardi": 1, "mercredi": 2, "jeudi": 3, "vendredi": 4, "samedi": 5, "dimanche": 6,
"lun": 0, "mar": 1, "mer": 2, "jeu": 3, "ven": 4, "sam": 5, "dim": 6
}
time_re = re.compile(r"(?P<day>\b(?:lun\w*|mar\w*|mer\w*|jeu\w*|ven\w*|sam\w*|dim\w*)\b).*?(?P<start>\d{1,2}:\d{2}).{0,5}[-à].{0,5}(?P<end>\d{1,2}:\d{2}).{0,10}(?P<title>[A-Za-zÀ-ÿ\- ']+)", re.IGNORECASE)
for line in text.splitlines():
m = time_re.search(line)
if not m:
continue
day = (m.group("day") or "").lower()
start = m.group("start")
end = m.group("end")
title = m.group("title").strip()
if not title:
title = "Activité"
today = dt.datetime.now()
# Align to the week containing today
weekday = today.weekday()
target = day_map.get(day, weekday)
# Compute date for target day in current week
delta = target - weekday
d = today + dt.timedelta(days=delta)
start_iso = f"{d.strftime('%Y-%m-%d')}T{start}"
end_iso = f"{d.strftime('%Y-%m-%d')}T{end}"
items.append({
"title": title,
"category": "other",
"start_date": start_iso,
"end_date": end_iso,
"confidence": 0.65
})
return items
def _map_category(raw: str) -> str:
value = (raw or "").strip().lower()
if value in {"school", "ecole", "scolaire", "classe", "cours"}:
return "school"
if value in {"sport", "sports", "entrainement", "entrainement sportif"}:
return "sport"
if value in {"medical", "medecin", "dentiste", "sante"}:
return "medical"
if value in {"event", "evenement", "sortie", "anniversaire", "rdv", "rendez-vous"}:
return "event"
return "other"
def _extract_activities_json(text: str) -> list:
"""Parse either a JSON array or an object containing activities/events."""
try:
data = json.loads(text)
if isinstance(data, list):
return data
if isinstance(data, dict):
for key in ("activities", "events", "items", "data"):
if isinstance(data.get(key), list):
return data[key]
# fallthrough: try bracket slice
except Exception:
pass
start = text.find("[")
end = text.rfind("]")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except Exception:
return []
return []
async def parse_image(file: UploadFile) -> List[ActivitySchema]:
content = await file.read()
print(f"[image] Starting parse_image, file size: {len(content)} bytes")
# === PHASE 1: ULTRA OCR with Context-Aware Parsing ===
try:
from .ultra_ocr import parse_image_ultra
print("[image] Using ULTRA OCR parser with context-aware analysis")
local_items, local_score, metadata = parse_image_ultra(content)
print(f"[image] ULTRA OCR returned {len(local_items)} items with score {local_score:.2f}")
print(f"[image] Metadata: {metadata}")
except Exception as e:
print(f"[image] ULTRA OCR failed: {e}, falling back to enhanced OCR")
try:
from .local_ocr_enhanced import parse_image_enhanced
print("[image] Using enhanced local OCR parser with scoring")
local_items, local_score = parse_image_enhanced(content)
print(f"[image] Enhanced OCR returned {len(local_items)} items with score {local_score:.2f}")
except Exception as e2:
print(f"[image] Enhanced OCR also failed: {e2}, using basic OCR")
local_items = _local_ocr_parse(content)
local_score = 0.5 if local_items else 0.0
print(f"[image] Basic OCR returned {len(local_items)} items, score {local_score:.2f}")
# === DECISION: Accept local parsing or fallback to GPT? ===
# Ultra OCR is very reliable, so we can use a lower threshold
MIN_SCORE_THRESHOLD = 0.60
if local_score >= MIN_SCORE_THRESHOLD and local_items:
print(f"[image] OK Local score {local_score:.2f} >= {MIN_SCORE_THRESHOLD}, accepting local results")
activities: List[ActivitySchema] = []
for it in local_items:
try:
start = it.get("start_date")
end = it.get("end_date")
activities.append(
ActivitySchema(
title=str(it.get("title") or "Activité"),
category=_map_category(str(it.get("category") or "other")),
start_date=dt.datetime.fromisoformat(start),
end_date=dt.datetime.fromisoformat(end),
location=it.get("location"),
notes=it.get("notes"),
confidence=float(it.get("confidence") or local_score),
)
)
except Exception as e:
print(f"[image] Failed to parse activity: {e}")
continue
if activities:
print(f"[image] Returning {len(activities)} activities from LOCAL parsing (no GPT needed)")
return activities
# === PHASE 2: GPT Fallback (only if local score < threshold) ===
print(f"[image] SKIP Local score {local_score:.2f} < {MIN_SCORE_THRESHOLD}, falling back to GPT")
api_key = os.getenv("OPENAI_API_KEY")
print(f"[image] OpenAI API key configured: {bool(api_key)}")
if not api_key:
print("[image] No API key, returning empty list")
return []
mime = file.content_type or "image/png"
data_url = f"data:{mime};base64,{base64.b64encode(content).decode('utf-8')}"
prompt = """Tu es un expert en analyse de plannings scolaires et professionnels. Analyse cette image de planning et extrait TOUTES les activités avec une PRÉCISION MAXIMALE.
🎯 CONTEXTE DE PLANNING:
- Type détecté: PLANNING HEBDOMADAIRE ou MENSUEL
- Structure typique: grille avec jours/dates en colonnes, horaires en lignes
- Peut être manuscrit, imprimé, ou mixte
- Peut contenir des abréviations (Math, Fran, EPS, etc.)
📋 FORMAT DE SORTIE OBLIGATOIRE:
Retourne UNIQUEMENT un tableau JSON valide, sans texte avant ou après.
Format: [{"title": "...", "category": "...", "start_date": "...", "end_date": "...", "location": "...", "notes": "..."}]
📝 CHAMPS REQUIS (tous obligatoires):
1. **title**: Nom EXACT de l'activité tel qu'écrit
- Exemples: "Mathématiques", "Français", "Réunion parents-profs", "Natation"
- Si abrégé, développe intelligemment (Math → Mathématiques, Fran → Français)
2. **category**: Classification intelligente (UN SEUL MOT parmi):
- "school": matières scolaires (maths, français, histoire, sciences, etc.)
- "sport": activités sportives (EPS, natation, foot, gymnastique, etc.)
- "medical": santé (médecin, dentiste, infirmerie, vaccin, etc.)
- "event": événements (sortie, spectacle, réunion, anniversaire, etc.)
- "other": tout le reste (pause, repas, temps libre, etc.)
3. **start_date**: Date/heure de début au format ISO 8601 COMPLET
- Format: "YYYY-MM-DDTHH:MM" (ex: "2025-10-14T08:00")
- MÉTHODE DE DÉDUCTION:
a) Si le planning indique "Semaine du 13 au 17 oct" ou "Lundi 14/10":
→ Calcule la date exacte (2025-10-14 pour lundi 14 oct)
b) Si seulement le jour est visible (Lundi, Mardi...):
→ Utilise la semaine ACTUELLE (aujourd'hui = 13 oct 2025)
→ Lundi = 2025-10-14, Mardi = 2025-10-15, etc.
c) Si l'heure est "8h30" ou "08:30":
→ Formate en "08:30"
4. **end_date**: Date/heure de fin (même format)
- Si visible: utilise l'heure indiquée
- Si non visible: ajoute 1h à start_date par défaut
- Si "8h30-10h00": end_date = "2025-10-14T10:00"
5. **location**: Lieu EXACT si visible
- Exemples: "Salle 203", "Gymnase", "Cour de récréation"
- Si non visible: null
6. **notes**: Informations complémentaires si visibles
- Exemples: "Prof: M. Dupont", "Apporter maillot", "Contrôle surprise"
- Si non visible: null
🔍 INSTRUCTIONS DÉTAILLÉES:
A. LECTURE DE LA STRUCTURE:
1. Identifie le type: hebdomadaire (5-7 jours) ou mensuel (tout le mois)
2. Repère l'en-tête: dates, jours de la semaine
3. Repère les horaires: colonne de gauche généralement
4. Lis CHAQUE cellule, même vides ou rayées
B. EXTRACTION SYSTÉMATIQUE:
1. Parcours TOUTES les cellules ligne par ligne
2. Pour CHAQUE activité trouvée:
- Identifie le jour/date de la colonne
- Identifie l'horaire de la ligne
- Lis le titre exact (ne pas inventer)
- Déduis la catégorie intelligemment
- Calcule les dates ISO complètes
3. N'oublie AUCUNE activité, même répétitives
C. GESTION DES CAS PARTICULIERS:
- Texte flou/illisible: fais de ton mieux, note "(illisible)" dans notes
- Abréviations: développe logiquement (Math→Mathématiques, mais pas EPS→Éducation Physique, garde "EPS")
- Cellules fusionnées: une seule activité sur plusieurs créneaux
- Planning manuscrit: déchiffre l'écriture au mieux
📊 EXEMPLE COMPLET (planning hebdomadaire):
Planning: "Semaine du 14 au 18 octobre 2025"
Lundi colonne 1: "8h30-10h00 Mathématiques Salle 12"
Extraction:
[
{
"title": "Mathématiques",
"category": "school",
"start_date": "2025-10-14T08:30",
"end_date": "2025-10-14T10:00",
"location": "Salle 12",
"notes": null
}
]
⚠️ RÈGLES ABSOLUES:
- Retourne UNIQUEMENT le JSON (pas de ```json, pas de texte explicatif)
- Extrais TOUT, ne laisse rien de côté
- Les dates doivent être cohérentes et réalistes
- Si doute sur une info, mets null plutôt que d'inventer
- Privilégie la précision à la quantité
🚀 C'est parti ! Analyse l'image et retourne le JSON complet."""
def call_model(model: str) -> list:
body = {
"model": model,
"messages": [
{"role": "system", "content": "Tu es un extracteur de planning fiable et concis."},
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": data_url}},
],
},
],
"temperature": 0.1,
}
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=json.dumps(body).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
raw = resp.read().decode("utf-8")
data = json.loads(raw)
content_text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "[]")
)
print(f"[image] GPT response (first 500 chars): {content_text[:500]}")
return _extract_activities_json(content_text)
except Exception as e:
print(f"[image] OpenAI API call failed: {type(e).__name__}: {str(e)}")
return []
primary_model = os.getenv("INGESTION_OPENAI_MODEL", "gpt-4o")
fallback_model = os.getenv("INGESTION_OPENAI_FALLBACK_MODEL", "gpt-4o-mini")
print(f"[image] Calling OpenAI with model: {primary_model}")
items = call_model(primary_model)
print(f"[image] Primary model returned {len(items)} items")
if not items:
print(f"[image] Trying fallback model: {fallback_model}")
items = call_model(fallback_model)
print(f"[image] Fallback model returned {len(items)} items")
activities: List[ActivitySchema] = []
for it in items:
try:
title = str(it.get("title") or it.get("titre") or it.get("activity") or "Activité")
category = _map_category(str(it.get("category") or it.get("categorie") or it.get("type") or "other"))
start = str(it.get("start_date") or it.get("start") or it.get("debut") or it.get("start_time") or "")
end = str(it.get("end_date") or it.get("end") or it.get("fin") or it.get("end_time") or start)
location = it.get("location")
notes = it.get("notes")
def to_iso(x: str) -> str:
try:
# allow time-only or date-only or datetime; fallback to now
if len(x) == 5 and ":" in x:
today = dt.datetime.now().strftime("%Y-%m-%d")
return f"{today}T{x}"
if len(x) == 10:
return f"{x}T08:00"
return x
except Exception:
return dt.datetime.now().replace(microsecond=0).isoformat()
activities.append(
ActivitySchema(
title=title,
category=category, # type: ignore[arg-type]
start_date=dt.datetime.fromisoformat(to_iso(start)),
end_date=dt.datetime.fromisoformat(to_iso(end)),
location=str(location) if location is not None else None,
notes=str(notes) if notes is not None else None,
confidence=0.75,
)
)
except Exception:
continue
return activities

View File

@@ -0,0 +1,64 @@
from typing import List
from fastapi import UploadFile
from ..schemas import ActivitySchema
import json
from datetime import datetime
def _to_dt(value) -> datetime:
if isinstance(value, datetime):
return value
s = str(value)
try:
return datetime.fromisoformat(s.replace("Z", ""))
except Exception:
pass
# time-only case
try:
t = datetime.strptime(s, "%H:%M").time()
today = datetime.now().strftime("%Y-%m-%d")
return datetime.fromisoformat(f"{today}T{t.strftime('%H:%M')}")
except Exception:
return datetime.now()
async def parse_jsonfile(file: UploadFile) -> List[ActivitySchema]:
content = await file.read()
data = json.loads(content.decode("utf-8", errors="replace"))
items = []
if isinstance(data, list):
items = data
elif isinstance(data, dict):
for key in ("activities", "events", "items", "data"):
if isinstance(data.get(key), list):
items = data[key]
break
activities: List[ActivitySchema] = []
for it in items:
try:
title = str(it.get("title") or it.get("titre") or it.get("activity"))
if not title:
continue
category = (it.get("category") or it.get("categorie") or it.get("type") or "other").lower()
start = it.get("startDateTime") or it.get("start_date") or it.get("start") or it.get("debut") or it.get("start_time")
end = it.get("endDateTime") or it.get("end_date") or it.get("end") or it.get("fin") or it.get("end_time")
start_dt = _to_dt(start) if start else datetime.now()
end_dt = _to_dt(end) if end else start_dt
if category not in {"school", "sport", "medical", "event", "other"}:
category = "other"
activities.append(
ActivitySchema(
title=title,
category=category, # type: ignore[arg-type]
start_date=start_dt,
end_date=end_dt,
location=(it.get("location") or None),
notes=(it.get("notes") or None),
confidence=0.8
)
)
except Exception:
continue
return activities

View File

@@ -0,0 +1,285 @@
"""
Parser OCR local amélioré avec scoring de confiance
Utilise pytesseract + heuristiques de planning scolaire
"""
from typing import List, Dict, Tuple, Optional
import re
from datetime import datetime, timedelta
import io
from PIL import Image
# import numpy as np # Disabled for now due to compatibility issues
def deskew_image(img: Image.Image) -> Image.Image:
"""
Corrige automatiquement l'inclinaison d'une image (deskewing)
Utilise OpenCV pour détecter l'angle et rotation
Returns: Image corrigée
"""
try:
import cv2
# Convertir PIL Image en array numpy pour OpenCV
img_array = np.array(img)
# Convertir en niveaux de gris si nécessaire
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
# Binarisation pour détecter les contours
thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
# Détecter l'angle d'inclinaison
coords = np.column_stack(np.where(thresh > 0))
angle = cv2.minAreaRect(coords)[-1]
# Ajuster l'angle (OpenCV retourne entre -90 et 0)
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
# Si l'angle est très petit, pas besoin de corriger
if abs(angle) < 0.5:
print(f"[local_ocr] Skew angle {angle:.2f}° is negligible, skipping rotation")
return img
print(f"[local_ocr] Detected skew angle: {angle:.2f}°, rotating image...")
# Rotation de l'image
(h, w) = gray.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(img_array, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
# Convertir back to PIL Image
return Image.fromarray(rotated)
except Exception as e:
print(f"[local_ocr] Deskewing failed: {e}, using original image")
return img
def extract_text_with_confidence(img_bytes: bytes) -> Tuple[str, float]:
"""
Extrait le texte d'une image avec score de confiance OCR
Returns: (texte, confiance_moyenne)
"""
try:
import pytesseract
# Configuration du chemin Tesseract pour Windows
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
except ImportError:
print("[local_ocr] pytesseract not available")
return "", 0.0
try:
img = Image.open(io.BytesIO(img_bytes))
# ÉTAPE 0: Correction automatique d'inclinaison (DESKEWING) - DISABLED FOR NOW
# img = deskew_image(img) # Disabled due to NumPy compatibility issues
# Prétraitement avancé pour images de planning scolaire (photos inclinées)
from PIL import ImageEnhance, ImageFilter, ImageOps
# 1. Convertir en niveaux de gris
img = img.convert('L')
# 2. Augmenter la netteté
img = img.filter(ImageFilter.SHARPEN)
# 3. Augmenter le contraste fortement
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(2.5)
# 4. Augmenter la luminosité
enhancer = ImageEnhance.Brightness(img)
img = enhancer.enhance(1.3)
# 5. Binarisation simple (conversion noir et blanc) - WITHOUT NumPy
# Conversion avec seuil fixe au lieu d'adaptatif
from PIL import ImageOps
img = img.point(lambda x: 0 if x < 128 else 255, '1')
img = img.convert('L')
# 6. Inverser (texte noir sur fond blanc pour Tesseract)
img = ImageOps.invert(img)
print(f"[local_ocr] Image preprocessed: size={img.size}, mode={img.mode}")
# DEBUG: Sauvegarder l'image prétraitée
import tempfile
import os
debug_path = os.path.join(tempfile.gettempdir(), "tesseract_debug.png")
img.save(debug_path)
print(f"[local_ocr] DEBUG: Saved preprocessed image to {debug_path}")
# OCR avec configuration optimisée pour tableaux
# PSM 6 = Assume a single uniform block of text
custom_config = r'--oem 3 --psm 6'
data = pytesseract.image_to_data(img, lang="fra+eng", config=custom_config, output_type=pytesseract.Output.DICT)
# Calcul confiance moyenne (sur mots avec conf > 0)
confidences = [float(conf) for conf in data['conf'] if int(conf) > 0]
avg_conf = sum(confidences) / len(confidences) if confidences else 0.0
# Extraction texte avec même configuration
text = pytesseract.image_to_string(img, lang="fra+eng", config=custom_config)
# Debug: afficher les premiers caractères
preview = text[:200].replace('\n', '\\n') if text else "(empty)"
print(f"[local_ocr] Extracted {len(text)} chars, avg confidence: {avg_conf:.2f}")
print(f"[local_ocr] Text preview: {preview}")
return text, avg_conf / 100.0 # Normalize to 0-1
except Exception as e:
print(f"[local_ocr] Error: {e}")
return "", 0.0
def parse_planning_text(text: str, base_confidence: float) -> Tuple[List[Dict], float]:
"""
Parse un texte OCR pour extraire des activités de planning
Returns: (activities, score_global)
"""
if not text.strip():
return [], 0.0
activities = []
lines = text.split('\n')
# Patterns de détection
time_pattern = re.compile(r'(\d{1,2})[h:](\d{2})') # 8h30, 8:30, 08:30
day_pattern = re.compile(r'(lundi|mardi|mercredi|jeudi|vendredi|samedi|dimanche)', re.IGNORECASE)
date_pattern = re.compile(r'(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})') # 13/10/2025, 13-10-25
# Détection de structure de planning
has_days = bool(day_pattern.search(text))
has_times = bool(time_pattern.findall(text))
has_dates = bool(date_pattern.search(text))
structure_score = 0.0
if has_days:
structure_score += 0.4
if has_times:
structure_score += 0.4
if has_dates:
structure_score += 0.2
print(f"[local_ocr] Structure detected: days={has_days}, times={has_times}, dates={has_dates}, score={structure_score:.2f}")
# Extraction ligne par ligne
current_day = None
current_date = None
for line in lines:
line = line.strip()
if len(line) < 3:
continue
# Détection jour
day_match = day_pattern.search(line)
if day_match:
current_day = day_match.group(1).capitalize()
# Détection date
date_match = date_pattern.search(line)
if date_match:
try:
day, month, year = date_match.groups()
if len(year) == 2:
year = "20" + year
current_date = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
except:
pass
# Détection horaires + activité
times = time_pattern.findall(line)
if times and len(times) >= 1:
# Extraire le titre (tout sauf les horaires)
title = re.sub(time_pattern, '', line).strip()
title = re.sub(r'\s+', ' ', title) # Nettoyer espaces multiples
if not title or len(title) < 2:
continue
# Construire start/end time
try:
start_h, start_m = map(int, times[0])
# Si 2 horaires détectés, c'est start-end
if len(times) >= 2:
end_h, end_m = map(int, times[1])
else:
# Sinon, ajouter 1h par défaut
end_h, end_m = start_h + 1, start_m
# Date de base
if current_date:
base_date = current_date
else:
# Utiliser date actuelle si pas trouvée
base_date = datetime.now().strftime("%Y-%m-%d")
start_dt = f"{base_date}T{start_h:02d}:{start_m:02d}:00"
end_dt = f"{base_date}T{end_h:02d}:{end_m:02d}:00"
# Score de confiance pour cette activité
activity_conf = base_confidence * structure_score
if current_date:
activity_conf *= 1.1 # Bonus si date explicite
if len(times) >= 2:
activity_conf *= 1.1 # Bonus si start ET end détectés
activity_conf = min(activity_conf, 1.0)
activities.append({
"title": title[:100], # Limit length
"start_date": start_dt,
"end_date": end_dt,
"location": None,
"notes": f"Jour: {current_day}" if current_day else None,
"confidence": activity_conf,
"category": "other"
})
print(f"[local_ocr] Found activity: {title[:30]}... ({start_h}:{start_m:02d}-{end_h}:{end_m:02d}) conf={activity_conf:.2f}")
except Exception as e:
print(f"[local_ocr] Failed to parse time for: {line[:50]} - {e}")
continue
# Score global = base_conf * structure * taux_extraction
if not activities:
global_score = base_confidence * structure_score * 0.3 # Pénalité si rien trouvé
else:
extraction_rate = min(len(activities) / 10.0, 1.0) # Normalize (expect ~5-10 activities)
global_score = base_confidence * structure_score * extraction_rate
global_score = min(global_score, 1.0)
print(f"[local_ocr] Parsed {len(activities)} activities, global score: {global_score:.2f}")
return activities, global_score
def parse_image_enhanced(img_bytes: bytes) -> Tuple[List[Dict], float]:
"""
Pipeline complet : OCR + parsing + scoring
Returns: (activities, confidence_score)
"""
print("[local_ocr] Starting enhanced OCR parsing...")
# Étape 1: OCR avec confiance
text, ocr_conf = extract_text_with_confidence(img_bytes)
if not text or ocr_conf < 0.3:
print(f"[local_ocr] OCR quality too low (conf={ocr_conf:.2f}), skipping local parsing")
return [], 0.0
# Étape 2: Parsing intelligent
activities, parse_score = parse_planning_text(text, ocr_conf)
return activities, parse_score

View File

@@ -0,0 +1,267 @@
from typing import List
from fastapi import UploadFile
from ..schemas import ActivitySchema
import pdfplumber
import io
import os
import json
import datetime as dt
import urllib.request
import urllib.error
import re
def _map_category(raw: str) -> str:
value = (raw or "").strip().lower()
if value in {"school", "ecole", "scolaire", "classe", "cours"}:
return "school"
if value in {"sport", "sports", "entrainement", "entrainement sportif"}:
return "sport"
if value in {"medical", "medecin", "dentiste", "sante"}:
return "medical"
if value in {"event", "evenement", "sortie", "anniversaire", "rdv", "rendez-vous"}:
return "event"
return "other"
def _extract_activities_json(text: str) -> list:
try:
data = json.loads(text)
if isinstance(data, list):
return data
if isinstance(data, dict):
for key in ("activities", "events", "items", "data"):
if isinstance(data.get(key), list):
return data[key]
except Exception:
pass
start = text.find("[")
end = text.rfind("]")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except Exception:
return []
return []
async def parse_pdf(file: UploadFile) -> List[ActivitySchema]:
# Extract text with pdfplumber
raw = await file.read()
text_parts: list[str] = []
try:
with pdfplumber.open(io.BytesIO(raw)) as pdf:
for page in pdf.pages:
t = page.extract_text() or ""
if t:
text_parts.append(t)
except Exception:
text_parts = []
text = "\n\n".join(text_parts).strip()
# Heuristic parse from text first (local)
activities_local: List[ActivitySchema] = []
if text:
try:
day_map = {
"lundi": 0, "mardi": 1, "mercredi": 2, "jeudi": 3, "vendredi": 4, "samedi": 5, "dimanche": 6,
"lun": 0, "mar": 1, "mer": 2, "jeu": 3, "ven": 4, "sam": 5, "dim": 6
}
time_re = re.compile(r"(?P<day>\b(?:lun\w*|mar\w*|mer\w*|jeu\w*|ven\w*|sam\w*|dim\w*)\b).*?(?P<start>\d{1,2}:\d{2}).{0,5}[-à].{0,5}(?P<end>\d{1,2}:\d{2}).{0,10}(?P<title>[A-Za-zÀ-ÿ\- ']+)", re.IGNORECASE)
today = dt.datetime.now()
weekday = today.weekday()
for line in text.splitlines():
m = time_re.search(line)
if not m:
continue
day = (m.group("day") or "").lower()
start = m.group("start")
end = m.group("end")
title = m.group("title").strip() or "Activité"
target = day_map.get(day, weekday)
delta = target - weekday
d = today + dt.timedelta(days=delta)
start_iso = f"{d.strftime('%Y-%m-%d')}T{start}"
end_iso = f"{d.strftime('%Y-%m-%d')}T{end}"
activities_local.append(ActivitySchema(
title=title,
category="other", # type: ignore[arg-type]
start_date=dt.datetime.fromisoformat(start_iso),
end_date=dt.datetime.fromisoformat(end_iso),
confidence=0.65
))
except Exception:
activities_local = []
if activities_local:
return activities_local
if not text:
# Try OCR-like extraction via rendering pages and using vision model
try:
import fitz # PyMuPDF
doc = fitz.open(stream=raw, filetype="pdf")
imgs: list[str] = []
for page in doc:
pix = page.get_pixmap(dpi=180)
png_bytes = pix.tobytes("png")
b64 = __import__("base64").b64encode(png_bytes).decode("utf-8")
imgs.append(f"data:image/png;base64,{b64}")
except Exception:
imgs = []
api_key = os.getenv("OPENAI_API_KEY")
if not api_key or not imgs:
return []
prompt = (
"Analyse ces pages de planning et retourne STRICTEMENT un tableau JSON d'objets "
"{title, category, start_date, end_date, location, notes}. "
"- category ∈ {school, sport, medical, event, other}; dates en ISO 8601. Pas de texte hors JSON."
)
body = {
"model": os.getenv("INGESTION_OPENAI_MODEL", "gpt-4o-mini"),
"messages": [
{"role": "system", "content": "Tu es un extracteur de planning fiable et concis."},
{
"role": "user",
"content": [{"type": "text", "text": prompt}] + [{"type": "image_url", "image_url": {"url": u}} for u in imgs],
},
],
"temperature": 0.1,
"response_format": {"type": "json_object"},
}
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=json.dumps(body).encode("utf-8"),
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
resp_raw = resp.read().decode("utf-8")
data = json.loads(resp_raw)
content_text = (
data.get("choices", [{}])[0].get("message", {}).get("content", "[]")
)
except Exception:
return []
items = _parse_json_array(content_text)
activities: List[ActivitySchema] = []
for it in items:
try:
title = str(it.get("title") or it.get("titre") or "Activité")
category = _map_category(str(it.get("category") or it.get("categorie") or "other"))
start = str(it.get("start_date") or it.get("start") or it.get("debut") or "")
end = str(it.get("end_date") or it.get("end") or it.get("fin") or start)
location = it.get("location")
notes = it.get("notes")
def to_iso(x: str) -> str:
try:
if len(x) == 10:
return f"{x}T08:00"
return x
except Exception:
return dt.datetime.now().replace(microsecond=0).isoformat()
activities.append(
ActivitySchema(
title=title,
category=category, # type: ignore[arg-type]
start_date=dt.datetime.fromisoformat(to_iso(start)),
end_date=dt.datetime.fromisoformat(to_iso(end)),
location=str(location) if location is not None else None,
notes=str(notes) if notes is not None else None,
confidence=0.7,
)
)
except Exception:
continue
return activities
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return []
prompt = (
"Extrait les evenements de ce planning en texte et retourne STRICTEMENT un tableau JSON "
"d'objets avec: title, category, start_date, end_date, location, notes. "
"- category ∈ {school, sport, medical, event, other}. "
"- start_date/end_date en ISO 8601 (YYYY-MM-DDTHH:MM). "
"- Déduis les dates si range/semaine mentionnee. "
"- Langue d'origine pour title/notes. "
"- Pas de texte hors JSON.\n\n"
"Texte du document:\n" + text[:15000]
)
def call_model(model: str) -> list:
body = {
"model": model,
"messages": [
{"role": "system", "content": "Tu es un extracteur de planning fiable et concis."},
{"role": "user", "content": prompt},
],
"temperature": 0.1,
}
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=json.dumps(body).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
resp_raw = resp.read().decode("utf-8")
data = json.loads(resp_raw)
content_text = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "[]")
)
return _extract_activities_json(content_text)
except Exception:
return []
primary_model = os.getenv("INGESTION_OPENAI_MODEL", "gpt-4o")
fallback_model = os.getenv("INGESTION_OPENAI_FALLBACK_MODEL", "gpt-4o-mini")
items = call_model(primary_model)
if not items:
items = call_model(fallback_model)
activities: List[ActivitySchema] = []
for it in items:
try:
title = str(it.get("title") or it.get("titre") or it.get("activity") or "Activité")
category = _map_category(str(it.get("category") or it.get("categorie") or it.get("type") or "other"))
start = str(it.get("start_date") or it.get("start") or it.get("debut") or it.get("start_time") or "")
end = str(it.get("end_date") or it.get("end") or it.get("fin") or it.get("end_time") or start)
location = it.get("location")
notes = it.get("notes")
def to_iso(x: str) -> str:
try:
if len(x) == 5 and ":" in x:
today = dt.datetime.now().strftime("%Y-%m-%d")
return f"{today}T{x}"
if len(x) == 10:
return f"{x}T08:00"
return x
except Exception:
return dt.datetime.now().replace(microsecond=0).isoformat()
activities.append(
ActivitySchema(
title=title,
category=category, # type: ignore[arg-type]
start_date=dt.datetime.fromisoformat(to_iso(start)),
end_date=dt.datetime.fromisoformat(to_iso(end)),
location=str(location) if location is not None else None,
notes=str(notes) if notes is not None else None,
confidence=0.7,
)
)
except Exception:
continue
return activities

View File

@@ -0,0 +1,183 @@
from typing import List, Optional
from fastapi import UploadFile
from ..schemas import ActivitySchema
from openpyxl import load_workbook
from datetime import datetime, timedelta
from io import BytesIO
import re
def _to_dt(value) -> datetime:
if isinstance(value, datetime):
return value
# try parse simple strings
try:
# common formats: 2025-10-11 17:00, 11/10/2025 17:00
for fmt in ("%Y-%m-%d %H:%M", "%d/%m/%Y %H:%M", "%Y-%m-%d", "%d/%m/%Y"):
try:
return datetime.strptime(str(value), fmt)
except Exception:
pass
except Exception:
pass
# fallback to now to avoid crash
return datetime.now()
def _try_grid(ws) -> List[ActivitySchema]:
# Detect a header row with day names and a time column
day_aliases = {
"lundi": 0, "mardi": 1, "mercredi": 2, "jeudi": 3, "vendredi": 4, "samedi": 5, "dimanche": 6,
"lun": 0, "mar": 1, "mer": 2, "jeu": 3, "ven": 4, "sam": 5, "dim": 6,
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6,
}
def norm(s: Optional[str]) -> str:
return (s or "").strip().lower()
# Find header row
header_row = None
day_cols: List[int] = []
for r in range(1, min(ws.max_row, 10) + 1):
cols = []
for c in range(1, ws.max_column + 1):
v = norm(str(ws.cell(row=r, column=c).value or ""))
if v in day_aliases:
cols.append(c)
if len(cols) >= 2:
header_row = r
day_cols = cols
break
if not header_row:
return []
# Find time column: look for many HH:MM below header
time_re = re.compile(r"^\d{1,2}:\d{2}$")
time_col = None
best_count = 0
for c in range(1, min(ws.max_column, 6) + 1):
cnt = 0
for r in range(header_row + 1, ws.max_row + 1):
v = str(ws.cell(row=r, column=c).value or "").strip()
if time_re.match(v):
cnt += 1
if cnt > best_count and cnt >= 3:
best_count = cnt
time_col = c
if not time_col:
return []
# Compose base week dates starting from current week (Mon..Sun)
today = datetime.now()
weekday = today.weekday()
monday = today - timedelta(days=weekday)
def day_date(idx: int) -> datetime:
return monday + timedelta(days=idx)
activities: List[ActivitySchema] = []
# Map day col -> day index
for dcol in day_cols:
header_val = norm(str(ws.cell(row=header_row, column=dcol).value or ""))
day_idx = day_aliases.get(header_val)
if day_idx is None:
continue
for r in range(header_row + 1, ws.max_row + 1):
time_cell = str(ws.cell(row=r, column=time_col).value or "").strip()
if not time_re.match(time_cell):
continue
cell_val = str(ws.cell(row=r, column=dcol).value or "").strip()
if not cell_val or cell_val.lower() == "none":
continue
# Determine start/end
start_time = time_cell
# If value contains explicit range, prefer it
m = re.search(r"(\d{1,2}:\d{2}).{0,5}[-à].{0,5}(\d{1,2}:\d{2})", cell_val)
if m:
start_time = m.group(1)
end_time = m.group(2)
else:
# end is next time slot (if any)
next_time = None
if r + 1 <= ws.max_row:
nxt = str(ws.cell(row=r + 1, column=time_col).value or "").strip()
if time_re.match(nxt):
next_time = nxt
end_time = next_time or start_time
day_date_obj = day_date(day_idx)
start_iso = f"{day_date_obj.strftime('%Y-%m-%d')} {start_time}"
end_iso = f"{day_date_obj.strftime('%Y-%m-%d')} {end_time}"
activities.append(
ActivitySchema(
title=cell_val,
category="other", # type: ignore[arg-type]
start_date=_to_dt(start_iso),
end_date=_to_dt(end_iso),
confidence=0.7
)
)
return activities
async def parse_spreadsheet(file: UploadFile) -> List[ActivitySchema]:
content = await file.read()
wb = load_workbook(filename=BytesIO(content), data_only=True)
ws = wb.active
# Expect a simple header row with common fields
headers = {}
for col in range(1, ws.max_column + 1):
key = str(ws.cell(row=1, column=col).value or "").strip().lower()
if key:
headers[key] = col
# Known header aliases
def col(*names):
for n in names:
if n in headers:
return headers[n]
return None
title_col = col("title", "intitule", "activite")
start_col = col("start", "debut", "startdatetime", "date debut", "debut date", "start time")
end_col = col("end", "fin", "enddatetime", "date fin", "fin date", "end time")
cat_col = col("category", "categorie")
loc_col = col("location", "lieu")
notes_col = col("notes", "commentaire")
activities: List[ActivitySchema] = []
for row in range(2, ws.max_row + 1):
title = str(ws.cell(row=row, column=title_col).value) if title_col else None
if not title or title.lower() == "none":
continue
start_val = ws.cell(row=row, column=start_col).value if start_col else None
end_val = ws.cell(row=row, column=end_col).value if end_col else None
category = str(ws.cell(row=row, column=cat_col).value).lower() if cat_col else "other"
location = (
str(ws.cell(row=row, column=loc_col).value)
if loc_col and ws.cell(row=row, column=loc_col).value is not None
else None
)
notes = (
str(ws.cell(row=row, column=notes_col).value)
if notes_col and ws.cell(row=row, column=notes_col).value is not None
else None
)
start_dt = _to_dt(start_val) if start_val else datetime.now()
end_dt = _to_dt(end_val) if end_val else start_dt
if category not in {"school", "sport", "medical", "event", "other"}:
category = "other"
activities.append(
ActivitySchema(
title=str(title),
category=category,
start_date=start_dt,
end_date=end_dt,
location=location,
notes=notes,
confidence=0.6,
)
)
if activities:
return activities
# Try grid parser
grid_acts = _try_grid(ws)
return grid_acts

View File

@@ -0,0 +1,498 @@
"""
Ultra-performant OCR parser for weekly/monthly planning images
Combines advanced preprocessing, context-aware parsing, and multi-pass validation
"""
from typing import List, Dict, Tuple, Optional, Literal
import re
from datetime import datetime, timedelta
import io
from PIL import Image, ImageEnhance, ImageFilter, ImageOps
import json
PlanningType = Literal["weekly", "monthly", "unknown"]
def detect_planning_type(text: str) -> Tuple[PlanningType, float]:
"""
Détecte le type de planning (hebdomadaire/mensuel) avec score de confiance
Returns: (type, confidence)
"""
text_lower = text.lower()
# Indicateurs de planning hebdomadaire
weekly_indicators = [
r'semaine\s+(?:du\s+)?\d{1,2}', # "Semaine du 13"
r'(?:lundi|mardi|mercredi|jeudi|vendredi|samedi|dimanche)', # Jours de la semaine
r'du\s+\d{1,2}\s+au\s+\d{1,2}', # "du 13 au 17"
r'planning\s+hebdomadaire',
r'emploi\s+du\s+temps'
]
# Indicateurs de planning mensuel
monthly_indicators = [
r'(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)',
r'planning\s+mensuel',
r'calendrier\s+mensuel',
r'mois\s+de\s+\w+'
]
weekly_score = sum(1 for pattern in weekly_indicators if re.search(pattern, text_lower))
monthly_score = sum(1 for pattern in monthly_indicators if re.search(pattern, text_lower))
total = weekly_score + monthly_score
if total == 0:
return "unknown", 0.0
if weekly_score > monthly_score:
return "weekly", weekly_score / (total * 1.0)
elif monthly_score > weekly_score:
return "monthly", monthly_score / (total * 1.0)
else:
return "weekly", 0.5 # Default to weekly with low confidence
def extract_period_from_text(text: str, planning_type: PlanningType) -> Optional[Tuple[str, str]]:
"""
Extrait la période (date début/fin) du texte
Returns: (start_date, end_date) au format YYYY-MM-DD, ou None
"""
# Pattern: "du 13 au 17 octobre" ou "Semaine du 13 au 17/10"
period_pattern = re.compile(
r'(?:du\s+)?(\d{1,2})\s*(?:au|[-])\s*(\d{1,2})\s*(?:/)?(\d{1,2})?(?:\s+)?'
r'(janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre|\d{1,2})?',
re.IGNORECASE
)
match = period_pattern.search(text)
if not match:
return None
start_day = int(match.group(1))
end_day = int(match.group(2))
month_str = match.group(4) if match.group(4) else match.group(3)
# Map French month names
month_map = {
'janvier': 1, 'février': 2, 'mars': 3, 'avril': 4,
'mai': 5, 'juin': 6, 'juillet': 7, 'août': 8,
'septembre': 9, 'octobre': 10, 'novembre': 11, 'décembre': 12
}
if month_str and month_str.lower() in month_map:
month = month_map[month_str.lower()]
elif month_str and month_str.isdigit():
month = int(month_str)
else:
month = datetime.now().month
year = datetime.now().year
# Handle year transition (December -> January)
if planning_type == "weekly" and start_day > end_day:
# Week crosses month boundary
try:
start_date = datetime(year, month, start_day)
# End date is in next month
if month == 12:
end_date = datetime(year + 1, 1, end_day)
else:
end_date = datetime(year, month + 1, end_day)
except ValueError:
return None
else:
try:
start_date = datetime(year, month, start_day)
end_date = datetime(year, month, end_day)
except ValueError:
return None
return start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")
def infer_date_from_day(day_name: str, base_date: Optional[str] = None) -> str:
"""
Infère une date complète à partir d'un nom de jour
Si base_date fournie, trouve le jour le plus proche
"""
day_map = {
"lundi": 0, "mardi": 1, "mercredi": 2, "jeudi": 3,
"vendredi": 4, "samedi": 5, "dimanche": 6
}
target_weekday = day_map.get(day_name.lower())
if target_weekday is None:
return datetime.now().strftime("%Y-%m-%d")
if base_date:
try:
base = datetime.fromisoformat(base_date)
except:
base = datetime.now()
else:
base = datetime.now()
# Find next occurrence of target weekday
current_weekday = base.weekday()
days_ahead = target_weekday - current_weekday
if days_ahead < 0: # Day has already passed this week
days_ahead += 7
target_date = base + timedelta(days=days_ahead)
return target_date.strftime("%Y-%m-%d")
def advanced_image_preprocessing(img_bytes: bytes) -> Tuple[Image.Image, float]:
"""
Prétraitement d'image ultra-performant avec scoring qualité
Returns: (preprocessed_image, quality_score)
"""
try:
img = Image.open(io.BytesIO(img_bytes))
original_size = img.size
quality_score = 1.0
print(f"[ultra_ocr] Original image: {img.size}, mode={img.mode}")
# STEP 1: Resize if too large (optimal OCR: 300-600 DPI equivalent)
max_dimension = 3000
if max(img.size) > max_dimension:
ratio = max_dimension / max(img.size)
new_size = tuple(int(dim * ratio) for dim in img.size)
img = img.resize(new_size, Image.Resampling.LANCZOS)
print(f"[ultra_ocr] Resized to {img.size}")
quality_score *= 0.95
# STEP 2: Convert to grayscale
if img.mode != 'L':
img = img.convert('L')
# STEP 3: Enhance sharpness (critical for photos)
enhancer = ImageEnhance.Sharpness(img)
img = enhancer.enhance(2.0)
# STEP 4: Adaptive contrast enhancement
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(2.5)
# STEP 5: Brightness adjustment
enhancer = ImageEnhance.Brightness(img)
img = enhancer.enhance(1.2)
# STEP 6: Noise reduction with adaptive filtering
img = img.filter(ImageFilter.MedianFilter(size=3))
# STEP 7: Binarization (Otsu-like threshold)
# Calculate histogram to find optimal threshold
histogram = img.histogram()
pixels = sum(histogram)
# Simple Otsu's method approximation
threshold = 128 # Default
max_variance = 0
for t in range(0, 256):
w0 = sum(histogram[:t])
w1 = pixels - w0
if w0 == 0 or w1 == 0:
continue
mu0 = sum(i * histogram[i] for i in range(t)) / w0 if w0 > 0 else 0
mu1 = sum(i * histogram[i] for i in range(t, 256)) / w1 if w1 > 0 else 0
variance = w0 * w1 * (mu0 - mu1) ** 2
if variance > max_variance:
max_variance = variance
threshold = t
print(f"[ultra_ocr] Calculated optimal threshold: {threshold}")
# Apply threshold
img = img.point(lambda x: 0 if x < threshold else 255, '1')
img = img.convert('L')
# STEP 8: Morphological operations to clean up
img = img.filter(ImageFilter.MaxFilter(3)) # Dilate
img = img.filter(ImageFilter.MinFilter(3)) # Erode
# STEP 9: Invert for Tesseract (black text on white background)
img = ImageOps.invert(img)
print(f"[ultra_ocr] Preprocessing complete, quality score: {quality_score:.2f}")
return img, quality_score
except Exception as e:
print(f"[ultra_ocr] Preprocessing error: {e}")
# Return original as grayscale
img = Image.open(io.BytesIO(img_bytes)).convert('L')
return img, 0.5
def extract_text_ultra(img_bytes: bytes) -> Tuple[str, float, Image.Image]:
"""
OCR ultra-performant avec multi-pass et scoring
Returns: (text, confidence, preprocessed_image)
"""
try:
import pytesseract
# Windows path
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
except ImportError:
print("[ultra_ocr] pytesseract not available")
return "", 0.0, None
# Prétraitement avancé
img, quality = advanced_image_preprocessing(img_bytes)
# Multi-pass OCR with different PSM modes
results = []
# Pass 1: PSM 6 (uniform block of text - good for tables)
try:
config1 = r'--oem 3 --psm 6 -c preserve_interword_spaces=1'
data1 = pytesseract.image_to_data(img, lang="fra+eng", config=config1, output_type=pytesseract.Output.DICT)
text1 = pytesseract.image_to_string(img, lang="fra+eng", config=config1)
confidences1 = [float(conf) for conf in data1['conf'] if int(conf) > 0]
conf1 = (sum(confidences1) / len(confidences1) / 100.0) if confidences1 else 0.0
results.append((text1, conf1, "PSM6"))
print(f"[ultra_ocr] Pass 1 (PSM 6): {len(text1)} chars, conf={conf1:.2f}")
except Exception as e:
print(f"[ultra_ocr] Pass 1 failed: {e}")
# Pass 2: PSM 3 (fully automatic page segmentation)
try:
config2 = r'--oem 3 --psm 3'
data2 = pytesseract.image_to_data(img, lang="fra+eng", config=config2, output_type=pytesseract.Output.DICT)
text2 = pytesseract.image_to_string(img, lang="fra+eng", config=config2)
confidences2 = [float(conf) for conf in data2['conf'] if int(conf) > 0]
conf2 = (sum(confidences2) / len(confidences2) / 100.0) if confidences2 else 0.0
results.append((text2, conf2, "PSM3"))
print(f"[ultra_ocr] Pass 2 (PSM 3): {len(text2)} chars, conf={conf2:.2f}")
except Exception as e:
print(f"[ultra_ocr] Pass 2 failed: {e}")
# Select best result
if not results:
return "", 0.0, img
# Combine quality score with OCR confidence
best_text, best_conf, best_mode = max(results, key=lambda x: x[1])
final_conf = best_conf * quality
print(f"[ultra_ocr] Selected best: {best_mode}, final_conf={final_conf:.2f}")
print(f"[ultra_ocr] Text preview: {best_text[:200]}")
# Save debug image
try:
import tempfile
import os
debug_path = os.path.join(tempfile.gettempdir(), "ultra_ocr_debug.png")
img.save(debug_path)
print(f"[ultra_ocr] Saved debug image: {debug_path}")
except:
pass
return best_text, final_conf, img
def parse_planning_ultra(text: str, base_conf: float) -> Tuple[List[Dict], float, Dict]:
"""
Parser ultra-intelligent avec reconnaissance de contexte
Returns: (activities, global_score, metadata)
"""
if not text.strip():
return [], 0.0, {}
print("[ultra_ocr] Starting ultra parsing...")
# Detect planning type and period
planning_type, type_conf = detect_planning_type(text)
period = extract_period_from_text(text, planning_type)
metadata = {
"planning_type": planning_type,
"type_confidence": type_conf,
"period": period
}
print(f"[ultra_ocr] Planning type: {planning_type} (conf={type_conf:.2f})")
if period:
print(f"[ultra_ocr] Detected period: {period[0]} to {period[1]}")
activities = []
lines = text.split('\n')
# Enhanced regex patterns
time_pattern = re.compile(r'(\d{1,2})[h:.](\d{2})')
day_pattern = re.compile(r'\b(lundi|mardi|mercredi|jeudi|vendredi|samedi|dimanche)\b', re.IGNORECASE)
date_pattern = re.compile(r'(\d{1,2})[/\-.](\d{1,2})(?:[/\-.](\d{2,4}))?')
# Activity category detection (enhanced)
category_keywords = {
"school": ["math", "français", "anglais", "histoire", "géo", "sciences", "physique", "chimie", "svt", "eps", "cours", "classe", "devoir", "examen"],
"sport": ["sport", "foot", "basket", "natation", "piscine", "gymnase", "tennis", "danse", "judo", "karaté", "athlétisme"],
"medical": ["médecin", "docteur", "dentiste", "rdv médical", "consultation", "vaccin", "infirmerie"],
"event": ["anniversaire", "sortie", "spectacle", "théâtre", "concert", "visite", "excursion", "réunion"]
}
def detect_category(title: str) -> str:
title_lower = title.lower()
for category, keywords in category_keywords.items():
if any(kw in title_lower for kw in keywords):
return category
return "other"
# Context tracking
current_day = None
current_date = None
base_date = period[0] if period else None
# Structure scoring
has_structure = {
"days": bool(day_pattern.search(text)),
"times": bool(time_pattern.search(text)),
"dates": bool(date_pattern.search(text))
}
structure_score = sum(0.33 for v in has_structure.values() if v)
print(f"[ultra_ocr] Structure: days={has_structure['days']}, times={has_structure['times']}, dates={has_structure['dates']}, score={structure_score:.2f}")
# Parse line by line with context
for line_idx, line in enumerate(lines):
line = line.strip()
if len(line) < 3:
continue
# Track context
day_match = day_pattern.search(line)
if day_match:
current_day = day_match.group(1).capitalize()
if base_date:
current_date = infer_date_from_day(current_day, base_date)
print(f"[ultra_ocr] Day context: {current_day} -> {current_date}")
date_match = date_pattern.search(line)
if date_match:
try:
day = int(date_match.group(1))
month = int(date_match.group(2))
year = int(date_match.group(3)) if date_match.group(3) else datetime.now().year
if year < 100:
year = 2000 + year
current_date = f"{year:04d}-{month:02d}-{day:02d}"
print(f"[ultra_ocr] Explicit date: {current_date}")
except:
pass
# Extract activities
times = time_pattern.findall(line)
if not times:
continue
# Extract title (remove time patterns)
title = re.sub(time_pattern, '', line)
title = re.sub(day_pattern, '', title)
title = re.sub(date_pattern, '', title)
title = re.sub(r'[|\-]+', ' ', title) # Remove separators
title = re.sub(r'\s+', ' ', title).strip()
if len(title) < 2:
continue
try:
# Parse times
start_h, start_m = map(int, times[0])
if len(times) >= 2:
end_h, end_m = map(int, times[1])
else:
# Default: 1 hour duration
end_h, end_m = start_h + 1, start_m
# Determine date
if current_date:
activity_date = current_date
elif base_date:
activity_date = base_date
else:
activity_date = datetime.now().strftime("%Y-%m-%d")
start_dt = f"{activity_date}T{start_h:02d}:{start_m:02d}:00"
end_dt = f"{activity_date}T{end_h:02d}:{end_m:02d}:00"
# Validate times
try:
datetime.fromisoformat(start_dt)
datetime.fromisoformat(end_dt)
except:
print(f"[ultra_ocr] Invalid datetime: {start_dt}")
continue
# Detect category
category = detect_category(title)
# Calculate confidence
activity_conf = base_conf * structure_score
if current_date:
activity_conf *= 1.15
if len(times) >= 2:
activity_conf *= 1.1
if category != "other":
activity_conf *= 1.05
activity_conf = min(activity_conf, 1.0)
activities.append({
"title": title[:100],
"category": category,
"start_date": start_dt,
"end_date": end_dt,
"location": None,
"notes": f"Jour: {current_day}" if current_day else None,
"confidence": activity_conf
})
print(f"[ultra_ocr] Activity: {title[:40]:<40} | {start_h:02d}:{start_m:02d}-{end_h:02d}:{end_m:02d} | {category:<8} | conf={activity_conf:.2f}")
except Exception as e:
print(f"[ultra_ocr] Failed to parse: {line[:60]} - {e}")
continue
# Global score calculation
if activities:
extraction_quality = min(len(activities) / 8.0, 1.0) # Expect 5-10 activities
global_score = base_conf * structure_score * extraction_quality * (1 + type_conf * 0.2)
else:
global_score = base_conf * structure_score * 0.2
global_score = min(global_score, 1.0)
print(f"[ultra_ocr] Final: {len(activities)} activities, score={global_score:.2f}")
return activities, global_score, metadata
def parse_image_ultra(img_bytes: bytes) -> Tuple[List[Dict], float, Dict]:
"""
Pipeline ultra-performant complet
Returns: (activities, confidence_score, metadata)
"""
print("[ultra_ocr] ========== ULTRA OCR PIPELINE START ==========")
# Step 1: Ultra OCR
text, ocr_conf, img = extract_text_ultra(img_bytes)
if not text or ocr_conf < 0.25:
print(f"[ultra_ocr] OCR quality too low ({ocr_conf:.2f}), aborting")
return [], 0.0, {"error": "OCR quality too low"}
# Step 2: Ultra parsing
activities, parse_score, metadata = parse_planning_ultra(text, ocr_conf)
metadata["ocr_confidence"] = ocr_conf
metadata["raw_text_length"] = len(text)
print(f"[ultra_ocr] ========== PIPELINE COMPLETE: {len(activities)} activities, score={parse_score:.2f} ==========")
return activities, parse_score, metadata

View File

@@ -0,0 +1,32 @@
from datetime import datetime, time
from typing import Literal, Optional
from pydantic import BaseModel, Field
class ActivitySchema(BaseModel):
title: str
category: Literal["school", "sport", "medical", "event", "other"] = "other"
start_date: datetime
end_date: datetime
location: Optional[str] = None
notes: Optional[str] = None
confidence: float = Field(default=0.5, ge=0.0, le=1.0)
class ReminderSchema(BaseModel):
offset_minutes: int = Field(default=60, ge=0, le=1440)
channel: Literal["push", "email", "sms", "device"] = "push"
send_time: Optional[time] = None
class IngestionRequest(BaseModel):
schedule_id: str
child_id: str
filename: str
class IngestionResponse(BaseModel):
schedule_id: str
status: Literal["completed", "failed"]
activities: list[ActivitySchema] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)

View File

@@ -0,0 +1,9 @@
from fastapi.testclient import TestClient
from ingestion.main import app
def test_health():
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"