202 lines
5.8 KiB
Python
202 lines
5.8 KiB
Python
from typing import Callable, Awaitable, Optional, List
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
from os import environ
|
|
from urllib.parse import urlparse
|
|
from base64 import b64encode
|
|
from re import escape, compile, IGNORECASE
|
|
|
|
from requests import get
|
|
from fastapi import FastAPI, Request, Response, status
|
|
from fastapi.responses import JSONResponse, Response, PlainTextResponse, FileResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
from pymongo import MongoClient, DESCENDING
|
|
|
|
# Jinja2
|
|
|
|
def ip_to_uid(ip: Optional[str]) -> str:
|
|
if not ip:
|
|
return "-"
|
|
|
|
return b64encode(ip.encode("utf-8")).decode("utf-8")[:11]
|
|
|
|
def replace_ng_words(src: str, ng_words: List[str]) -> str:
|
|
result = src
|
|
|
|
for ng_word in ng_words:
|
|
pattern = compile(escape(ng_word), IGNORECASE)
|
|
if len(ng_word) == 1:
|
|
result = pattern.sub("🆖", result)
|
|
elif len(ng_word) >= 2:
|
|
result = pattern.sub(ng_word[0] + "🆖" + ng_word[2:], result)
|
|
|
|
return result
|
|
|
|
def content_to_linksets(content: str) -> str:
|
|
pattern = compile(r"https?:\/\/\S+")
|
|
groups = pattern.findall(content)
|
|
return "\n".join(groups)
|
|
|
|
# 初期化
|
|
|
|
templates = Jinja2Templates("templates")
|
|
|
|
templates.env.filters["ip_to_uid"] = ip_to_uid
|
|
templates.env.filters["replace_ng_words"] = replace_ng_words
|
|
templates.env.filters["content_to_linksets"] = content_to_linksets
|
|
|
|
mongo_client = MongoClient(
|
|
environ.get("MONGO_URI", "mongodb://127.0.0.1:27017/"),
|
|
username=environ.get("MONGO_USER"),
|
|
password=environ.get("MONGO_PASSWORD")
|
|
)
|
|
|
|
mongo_client.litey.ngs.create_index("word", unique=True)
|
|
|
|
# スニペット
|
|
|
|
class LiteYItem(BaseModel):
|
|
content: str
|
|
|
|
class LiteYDeleteItem(BaseModel):
|
|
id: str
|
|
|
|
class NGItem(BaseModel):
|
|
word: str
|
|
|
|
def fastapi_serve(dir: str, ref: str, indexes: List[str] = ["index.html", "index.htm"]) -> Response:
|
|
url_path = urlparse(ref or "/").path
|
|
root = Path(dir)
|
|
|
|
try_files = []
|
|
|
|
if url_path.endswith("/"):
|
|
try_files += [root / url_path.lstrip("/") / i for i in indexes]
|
|
|
|
try_files += [root / url_path]
|
|
|
|
try_files_tried = [t for t in try_files if t.is_file()]
|
|
|
|
print(try_files, try_files_tried)
|
|
|
|
if not try_files_tried:
|
|
return PlainTextResponse("指定されたファイルが見つかりません", status.HTTP_404_NOT_FOUND)
|
|
|
|
path = try_files_tried[0]
|
|
|
|
print(path, "をサーブ中")
|
|
|
|
return FileResponse(path)
|
|
|
|
def get_ip(req: Request) -> str:
|
|
return req.headers.get("CF-Connecting-IP") or req.client.host
|
|
|
|
def get_litey_notes(id: str = None) -> List[dict]:
|
|
if not id:
|
|
cursor = mongo_client.litey.notes.find({}, { "_id": False }).sort("date", DESCENDING)
|
|
return list(cursor)
|
|
|
|
return mongo_client.litey.notes.find_one({ "id": id }, { "_id": False })
|
|
|
|
def get_ng_words():
|
|
cursor = mongo_client.litey.ngs.find({}, { "_id": False })
|
|
return [ng["word"] for ng in list(cursor) if "word" in ng]
|
|
|
|
# FastAPI
|
|
|
|
app = FastAPI()
|
|
|
|
@app.middleware("http")
|
|
async def cors_handler(req: Request, call_next: Callable[[Request], Awaitable[Response]]):
|
|
res = await call_next(req)
|
|
|
|
if req.url.path.startswith("/api/"):
|
|
res.headers["Access-Control-Allow-Origin"] = "*"
|
|
res.headers["Access-Control-Allow-Credentials"] = "true"
|
|
res.headers["Access-Control-Allow-Methods"] = "*"
|
|
res.headers["Access-Control-Allow-Headers"] = "*"
|
|
|
|
if req.method == "OPTIONS":
|
|
res.status_code = status.HTTP_200_OK
|
|
|
|
return res
|
|
|
|
@app.get("/api/litey/get")
|
|
async def api_get(id: str = None):
|
|
res = JSONResponse(get_litey_notes(id))
|
|
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
|
|
res.headers["CDN-Cache-Control"] = f"max-age=0"
|
|
return res
|
|
|
|
@app.post("/api/litey/post")
|
|
async def api_post(item: LiteYItem, req: Request):
|
|
mongo_client.litey.notes.insert_one({
|
|
"id": str(uuid4()),
|
|
"content": item.content,
|
|
"date": datetime.now().astimezone(timezone.utc).isoformat(),
|
|
"ip": get_ip(req)
|
|
})
|
|
|
|
return PlainTextResponse("OK")
|
|
|
|
@app.post("/api/litey/delete")
|
|
async def api_delete(item: LiteYDeleteItem):
|
|
mongo_client.litey.notes.delete_one({ "id": item.id })
|
|
|
|
return PlainTextResponse("OK")
|
|
|
|
@app.get("/api/litey/image-proxy")
|
|
async def api_image_proxy(url: str):
|
|
result = get(url, timeout=5, headers={
|
|
"User-Agent": Path("user_agent.txt").read_text("UTF-8").rstrip("\n")
|
|
})
|
|
|
|
content = result.content
|
|
media_type = result.headers.get("Content-Type")
|
|
|
|
res = Response(content, media_type=media_type)
|
|
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
|
|
res.headers["CDN-Cache-Control"] = f"max-age=3600"
|
|
return res
|
|
|
|
@app.get("/api/ng/get")
|
|
async def api_ng_get():
|
|
res = PlainTextResponse("\n".join(get_ng_words()))
|
|
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
|
|
res.headers["CDN-Cache-Control"] = f"max-age=0"
|
|
return res
|
|
|
|
@app.post("/api/ng/post")
|
|
async def api_ng_post(item: NGItem):
|
|
mongo_client.litey.ngs.insert_one({
|
|
"word": item.word
|
|
})
|
|
|
|
return PlainTextResponse("OK")
|
|
|
|
@app.post("/api/ng/delete")
|
|
async def api_ng_post(item: NGItem):
|
|
mongo_client.litey.ngs.delete_one({
|
|
"word": item.word
|
|
})
|
|
|
|
return PlainTextResponse("OK")
|
|
|
|
@app.get("/")
|
|
async def home(req: Request):
|
|
res = templates.TemplateResponse(req, "index.html", {
|
|
"notes": get_litey_notes(),
|
|
"ng_words": get_ng_words()
|
|
})
|
|
res.headers["Cache-Control"] = f"public, max-age=0, s-maxage=0"
|
|
res.headers["CDN-Cache-Control"] = f"max-age=0"
|
|
return res
|
|
|
|
@app.get("/{ref:path}")
|
|
async def static(ref: str = None):
|
|
res = fastapi_serve("static", ref)
|
|
res.headers["Cache-Control"] = f"public, max-age=3600, s-maxage=3600"
|
|
res.headers["CDN-Cache-Control"] = f"max-age=3600"
|
|
return res
|