litey/app.py

250 lines
7.7 KiB
Python
Raw Normal View History

2024-06-14 08:03:49 +00:00
from typing import Callable, Awaitable, Optional, List
from datetime import datetime, timezone, timedelta
2024-06-14 08:03:49 +00:00
from pathlib import Path
from uuid import uuid4
from os import environ
from urllib.parse import urlparse
from base64 import b64encode
from re import escape, compile, IGNORECASE
2024-10-31 14:32:07 +09:00
from pprint import pprint
2025-03-05 23:38:18 +00:00
from math import ceil
2024-06-14 08:03:49 +00:00
from requests import get
from fastapi import FastAPI, Request, Response, status, Depends
2024-06-14 08:03:49 +00:00
from fastapi.responses import JSONResponse, Response, PlainTextResponse, FileResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from pymongo import MongoClient, DESCENDING
2024-10-31 14:32:07 +09:00
from contextlib import asynccontextmanager
from redis.asyncio import from_url
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
2024-06-14 08:03:49 +00:00
# Jinja2
def ip_to_uid(ip: Optional[str]) -> str:
if not ip:
return "-"
2024-11-24 02:04:03 +00:00
return b64encode(ip.encode("utf-8")).decode("utf-8")[-11:]
2024-06-14 08:03:49 +00:00
def replace_ng_words(src: str, ng_words: List[str]) -> str:
result = src
for ng_word in ng_words:
pattern = compile(escape(ng_word), IGNORECASE)
if len(ng_word) == 1:
result = pattern.sub("🆖", result)
elif len(ng_word) >= 2:
result = pattern.sub(ng_word[0] + "🆖" + ng_word[2:], result)
return result
def content_to_linksets(content: str) -> str:
pattern = compile(r"https?:\/\/\S+")
groups = pattern.findall(content)
return "\n".join(groups)
def is_over_n_hours(src: datetime, hours: int) -> bool:
2024-08-14 18:43:02 +09:00
now = datetime.now()
return now - src.replace(tzinfo=None) > timedelta(hours=hours)
2024-06-14 08:03:49 +00:00
# 初期化
2024-10-31 14:32:07 +09:00
ctx = {}
2024-06-14 08:03:49 +00:00
async def default_identifier(req: Request):
cloudflare_ip = req.headers.get("CF-Connecting-IP")
if cloudflare_ip:
return cloudflare_ip.split(",")[0]
forwarded = req.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0]
return req.client.host + ":" + req.scope["path"]
2024-10-31 14:32:07 +09:00
@asynccontextmanager
async def lifespan(app: FastAPI):
ctx["templates"] = Jinja2Templates("templates")
2024-06-14 08:03:49 +00:00
2024-10-31 14:32:07 +09:00
ctx["templates"].env.filters["ip_to_uid"] = ip_to_uid
ctx["templates"].env.filters["replace_ng_words"] = replace_ng_words
ctx["templates"].env.filters["content_to_linksets"] = content_to_linksets
ctx["templates"].env.filters["fromisoformat"] = datetime.fromisoformat
ctx["templates"].env.filters["is_over_n_hours"] = is_over_n_hours
2024-06-14 08:03:49 +00:00
2024-10-31 14:32:07 +09:00
ctx["mongo_client"] = MongoClient(
environ.get("MONGO_URI", "mongodb://127.0.0.1:27017/"),
username=environ.get("MONGO_USER"),
password=environ.get("MONGO_PASSWORD")
)
#uuid重複する考えすぎ
#ctx["mongo_client"].litey.notes.create_index("id", unique=True)
2024-10-31 14:32:07 +09:00
ctx["mongo_client"].litey.ngs.create_index("word", unique=True)
pprint(ctx)
redis_uri = environ.get("REDIS_URI", "redis://127.0.0.1:6379/")
redis_connection = from_url(redis_uri, encoding="utf8")
await FastAPILimiter.init(redis_connection, identifier=default_identifier)
2024-10-31 14:32:07 +09:00
yield
ctx["mongo_client"].close()
ctx.clear()
2024-06-14 08:03:49 +00:00
await FastAPILimiter.close()
2024-06-14 08:03:49 +00:00
# スニペット
class LiteYItem(BaseModel):
content: str
class LiteYDeleteItem(BaseModel):
id: str
class NGItem(BaseModel):
word: str
def fastapi_serve(dir: str, ref: str, indexes: List[str] = ["index.html", "index.htm"]) -> Response:
url_path = urlparse(ref or "/").path
root = Path(dir)
try_files = []
if url_path.endswith("/"):
try_files += [root / url_path.lstrip("/") / i for i in indexes]
try_files += [root / url_path]
try_files_tried = [t for t in try_files if t.is_file()]
print(try_files, try_files_tried)
if not try_files_tried:
return PlainTextResponse("指定されたファイルが見つかりません", status.HTTP_404_NOT_FOUND)
path = try_files_tried[0]
print(path, "をサーブ中")
return FileResponse(path)
def get_ip(req: Request) -> str:
return req.headers.get("CF-Connecting-IP") or req.headers.get("X-Forwarded-For") or req.client.host
2024-06-14 08:03:49 +00:00
2025-03-05 23:38:18 +00:00
def get_max_page() -> int:
return ceil(ctx["mongo_client"].litey.notes.count_documents({}) / 50)
def get_litey_notes(id: str = None, page: int = 0) -> List[dict]:
2024-06-14 08:03:49 +00:00
if not id:
2025-03-05 23:38:18 +00:00
cursor = ctx["mongo_client"].litey.notes.find({}, { "_id": False }).sort("date", DESCENDING).skip(page * 50).limit(50)
2024-06-14 08:03:49 +00:00
return list(cursor)
2024-10-31 14:32:07 +09:00
return ctx["mongo_client"].litey.notes.find_one({ "id": id }, { "_id": False })
2024-06-14 08:03:49 +00:00
2024-06-14 11:23:09 +00:00
def get_ng_words() -> List[str]:
2024-10-31 14:32:07 +09:00
cursor = ctx["mongo_client"].litey.ngs.find({}, { "_id": False })
2024-06-14 08:03:49 +00:00
return [ng["word"] for ng in list(cursor) if "word" in ng]
# FastAPI
2024-10-31 14:32:07 +09:00
app = FastAPI(lifespan=lifespan)
2024-06-14 08:03:49 +00:00
@app.middleware("http")
async def cors_handler(req: Request, call_next: Callable[[Request], Awaitable[Response]]):
res = await call_next(req)
if req.url.path.startswith("/api/"):
res.headers["Access-Control-Allow-Origin"] = "*"
res.headers["Access-Control-Allow-Credentials"] = "true"
res.headers["Access-Control-Allow-Methods"] = "*"
res.headers["Access-Control-Allow-Headers"] = "*"
if req.method == "OPTIONS":
res.status_code = status.HTTP_200_OK
return res
@app.get("/api/litey/get")
2025-03-05 23:38:18 +00:00
async def api_get(id: str = None, page: int = 0):
res = JSONResponse(get_litey_notes(id, page))
2025-01-31 18:31:15 +09:00
res.headers["Cache-Control"] = f"public, max-age=60, s-maxage=60"
res.headers["CDN-Cache-Control"] = f"max-age=60"
2024-06-14 08:03:49 +00:00
return res
2025-03-05 07:30:18 +00:00
@app.post("/api/litey/post")
2024-06-14 08:03:49 +00:00
async def api_post(item: LiteYItem, req: Request):
2024-10-31 14:32:07 +09:00
ctx["mongo_client"].litey.notes.insert_one({
2024-06-14 08:03:49 +00:00
"id": str(uuid4()),
"content": item.content,
"date": datetime.now().astimezone(timezone.utc).isoformat(),
"ip": get_ip(req)
})
return PlainTextResponse("OK")
@app.post("/api/litey/delete", dependencies=[Depends(RateLimiter(times=1, seconds=86400))])
2024-06-14 08:03:49 +00:00
async def api_delete(item: LiteYDeleteItem):
2024-10-31 14:32:07 +09:00
ctx["mongo_client"].litey.notes.delete_one({
"id": item.id
})
2024-06-14 08:03:49 +00:00
return PlainTextResponse("OK")
@app.get("/api/litey/image-proxy")
async def api_image_proxy(url: str):
result = get(url, timeout=5, headers={
"User-Agent": Path("user_agent.txt").read_text("UTF-8").rstrip("\n")
})
content = result.content
media_type = result.headers.get("Content-Type")
res = Response(content, media_type=media_type)
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
res.headers["CDN-Cache-Control"] = f"max-age=3600"
return res
@app.get("/api/ng/get")
async def api_ng_get():
res = PlainTextResponse("\n".join(get_ng_words()))
2025-01-31 18:31:15 +09:00
res.headers["Cache-Control"] = f"public, max-age=60, s-maxage=60"
res.headers["CDN-Cache-Control"] = f"max-age=60"
2024-06-14 08:03:49 +00:00
return res
2025-03-05 07:30:18 +00:00
@app.post("/api/ng/post")
2024-06-14 08:03:49 +00:00
async def api_ng_post(item: NGItem):
2024-10-31 14:32:07 +09:00
ctx["mongo_client"].litey.ngs.insert_one({
2024-06-14 08:03:49 +00:00
"word": item.word
})
return PlainTextResponse("OK")
2025-03-04 00:52:59 +00:00
@app.post("/api/ng/delete", dependencies=[Depends(RateLimiter(times=1, seconds=86400))])
2024-06-15 03:36:08 +00:00
async def api_ng_delete(item: NGItem):
2024-10-31 14:32:07 +09:00
ctx["mongo_client"].litey.ngs.delete_one({
2024-06-14 08:03:49 +00:00
"word": item.word
})
return PlainTextResponse("OK")
@app.get("/")
2025-03-05 23:38:18 +00:00
async def home(req: Request, page: int = 0):
2024-10-31 14:32:07 +09:00
res = ctx["templates"].TemplateResponse(req, "index.html", {
2025-03-05 23:38:18 +00:00
"notes": get_litey_notes(page=page),
"ng_words": get_ng_words(),
"max_page": get_max_page()
2024-06-14 08:03:49 +00:00
})
2025-01-31 18:31:15 +09:00
res.headers["Cache-Control"] = f"public, max-age=60, s-maxage=60"
res.headers["CDN-Cache-Control"] = f"max-age=60"
2024-06-14 08:03:49 +00:00
return res
@app.get("/{ref:path}")
async def static(ref: str = None):
res = fastapi_serve("static", ref)
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
res.headers["CDN-Cache-Control"] = f"max-age=3600"
return res