From f6f129d67c958fd6ecee1eb20edfaa2ab36b8868 Mon Sep 17 00:00:00 2001 From: Michiel Scholten Date: Mon, 30 Oct 2023 21:51:55 +0100 Subject: [PATCH] Refactoring to fastapi, reformatting with ruff --- pyproject.toml | 9 + requirements-dev.in | 2 +- src/digimarks/main.py | 400 ++++++++++++++++++++++-------------------- 3 files changed, 221 insertions(+), 190 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 85d9787..8896b9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,5 +59,14 @@ select = [ "W", ] +[tool.ruff.flake8-quotes] +docstring-quotes = "double" +inline-quotes = "single" +multiline-quotes = "double" + +[tool.ruff.format] +# Prefer single quotes over double quotes +quote-style = "single" + [tool.ruff.mccabe] max-complexity = 10 diff --git a/requirements-dev.in b/requirements-dev.in index 2536ded..03ad5e5 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -1,6 +1,6 @@ -r requirements.in -black +# black pylint ruff diff --git a/src/digimarks/main.py b/src/digimarks/main.py index 9305519..07de207 100644 --- a/src/digimarks/main.py +++ b/src/digimarks/main.py @@ -5,26 +5,27 @@ import hashlib import logging import os import shutil +from typing import Optional from urllib.parse import urljoin, urlparse, urlunparse import bs4 import requests from dateutil import tz -#from flask import (Flask, abort, jsonify, make_response, redirect, + +# from flask import (Flask, abort, jsonify, make_response, redirect, # render_template, request, url_for) -from fastapi import Depends, FastAPI, HTTPException, Request, Response +from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import RedirectResponse from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.templating import Jinja2Templates from feedgen.feed import FeedGenerator -from pydantic import DirectoryPath, FilePath, validator +from pydantic import DirectoryPath, FilePath from pydantic_settings import BaseSettings -from sqlalchemy import VARCHAR, Boolean, Column, DateTime, ForeignKey, Integer, String, Text, create_engine +from sqlalchemy import VARCHAR, Boolean, Column, DateTime, Integer, Text, create_engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker - +from sqlalchemy.orm import sessionmaker, Mapped DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev' @@ -34,7 +35,8 @@ DEFAULT_THEME = 'freshgreen' class Settings(BaseSettings): """Configuration needed for digimarks to find its database, favicons, API integrations""" - database_file: FilePath = './bookmarks.db' + # database_file: FilePath = './bookmarks.db' + database_file: FilePath media_dir: DirectoryPath media_url: str = '/static/' @@ -44,17 +46,16 @@ class Settings(BaseSettings): settings = Settings() +print(settings.model_dump()) -engine = create_engine( - f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False} -) +engine = create_engine(f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() app = FastAPI() -templates = Jinja2Templates(directory="templates") +templates = Jinja2Templates(directory='templates') logger = logging.getLogger('digimarks') if settings.debug: @@ -63,10 +64,10 @@ if settings.debug: # CORS configuration app.add_middleware( CORSMiddleware, - allow_origins=["*"], # Allow requests from everywhere + allow_origins=['*'], # Allow requests from everywhere allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], + allow_methods=['*'], + allow_headers=['*'], ) @@ -113,25 +114,28 @@ def clean_tags(tags_list): return tags_res -magic_dict = { - b"\x1f\x8b\x08": "gz", - b"\x42\x5a\x68": "bz2", - b"\x50\x4b\x03\x04": "zip" - } - -max_len = max(len(x) for x in magic_dict) - def file_type(filename): - with open(filename, "rb") as f: + """Try to determine the file type for the file in `filename`. + + :param str filename: path to file to check + :return: zip file type + :rtype: str + """ + magic_dict = {b'\x1f\x8b\x08': 'gz', b'\x42\x5a\x68': 'bz2', b'\x50\x4b\x03\x04': 'zip'} + + max_len = max(len(x) for x in magic_dict) + + with open(filename, 'rb') as f: file_start = f.read(max_len) for magic, filetype in magic_dict.items(): if file_start.startswith(magic): return filetype - return "no match" + return 'no match' class User(Base): - """ User account """ + """User account""" + __tablename__ = 'user' id = Column(Integer, primary_key=True) @@ -142,13 +146,14 @@ class User(Base): created_date = Column(DateTime, default=datetime.datetime.now) def generate_key(self): - """ Generate userkey """ + """Generate userkey""" self.key = binascii.hexlify(os.urandom(24)) return self.key class Bookmark(Base): - """ Bookmark instance, connected to User """ + """Bookmark instance, connected to User""" + __tablename__ = 'bookmark' id = Column(Integer, primary_key=True) @@ -158,13 +163,15 @@ class Bookmark(Base): title = Column(VARCHAR(255), default='') url = Column(VARCHAR(255)) note = Column(Text, default='') - #image = CharField(default='') - url_hash = Column(VARCHAR(255) , default='') + # image = CharField(default='') + url_hash = Column(VARCHAR(255), default='') tags = Column(VARCHAR(255), default='') starred = Column(Boolean, default=False) # Website (domain) favicon - favicon = Column(VARCHAR(255), null=True) + # favicon = Column(VARCHAR(255), null=True) + # favicon = Column(VARCHAR(255)) + favicon: Mapped[Optional[str]] # Status code: 200 is OK, 404 is not found, for example (showing an error) HTTP_CONNECTIONERROR = 0 @@ -177,24 +184,25 @@ class Bookmark(Base): redirect_uri = None created_date = Column(DateTime, default=datetime.datetime.now) - modified_date = Column(DateTime, null=True) - deleted_date = Column(DateTime, null=True) + # modified_date = Column(DateTime, null=True) + modified_date: Mapped[Optional[datetime.datetime]] + # deleted_date = Column(DateTime, null=True) + deleted_date: Mapped[Optional[datetime.datetime]] # Bookmark status; deleting doesn't remove from DB VISIBLE = 0 DELETED = 1 status = Column(Integer, default=VISIBLE) - class Meta: ordering = (('created_date', 'desc'),) def set_hash(self): - """ Generate hash """ + """Generate hash""" self.url_hash = hashlib.md5(self.url.encode('utf-8')).hexdigest() def set_title_from_source(self) -> str: - """ Request the title by requesting the source url """ + """Request the title by requesting the source url""" try: result = requests.get(self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}) self.http_status = result.status_code @@ -210,7 +218,7 @@ class Bookmark(Base): return self.title def set_status_code(self) -> int: - """ Check the HTTP status of the url, as it might not exist for example """ + """Check the HTTP status of the url, as it might not exist for example""" try: result = requests.head(self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}, timeout=30) self.http_status = result.status_code @@ -219,13 +227,13 @@ class Bookmark(Base): return self.http_status def _set_favicon_with_iconsbetterideaorg(self, domain): - """ Fetch favicon for the domain """ + """Fetch favicon for the domain""" fileextension = '.png' meta = requests.head( 'http://icons.better-idea.org/icon?size=60&url=' + domain, allow_redirects=True, headers={'User-Agent': DIGIMARKS_USER_AGENT}, - timeout=15 + timeout=15, ) if meta.url[-3:].lower() == 'ico': fileextension = '.ico' @@ -233,7 +241,7 @@ class Bookmark(Base): 'http://icons.better-idea.org/icon?size=60&url=' + domain, stream=True, headers={'User-Agent': DIGIMARKS_USER_AGENT}, - timeout=15 + timeout=15, ) filename = os.path.join(settings.media_dir, 'favicons/', domain + fileextension) with open(filename, 'wb') as out_file: @@ -251,18 +259,18 @@ class Bookmark(Base): self.favicon = domain + fileextension def _set_favicon_with_realfavicongenerator(self, domain): - """ Fetch favicon for the domain """ + """Fetch favicon for the domain""" response = requests.get( 'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=android_chrome&site=' + domain, stream=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY} + headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY}, ) if response.status_code == 404: # Fall back to desktop favicon response = requests.get( 'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=desktop&site=' + domain, stream=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY} + headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.MASHAPE_API_KEY}, ) # Debug for the moment print(domain) @@ -293,7 +301,7 @@ class Bookmark(Base): self.favicon = domain + fileextension def set_favicon(self): - """ Fetch favicon for the domain """ + """Fetch favicon for the domain""" u = urlparse(self.url) domain = u.netloc if os.path.isfile(os.path.join(settings.media_dir, 'favicons/', domain + '.png')): @@ -304,11 +312,11 @@ class Bookmark(Base): # If file exists, don't re-download it self.favicon = domain + '.ico' return - #self._set_favicon_with_iconsbetterideaorg(domain) + # self._set_favicon_with_iconsbetterideaorg(domain) self._set_favicon_with_realfavicongenerator(domain) def set_tags(self, newtags): - """ Set tags from `tags`, strip and sort them """ + """Set tags from `tags`, strip and sort them""" tags_split = newtags.split(',') tags_clean = clean_tags(tags_split) self.tags = ','.join(tags_clean) @@ -334,7 +342,7 @@ class Bookmark(Base): @property def tags_list(self): - """ Get the tags as a list, iterable in template """ + """Get the tags as a list, iterable in template""" if self.tags: return self.tags.split(',') return [] @@ -343,7 +351,7 @@ class Bookmark(Base): result = { 'title': self.title, 'url': self.url, - 'created': self.created_date.strftime('%Y-%m-%d %H:%M:%S'), + 'created': self.created_date.strftime('%Y-%m-%d %H:%M:%S'), 'url_hash': self.url_hash, 'tags': self.tags, } @@ -355,7 +363,8 @@ class Bookmark(Base): class PublicTag(Base): - """ Publicly shared tag """ + """Publicly shared tag""" + __tablename__ = 'publictag' id = Column(Integer, primary_key=True) @@ -365,12 +374,12 @@ class PublicTag(Base): created_date = Column(DateTime, default=datetime.datetime.now) def generate_key(self): - """ Generate hash-based key for publicly shared tag """ + """Generate hash-based key for publicly shared tag""" self.tagkey = binascii.hexlify(os.urandom(16)) def get_tags_for_user(userkey): - """ Extract all tags from the bookmarks """ + """Extract all tags from the bookmarks""" bookmarks = Bookmark.select().filter(Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE) tags = [] for bookmark in bookmarks: @@ -379,7 +388,7 @@ def get_tags_for_user(userkey): def get_cached_tags(userkey): - """ Fail-safe way to get the cached tags for `userkey` """ + """Fail-safe way to get the cached tags for `userkey`""" try: return all_tags[userkey] except KeyError: @@ -401,15 +410,19 @@ def make_external(request: Request, url): def _find_bookmarks(userkey, filter_text) -> list[Bookmark]: """Look up bookmark for `userkey` which contains `filter_text` in its properties""" - return Bookmark.select().where( - Bookmark.userkey == userkey, - ( - Bookmark.title.contains(filter_text) | - Bookmark.url.contains(filter_text) | - Bookmark.note.contains(filter_text) - ), - Bookmark.status == Bookmark.VISIBLE - ).order_by(Bookmark.created_date.desc()) + return ( + Bookmark.select() + .where( + Bookmark.userkey == userkey, + ( + Bookmark.title.contains(filter_text) + | Bookmark.url.contains(filter_text) + | Bookmark.note.contains(filter_text) + ), + Bookmark.status == Bookmark.VISIBLE, + ) + .order_by(Bookmark.created_date.desc()) + ) # @app.errorhandler(404) @@ -420,93 +433,104 @@ def _find_bookmarks(userkey, filter_text) -> list[Bookmark]: @app.get('/') def index(): - """ Homepage, point visitors to project page """ + """Homepage, point visitors to project page""" # theme = themes[DEFAULT_THEME] # return render_template('index.html', theme=theme) return {} -def get_bookmarks(request: Request, userkey, filtermethod=None, sortmethod=None): - """ User homepage, list their bookmarks, optionally filtered and/or sorted """ - #return object_list('bookmarks.html', Bookmark.select()) - #user = User.select(key=userkey) - #if user: +def get_bookmarks(request: Request, user_key, filter_method=None, sort_method=None): + """User homepage, list their bookmarks, optionally filtered and/or sorted""" + # return object_list('bookmarks.html', Bookmark.select()) + # user = User.select(key=userkey) + # if user: # bookmarks = Bookmark.select(User=user) # return render_template('bookmarks.html', bookmarks) - #else: + # else: # abort(404) message = request.args.get('message') - bookmarktags = get_cached_tags(userkey) + bookmarktags = get_cached_tags(user_key) filter_text = '' if request.form: filter_text = request.form['filter_text'] filter_starred = False - if filtermethod and filtermethod.lower() == 'starred': + if filter_method and filter_method.lower() == 'starred': filter_starred = True filter_broken = False - if filtermethod and filtermethod.lower() == 'broken': + if filter_method and filter_method.lower() == 'broken': filter_broken = True filter_note = False - if filtermethod and filtermethod.lower() == 'note': + if filter_method and filter_method.lower() == 'note': filter_note = True if filter_text: - bookmarks = _find_bookmarks(userkey, filter_text) + bookmarks = _find_bookmarks(user_key, filter_text) elif filter_starred: - bookmarks = Bookmark.select().where(Bookmark.userkey == userkey, - Bookmark.starred).order_by(Bookmark.created_date.desc()) + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == user_key, Bookmark.starred) + .order_by(Bookmark.created_date.desc()) + ) elif filter_broken: - bookmarks = Bookmark.select().where(Bookmark.userkey == userkey, - Bookmark.http_status != 200).order_by(Bookmark.created_date.desc()) + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == user_key, Bookmark.http_status != 200) + .order_by(Bookmark.created_date.desc()) + ) elif filter_note: - bookmarks = Bookmark.select().where(Bookmark.userkey == userkey, - Bookmark.note != '').order_by(Bookmark.created_date.desc()) + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == user_key, Bookmark.note != '') + .order_by(Bookmark.created_date.desc()) + ) else: - bookmarks = Bookmark.select().where( - Bookmark.userkey == userkey, - Bookmark.status == Bookmark.VISIBLE - ).order_by(Bookmark.created_date.desc()) + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE) + .order_by(Bookmark.created_date.desc()) + ) return bookmarks, bookmarktags, filter_text, message -@app.get('/', response_class=HTMLResponse) -@app.post('/', response_class=HTMLResponse) -@app.route('//filter/', methods=['GET', 'POST']) -@app.route('//sort/', methods=['GET', 'POST']) -@app.route('//', methods=['GET', 'POST']) -@app.route('///filter/', methods=['GET', 'POST']) -@app.route('///sort/', methods=['GET', 'POST']) -def bookmarks_page(request: Request, userkey, filtermethod=None, sortmethod=None, show_as='cards'): - bookmarks, bookmarktags, filter_text, message = get_bookmarks(request, userkey, filtermethod, sortmethod) - theme = get_theme(userkey) +@app.get('/{user_key}', response_class=HTMLResponse) +@app.post('/{user_key}', response_class=HTMLResponse) +@app.route('//filter/', methods=['GET', 'POST']) +@app.route('//sort/', methods=['GET', 'POST']) +@app.route('//', methods=['GET', 'POST']) +@app.route('///filter/', methods=['GET', 'POST']) +@app.route('///sort/', methods=['GET', 'POST']) +def bookmarks_page(request: Request, user_key, filter_method=None, sort_method=None, show_as='cards'): + bookmarks, bookmarktags, filter_text, message = get_bookmarks(request, user_key, filter_method, sort_method) + theme = get_theme(user_key) return templates.TemplateResponse( 'bookmarks.html', bookmarks=bookmarks, - userkey=userkey, + userkey=user_key, tags=bookmarktags, filter_text=filter_text, message=message, theme=theme, editable=True, # bookmarks can be edited showtags=True, # tags should be shown with the bookmarks - filtermethod=filtermethod, - sortmethod=sortmethod, + filtermethod=filter_method, + sortmethod=sort_method, show_as=show_as, # show list of bookmarks instead of cards ) -@app.get('//js') -def bookmarks_js(userkey): - """ Return list of bookmarks with their favicons, to be used for autocompletion """ - bookmarks = Bookmark.select().where( - Bookmark.userkey == userkey, - Bookmark.status == Bookmark.VISIBLE - ).order_by(Bookmark.created_date.desc()) +@app.get('/{user_key}/js') +def bookmarks_js(user_key): + """Return list of bookmarks with their favicons, to be used for autocompletion""" + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE) + .order_by(Bookmark.created_date.desc()) + ) result = [] for bookmark in bookmarks: result.append({'title': bookmark.title}) @@ -521,13 +545,11 @@ def bookmarks_js(userkey): @app.get('/r//', response_class=HTMLResponse) def bookmark_redirect(userkey, urlhash): - """ Securely redirect a bookmark to its url, stripping referrer (if browser plays nice) """ + """Securely redirect a bookmark to its url, stripping referrer (if browser plays nice)""" # @TODO: add counter to this bookmark try: bookmark = Bookmark.get( - Bookmark.url_hash == urlhash, - Bookmark.userkey == userkey, - Bookmark.status == Bookmark.VISIBLE + Bookmark.url_hash == urlhash, Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE ) except Bookmark.DoesNotExist: raise HTTPException(status_code=404, detail='Bookmark not found') @@ -554,20 +576,19 @@ def bookmarks_json(request: Request, userkey, filtermethod=None, sortmethod=None @app.route('/api/v1//') def bookmark_json(userkey, urlhash): - """ Serialise bookmark to json """ + """Serialise bookmark to json""" try: bookmark = Bookmark.get( - Bookmark.url_hash == urlhash, - Bookmark.userkey == userkey, - Bookmark.status == Bookmark.VISIBLE + Bookmark.url_hash == urlhash, Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE ) return bookmark.to_dict() except Bookmark.DoesNotExist: raise HTTPException(status_code=404, detail='Bookmark not found') + @app.route('/api/v1//search/') def search_bookmark_titles_json(userkey, filter_text): - """ Serialise bookmark to json """ + """Serialise bookmark to json""" bookmarks = _find_bookmarks(userkey, filter_text) result = [] for bookmark in bookmarks: @@ -578,7 +599,7 @@ def search_bookmark_titles_json(userkey, filter_text): @app.get('//', response_class=HTMLResponse) @app.get('///edit', response_class=HTMLResponse) def editbookmark(request: Request, userkey, urlhash): - """ Bookmark edit form """ + """Bookmark edit form""" # bookmark = getbyurlhash() try: bookmark = Bookmark.get(Bookmark.url_hash == urlhash, Bookmark.userkey == userkey) @@ -598,13 +619,13 @@ def editbookmark(request: Request, userkey, urlhash): message=message, formaction='edit', tags=tags, - theme=theme + theme=theme, ) @app.get('//add', response_class=HTMLResponse) def addbookmark(request: Request, userkey): - """ Bookmark add form """ + """Bookmark add form""" url = request.args.get('url') if not url: url = '' @@ -615,18 +636,12 @@ def addbookmark(request: Request, userkey): tags = get_cached_tags(userkey) theme = get_theme(userkey) return templates.TemplateResponse( - 'edit.html', - action='Add bookmark', - userkey=userkey, - bookmark=bookmark, - tags=tags, - message=message, - theme=theme + 'edit.html', action='Add bookmark', userkey=userkey, bookmark=bookmark, tags=tags, message=message, theme=theme ) def updatebookmark(request: Request, userkey, urlhash=None): - """ Add (no urlhash) or edit (urlhash is set) a bookmark """ + """Add (no urlhash) or edit (urlhash is set) a bookmark""" title = request.form.get('title') url = request.form.get('url') tags = request.form.get('tags') @@ -643,7 +658,9 @@ def updatebookmark(request: Request, userkey, urlhash=None): bookmark, created = Bookmark.get_or_create(url=url, userkey=userkey) if not created: message = 'Existing bookmark, did not overwrite with new values' - return RedirectResponse(request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash, message=message)) + return RedirectResponse( + request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash, message=message) + ) elif url: # Existing bookmark, get from DB bookmark = Bookmark.get(Bookmark.userkey == userkey, Bookmark.url_hash == urlhash) @@ -661,7 +678,7 @@ def updatebookmark(request: Request, userkey, urlhash=None): bookmark.set_tags(tags) bookmark.note = note bookmark.set_hash() - #bookmark.fetch_image() + # bookmark.fetch_image() if not title: # Title was empty, automatically fetch it from the url, will also update the status code bookmark.set_title_from_source() @@ -680,26 +697,27 @@ def updatebookmark(request: Request, userkey, urlhash=None): @app.route('//adding', methods=['GET', 'POST']) -#@app.route('//adding') -def addingbookmark(request: Request, userkey): - """ Add the bookmark from form submit by /add """ - tags = get_cached_tags(userkey) +# @app.route('//adding') +def adding_bookmark(request: Request, user_key): + """Add the bookmark from form submit by /add""" + tags = get_cached_tags(user_key) if request.method == 'POST': - bookmark = updatebookmark(request, userkey) + bookmark = updatebookmark(request, user_key) if not bookmark: - return RedirectResponse(request.url_for('addbookmark', userkey=userkey, message='No url provided', tags=tags)) + return RedirectResponse( + request.url_for('addbookmark', userkey=user_key, message='No url provided', tags=tags) + ) if type(bookmark).__name__ == 'Response': return bookmark - all_tags[userkey] = get_tags_for_user(userkey) - return RedirectResponse(request.url_for('editbookmark', userkey=userkey, urlhash=bookmark.url_hash)) - return RedirectResponse(request.url_for('addbookmark', userkey=userkey, tags=tags)) + all_tags[user_key] = get_tags_for_user(user_key) + return RedirectResponse(request.url_for('editbookmark', userkey=user_key, urlhash=bookmark.url_hash)) + return RedirectResponse(request.url_for('addbookmark', userkey=user_key, tags=tags)) @app.route('///editing', methods=['GET', 'POST']) def editingbookmark(request: Request, userkey, urlhash): - """ Edit the bookmark from form submit """ - + """Edit the bookmark from form submit""" if request.method == 'POST': bookmark = updatebookmark(request, userkey, urlhash=urlhash) all_tags[userkey] = get_tags_for_user(userkey) @@ -709,26 +727,23 @@ def editingbookmark(request: Request, userkey, urlhash): @app.route('///delete', methods=['GET', 'POST']) def deletingbookmark(request: Request, userkey, urlhash): - """ Delete the bookmark from form submit by /delete """ + """Delete the bookmark from form submit by /delete""" query = Bookmark.update(status=Bookmark.DELETED).where(Bookmark.userkey == userkey, Bookmark.url_hash == urlhash) query.execute() query = Bookmark.update(deleted_date=datetime.datetime.now()).where( - Bookmark.userkey == userkey, - Bookmark.url_hash == urlhash + Bookmark.userkey == userkey, Bookmark.url_hash == urlhash ) query.execute() - message = 'Bookmark deleted. Undo deletion'.format(request.url_for( - 'undeletebookmark', - userkey=userkey, - urlhash=urlhash - )) + message = 'Bookmark deleted. Undo deletion'.format( + request.url_for('undeletebookmark', userkey=userkey, urlhash=urlhash) + ) all_tags[userkey] = get_tags_for_user(userkey) return RedirectResponse(request.url_for('bookmarks_page', userkey=userkey, message=message)) @app.get('///undelete') def undeletebookmark(request: Request, userkey, urlhash): - """ Undo deletion of the bookmark identified by urlhash """ + """Undo deletion of the bookmark identified by urlhash""" query = Bookmark.update(status=Bookmark.VISIBLE).where(Bookmark.userkey == userkey, Bookmark.url_hash == urlhash) query.execute() message = 'Bookmark restored' @@ -738,9 +753,9 @@ def undeletebookmark(request: Request, userkey, urlhash): @app.get('//tags', response_class=HTMLResponse) def tags_page(userkey): - """ Overview of all tags used by user """ + """Overview of all tags used by user""" tags = get_cached_tags(userkey) - #publictags = PublicTag.select().where(Bookmark.userkey == userkey) + # publictags = PublicTag.select().where(Bookmark.userkey == userkey) alltags = [] for tag in tags: try: @@ -748,11 +763,11 @@ def tags_page(userkey): except PublicTag.DoesNotExist: publictag = None - total = Bookmark.select().where( - Bookmark.userkey == userkey, - Bookmark.tags.contains(tag), - Bookmark.status == Bookmark.VISIBLE - ).count() + total = ( + Bookmark.select() + .where(Bookmark.userkey == userkey, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE) + .count() + ) alltags.append({'tag': tag, 'publictag': publictag, 'total': total}) totaltags = len(alltags) totalbookmarks = Bookmark.select().where(Bookmark.userkey == userkey, Bookmark.status == Bookmark.VISIBLE).count() @@ -773,18 +788,18 @@ def tags_page(userkey): totalhttperrorstatus=totalhttperrorstatus, totalnotes=totalnotes, userkey=userkey, - theme=theme + theme=theme, ) @app.get('//tag/', response_class=HTMLResponse) def tag_page(request: Request, userkey, tag): - """ Overview of all bookmarks with a certain tag """ - bookmarks = Bookmark.select().where( - Bookmark.userkey == userkey, - Bookmark.tags.contains(tag), - Bookmark.status == Bookmark.VISIBLE - ).order_by(Bookmark.created_date.desc()) + """Overview of all bookmarks with a certain tag""" + bookmarks = ( + Bookmark.select() + .where(Bookmark.userkey == userkey, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE) + .order_by(Bookmark.created_date.desc()) + ) tags = get_cached_tags(userkey) pageheader = 'tag: ' + tag message = request.args.get('message') @@ -811,20 +826,24 @@ def tag_page(request: Request, userkey, tag): def get_publictag(tagkey): - """ Return tag and bookmarks in this public tag collection """ + """Return tag and bookmarks in this public tag collection""" this_tag = PublicTag.get(PublicTag.tagkey == tagkey) - bookmarks = Bookmark.select().where( - Bookmark.userkey == this_tag.userkey, - Bookmark.tags.contains(this_tag.tag), - Bookmark.status == Bookmark.VISIBLE - ).order_by(Bookmark.created_date.desc()) + bookmarks = ( + Bookmark.select() + .where( + Bookmark.userkey == this_tag.userkey, + Bookmark.tags.contains(this_tag.tag), + Bookmark.status == Bookmark.VISIBLE, + ) + .order_by(Bookmark.created_date.desc()) + ) return this_tag, bookmarks @app.get('/pub/', response_class=HTMLResponse) def publictag_page(tagkey): - """ Read-only overview of the bookmarks in the userkey/tag of this PublicTag """ - #this_tag = get_object_or_404(PublicTag.select().where(PublicTag.tagkey == tagkey)) + """Read-only overview of the bookmarks in the userkey/tag of this PublicTag""" + # this_tag = get_object_or_404(PublicTag.select().where(PublicTag.tagkey == tagkey)) try: this_tag, bookmarks = get_publictag(tagkey) # theme = themes[DEFAULT_THEME] @@ -835,7 +854,7 @@ def publictag_page(tagkey): tag=this_tag.tag, action=this_tag.tag, tagkey=tagkey, - theme=theme + theme=theme, ) except PublicTag.DoesNotExist: raise HTTPException(status_code=404, detail='Public tag not found') @@ -843,7 +862,7 @@ def publictag_page(tagkey): @app.route('/api/v1/pub/') def publictag_json(tagkey): - """ json representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag """ + """Json representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag""" try: this_tag, bookmarks = get_publictag(tagkey) result = { @@ -861,13 +880,13 @@ def publictag_json(tagkey): @app.get('/pub//feed') async def publictag_feed(request: Request, tagkey: str): - """ rss/atom representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag """ + """rss/atom representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag""" try: this_tag = PublicTag.get(PublicTag.tagkey == tagkey) bookmarks = Bookmark.select().where( Bookmark.userkey == this_tag.userkey, Bookmark.tags.contains(this_tag.tag), - Bookmark.status == Bookmark.VISIBLE + Bookmark.status == Bookmark.VISIBLE, ) feed = FeedGenerator() @@ -942,7 +961,7 @@ def removepublictag(request: Request, userkey, tag, tagkey): @app.route('//adduser') def adduser(systemkey): - """ Add user endpoint, convenience """ + """Add user endpoint, convenience""" if systemkey == settings.SYSTEMKEY: newuser = User() newuser.generate_key() @@ -951,12 +970,12 @@ def adduser(systemkey): all_tags[newuser.key] = [] return {'user': f'/{newuser.key.decode("utf-8")}'} else: - raise HTTPException(status_code=404, detail='I can''t let you do that Dave') + raise HTTPException(status_code=404, detail='I can\'t let you do that Dave') @app.route('//refreshfavicons') def refreshfavicons(systemkey): - """ Add user endpoint, convenience """ + """Add user endpoint, convenience""" if systemkey == settings.SYSTEMKEY: bookmarks = Bookmark.select() for bookmark in bookmarks: @@ -969,17 +988,19 @@ def refreshfavicons(systemkey): bookmark.set_favicon() return {'message': 'Done refreshing icons'} else: - raise HTTPException(status_code=404, detail='I can''t let you do that Dave') + raise HTTPException(status_code=404, detail='I can\'t let you do that Dave') @app.route('//findmissingfavicons') def findmissingfavicons(systemkey): - """ Add user endpoint, convenience """ + """Add user endpoint, convenience""" if systemkey == settings.SYSTEMKEY: bookmarks = Bookmark.select() for bookmark in bookmarks: try: - if not bookmark.favicon or not os.path.isfile(os.path.join(settings.media_dir, 'favicons', bookmark.favicon)): + if not bookmark.favicon or not os.path.isfile( + os.path.join(settings.media_dir, 'favicons', bookmark.favicon) + ): # This favicon is missing # Clear favicon, so fallback can be used instead of showing a broken image bookmark.favicon = None @@ -991,22 +1012,23 @@ def findmissingfavicons(systemkey): print(e) return {'message': 'Done finding missing icons'} else: - raise HTTPException(status_code=404, detail='I can''t let you do that Dave') + raise HTTPException(status_code=404, detail='I can\'t let you do that Dave') # Initialisation == create the bookmark, user and public tag tables if they do not exist -Bookmark.create_table(True) -User.create_table(True) -PublicTag.create_table(True) +# TODO: switch to alembic migrations +# Bookmark.create_table(True) +# User.create_table(True) +# PublicTag.create_table(True) -users = User.select() -print('Current user keys:') -for user in users: - all_tags[user.key] = get_tags_for_user(user.key) - usersettings[user.key] = {'theme': user.theme} - print(user.key) +# users = User.select() +# print('Current user keys:') +# for user in users: +# all_tags[user.key] = get_tags_for_user(user.key) +# usersettings[user.key] = {'theme': user.theme} +# print(user.key) # Run when called standalone -if __name__ == '__main__': - # run the application - app.run(host='0.0.0.0', port=9999, debug=True) +# if __name__ == '__main__': +# run the application +# app.run(host='0.0.0.0', port=9999, debug=True)