litey/app.py
2024-06-15 03:36:08 +00:00

202 lines
5.8 KiB
Python

from typing import Callable, Awaitable, Optional, List
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4
from os import environ
from urllib.parse import urlparse
from base64 import b64encode
from re import escape, compile, IGNORECASE
from requests import get
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import JSONResponse, Response, PlainTextResponse, FileResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from pymongo import MongoClient, DESCENDING
# Jinja2
def ip_to_uid(ip: Optional[str]) -> str:
if not ip:
return "-"
return b64encode(ip.encode("utf-8")).decode("utf-8")[:11]
def replace_ng_words(src: str, ng_words: List[str]) -> str:
result = src
for ng_word in ng_words:
pattern = compile(escape(ng_word), IGNORECASE)
if len(ng_word) == 1:
result = pattern.sub("🆖", result)
elif len(ng_word) >= 2:
result = pattern.sub(ng_word[0] + "🆖" + ng_word[2:], result)
return result
def content_to_linksets(content: str) -> str:
pattern = compile(r"https?:\/\/\S+")
groups = pattern.findall(content)
return "\n".join(groups)
# 初期化
templates = Jinja2Templates("templates")
templates.env.filters["ip_to_uid"] = ip_to_uid
templates.env.filters["replace_ng_words"] = replace_ng_words
templates.env.filters["content_to_linksets"] = content_to_linksets
mongo_client = MongoClient(
environ.get("MONGO_URI", "mongodb://127.0.0.1:27017/"),
username=environ.get("MONGO_USER"),
password=environ.get("MONGO_PASSWORD")
)
mongo_client.litey.ngs.create_index("word", unique=True)
# スニペット
class LiteYItem(BaseModel):
content: str
class LiteYDeleteItem(BaseModel):
id: str
class NGItem(BaseModel):
word: str
def fastapi_serve(dir: str, ref: str, indexes: List[str] = ["index.html", "index.htm"]) -> Response:
url_path = urlparse(ref or "/").path
root = Path(dir)
try_files = []
if url_path.endswith("/"):
try_files += [root / url_path.lstrip("/") / i for i in indexes]
try_files += [root / url_path]
try_files_tried = [t for t in try_files if t.is_file()]
print(try_files, try_files_tried)
if not try_files_tried:
return PlainTextResponse("指定されたファイルが見つかりません", status.HTTP_404_NOT_FOUND)
path = try_files_tried[0]
print(path, "をサーブ中")
return FileResponse(path)
def get_ip(req: Request) -> str:
return req.headers.get("CF-Connecting-IP") or req.client.host
def get_litey_notes(id: str = None) -> List[dict]:
if not id:
cursor = mongo_client.litey.notes.find({}, { "_id": False }).sort("date", DESCENDING)
return list(cursor)
return mongo_client.litey.notes.find_one({ "id": id }, { "_id": False })
def get_ng_words() -> List[str]:
cursor = mongo_client.litey.ngs.find({}, { "_id": False })
return [ng["word"] for ng in list(cursor) if "word" in ng]
# FastAPI
app = FastAPI()
@app.middleware("http")
async def cors_handler(req: Request, call_next: Callable[[Request], Awaitable[Response]]):
res = await call_next(req)
if req.url.path.startswith("/api/"):
res.headers["Access-Control-Allow-Origin"] = "*"
res.headers["Access-Control-Allow-Credentials"] = "true"
res.headers["Access-Control-Allow-Methods"] = "*"
res.headers["Access-Control-Allow-Headers"] = "*"
if req.method == "OPTIONS":
res.status_code = status.HTTP_200_OK
return res
@app.get("/api/litey/get")
async def api_get(id: str = None):
res = JSONResponse(get_litey_notes(id))
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
res.headers["CDN-Cache-Control"] = f"max-age=0"
return res
@app.post("/api/litey/post")
async def api_post(item: LiteYItem, req: Request):
mongo_client.litey.notes.insert_one({
"id": str(uuid4()),
"content": item.content,
"date": datetime.now().astimezone(timezone.utc).isoformat(),
"ip": get_ip(req)
})
return PlainTextResponse("OK")
@app.post("/api/litey/delete")
async def api_delete(item: LiteYDeleteItem):
mongo_client.litey.notes.delete_one({ "id": item.id })
return PlainTextResponse("OK")
@app.get("/api/litey/image-proxy")
async def api_image_proxy(url: str):
result = get(url, timeout=5, headers={
"User-Agent": Path("user_agent.txt").read_text("UTF-8").rstrip("\n")
})
content = result.content
media_type = result.headers.get("Content-Type")
res = Response(content, media_type=media_type)
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
res.headers["CDN-Cache-Control"] = f"max-age=3600"
return res
@app.get("/api/ng/get")
async def api_ng_get():
res = PlainTextResponse("\n".join(get_ng_words()))
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
res.headers["CDN-Cache-Control"] = f"max-age=0"
return res
@app.post("/api/ng/post")
async def api_ng_post(item: NGItem):
mongo_client.litey.ngs.insert_one({
"word": item.word
})
return PlainTextResponse("OK")
@app.post("/api/ng/delete")
async def api_ng_delete(item: NGItem):
mongo_client.litey.ngs.delete_one({
"word": item.word
})
return PlainTextResponse("OK")
@app.get("/")
async def home(req: Request):
res = templates.TemplateResponse(req, "index.html", {
"notes": get_litey_notes(),
"ng_words": get_ng_words()
})
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
res.headers["CDN-Cache-Control"] = f"max-age=0"
return res
@app.get("/{ref:path}")
async def static(ref: str = None):
res = fastapi_serve("static", ref)
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
res.headers["CDN-Cache-Control"] = f"max-age=3600"
return res