1
0
mirror of https://codeberg.org/diginaut/digimarks.git synced 2026-03-22 06:20:49 +01:00
Files
digimarks/src/digimarks/bookmarks_service.py

232 lines
8.1 KiB
Python

"""Bookmark helper functions, like content scrapers, favicon extractor, updater functions."""
import logging
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Annotated
from urllib.parse import urlparse, urlunparse
import bs4
import httpx
from extract_favicon import from_html
from fastapi import Query, Request
from fastapi.exceptions import HTTPException
from pydantic import AnyUrl, ValidationError
from sqlmodel import select
from digimarks import tags_service, utils
from digimarks.exceptions import BookmarkNotFound
from digimarks.models import Bookmark, Visibility
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
logger = logging.getLogger('digimarks')
def get_favicon(html_content: str, root_url: str) -> str:
"""Fetch the favicon from `html_content` using `root_url`."""
favicons = from_html(html_content, root_url=root_url, include_fallbacks=True)
for favicon in favicons:
print(favicon.url, favicon.width, favicon.height)
# TODO: save the preferred image to file and return
async def set_information_from_source(bookmark: Bookmark, request: Request) -> Bookmark:
"""Request the title by requesting the source url."""
logger.info('Extracting information from url %s', bookmark.url)
try:
result = await request.app.state.requests_client.get(
str(bookmark.url), headers={'User-Agent': DIGIMARKS_USER_AGENT}
)
bookmark.http_status = result.status_code
logger.info('HTTP status code %s for %s', bookmark.http_status, bookmark.url)
except httpx.HTTPError as err:
# For example, "MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?"
logger.error('Exception when trying to retrieve title for %s. Error: %s', bookmark.url, str(err))
bookmark.http_status = 404
bookmark.title = ''
return bookmark
if bookmark.http_status == 200 or bookmark.http_status == 202:
html_content = bs4.BeautifulSoup(result.text, 'html.parser')
try:
bookmark.title = html_content.title.text.strip()
except AttributeError as exc:
logger.error('Error while trying to extract title from URL %s: %s', str(bookmark.url), str(exc))
raise HTTPException(status_code=400, detail='Error while trying to extract title')
url_parts = urlparse(str(bookmark.url))
root_url = url_parts.scheme + '://' + url_parts.netloc
favicon = get_favicon(result.text, root_url)
# filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension)
# with open(filename, 'wb') as out_file:
# shutil.copyfileobj(response.raw, out_file)
# Extraction was successful
logger.info('Extracting information was successful')
return bookmark
def strip_url_params(url: str) -> str:
"""Strip URL params from URL.
:param url: URL to strip URL params from.
:return: clean URL
:rtype: str
"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment))
async def update_bookmark_with_info(bookmark: Bookmark, request: Request, strip_params: bool = False):
"""Automatically update title, favicon, etc."""
if isinstance(bookmark.url, str):
# If type of the url is a 'simple' string, ensure it to be an AnyUrl
bookmark.url = AnyUrl(bookmark.url)
if not bookmark.title:
# Title was empty, automatically fetch it from the url, will also update the status code
await set_information_from_source(bookmark, request)
if strip_params:
# Strip URL parameters, e.g., tracking params
bookmark.url = AnyUrl(strip_url_params(str(bookmark.url)))
# Sort and deduplicate tags
tags_service.set_tags(bookmark, bookmark.tags)
async def list_bookmarks_for_user(
session,
user_key: str,
offset: int = 0,
limit: Annotated[int, Query(le=10000)] = 100,
) -> Sequence[Bookmark]:
"""List all bookmarks in the database. By default, 100 items are returned.
There is a limit of 10000 items.
"""
result = await session.exec(
select(Bookmark)
.where(Bookmark.user_key == user_key, Bookmark.status != Visibility.DELETED)
.offset(offset)
.limit(limit)
)
bookmarks = result.all()
return bookmarks
async def get_bookmark_for_user_with_url_hash(session, user_key: str, url_hash: str) -> Bookmark:
"""Get a bookmark from the database by its URL hash."""
result = await session.exec(
select(Bookmark).where(
Bookmark.user_key == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED
)
)
if not result.first():
raise BookmarkNotFound(f'url_hash: {url_hash}')
return result.first()
async def autocomplete_bookmark(
session,
request: Request,
user_key: str,
bookmark: Bookmark,
strip_params: bool = False,
) -> Bookmark:
"""Autofill some fields for this (new) bookmark for user `user_key`."""
bookmark.user_key = user_key
# Auto-fill title, fix tags etc.
try:
await update_bookmark_with_info(bookmark, request, strip_params)
except ValidationError as exc:
logger.error('ValidationError while autocompleting bookmark with URL %s', bookmark.url)
logger.error('Error was: %s', str(exc))
raise HTTPException(status_code=400, detail='Error while autocompleting, likely the URL contained an error')
url_hash = utils.generate_hash(str(bookmark.url))
result = await session.exec(
select(Bookmark).where(
Bookmark.user_key == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED
)
)
bookmark_db = result.first()
if bookmark_db:
# Bookmark with this URL already exists, provide the hash so the frontend can look it up and the user can
# merge them if so wanted
bookmark.url_hash = url_hash
return bookmark
async def add_bookmark(
session,
request: Request,
user_key: str,
bookmark: Bookmark,
strip_params: bool = False,
) -> Bookmark:
"""Add new bookmark for user `user_key`."""
bookmark.user_key = user_key
# Auto-fill title, fix tags etc.
await update_bookmark_with_info(bookmark, request, strip_params)
bookmark.url_hash = utils.generate_hash(str(bookmark.url))
logger.info('Adding bookmark %s for user %s', bookmark.url_hash, user_key)
session.add(bookmark)
await session.commit()
await session.refresh(bookmark)
return bookmark
async def update_bookmark(
session,
request: Request,
user_key: str,
bookmark: Bookmark,
url_hash: str,
strip_params: bool = False,
) -> Bookmark:
"""Update existing bookmark `bookmark_key` for user `user_key`."""
result = await session.exec(
select(Bookmark).where(
Bookmark.user_key == user_key, Bookmark.url_hash == url_hash, Bookmark.status != Visibility.DELETED
)
)
bookmark_db = result.first()
if not bookmark_db:
raise BookmarkNotFound(message='Bookmark with hash {url_hash} not found')
bookmark.modified_date = datetime.now(UTC)
# 'patch' endpoint, which means that you can send only the data that you want to update, leaving the rest intact
bookmark_data = bookmark.model_dump(exclude_unset=True)
# Merge the changed fields into the existing object
bookmark_db.sqlmodel_update(bookmark_data)
# Autofill title, fix tags, etc. where (still) needed
await update_bookmark_with_info(bookmark, request, strip_params)
session.add(bookmark_db)
await session.commit()
await session.refresh(bookmark_db)
return bookmark_db
async def delete_bookmark(
session,
user_key: str,
url_hash: str,
) -> None:
"""(Soft)Delete bookmark `bookmark_key` for user `user_key`."""
result = await session.get(Bookmark, {'url_hash': url_hash, 'user_key': user_key})
bookmark = result
if not bookmark:
raise BookmarkNotFound(message='Bookmark with hash {url_hash} not found')
bookmark.deleted_date = datetime.now(UTC)
bookmark.status = Visibility.DELETED
session.add(bookmark)
await session.commit()