from flask import Flask, session, request, render_template_string, render_template import json import os
app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(32).hex()
@app.route('/', methods=['GET', 'POST']) def store(): if not session.get('name'): session['name'] = ''.join("customer") session['permission'] = 0
error_message = '' if request.method == 'POST': error_message = '<p style="color: red; font-size: 0.8em;">该商品暂时无法购买,请稍后再试!</p>'
products = [ {"id": 1, "name": "美式咖啡", "price": 9.99, "image": "1.png"}, {"id": 2, "name": "橙c美式", "price": 19.99, "image": "2.png"}, {"id": 3, "name": "摩卡", "price": 29.99, "image": "3.png"}, {"id": 4, "name": "卡布奇诺", "price": 19.99, "image": "4.png"}, {"id": 5, "name": "冰拿铁", "price": 29.99, "image": "5.png"} ]
return render_template('index.html', error_message=error_message, session=session, products=products)
def add(): pass
@app.route('/add', methods=['POST', 'GET']) def adddd(): if request.method == 'GET': return ''' <html> <body style="background-image: url('/static/img/7.png'); background-size: cover; background-repeat: no-repeat;"> <h2>添加商品</h2> <form id="productForm"> <p>商品名称: <input type="text" id="name"></p> <p>商品价格: <input type="text" id="price"></p> <button type="button" onclick="submitForm()">添加商品</button> </form> <script> function submitForm() { const nameInput = document.getElementById('name').value; const priceInput = document.getElementById('price').value;
fetch(`/add?price=${encodeURIComponent(priceInput)}`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: nameInput }) .then(response => response.text()) .then(data => alert(data)) .catch(error => console.error('错误:', error)); } </script> </body> </html> ''' elif request.method == 'POST': if request.data: try: raw_data = request.data.decode('utf-8') if check(raw_data): return "该商品违规,无法上传" json_data = json.loads(raw_data)
if not isinstance(json_data, dict): return "添加失败1"
merge(json_data, add) return "你无法添加商品哦"
except (UnicodeDecodeError, json.JSONDecodeError): return "添加失败2" except TypeError as e: return f"添加失败3" except Exception as e: return f"添加失败4" return "添加失败5"
@app.route('/aaadminnn', methods=['GET', 'POST']) def admin(): if session.get('name') == "admin" and session.get('permission') != 0: permission = session.get('permission') if check1(permission): return "非法权限"
if request.method == 'POST': return '<script>alert("上传成功!");window.location.href="/aaadminnn";</script>'
upload_form = ''' <h2>商品管理系统</h2> <form method=POST enctype=multipart/form-data style="margin:20px;padding:20px;border:1px solid #ccc"> <h3>上传新商品</h3> <input type=file name=file required style="margin:10px"><br> <small>支持格式:jpg/png(最大2MB)</small><br> <input type=submit value="立即上传" style="margin:10px;padding:5px 20px"> </form> '''
original_template = 'Hello admin!!!Your permissions are{}'.format(permission) new_template = original_template + upload_form
return render_template_string(new_template) else: return "<script>alert('You are not an admin');window.location.href='/'</script>"
def merge(src, dst): for k, v in src.items(): if hasattr(dst, '__getitem__'): if dst.get(k) and type(v) == dict: merge(v, dst.get(k)) else: dst[k] = v elif hasattr(dst, k) and type(v) == dict: merge(v, getattr(dst, k)) else: setattr(dst, k, v)
def check(raw_data, forbidden_keywords=None): """ 检查原始数据中是否包含禁止的关键词 如果包含禁止关键词返回 True,否则返回 False """ if forbidden_keywords is None: forbidden_keywords = ["app", "config", "init", "globals", "flag", "SECRET", "pardir", "class", "mro", "subclasses", "builtins", "eval", "os", "open", "file", "import", "cat", "ls", "/", "base", "url", "read"]
return any(keyword in raw_data for keyword in forbidden_keywords)
param_black_list = ['config', 'session', 'url', '\\', '<', '>', '%1c', '%1d', '%1f', '%1e', '%20', '%2b', '%2c', '%3c', '%3e', '%c', '%2f', 'b64decode', 'base64', 'encode', 'chr', '[', ']', 'os', 'cat', 'flag', 'set', 'self', '%', 'file', 'pop(', 'setdefault', 'char', 'lipsum', 'update', '=', 'if', 'print', 'env', 'endfor', 'code', '=' ]
def waf_check(value): for black in param_black_list: if black in value: return False return True
def is_automated_request(): user_agent = request.headers.get('User-Agent', '').lower() automated_agents = ['fenjing', 'curl', 'python', 'bot', 'spider'] return any(agent in user_agent for agent in automated_agents)
def check1(value):
if is_automated_request(): print("Automated tool detected") return True
if not waf_check(value): return True
return False
app.run(host="0.0.0.0",port=5014)
|