1
0
mirror of https://github.com/aquatix/digimarks.git synced 2025-12-06 22:05:09 +01:00

Public tags, cleanups, config for templates

This commit is contained in:
2025-05-04 21:38:58 +02:00
parent 3369ee3b89
commit 85639b128f

View File

@@ -5,20 +5,24 @@ import hashlib
import logging import logging
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone from datetime import UTC, datetime
from http import HTTPStatus from http import HTTPStatus
from typing import Annotated, Optional, Type, TypeVar from typing import Annotated, Optional, Sequence, Type, TypeVar
from urllib.parse import urlparse, urlunparse
import bs4 import bs4
import httpx import httpx
from fastapi import Depends, FastAPI, HTTPException, Query, Request from fastapi import Depends, FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from pydantic import AnyUrl, DirectoryPath, FilePath from pydantic import AnyUrl, DirectoryPath, FilePath
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from sqlmodel import AutoString, Field, Session, SQLModel, create_engine, select from sqlmodel import AutoString, Field, Session, SQLModel, create_engine, select
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev' DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
DIGIMARKS_VERSION = '2.0.0a1'
DEFAULT_THEME = 'freshgreen' DEFAULT_THEME = 'freshgreen'
@@ -31,6 +35,9 @@ class Settings(BaseSettings):
media_dir: DirectoryPath media_dir: DirectoryPath
media_url: str = '/static/' media_url: str = '/static/'
static_dir: DirectoryPath = 'static'
template_dir: DirectoryPath = 'templates'
mashape_api_key: str mashape_api_key: str
system_key: str system_key: str
@@ -42,7 +49,6 @@ settings = Settings()
print(settings.model_dump()) print(settings.model_dump())
engine = create_engine(f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False}) engine = create_engine(f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False})
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_session(): def get_session():
@@ -63,8 +69,8 @@ async def lifespan(the_app: FastAPI):
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
app.mount('/static', StaticFiles(directory=settings.static_dir), name='static')
templates = Jinja2Templates(directory='templates') templates = Jinja2Templates(directory=settings.template_dir)
logger = logging.getLogger('digimarks') logger = logging.getLogger('digimarks')
if settings.debug: if settings.debug:
@@ -79,13 +85,12 @@ app.add_middleware(
allow_headers=['*'], allow_headers=['*'],
) )
# Temporary
all_tags = {}
usersettings = {}
def i_filter_false(predicate, iterable):
"""Filter an iterable if predicate returns True.
def ifilterfalse(predicate, iterable): i_filter_false(lambda x: x%2, range(10)) --> 0 2 4 6 8
# ifilterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8 """
if predicate is None: if predicate is None:
predicate = bool predicate = bool
for x in iterable: for x in iterable:
@@ -102,7 +107,7 @@ def unique_ever_seen(iterable, key=None):
seen = set() seen = set()
seen_add = seen.add seen_add = seen.add
if key is None: if key is None:
for element in ifilterfalse(seen.__contains__, iterable): for element in i_filter_false(seen.__contains__, iterable):
seen_add(element) seen_add(element)
yield element yield element
else: else:
@@ -152,11 +157,12 @@ def generate_hash(input_text: str) -> str:
return hashlib.md5(input_text.encode('utf-8')).hexdigest() return hashlib.md5(input_text.encode('utf-8')).hexdigest()
def generate_key(): def generate_key() -> str:
"""Generate a key to be used for a user or tag.""" """Generate a key to be used for a user or tag."""
return binascii.hexlify(os.urandom(24)) return str(binascii.hexlify(os.urandom(24)))
# Type var used for building custom types for the DB
T = TypeVar('T') T = TypeVar('T')
@@ -200,11 +206,6 @@ class User(SQLModel, table=True):
theme: str = Field(default=DEFAULT_THEME) theme: str = Field(default=DEFAULT_THEME)
created_date: datetime created_date: datetime
def generate_key(self):
"""Generate user key."""
self.key = binascii.hexlify(os.urandom(24))
return self.key
class Visibility: class Visibility:
"""Options for visibility of an object.""" """Options for visibility of an object."""
@@ -232,16 +233,16 @@ class Bookmark(SQLModel, table=True):
http_status: int = Field(default=HTTPStatus.OK) http_status: int = Field(default=HTTPStatus.OK)
created_date: datetime = Field(default=datetime.now(timezone.utc)) created_date: datetime = Field(default=datetime.now(UTC))
modified_date: datetime = Field(default=None) modified_date: datetime = Field(default=None)
deleted_date: datetime = Field(default=None) deleted_date: datetime = Field(default=None)
status: int = Field(default=Visibility.VISIBLE) status: int = Field(default=Visibility.VISIBLE)
def set_title_from_source(self, request: Request) -> str: async def set_title_from_source(self, request: Request) -> str:
"""Request the title by requesting the source url.""" """Request the title by requesting the source url."""
try: try:
result = request.app.requests_client.get(self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}) result = await request.app.requests_client.get(self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT})
self.http_status = result.status_code self.http_status = result.status_code
except httpx.HTTPError as err: except httpx.HTTPError as err:
# For example, 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?' # For example, 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?'
@@ -257,13 +258,46 @@ class Bookmark(SQLModel, table=True):
self.title = '' self.title = ''
return self.title return self.title
def set_tags(self, new_tags: str) -> None:
"""Set tags from `tags`, strip and sort them.
:param str new_tags: New tags to sort and set.
"""
tags_split = new_tags.split(',')
tags_clean = clean_tags(tags_split)
self.tags = ','.join(tags_clean)
@property @property
def tags_list(self): def tags_list(self) -> list[str]:
"""Get the tags as a list, iterable in template.""" """Get the tags as a list, iterable in template."""
if self.tags: if self.tags:
return self.tags.split(',') return self.tags.split(',')
return [] return []
@classmethod
def strip_url_params(cls, url: str) -> str:
"""Strip URL params from URL.
:param url: URL to strip URL params from.
:return: clean URL
:rtype: str
"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment))
def update(self, request: Request, strip_params: bool = False):
"""Automatically update title etc."""
if not self.title:
# Title was empty, automatically fetch it from the url, will also update the status code
self.set_title_from_source(request)
if strip_params:
# Strip URL parameters, e.g., tracking params
self.url = self.strip_url_params(str(self.url))
# Sort and deduplicate tags
self.set_tags(self.tags)
class PublicTag(SQLModel, table=True): class PublicTag(SQLModel, table=True):
"""Public tag object.""" """Public tag object."""
@@ -274,19 +308,21 @@ class PublicTag(SQLModel, table=True):
tagkey: str tagkey: str
userkey: str = Field(foreign_key='user.key') userkey: str = Field(foreign_key='user.key')
tag: str tag: str
created_date: datetime = Field(default=datetime.now(timezone.utc)) created_date: datetime = Field(default=datetime.now(UTC))
@app.get('/') @app.get('/', response_class=HTMLResponse)
def index(): def index(request: Request):
"""Homepage, point visitors to project page.""" """Homepage, point visitors to project page."""
# theme = themes[DEFAULT_THEME] return templates.TemplateResponse(
# return render_template('index.html', theme=theme) request=request,
return {} name='index.html',
context={'language': 'en', 'version': DIGIMARKS_VERSION, 'theme': DEFAULT_THEME},
)
@app.get('/api/v1/admin/{system_key}/users/{user_id}') @app.get('/api/v1/admin/{system_key}/users/{user_id}', response_model=User)
def get_user(session: SessionDep, system_key: str, user_id: int) -> User: def get_user(session: SessionDep, system_key: str, user_id: int) -> Type[User]:
"""Show user information.""" """Show user information."""
if system_key != settings.system_key: if system_key != settings.system_key:
raise HTTPException(status_code=404) raise HTTPException(status_code=404)
@@ -304,7 +340,7 @@ def list_users(
system_key: str, system_key: str,
offset: int = 0, offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100, limit: Annotated[int, Query(le=100)] = 100,
) -> list[User]: ) -> Sequence[User]:
"""List all users in the database. """List all users in the database.
:param SessionDep session: :param SessionDep session:
@@ -351,15 +387,15 @@ def add_bookmark(
request: Request, request: Request,
user_key: str, user_key: str,
bookmark: Bookmark, bookmark: Bookmark,
strip_params: bool = False,
): ):
"""Add new bookmark for user `user_key`.""" """Add new bookmark for user `user_key`."""
bookmark.userkey = user_key bookmark.userkey = user_key
bookmark.url_hash = generate_hash(str(bookmark.url)) bookmark.url_hash = generate_hash(str(bookmark.url))
# if strip_params:
# url = Bookmark.strip_url_params(url) # Auto-fill title, fix tags etc.
if not bookmark.title: bookmark.update(request, strip_params)
# Title was empty, automatically fetch it from the url, will also update the status code
bookmark.set_title_from_source(request)
session.add(bookmark) session.add(bookmark)
session.commit() session.commit()
session.refresh(bookmark) session.refresh(bookmark)
@@ -373,17 +409,19 @@ def update_bookmark(
user_key: str, user_key: str,
bookmark: Bookmark, bookmark: Bookmark,
url_hash: str, url_hash: str,
strip_params: bool = False,
): ):
"""Update existing bookmark `bookmark_key` for user `user_key`.""" """Update existing bookmark `bookmark_key` for user `user_key`."""
bookmark_db = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key}) bookmark_db = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key})
if not bookmark_db: if not bookmark_db:
raise HTTPException(status_code=404, detail='Bookmark not found') raise HTTPException(status_code=404, detail='Bookmark not found')
# Auto-fill title, fix tags etc.
bookmark.update(request, strip_params)
bookmark.modified_date = datetime.now(UTC)
bookmark_data = bookmark.model_dump(exclude_unset=True) bookmark_data = bookmark.model_dump(exclude_unset=True)
bookmark_db.sqlmodel_update(bookmark_data) bookmark_db.sqlmodel_update(bookmark_data)
if not bookmark_db.title:
# Title was empty, automatically fetch it from the url, will also update the status code
bookmark.set_title_from_source(request)
bookmark.modified_date = datetime.now(timezone.utc)
session.add(bookmark_db) session.add(bookmark_db)
session.commit() session.commit()
session.refresh(bookmark_db) session.refresh(bookmark_db)
@@ -400,7 +438,7 @@ def delete_bookmark(
bookmark = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key}) bookmark = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key})
if not bookmark: if not bookmark:
raise HTTPException(status_code=404, detail='Bookmark not found') raise HTTPException(status_code=404, detail='Bookmark not found')
bookmark.deleted_date = datetime.now(timezone.utc) bookmark.deleted_date = datetime.now(UTC)
bookmark.status = Visibility.DELETED bookmark.status = Visibility.DELETED
session.add(bookmark) session.add(bookmark)
session.commit() session.commit()
@@ -418,3 +456,31 @@ def list_tags_for_user(
for bookmark in bookmarks: for bookmark in bookmarks:
tags += bookmark.tags_list tags += bookmark.tags_list
return clean_tags(tags) return clean_tags(tags)
@app.get('/api/v1/{user_key}/tags/{tag_key}')
def list_tags_for_user(
session: SessionDep,
user_key: str,
) -> list[str]:
"""List all tags in use by the user."""
bookmarks = session.exec(select(Bookmark).where(Bookmark.userkey == user_key)).all()
tags = []
for bookmark in bookmarks:
tags += bookmark.tags_list
return clean_tags(tags)
@app.get('/{user_key}', response_class=HTMLResponse)
def page_user_landing(
session: SessionDep,
request: Request,
user_key: str,
):
user = session.exec(select(User).where(User.key == user_key)).first()
if not user:
raise HTTPException(status_code=404, detail='User not found')
language = 'en'
return templates.TemplateResponse(
request=request, name='user_index.html', context={'language': language, 'version': DIGIMARKS_VERSION}
)