Sql_Injection/backend/admin_login.py
2025-11-25 14:30:38 +05:30

142 lines
4.4 KiB
Python

# admin_signin.py
import os
from datetime import datetime, timedelta
from flask import Blueprint, request, jsonify
import mysql.connector
from werkzeug.security import generate_password_hash, check_password_hash
# -------------------------
# Config from env (adjust if needed)
# -------------------------
DB_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
DB_USER = os.getenv("MYSQL_USER", "root")
DB_PASS = os.getenv("MYSQL_PASSWORD", "12345!")
DB_NAME = os.getenv("MYSQL_DB", "sqlilab")
DB_PORT = int(os.getenv("MYSQL_PORT", 3306))
auth_bp = Blueprint("auth", __name__)
# -------------------------
# DB helper
# -------------------------
def get_db():
conn_args = dict(
host=DB_HOST,
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
port=DB_PORT,
autocommit=False,
)
return mysql.connector.connect(**conn_args)
# -------------------------
# Ensure admins table has refresh columns (best-effort, safe)
# -------------------------
def ensure_admin_refresh_columns():
"""
Best-effort: ensure admins_cred table exists and has common columns.
This will not throw if table missing; useful at startup.
"""
cnx = None
cur = None
try:
cnx = get_db()
cur = cnx.cursor()
# Try a harmless check; if table doesn't exist this will raise and we ignore
cur.execute(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'admins_cred' AND COLUMN_NAME IN ('refresh_token','refresh_expires_at')",
(DB_NAME,)
)
rows = cur.fetchall()
# If you want ALTER logic, paste here — keeping minimal to avoid accidental schema changes.
except Exception:
# ignore errors (table may not exist in dev)
pass
finally:
try:
if cur:
cur.close()
except:
pass
try:
if cnx:
cnx.close()
except:
pass
# call it so import won't fail
ensure_admin_refresh_columns()
# -------------------------
# Signup & Login endpoints (simple)
# -------------------------
@auth_bp.route("/admin_signup", methods=["POST"])
def admin_signup():
if not request.is_json:
return jsonify({"error": "expected JSON body"}), 400
body = request.get_json()
username = (body.get("username") or "").strip()
password = (body.get("password") or "").strip()
if not username or not password:
return jsonify({"error": "username and password required"}), 400
hashed = generate_password_hash(password)
cnx = None
cur = None
try:
cnx = get_db()
cur = cnx.cursor()
cur.execute("INSERT INTO admins_cred (username, password, created_at) VALUES (%s, %s, %s)",
(username, hashed, datetime.utcnow()))
cnx.commit()
new_id = cur.lastrowid
return jsonify({"message": "admin signed up", "id": new_id}), 201
except mysql.connector.IntegrityError:
if cnx:
cnx.rollback()
return jsonify({"error": "username already exists"}), 400
except Exception as e:
if cnx:
cnx.rollback()
return jsonify({"error": str(e)}), 500
finally:
try:
if cur: cur.close()
except: pass
try:
if cnx: cnx.close()
except: pass
@auth_bp.route("/admin_login", methods=["POST"])
def admin_login():
if not request.is_json:
return jsonify({"error": "expected JSON body"}), 400
data = request.get_json()
username = (data.get("username") or "").strip()
password = (data.get("password") or "").strip()
if not username or not password:
return jsonify({"error": "username and password required"}), 400
cnx = None
cur = None
try:
cnx = get_db()
cur = cnx.cursor(dictionary=True)
cur.execute("SELECT id, username, password FROM admins_cred WHERE username = %s", (username,))
admin = cur.fetchone()
if not admin or not check_password_hash(admin["password"], password):
return jsonify({"error": "invalid credentials"}), 401
return jsonify({"message": "login success", "admin": {"id": int(admin["id"]), "username": admin["username"]}}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
try:
if cur: cur.close()
except: pass
try:
if cnx: cnx.close()
except: pass