mirror of
https://github.com/hoshikawa2/rfp_response_automation.git
synced 2026-03-06 18:21:02 +00:00
first commit
This commit is contained in:
92
files/modules/core/security.py
Normal file
92
files/modules/core/security.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from functools import wraps
|
||||
from flask import request, Response, url_for, session, redirect
|
||||
from werkzeug.security import check_password_hash
|
||||
from modules.core.audit import audit_log
|
||||
from modules.users.db import get_pool
|
||||
|
||||
# =========================
|
||||
# Base authentication
|
||||
# =========================
|
||||
|
||||
def authenticate():
|
||||
return redirect(url_for("auth.login_page"))
|
||||
|
||||
def get_current_user():
|
||||
|
||||
email = session.get("user_email")
|
||||
if not email:
|
||||
return None
|
||||
|
||||
sql = """
|
||||
SELECT id, username, email, user_role, active
|
||||
FROM app_users
|
||||
WHERE email = :1 \
|
||||
"""
|
||||
|
||||
pool = get_pool()
|
||||
|
||||
with pool.acquire() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, [email])
|
||||
row = cur.fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
return {
|
||||
"id": row[0],
|
||||
"username": row[1],
|
||||
"email": row[2],
|
||||
"role": row[3],
|
||||
"active": row[4]
|
||||
}
|
||||
|
||||
|
||||
# =========================
|
||||
# Decorators
|
||||
# =========================
|
||||
|
||||
def requires_login(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
user = get_current_user()
|
||||
if not user:
|
||||
return authenticate()
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def requires_app_auth(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
user = get_current_user()
|
||||
|
||||
if not user:
|
||||
return authenticate()
|
||||
|
||||
role = (user.get("role") or "").strip().lower()
|
||||
|
||||
if role not in ["user", "admin"]:
|
||||
return authenticate()
|
||||
|
||||
audit_log("LOGIN_SUCCESS", f"user={user}")
|
||||
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def requires_admin_auth(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
user = get_current_user()
|
||||
|
||||
if not user:
|
||||
return authenticate()
|
||||
|
||||
if user.get("role") != "admin":
|
||||
return authenticate()
|
||||
|
||||
audit_log("LOGIN_ADMIN_SUCCESS", f"user={user}")
|
||||
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
Reference in New Issue
Block a user