1
0
mirror of https://github.com/aquatix/digimarks.git synced 2025-12-06 22:05:09 +01:00

Refactoring to fastapi, reformatting with ruff

This commit is contained in:
2023-10-30 21:51:55 +01:00
parent 30bd835e41
commit f6f129d67c
3 changed files with 221 additions and 190 deletions

View File

@@ -59,5 +59,14 @@ select = [
"W",
]
[tool.ruff.flake8-quotes]
docstring-quotes = "double"
inline-quotes = "single"
multiline-quotes = "double"
[tool.ruff.format]
# Prefer single quotes over double quotes
quote-style = "single"
[tool.ruff.mccabe]
max-complexity = 10

View File

@@ -1,6 +1,6 @@
-r requirements.in
black
# black
pylint
ruff

View File

@@ -5,26 +5,27 @@ import hashlib
import logging
import os
import shutil
from typing import Optional
from urllib.parse import urljoin, urlparse, urlunparse
import bs4
import requests
from dateutil import tz
# from flask import (Flask, abort, jsonify, make_response, redirect,
# render_template, request, url_for)
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.templating import Jinja2Templates
from feedgen.feed import FeedGenerator
from pydantic import DirectoryPath, FilePath, validator
from pydantic import DirectoryPath, FilePath
from pydantic_settings import BaseSettings
from sqlalchemy import VARCHAR, Boolean, Column, DateTime, ForeignKey, Integer, String, Text, create_engine
from sqlalchemy import VARCHAR, Boolean, Column, DateTime, Integer, Text, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, Mapped
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
@@ -34,7 +35,8 @@ DEFAULT_THEME = 'freshgreen'
class Settings(BaseSettings):
"""Configuration needed for digimarks to find its database, favicons, API integrations"""
database_file: FilePath = './bookmarks.db'
# database_file: FilePath = './bookmarks.db'
database_file: FilePath
media_dir: DirectoryPath
media_url: str = '/static/'
@@ -44,17 +46,16 @@ class Settings(BaseSettings):
settings = Settings()
print(settings.model_dump())
engine = create_engine(
f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False}
)
engine = create_engine(f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
app = FastAPI()
templates = Jinja2Templates(directory="templates")
templates = Jinja2Templates(directory='templates')
logger = logging.getLogger('digimarks')
if settings.debug:
@@ -63,10 +64,10 @@ if settings.debug:
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow requests from everywhere
allow_origins=['*'], # Allow requests from everywhere
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_methods=['*'],
allow_headers=['*'],
)
@@ -113,25 +114,28 @@ def clean_tags(tags_list):
return tags_res
magic_dict = {
b"\x1f\x8b\x08": "gz",
b"\x42\x5a\x68": "bz2",
b"\x50\x4b\x03\x04": "zip"
}
def file_type(filename):
"""Try to determine the file type for the file in `filename`.
:param str filename: path to file to check
:return: zip file type
:rtype: str
"""
magic_dict = {b'\x1f\x8b\x08': 'gz', b'\x42\x5a\x68': 'bz2', b'\x50\x4b\x03\x04': 'zip'}
max_len = max(len(x) for x in magic_dict)
def file_type(filename):
with open(filename, "rb") as f:
with open(filename, 'rb') as f:
file_start = f.read(max_len)
for magic, filetype in magic_dict.items():
if file_start.startswith(magic):
return filetype
return "no match"
return 'no match'
class User(Base):
"""User account"""
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
@@ -149,6 +153,7 @@ class User(Base):
class Bookmark(Base):
"""Bookmark instance, connected to User"""
__tablename__ = 'bookmark'
id = Column(Integer, primary_key=True)
@@ -164,7 +169,9 @@ class Bookmark(Base):
starred = Column(Boolean, default=False)
# Website (domain) favicon
favicon = Column(VARCHAR(255), null=True)
# favicon = Column(VARCHAR(255), null=True)
# favicon = Column(VARCHAR(255))
favicon: Mapped[Optional[str]]
# Status code: 200 is OK, 404 is not found, for example (showing an error)
HTTP_CONNECTIONERROR = 0
@@ -177,15 +184,16 @@ class Bookmark(Base):
redirect_uri = None
created_date = Column(DateTime, default=datetime.datetime.now)
modified_date = Column(DateTime, null=True)
deleted_date = Column(DateTime, null=True)
# modified_date = Column(DateTime, null=True)
modified_date: Mapped[Optional[datetime.datetime]]
# deleted_date = Column(DateTime, null=True)
deleted_date: Mapped[Optional[datetime.datetime]]
# Bookmark status; deleting doesn't remove from DB
VISIBLE = 0
DELETED = 1
status = Column(Integer, default=VISIBLE)
class Meta:
ordering = (('created_date', 'desc'),)
@@ -225,7 +233,7 @@ class Bookmark(Base):
'http://icons.better-idea.org/icon?size=60&url=' + domain,
allow_redirects=True,
headers={'User-Agent': DIGIMARKS_USER_AGENT},
timeout=15
timeout=15,
)
if meta.url[-3:].lower() == 'ico':
fileextension = '.ico'
@@ -233,7 +241,7 @@ class Bookmark(Base):
'http://icons.better-idea.org/icon?size=60&url=' + domain,
stream=True,
headers={'User-Agent': DIGIMARKS_USER_AGENT},
timeout=15
timeout=15,
)
filename = os.path.join(settings.media_dir, 'favicons/', domain + fileextension)
with open(filename, 'wb') as out_file:
@@ -255,14 +263,14 @@ class Bookmark(Base):
response = requests.get(
'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=android_chrome&site=' + domain,
stream=True,
headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY}
headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY},
)
if response.status_code == 404:
# Fall back to desktop favicon
response = requests.get(
'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=desktop&site=' + domain,
stream=True,
headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY}
headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY},
)
# Debug for the moment
print(domain)
@@ -356,6 +364,7 @@ class Bookmark(Base):
class PublicTag(Base):
"""Publicly shared tag"""
__tablename__ = 'publictag'
id = Column(Integer, primary_key=True)
@@ -401,15 +410,19 @@ def make_external(request: Request, url):
def _find_bookmarks(userkey, filter_text) -> list[Bookmark]:
"""Look up bookmark for `userkey` which contains `filter_text` in its properties"""
return Bookmark.select().where(
return (
Bookmark.select()
.where(
Bookmark.userkey == userkey,
(
Bookmark.title.contains(filter_text) |
Bookmark.url.contains(filter_text) |
Bookmark.note.contains(filter_text)
Bookmark.title.contains(filter_text)
| Bookmark.url.contains(filter_text)
| Bookmark.note.contains(filter_text)
),
Bookmark.status == Bookmark.VISIBLE
).order_by(Bookmark.created_date.desc())
Bookmark.status == Bookmark.VISIBLE,
)
.order_by(Bookmark.created_date.desc())
)
# @app.errorhandler(404)
@@ -426,7 +439,7 @@ def index():
return {}
def get_bookmarks(request: Request, userkey, filtermethod=None, sortmethod=None):
def get_bookmarks(request: Request, user_key, filter_method=None, sort_method=None):
"""User homepage, list their bookmarks, optionally filtered and/or sorted"""
# return object_list('bookmarks.html', Bookmark.select())
# user = User.select(key=userkey)
@@ -436,77 +449,88 @@ def get_bookmarks(request: Request, userkey, filtermethod=None, sortmethod=None)
# else:
# abort(404)
message = request.args.get('message')
bookmarktags = get_cached_tags(userkey)
bookmarktags = get_cached_tags(user_key)
filter_text = ''
if request.form:
filter_text = request.form['filter_text']
filter_starred = False
if filtermethod and filtermethod.lower() == 'starred':
if filter_method and filter_method.lower() == 'starred':
filter_starred = True
filter_broken = False
if filtermethod and filtermethod.lower() == 'broken':
if filter_method and filter_method.lower() == 'broken':
filter_broken = True
filter_note = False
if filtermethod and filtermethod.lower() == 'note':
if filter_method and filter_method.lower() == 'note':
filter_note = True
if filter_text:
bookmarks = _find_bookmarks(userkey, filter_text)
bookmarks = _find_bookmarks(user_key, filter_text)
elif filter_starred:
bookmarks = Bookmark.select().where(Bookmark.userkey == userkey,
Bookmark.starred).order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == user_key, Bookmark.starred)
.order_by(Bookmark.created_date.desc())
)
elif filter_broken:
bookmarks = Bookmark.select().where(Bookmark.userkey == userkey,
Bookmark.http_status != 200).order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == user_key, Bookmark.http_status != 200)
.order_by(Bookmark.created_date.desc())
)
elif filter_note:
bookmarks = Bookmark.select().where(Bookmark.userkey == userkey,
Bookmark.note != '').order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == user_key, Bookmark.note != '')
.order_by(Bookmark.created_date.desc())
)
else:
bookmarks = Bookmark.select().where(
Bookmark.userkey == userkey,
Bookmark.status == Bookmark.VISIBLE
).order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE)
.order_by(Bookmark.created_date.desc())
)
return bookmarks, bookmarktags, filter_text, message
@app.get('/<userkey>', response_class=HTMLResponse)
@app.post('/<userkey>', response_class=HTMLResponse)
@app.route('/<userkey>/filter/<filtermethod>', methods=['GET', 'POST'])
@app.route('/<userkey>/sort/<sortmethod>', methods=['GET', 'POST'])
@app.route('/<userkey>/<show_as>', methods=['GET', 'POST'])
@app.route('/<userkey>/<show_as>/filter/<filtermethod>', methods=['GET', 'POST'])
@app.route('/<userkey>/<show_as>/sort/<sortmethod>', methods=['GET', 'POST'])
def bookmarks_page(request: Request, userkey, filtermethod=None, sortmethod=None, show_as='cards'):
bookmarks, bookmarktags, filter_text, message = get_bookmarks(request, userkey, filtermethod, sortmethod)
theme = get_theme(userkey)
@app.get('/{user_key}', response_class=HTMLResponse)
@app.post('/{user_key}', response_class=HTMLResponse)
@app.route('/<user_key>/filter/<filtermethod>', methods=['GET', 'POST'])
@app.route('/<user_key>/sort/<sortmethod>', methods=['GET', 'POST'])
@app.route('/<user_key>/<show_as>', methods=['GET', 'POST'])
@app.route('/<user_key>/<show_as>/filter/<filtermethod>', methods=['GET', 'POST'])
@app.route('/<user_key>/<show_as>/sort/<sortmethod>', methods=['GET', 'POST'])
def bookmarks_page(request: Request, user_key, filter_method=None, sort_method=None, show_as='cards'):
bookmarks, bookmarktags, filter_text, message = get_bookmarks(request, user_key, filter_method, sort_method)
theme = get_theme(user_key)
return templates.TemplateResponse(
'bookmarks.html',
bookmarks=bookmarks,
userkey=userkey,
userkey=user_key,
tags=bookmarktags,
filter_text=filter_text,
message=message,
theme=theme,
editable=True, # bookmarks can be edited
showtags=True, # tags should be shown with the bookmarks
filtermethod=filtermethod,
sortmethod=sortmethod,
filtermethod=filter_method,
sortmethod=sort_method,
show_as=show_as, # show list of bookmarks instead of cards
)
@app.get('/<userkey>/js')
def bookmarks_js(userkey):
@app.get('/{user_key}/js')
def bookmarks_js(user_key):
"""Return list of bookmarks with their favicons, to be used for autocompletion"""
bookmarks = Bookmark.select().where(
Bookmark.userkey == userkey,
Bookmark.status == Bookmark.VISIBLE
).order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE)
.order_by(Bookmark.created_date.desc())
)
result = []
for bookmark in bookmarks:
result.append({'title': bookmark.title})
@@ -525,9 +549,7 @@ def bookmark_redirect(userkey, urlhash):
# @TODO: add counter to this bookmark
try:
bookmark = Bookmark.get(
Bookmark.url_hash == urlhash,
Bookmark.userkey == userkey,
Bookmark.status == Bookmark.VISIBLE
Bookmark.url_hash == urlhash, Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE
)
except Bookmark.DoesNotExist:
raise HTTPException(status_code=404, detail='Bookmark not found')
@@ -557,14 +579,13 @@ def bookmark_json(userkey, urlhash):
"""Serialise bookmark to json"""
try:
bookmark = Bookmark.get(
Bookmark.url_hash == urlhash,
Bookmark.userkey == userkey,
Bookmark.status == Bookmark.VISIBLE
Bookmark.url_hash == urlhash, Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE
)
return bookmark.to_dict()
except Bookmark.DoesNotExist:
raise HTTPException(status_code=404, detail='Bookmark not found')
@app.route('/api/v1/<userkey>/search/<filter_text>')
def search_bookmark_titles_json(userkey, filter_text):
"""Serialise bookmark to json"""
@@ -598,7 +619,7 @@ def editbookmark(request: Request, userkey, urlhash):
message=message,
formaction='edit',
tags=tags,
theme=theme
theme=theme,
)
@@ -615,13 +636,7 @@ def addbookmark(request: Request, userkey):
tags = get_cached_tags(userkey)
theme = get_theme(userkey)
return templates.TemplateResponse(
'edit.html',
action='Add bookmark',
userkey=userkey,
bookmark=bookmark,
tags=tags,
message=message,
theme=theme
'edit.html', action='Add bookmark', userkey=userkey, bookmark=bookmark, tags=tags, message=message, theme=theme
)
@@ -643,7 +658,9 @@ def updatebookmark(request: Request, userkey, urlhash=None):
bookmark, created = Bookmark.get_or_create(url=url, userkey=userkey)
if not created:
message = 'Existing bookmark, did not overwrite with new values'
return RedirectResponse(request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash, message=message))
return RedirectResponse(
request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash, message=message)
)
elif url:
# Existing bookmark, get from DB
bookmark = Bookmark.get(Bookmark.userkey == userkey, Bookmark.url_hash == urlhash)
@@ -681,25 +698,26 @@ def updatebookmark(request: Request, userkey, urlhash=None):
@app.route('/<userkey>/adding', methods=['GET', 'POST'])
# @app.route('/<userkey>/adding')
def addingbookmark(request: Request, userkey):
def adding_bookmark(request: Request, user_key):
"""Add the bookmark from form submit by /add"""
tags = get_cached_tags(userkey)
tags = get_cached_tags(user_key)
if request.method == 'POST':
bookmark = updatebookmark(request, userkey)
bookmark = updatebookmark(request, user_key)
if not bookmark:
return RedirectResponse(request.url_for('addbookmark', userkey=userkey, message='No url provided', tags=tags))
return RedirectResponse(
request.url_for('addbookmark', userkey=user_key, message='No url provided', tags=tags)
)
if type(bookmark).__name__ == 'Response':
return bookmark
all_tags[userkey] = get_tags_for_user(userkey)
return RedirectResponse(request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash))
return RedirectResponse(request.url_for('addbookmark', userkey=userkey, tags=tags))
all_tags[user_key] = get_tags_for_user(user_key)
return RedirectResponse(request.url_for('editbookmark', userkey=user_key, urlhash=bookmark.url_hash))
return RedirectResponse(request.url_for('addbookmark', userkey=user_key, tags=tags))
@app.route('/<userkey>/<urlhash>/editing', methods=['GET', 'POST'])
def editingbookmark(request: Request, userkey, urlhash):
"""Edit the bookmark from form submit"""
if request.method == 'POST':
bookmark = updatebookmark(request, userkey, urlhash=urlhash)
all_tags[userkey] = get_tags_for_user(userkey)
@@ -713,15 +731,12 @@ def deletingbookmark(request: Request, userkey, urlhash):
query = Bookmark.update(status=Bookmark.DELETED).where(Bookmark.userkey == userkey, Bookmark.url_hash == urlhash)
query.execute()
query = Bookmark.update(deleted_date=datetime.datetime.now()).where(
Bookmark.userkey == userkey,
Bookmark.url_hash == urlhash
Bookmark.userkey == userkey, Bookmark.url_hash == urlhash
)
query.execute()
message = 'Bookmark deleted. <a href="{}">Undo deletion</a>'.format(request.url_for(
'undeletebookmark',
userkey=userkey,
urlhash=urlhash
))
message = 'Bookmark deleted. <a href="{}">Undo deletion</a>'.format(
request.url_for('undeletebookmark', userkey=userkey, urlhash=urlhash)
)
all_tags[userkey] = get_tags_for_user(userkey)
return RedirectResponse(request.url_for('bookmarks_page', userkey=userkey, message=message))
@@ -748,11 +763,11 @@ def tags_page(userkey):
except PublicTag.DoesNotExist:
publictag = None
total = Bookmark.select().where(
Bookmark.userkey == userkey,
Bookmark.tags.contains(tag),
Bookmark.status == Bookmark.VISIBLE
).count()
total = (
Bookmark.select()
.where(Bookmark.userkey == userkey, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE)
.count()
)
alltags.append({'tag': tag, 'publictag': publictag, 'total': total})
totaltags = len(alltags)
totalbookmarks = Bookmark.select().where(Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE).count()
@@ -773,18 +788,18 @@ def tags_page(userkey):
totalhttperrorstatus=totalhttperrorstatus,
totalnotes=totalnotes,
userkey=userkey,
theme=theme
theme=theme,
)
@app.get('/<userkey>/tag/<tag>', response_class=HTMLResponse)
def tag_page(request: Request, userkey, tag):
"""Overview of all bookmarks with a certain tag"""
bookmarks = Bookmark.select().where(
Bookmark.userkey == userkey,
Bookmark.tags.contains(tag),
Bookmark.status == Bookmark.VISIBLE
).order_by(Bookmark.created_date.desc())
bookmarks = (
Bookmark.select()
.where(Bookmark.userkey == userkey, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE)
.order_by(Bookmark.created_date.desc())
)
tags = get_cached_tags(userkey)
pageheader = 'tag: ' + tag
message = request.args.get('message')
@@ -813,11 +828,15 @@ def tag_page(request: Request, userkey, tag):
def get_publictag(tagkey):
"""Return tag and bookmarks in this public tag collection"""
this_tag = PublicTag.get(PublicTag.tagkey == tagkey)
bookmarks = Bookmark.select().where(
bookmarks = (
Bookmark.select()
.where(
Bookmark.userkey == this_tag.userkey,
Bookmark.tags.contains(this_tag.tag),
Bookmark.status == Bookmark.VISIBLE
).order_by(Bookmark.created_date.desc())
Bookmark.status == Bookmark.VISIBLE,
)
.order_by(Bookmark.created_date.desc())
)
return this_tag, bookmarks
@@ -835,7 +854,7 @@ def publictag_page(tagkey):
tag=this_tag.tag,
action=this_tag.tag,
tagkey=tagkey,
theme=theme
theme=theme,
)
except PublicTag.DoesNotExist:
raise HTTPException(status_code=404, detail='Public tag not found')
@@ -843,7 +862,7 @@ def publictag_page(tagkey):
@app.route('/api/v1/pub/<tagkey>')
def publictag_json(tagkey):
""" json representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag """
"""Json representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag"""
try:
this_tag, bookmarks = get_publictag(tagkey)
result = {
@@ -867,7 +886,7 @@ async def publictag_feed(request: Request, tagkey: str):
bookmarks = Bookmark.select().where(
Bookmark.userkey == this_tag.userkey,
Bookmark.tags.contains(this_tag.tag),
Bookmark.status == Bookmark.VISIBLE
Bookmark.status == Bookmark.VISIBLE,
)
feed = FeedGenerator()
@@ -951,7 +970,7 @@ def adduser(systemkey):
all_tags[newuser.key] = []
return {'user': f'/{newuser.key.decode("utf-8")}'}
else:
raise HTTPException(status_code=404, detail='I can''t let you do that Dave')
raise HTTPException(status_code=404, detail='I can\'t let you do that Dave')
@app.route('/<systemkey>/refreshfavicons')
@@ -969,7 +988,7 @@ def refreshfavicons(systemkey):
bookmark.set_favicon()
return {'message': 'Done refreshing icons'}
else:
raise HTTPException(status_code=404, detail='I can''t let you do that Dave')
raise HTTPException(status_code=404, detail='I can\'t let you do that Dave')
@app.route('/<systemkey>/findmissingfavicons')
@@ -979,7 +998,9 @@ def findmissingfavicons(systemkey):
bookmarks = Bookmark.select()
for bookmark in bookmarks:
try:
if not bookmark.favicon or not os.path.isfile(os.path.join(settings.media_dir, 'favicons', bookmark.favicon)):
if not bookmark.favicon or not os.path.isfile(
os.path.join(settings.media_dir, 'favicons', bookmark.favicon)
):
# This favicon is missing
# Clear favicon, so fallback can be used instead of showing a broken image
bookmark.favicon = None
@@ -991,22 +1012,23 @@ def findmissingfavicons(systemkey):
print(e)
return {'message': 'Done finding missing icons'}
else:
raise HTTPException(status_code=404, detail='I can''t let you do that Dave')
raise HTTPException(status_code=404, detail='I can\'t let you do that Dave')
# Initialisation == create the bookmark, user and public tag tables if they do not exist
Bookmark.create_table(True)
User.create_table(True)
PublicTag.create_table(True)
# TODO: switch to alembic migrations
# Bookmark.create_table(True)
# User.create_table(True)
# PublicTag.create_table(True)
users = User.select()
print('Current user keys:')
for user in users:
all_tags[user.key] = get_tags_for_user(user.key)
usersettings[user.key] = {'theme': user.theme}
print(user.key)
# users = User.select()
# print('Current user keys:')
# for user in users:
# all_tags[user.key] = get_tags_for_user(user.key)
# usersettings[user.key] = {'theme': user.theme}
# print(user.key)
# Run when called standalone
if __name__ == '__main__':
# if __name__ == '__main__':
# run the application
app.run(host='0.0.0.0', port=9999, debug=True)
# app.run(host='0.0.0.0', port=9999, debug=True)