notes/app.py
2025-06-06 20:36:23 +00:00

244 lines
7.5 KiB
Python

from flask import Flask, request, session, redirect, url_for, render_template, jsonify, abort
from werkzeug.security import generate_password_hash, check_password_hash
from cryptography.fernet import Fernet
import os
import hashlib
import base64
import dotenv
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import time
import re
from flask_socketio import SocketIO, emit
dotenv.load_dotenv()
app = Flask(__name__)
socketio = SocketIO(app)
app.secret_key = os.getenv("SECRET")
USERS_FILE = "users.txt"
DATA_DIR = "notes"
os.makedirs(DATA_DIR, exist_ok=True)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 # 16 KB
limiter = Limiter(key_func=get_remote_address)
limiter.init_app(app)
@app.errorhandler(413)
def payload_too_large(e):
return "someone call caseoh, he's got competition — I think his number is 413?", 413
# === UTILS ===
def get_user():
return session.get("user")
def get_note_path(user, note):
safe_user = user.replace("/", "_")
safe_note = note.replace("/", "_")
user_dir = os.path.join(DATA_DIR, safe_user)
os.makedirs(user_dir, exist_ok=True)
return os.path.join(user_dir, safe_note + ".txt")
def get_key_for_user(user, password):
salt = b"fixed_salt" # You can store a per-user salt in USERS_FILE if needed
key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt + user.encode(), 100_000)
return base64.urlsafe_b64encode(key[:32])
banned_user_agent_patterns = [
re.compile(r'curl/\d+\.\d+(\.\d+)?'),
re.compile(r'python-requests/\d+\.\d+(\.\d+)?'),
]
@app.before_request
def block_banned_user_agents():
ua = request.headers.get("User-Agent", "")
for pattern in banned_user_agent_patterns:
if pattern.search(ua):
abort(403)
# === ROUTES ===
@app.route("/api/<note>", methods=["GET", "POST"])
@limiter.limit("100 per hour", per_method=True, key_func=get_remote_address)
def api(note):
user = get_user()
key = session.get("key")
if not user or not key:
return "Unauthorized", 401
fernet = Fernet(key.encode())
path = get_note_path(user, note)
if request.method == "GET":
if not os.path.exists(path):
return "", 200
try:
with open(path, "rb") as f:
return fernet.decrypt(f.read()).decode(), 200
except Exception:
return "Corrupted note or invalid key", 500
if request.method == "POST":
data = request.get_json()
if not data or "content" not in data:
return "Bad request", 400
# Check if this is a new note
is_new = not os.path.exists(path)
if is_new:
now = time.time()
record = session.setdefault("note_creations", [])
# Clean up old timestamps
record = [t for t in record if now - t < 3600]
if len(record) >= 10:
return "Slow down, Picasso — max 10 new notes per hour", 429
record.append(now)
session["note_creations"] = record
ciphertext = fernet.encrypt(data["content"].encode())
with open(path, "wb") as f:
f.write(ciphertext)
return jsonify({"status": "saved"})
@app.route("/n/<note>")
@app.route("/notes/<note>")
def serve_note(note):
if not get_user():
return redirect(url_for("login"))
return render_template("note.html", note=note, username=get_user())
@app.route("/public/<username>")
@app.route("/board")
def serve_note_nologin(username: str = ""):
return render_template("note.html", note=username, username=get_user())
@app.route("/publicapi/<username>", methods=["GET", "POST"])
def public_board(username):
user = get_user()
path = os.path.join(DATA_DIR, username.replace("/", "_"), "__public.txt")
if request.method == "GET":
if not os.path.exists(path):
return "", 200
with open(path, "r", encoding="utf-8") as f:
return f.read(), 200
if request.method == "POST":
if user != username:
return "Forbidden", 403
data = request.get_json()
if not data or "content" not in data:
return "Bad request", 400
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(data["content"])
socketio.emit('board_update', {'content': data["content"]})
return jsonify({"status": "saved"})
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
# When content is updated via POST /boardapi
@app.route("/boardapi", methods=["GET", "POST"])
def shared_board():
path = os.path.join(DATA_DIR, "__shared_board.txt")
if request.method == "GET":
if not os.path.exists(path):
return "", 200
with open(path, "r", encoding="utf-8") as f:
return f.read(), 200
elif request.method == "POST":
data = request.get_json()
if not data or "content" not in data:
return "Bad request", 400
with open(path, "w", encoding="utf-8") as f:
f.write(data["content"])
# Broadcast new content to all clients
socketio.emit('board_update', {'content': data["content"]})
return jsonify({"status": "saved"})
@app.route("/")
def serve_root():
if not get_user():
return redirect(url_for("login"))
return redirect(url_for("serve_note",note="home"))
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if not username or not password:
return "Missing fields", 400
with open(USERS_FILE, "a+") as f:
f.seek(0)
for line in f:
if line.strip().split(":")[0] == username:
return "User already exists", 400
hashed_pw = generate_password_hash(password)
f.write(f"{username}:{hashed_pw}\n")
session["user"] = username
session["key"] = get_key_for_user(username, password).decode()
return redirect(url_for("serve_note", note="home"))
return '''
<form method="post">
Username: <input name="username"><br>
Password: <input name="password" type="password"><br>
<input type="submit" value="Register">
</form>
<p>Already have an account? <a href="/login">Login here</a>.</p>
'''
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
with open(USERS_FILE, "r") as f:
for line in f:
user, stored_hash = line.strip().split(":", 1)
if user == username and check_password_hash(stored_hash, password):
session["user"] = username
session["key"] = get_key_for_user(username, password).decode()
return redirect(url_for("serve_note", note="home"))
return "Invalid login", 401
return '''
<form method="post">
Username: <input name="username"><br>
Password: <input name="password" type="password"><br>
<input type="submit" value="Login">
</form>
<p>Don't have an account? <a href="/register">Register here</a>.</p>
'''
@app.route("/logout")
def logout():
session.pop("user", None)
session.pop("key", None)
return redirect(url_for("login"))
if __name__ == "__main__":
app.run(debug=True, port=8765)