1
0
mirror of https://github.com/aquatix/digimarks.git synced 2025-12-06 22:05:09 +01:00

Moved more functionality to modules, away from main app file

This commit is contained in:
2025-09-12 19:59:07 +02:00
parent 7facbeb149
commit 99d883d0e9
4 changed files with 174 additions and 157 deletions

View File

@@ -0,0 +1,78 @@
"""Bookmark helper functions, like content scrapers, favicon extractor, updater functions."""
from urllib.parse import urlparse, urlunparse
import bs4
import httpx
from extract_favicon import from_html
from fastapi import Request
from pydantic import AnyUrl
from src.digimarks import tags_helpers
from src.digimarks.models import Bookmark
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
def get_favicon(html_content: str, root_url: str) -> str:
"""Fetch the favicon from `html_content` using `root_url`."""
favicons = from_html(html_content, root_url=root_url, include_fallbacks=True)
for favicon in favicons:
print(favicon.url, favicon.width, favicon.height)
# TODO: save the preferred image to file and return
async def set_information_from_source(logger, bookmark: Bookmark, request: Request) -> Bookmark:
"""Request the title by requesting the source url."""
logger.info('Extracting information from url %s', bookmark.url)
try:
result = await request.app.requests_client.get(bookmark.url, headers={'User-Agent': DIGIMARKS_USER_AGENT})
bookmark.http_status = result.status_code
except httpx.HTTPError as err:
# For example, "MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?"
logger.error('Exception when trying to retrieve title for %s. Error: %s', bookmark.url, str(err))
bookmark.http_status = 404
bookmark.title = ''
return bookmark
if bookmark.http_status == 200 or bookmark.http_status == 202:
html = bs4.BeautifulSoup(result.text, 'html.parser')
try:
bookmark.title = html.title.text.strip()
except AttributeError:
bookmark.title = ''
url_parts = urlparse(str(bookmark.url))
root_url = url_parts.scheme + '://' + url_parts.netloc
favicon = get_favicon(result.text, root_url)
# filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension)
# with open(filename, 'wb') as out_file:
# shutil.copyfileobj(response.raw, out_file)
# Extraction was successful
logger.info('Extracting information was successful')
return bookmark
def strip_url_params(url: str) -> str:
"""Strip URL params from URL.
:param url: URL to strip URL params from.
:return: clean URL
:rtype: str
"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment))
def update_bookmark_with_info(bookmark: Bookmark, request: Request, strip_params: bool = False):
"""Automatically update title, favicon, etc."""
if not bookmark.title:
# Title was empty, automatically fetch it from the url, will also update the status code
set_information_from_source(bookmark, request)
if strip_params:
# Strip URL parameters, e.g., tracking params
bookmark.url = AnyUrl(strip_url_params(str(bookmark.url)))
# Sort and deduplicate tags
tags_helpers.set_tags(bookmark, bookmark.tags)

View File

@@ -1,32 +1,26 @@
"""digimarks main module.""" """digimarks main module."""
import binascii
import hashlib
import logging import logging
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Annotated, Sequence, Type from typing import Annotated, Sequence, Type
from urllib.parse import urlparse, urlunparse
import bs4
import httpx import httpx
from extract_favicon import from_html
from fastapi import Depends, FastAPI, HTTPException, Query, Request from fastapi import Depends, FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from pydantic import AnyUrl, DirectoryPath, FilePath from pydantic import DirectoryPath, FilePath
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlmodel import desc, select from sqlmodel import desc, select
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from src.digimarks import bookmarks_helpers, tags_helpers, utils
from src.digimarks.models import DEFAULT_THEME, Bookmark, User, Visibility from src.digimarks.models import DEFAULT_THEME, Bookmark, User, Visibility
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
DIGIMARKS_VERSION = '2.0.0a1' DIGIMARKS_VERSION = '2.0.0a1'
@@ -77,6 +71,7 @@ app.mount('/static', StaticFiles(directory=settings.static_dir), name='static')
app.mount('/content/favicons', StaticFiles(directory=settings.favicons_dir), name='favicons') app.mount('/content/favicons', StaticFiles(directory=settings.favicons_dir), name='favicons')
templates = Jinja2Templates(directory=settings.template_dir) templates = Jinja2Templates(directory=settings.template_dir)
# Set up logging
logger = logging.getLogger('digimarks') logger = logging.getLogger('digimarks')
if settings.debug: if settings.debug:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@@ -91,61 +86,6 @@ app.add_middleware(
) )
def i_filter_false(predicate, iterable):
"""Filter an iterable if predicate returns True.
i_filter_false(lambda x: x%2, range(10)) --> 0 2 4 6 8
"""
if predicate is None:
predicate = bool
for x in iterable:
if not predicate(x):
yield x
def unique_ever_seen(iterable, key=None):
"""List unique elements, preserving order. Remember all elements ever seen.
unique_ever_seen('AAAABBBCCDAABBB') --> A B C D
unique_ever_seen('ABBCcAD', str.lower) --> A B C D
"""
seen = set()
seen_add = seen.add
if key is None:
for element in i_filter_false(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def clean_tags(tags_list: list) -> list:
"""Generate a unique list of the tags.
:param list tags_list: List with all tags
:return: deduplicated list of the tags, without leading or trailing whitespace
:rtype: list
"""
tags_res = [x.strip() for x in tags_list]
tags_res = list(unique_ever_seen(tags_res))
tags_res.sort()
if tags_res and tags_res[0] == '':
del tags_res[0]
return tags_res
def list_tags_for_bookmarks(bookmarks: list) -> list:
"""Generate a unique list of the tags from the list of bookmarks."""
tags = []
for bookmark in bookmarks:
tags += bookmark.tags_list
return clean_tags(tags)
def file_type(filename: str) -> str: def file_type(filename: str) -> str:
"""Try to determine the file type for the file in `filename`. """Try to determine the file type for the file in `filename`.
@@ -165,91 +105,6 @@ def file_type(filename: str) -> str:
return 'no match' return 'no match'
def generate_hash(input_text: str) -> str:
"""Generate a hash from string `input`, e.g., for a URL."""
return hashlib.md5(input_text.encode('utf-8')).hexdigest()
def generate_key() -> str:
"""Generate a key to be used for a user or tag."""
return str(binascii.hexlify(os.urandom(24)))
def get_favicon(html_content: str, root_url: str) -> str:
"""Fetch the favicon from `html_content` using `root_url`."""
favicons = from_html(html_content, root_url=root_url, include_fallbacks=True)
for favicon in favicons:
print(favicon.url, favicon.width, favicon.height)
# TODO: save the preferred image to file and return
async def set_information_from_source(bookmark: Bookmark, request: Request) -> Bookmark:
"""Request the title by requesting the source url."""
logger.info('Extracting information from url %s', bookmark.url)
try:
result = await request.app.requests_client.get(bookmark.url, headers={'User-Agent': DIGIMARKS_USER_AGENT})
bookmark.http_status = result.status_code
except httpx.HTTPError as err:
# For example, 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?'
logger.error('Exception when trying to retrieve title for %s. Error: %s', bookmark.url, str(err))
bookmark.http_status = 404
bookmark.title = ''
return bookmark
if bookmark.http_status == 200 or bookmark.http_status == 202:
html = bs4.BeautifulSoup(result.text, 'html.parser')
try:
bookmark.title = html.title.text.strip()
except AttributeError:
bookmark.title = ''
url_parts = urlparse(str(bookmark.url))
root_url = url_parts.scheme + '://' + url_parts.netloc
favicon = get_favicon(result.text, root_url)
# filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension)
# with open(filename, 'wb') as out_file:
# shutil.copyfileobj(response.raw, out_file)
# Extraction was successful
logger.info('Extracting information was successful')
return bookmark
def set_tags(bookmark: Bookmark, new_tags: str) -> None:
"""Set tags from `tags`, strip and sort them.
:param Bookmark bookmark: Bookmark to modify
:param str new_tags: New tags to sort and set.
"""
tags_split = new_tags.split(',')
tags_clean = clean_tags(tags_split)
bookmark.tags = ','.join(tags_clean)
def strip_url_params(url: str) -> str:
"""Strip URL params from URL.
:param url: URL to strip URL params from.
:return: clean URL
:rtype: str
"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment))
def update_bookmark_with_info(bookmark: Bookmark, request: Request, strip_params: bool = False):
"""Automatically update title, favicon, etc."""
if not bookmark.title:
# Title was empty, automatically fetch it from the url, will also update the status code
set_information_from_source(bookmark, request)
if strip_params:
# Strip URL parameters, e.g., tracking params
bookmark.url = AnyUrl(strip_url_params(str(bookmark.url)))
# Sort and deduplicate tags
set_tags(bookmark, bookmark.tags)
@app.get('/', response_class=HTMLResponse) @app.get('/', response_class=HTMLResponse)
@app.head('/', response_class=HTMLResponse) @app.head('/', response_class=HTMLResponse)
def index(request: Request): def index(request: Request):
@@ -344,9 +199,9 @@ async def autocomplete_bookmark(
bookmark.userkey = user_key bookmark.userkey = user_key
# Auto-fill title, fix tags etc. # Auto-fill title, fix tags etc.
update_bookmark_with_info(bookmark, request, strip_params) bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params)
url_hash = generate_hash(str(bookmark.url)) url_hash = utils.generate_hash(str(bookmark.url))
result = await session.exec( result = await session.exec(
select(Bookmark).where( select(Bookmark).where(
Bookmark.userkey == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED Bookmark.userkey == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED
@@ -373,8 +228,8 @@ async def add_bookmark(
bookmark.userkey = user_key bookmark.userkey = user_key
# Auto-fill title, fix tags etc. # Auto-fill title, fix tags etc.
update_bookmark_with_info(bookmark, request, strip_params) bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params)
bookmark.url_hash = generate_hash(str(bookmark.url)) bookmark.url_hash = utils.generate_hash(str(bookmark.url))
session.add(bookmark) session.add(bookmark)
await session.commit() await session.commit()
@@ -409,11 +264,11 @@ async def update_bookmark(
bookmark_db.sqlmodel_update(bookmark_data) bookmark_db.sqlmodel_update(bookmark_data)
# Autofill title, fix tags, etc. where (still) needed # Autofill title, fix tags, etc. where (still) needed
update_bookmark_with_info(bookmark, request, strip_params) bookmarks_helpers.update_bookmark_with_info(bookmark, request, strip_params)
session.add(bookmark_db) session.add(bookmark_db)
session.commit() await session.commit()
session.refresh(bookmark_db) await session.refresh(bookmark_db)
return bookmark_db return bookmark_db
@@ -477,7 +332,7 @@ async def list_tags_for_user(
tags = [] tags = []
for bookmark in bookmarks: for bookmark in bookmarks:
tags += bookmark.tag_list tags += bookmark.tag_list
return clean_tags(tags) return tags.clean_tags(tags)
@app.get('/api/v1/{user_key}/tags/{tag_key}') @app.get('/api/v1/{user_key}/tags/{tag_key}')
@@ -488,7 +343,7 @@ async def list_tags_for_user(
"""List all tags in use by the user.""" """List all tags in use by the user."""
result = await session.exec(select(Bookmark).where(Bookmark.userkey == user_key)) result = await session.exec(select(Bookmark).where(Bookmark.userkey == user_key))
bookmarks = result.all() bookmarks = result.all()
return list_tags_for_bookmarks(bookmarks) return tags_helpers.list_tags_for_bookmarks(bookmarks)
@app.get('/{user_key}', response_class=HTMLResponse) @app.get('/{user_key}', response_class=HTMLResponse)

View File

@@ -0,0 +1,69 @@
"""Helper functions for tags used with Bookmark models."""
from src.digimarks.models import Bookmark
def i_filter_false(predicate, iterable):
"""Filter an iterable if predicate returns True.
i_filter_false(lambda x: x%2, range(10)) --> 0 2 4 6 8
"""
if predicate is None:
predicate = bool
for x in iterable:
if not predicate(x):
yield x
def unique_ever_seen(iterable, key=None):
"""List unique elements, preserving order. Remember all elements ever seen.
unique_ever_seen('AAAABBBCCDAABBB') --> A B C D
unique_ever_seen('ABBCcAD', str.lower) --> A B C D
"""
seen = set()
seen_add = seen.add
if key is None:
for element in i_filter_false(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def clean_tags(tags_list: list) -> list:
"""Generate a unique list of the tags.
:param list tags_list: List with all tags
:return: deduplicated list of the tags, without leading or trailing whitespace
:rtype: list
"""
tags_res = [x.strip() for x in tags_list]
tags_res = list(unique_ever_seen(tags_res))
tags_res.sort()
if tags_res and tags_res[0] == '':
del tags_res[0]
return tags_res
def list_tags_for_bookmarks(bookmarks: list) -> list:
"""Generate a unique list of the tags from the list of bookmarks."""
tags = []
for bookmark in bookmarks:
tags += bookmark.tags_list
return clean_tags(tags)
def set_tags(bookmark: Bookmark, new_tags: str) -> None:
"""Set tags from `tags`, strip and sort them.
:param Bookmark bookmark: Bookmark to modify
:param str new_tags: New tags to sort and set.
"""
tags_split = new_tags.split(',')
tags_clean = clean_tags(tags_split)
bookmark.tags = ','.join(tags_clean)

15
src/digimarks/utils.py Normal file
View File

@@ -0,0 +1,15 @@
"""General utility functions."""
import binascii
import hashlib
import os
def generate_hash(input_text: str) -> str:
"""Generate a hash from string `input`, e.g., for a URL."""
return hashlib.md5(input_text.encode('utf-8')).hexdigest()
def generate_key() -> str:
"""Generate a key to be used for a user or tag."""
return str(binascii.hexlify(os.urandom(24)))