From 99d883d0e9a7bed0a2b151f3845e35c3fd8d3c01 Mon Sep 17 00:00:00 2001 From: Michiel Scholten Date: Fri, 12 Sep 2025 19:59:07 +0200 Subject: [PATCH] Moved more functionality to modules, away from main app file --- src/digimarks/bookmarks_helpers.py | 78 +++++++++++++ src/digimarks/main.py | 169 ++--------------------------- src/digimarks/tags_helpers.py | 69 ++++++++++++ src/digimarks/utils.py | 15 +++ 4 files changed, 174 insertions(+), 157 deletions(-) create mode 100644 src/digimarks/bookmarks_helpers.py create mode 100644 src/digimarks/tags_helpers.py create mode 100644 src/digimarks/utils.py diff --git a/src/digimarks/bookmarks_helpers.py b/src/digimarks/bookmarks_helpers.py new file mode 100644 index 0000000..bc1451e --- /dev/null +++ b/src/digimarks/bookmarks_helpers.py @@ -0,0 +1,78 @@ +"""Bookmark helper functions, like content scrapers, favicon extractor, updater functions.""" + +from urllib.parse import urlparse, urlunparse + +import bs4 +import httpx +from extract_favicon import from_html +from fastapi import Request +from pydantic import AnyUrl + +from src.digimarks import tags_helpers +from src.digimarks.models import Bookmark + +DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev' + + +def get_favicon(html_content: str, root_url: str) -> str: + """Fetch the favicon from `html_content` using `root_url`.""" + favicons = from_html(html_content, root_url=root_url, include_fallbacks=True) + for favicon in favicons: + print(favicon.url, favicon.width, favicon.height) + # TODO: save the preferred image to file and return + + +async def set_information_from_source(logger, bookmark: Bookmark, request: Request) -> Bookmark: + """Request the title by requesting the source url.""" + logger.info('Extracting information from url %s', bookmark.url) + try: + result = await request.app.requests_client.get(bookmark.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}) + bookmark.http_status = result.status_code + except httpx.HTTPError as err: + # For example, "MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?" + logger.error('Exception when trying to retrieve title for %s. Error: %s', bookmark.url, str(err)) + bookmark.http_status = 404 + bookmark.title = '' + return bookmark + if bookmark.http_status == 200 or bookmark.http_status == 202: + html = bs4.BeautifulSoup(result.text, 'html.parser') + try: + bookmark.title = html.title.text.strip() + except AttributeError: + bookmark.title = '' + + url_parts = urlparse(str(bookmark.url)) + root_url = url_parts.scheme + '://' + url_parts.netloc + favicon = get_favicon(result.text, root_url) + # filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension) + # with open(filename, 'wb') as out_file: + # shutil.copyfileobj(response.raw, out_file) + + # Extraction was successful + logger.info('Extracting information was successful') + return bookmark + + +def strip_url_params(url: str) -> str: + """Strip URL params from URL. + + :param url: URL to strip URL params from. + :return: clean URL + :rtype: str + """ + parsed = urlparse(url) + return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment)) + + +def update_bookmark_with_info(bookmark: Bookmark, request: Request, strip_params: bool = False): + """Automatically update title, favicon, etc.""" + if not bookmark.title: + # Title was empty, automatically fetch it from the url, will also update the status code + set_information_from_source(bookmark, request) + + if strip_params: + # Strip URL parameters, e.g., tracking params + bookmark.url = AnyUrl(strip_url_params(str(bookmark.url))) + + # Sort and deduplicate tags + tags_helpers.set_tags(bookmark, bookmark.tags) diff --git a/src/digimarks/main.py b/src/digimarks/main.py index 69b426e..73234f5 100644 --- a/src/digimarks/main.py +++ b/src/digimarks/main.py @@ -1,32 +1,26 @@ """digimarks main module.""" -import binascii -import hashlib import logging -import os from contextlib import asynccontextmanager from datetime import UTC, datetime from typing import Annotated, Sequence, Type -from urllib.parse import urlparse, urlunparse -import bs4 import httpx -from extract_favicon import from_html from fastapi import Depends, FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates -from pydantic import AnyUrl, DirectoryPath, FilePath +from pydantic import DirectoryPath, FilePath from pydantic_settings import BaseSettings from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import sessionmaker from sqlmodel import desc, select from sqlmodel.ext.asyncio.session import AsyncSession +from src.digimarks import bookmarks_helpers, tags_helpers, utils from src.digimarks.models import DEFAULT_THEME, Bookmark, User, Visibility -DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev' DIGIMARKS_VERSION = '2.0.0a1' @@ -77,6 +71,7 @@ app.mount('/static', StaticFiles(directory=settings.static_dir), name='static') app.mount('/content/favicons', StaticFiles(directory=settings.favicons_dir), name='favicons') templates = Jinja2Templates(directory=settings.template_dir) +# Set up logging logger = logging.getLogger('digimarks') if settings.debug: logger.setLevel(logging.DEBUG) @@ -91,61 +86,6 @@ app.add_middleware( ) -def i_filter_false(predicate, iterable): - """Filter an iterable if predicate returns True. - - i_filter_false(lambda x: x%2, range(10)) --> 0 2 4 6 8 - """ - if predicate is None: - predicate = bool - for x in iterable: - if not predicate(x): - yield x - - -def unique_ever_seen(iterable, key=None): - """List unique elements, preserving order. Remember all elements ever seen. - - unique_ever_seen('AAAABBBCCDAABBB') --> A B C D - unique_ever_seen('ABBCcAD', str.lower) --> A B C D - """ - seen = set() - seen_add = seen.add - if key is None: - for element in i_filter_false(seen.__contains__, iterable): - seen_add(element) - yield element - else: - for element in iterable: - k = key(element) - if k not in seen: - seen_add(k) - yield element - - -def clean_tags(tags_list: list) -> list: - """Generate a unique list of the tags. - - :param list tags_list: List with all tags - :return: deduplicated list of the tags, without leading or trailing whitespace - :rtype: list - """ - tags_res = [x.strip() for x in tags_list] - tags_res = list(unique_ever_seen(tags_res)) - tags_res.sort() - if tags_res and tags_res[0] == '': - del tags_res[0] - return tags_res - - -def list_tags_for_bookmarks(bookmarks: list) -> list: - """Generate a unique list of the tags from the list of bookmarks.""" - tags = [] - for bookmark in bookmarks: - tags += bookmark.tags_list - return clean_tags(tags) - - def file_type(filename: str) -> str: """Try to determine the file type for the file in `filename`. @@ -165,91 +105,6 @@ def file_type(filename: str) -> str: return 'no match' -def generate_hash(input_text: str) -> str: - """Generate a hash from string `input`, e.g., for a URL.""" - return hashlib.md5(input_text.encode('utf-8')).hexdigest() - - -def generate_key() -> str: - """Generate a key to be used for a user or tag.""" - return str(binascii.hexlify(os.urandom(24))) - - -def get_favicon(html_content: str, root_url: str) -> str: - """Fetch the favicon from `html_content` using `root_url`.""" - favicons = from_html(html_content, root_url=root_url, include_fallbacks=True) - for favicon in favicons: - print(favicon.url, favicon.width, favicon.height) - # TODO: save the preferred image to file and return - - -async def set_information_from_source(bookmark: Bookmark, request: Request) -> Bookmark: - """Request the title by requesting the source url.""" - logger.info('Extracting information from url %s', bookmark.url) - try: - result = await request.app.requests_client.get(bookmark.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}) - bookmark.http_status = result.status_code - except httpx.HTTPError as err: - # For example, 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?' - logger.error('Exception when trying to retrieve title for %s. Error: %s', bookmark.url, str(err)) - bookmark.http_status = 404 - bookmark.title = '' - return bookmark - if bookmark.http_status == 200 or bookmark.http_status == 202: - html = bs4.BeautifulSoup(result.text, 'html.parser') - try: - bookmark.title = html.title.text.strip() - except AttributeError: - bookmark.title = '' - - url_parts = urlparse(str(bookmark.url)) - root_url = url_parts.scheme + '://' + url_parts.netloc - favicon = get_favicon(result.text, root_url) - # filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension) - # with open(filename, 'wb') as out_file: - # shutil.copyfileobj(response.raw, out_file) - - # Extraction was successful - logger.info('Extracting information was successful') - return bookmark - - -def set_tags(bookmark: Bookmark, new_tags: str) -> None: - """Set tags from `tags`, strip and sort them. - - :param Bookmark bookmark: Bookmark to modify - :param str new_tags: New tags to sort and set. - """ - tags_split = new_tags.split(',') - tags_clean = clean_tags(tags_split) - bookmark.tags = ','.join(tags_clean) - - -def strip_url_params(url: str) -> str: - """Strip URL params from URL. - - :param url: URL to strip URL params from. - :return: clean URL - :rtype: str - """ - parsed = urlparse(url) - return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment)) - - -def update_bookmark_with_info(bookmark: Bookmark, request: Request, strip_params: bool = False): - """Automatically update title, favicon, etc.""" - if not bookmark.title: - # Title was empty, automatically fetch it from the url, will also update the status code - set_information_from_source(bookmark, request) - - if strip_params: - # Strip URL parameters, e.g., tracking params - bookmark.url = AnyUrl(strip_url_params(str(bookmark.url))) - - # Sort and deduplicate tags - set_tags(bookmark, bookmark.tags) - - @app.get('/', response_class=HTMLResponse) @app.head('/', response_class=HTMLResponse) def index(request: Request): @@ -344,9 +199,9 @@ async def autocomplete_bookmark( bookmark.userkey = user_key # Auto-fill title, fix tags etc. - update_bookmark_with_info(bookmark, request, strip_params) + bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params) - url_hash = generate_hash(str(bookmark.url)) + url_hash = utils.generate_hash(str(bookmark.url)) result = await session.exec( select(Bookmark).where( Bookmark.userkey == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED @@ -373,8 +228,8 @@ async def add_bookmark( bookmark.userkey = user_key # Auto-fill title, fix tags etc. - update_bookmark_with_info(bookmark, request, strip_params) - bookmark.url_hash = generate_hash(str(bookmark.url)) + bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params) + bookmark.url_hash = utils.generate_hash(str(bookmark.url)) session.add(bookmark) await session.commit() @@ -409,11 +264,11 @@ async def update_bookmark( bookmark_db.sqlmodel_update(bookmark_data) # Autofill title, fix tags, etc. where (still) needed - update_bookmark_with_info(bookmark, request, strip_params) + bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params) session.add(bookmark_db) - session.commit() - session.refresh(bookmark_db) + await session.commit() + await session.refresh(bookmark_db) return bookmark_db @@ -477,7 +332,7 @@ async def list_tags_for_user( tags = [] for bookmark in bookmarks: tags += bookmark.tag_list - return clean_tags(tags) + return tags.clean_tags(tags) @app.get('/api/v1/{user_key}/tags/{tag_key}') @@ -488,7 +343,7 @@ async def list_tags_for_user( """List all tags in use by the user.""" result = await session.exec(select(Bookmark).where(Bookmark.userkey == user_key)) bookmarks = result.all() - return list_tags_for_bookmarks(bookmarks) + return tags_helpers.list_tags_for_bookmarks(bookmarks) @app.get('/{user_key}', response_class=HTMLResponse) diff --git a/src/digimarks/tags_helpers.py b/src/digimarks/tags_helpers.py new file mode 100644 index 0000000..22cd0b0 --- /dev/null +++ b/src/digimarks/tags_helpers.py @@ -0,0 +1,69 @@ +"""Helper functions for tags used with Bookmark models.""" + +from src.digimarks.models import Bookmark + + +def i_filter_false(predicate, iterable): + """Filter an iterable if predicate returns True. + + i_filter_false(lambda x: x%2, range(10)) --> 0 2 4 6 8 + """ + if predicate is None: + predicate = bool + for x in iterable: + if not predicate(x): + yield x + + +def unique_ever_seen(iterable, key=None): + """List unique elements, preserving order. Remember all elements ever seen. + + unique_ever_seen('AAAABBBCCDAABBB') --> A B C D + unique_ever_seen('ABBCcAD', str.lower) --> A B C D + """ + seen = set() + seen_add = seen.add + if key is None: + for element in i_filter_false(seen.__contains__, iterable): + seen_add(element) + yield element + else: + for element in iterable: + k = key(element) + if k not in seen: + seen_add(k) + yield element + + +def clean_tags(tags_list: list) -> list: + """Generate a unique list of the tags. + + :param list tags_list: List with all tags + :return: deduplicated list of the tags, without leading or trailing whitespace + :rtype: list + """ + tags_res = [x.strip() for x in tags_list] + tags_res = list(unique_ever_seen(tags_res)) + tags_res.sort() + if tags_res and tags_res[0] == '': + del tags_res[0] + return tags_res + + +def list_tags_for_bookmarks(bookmarks: list) -> list: + """Generate a unique list of the tags from the list of bookmarks.""" + tags = [] + for bookmark in bookmarks: + tags += bookmark.tags_list + return clean_tags(tags) + + +def set_tags(bookmark: Bookmark, new_tags: str) -> None: + """Set tags from `tags`, strip and sort them. + + :param Bookmark bookmark: Bookmark to modify + :param str new_tags: New tags to sort and set. + """ + tags_split = new_tags.split(',') + tags_clean = clean_tags(tags_split) + bookmark.tags = ','.join(tags_clean) diff --git a/src/digimarks/utils.py b/src/digimarks/utils.py new file mode 100644 index 0000000..c59c8ec --- /dev/null +++ b/src/digimarks/utils.py @@ -0,0 +1,15 @@ +"""General utility functions.""" + +import binascii +import hashlib +import os + + +def generate_hash(input_text: str) -> str: + """Generate a hash from string `input`, e.g., for a URL.""" + return hashlib.md5(input_text.encode('utf-8')).hexdigest() + + +def generate_key() -> str: + """Generate a key to be used for a user or tag.""" + return str(binascii.hexlify(os.urandom(24)))