diff --git a/requirements.in b/requirements.in index ee96f5e..5f7fb9a 100644 --- a/requirements.in +++ b/requirements.in @@ -1,6 +1,6 @@ # Core application fastapi[all] -sqlalchemy +sqlmodel # Fetch title etc from links beautifulsoup4 diff --git a/src/digimarks/main.py b/src/digimarks/main.py index d66e63b..6f74e17 100644 --- a/src/digimarks/main.py +++ b/src/digimarks/main.py @@ -1,27 +1,22 @@ +"""digimarks main module.""" + import binascii -import datetime -import gzip import hashlib import logging import os -import shutil from contextlib import asynccontextmanager -from typing import Optional -from urllib.parse import urljoin, urlparse, urlunparse +from datetime import datetime, timezone +from http import HTTPStatus +from typing import Annotated, Optional, Type, TypeVar import bs4 import httpx -from dateutil import tz -from fastapi import FastAPI, HTTPException, Request, Response +from fastapi import Depends, FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates -from feedgen.feed import FeedGenerator -from pydantic import DirectoryPath, FilePath +from pydantic import AnyUrl, DirectoryPath, FilePath from pydantic_settings import BaseSettings -from sqlalchemy import VARCHAR, Boolean, Column, DateTime, Integer, Text, create_engine, select -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import Mapped, sessionmaker +from sqlmodel import AutoString, Field, Session, SQLModel, create_engine, select DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev' @@ -47,9 +42,16 @@ settings = Settings() print(settings.model_dump()) engine = create_engine(f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False}) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -Base = declarative_base() + +def get_session(): + """SQLAlchemy session factory.""" + with Session(engine) as session: + yield session + + +SessionDep = Annotated[Session, Depends(get_session)] @asynccontextmanager @@ -111,7 +113,7 @@ def unique_ever_seen(iterable, key=None): yield element -def clean_tags(tags_list: list): +def clean_tags(tags_list: list) -> list: """Generate a unique list of the tags. :param list tags_list: List with all tags @@ -126,7 +128,7 @@ def clean_tags(tags_list: list): return tags_res -def file_type(filename: str): +def file_type(filename: str) -> str: """Try to determine the file type for the file in `filename`. :param str filename: path to file to check @@ -145,17 +147,58 @@ def file_type(filename: str): return 'no match' -class User(Base): +def generate_hash(input_text: str) -> str: + """Generate a hash from string `input`, e.g., for a URL.""" + return hashlib.md5(input_text.encode('utf-8')).hexdigest() + + +def generate_key(): + """Generate a key to be used for a user or tag.""" + return binascii.hexlify(os.urandom(24)) + + +T = TypeVar('T') + + +def build_custom_type(internal_type: Type[T]) -> Type[AutoString]: + """Create a type that is compatible with the database. + + Based on https://github.com/fastapi/sqlmodel/discussions/847 + """ + + class CustomType(AutoString): + def process_bind_param(self, value, dialect) -> Optional[str]: + if value is None: + return None + + if isinstance(value, str): + # Test if value is valid to avoid `process_result_value` failing + try: + internal_type(value) # type: ignore[call-arg] + except ValueError as e: + raise ValueError(f'Invalid value for {internal_type.__name__}: {e}') from e + + return str(value) + + def process_result_value(self, value, dialect) -> Optional[T]: + if value is None: + return None + + return internal_type(value) # type: ignore[call-arg] + + return CustomType + + +class User(SQLModel, table=True): """User account.""" __tablename__ = 'user' - id = Column(Integer, primary_key=True) - username = Column(VARCHAR(255)) - key = Column(VARCHAR(255)) - # theme = CharField(default=DEFAULT_THEME) - theme = Column(VARCHAR(20), default=DEFAULT_THEME) - created_date = Column(DateTime, default=datetime.datetime.now) + id: int = Field(primary_key=True) + username: str + key: str + theme: str = Field(default=DEFAULT_THEME) + created_date: datetime def generate_key(self): """Generate user key.""" @@ -163,55 +206,37 @@ class User(Base): return self.key -class Bookmark(Base): - """Bookmark instance, connected to User.""" +class Visibility: + """Options for visibility of an object.""" + + VISIBLE = 0 + DELETED = 1 + + +class Bookmark(SQLModel, table=True): + """Bookmark object.""" __tablename__ = 'bookmark' - id = Column(Integer, primary_key=True) - # Foreign key to User - userkey = Column(VARCHAR(255)) + id: int = Field(primary_key=True) + userkey: str = Field(foreign_key='user.key') + title: str = Field(default='') + url: AnyUrl = Field(default='', sa_type=build_custom_type(AnyUrl)) + note: str = Field(default='') + # image: str = Field(default='') + url_hash: str = Field(default='') + tags: str = Field(default='') + starred: bool = Field(default=False) - title = Column(VARCHAR(255), default='') - url = Column(VARCHAR(255)) - note = Column(Text, default='') - # image = CharField(default='') - url_hash = Column(VARCHAR(255), default='') - tags = Column(VARCHAR(255), default='') - starred = Column(Boolean, default=False) + favicon: str | None = Field(default=None) - # Website (domain) favicon - # favicon = Column(VARCHAR(255), null=True) - # favicon = Column(VARCHAR(255)) - favicon: Mapped[Optional[str]] + http_status: int = Field(default=HTTPStatus.OK) - # Status code: 200 is OK, 404 is not found, for example (showing an error) - HTTP_CONNECTION_ERROR = 0 - HTTP_OK = 200 - HTTP_ACCEPTED = 202 - HTTP_MOVED_TEMPORARILY = 304 - HTTP_NOTFOUND = 404 + created_date: datetime = Field(default=datetime.now(timezone.utc)) + modified_date: datetime = Field(default=None) + deleted_date: datetime = Field(default=None) - http_status = Column(Integer, default=200) - redirect_uri = None - - created_date = Column(DateTime, default=datetime.datetime.now) - # modified_date = Column(DateTime, null=True) - modified_date: Mapped[Optional[datetime.datetime]] - # deleted_date = Column(DateTime, null=True) - deleted_date: Mapped[Optional[datetime.datetime]] - - # Bookmark status; deleting doesn't remove from DB - VISIBLE = 0 - DELETED = 1 - status = Column(Integer, default=VISIBLE) - - class Meta: - ordering = (('created_date', 'desc'),) - - def set_hash(self): - """Generate hash.""" - self.url_hash = hashlib.md5(self.url.encode('utf-8')).hexdigest() + status: int = Field(default=Visibility.VISIBLE) def set_title_from_source(self, request: Request) -> str: """Request the title by requesting the source url.""" @@ -219,7 +244,7 @@ class Bookmark(Base): result = request.app.requests_client.get(self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}) self.http_status = result.status_code except httpx.HTTPError as err: - # For example 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?' + # For example, 'MissingSchema: Invalid URL 'abc': No schema supplied. Perhaps you meant http://abc?' logger.error('Exception when trying to retrieve title for %s. Error: %s', self.url, str(err)) self.http_status = 404 self.title = '' @@ -232,152 +257,6 @@ class Bookmark(Base): self.title = '' return self.title - def set_status_code(self, request: Request) -> int: - """Check the HTTP status of the url, as it might not exist for example.""" - try: - result = request.app.requests_client.head( - self.url, headers={'User-Agent': DIGIMARKS_USER_AGENT}, timeout=30 - ) - self.http_status = result.status_code - except httpx.HTTPError as err: - logger.error('Failed to do head info fetching for %s: %s', self.url, str(err)) - self.http_status = self.HTTP_CONNECTION_ERROR - return self.http_status - - def _set_favicon_with_iconsbetterideaorg(self, request: Request, domain): - """Fetch favicon for the domain.""" - file_extension = '.png' - meta = request.app.requests_client.head( - 'http://icons.better-idea.org/icon?size=60&url=' + domain, - allow_redirects=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT}, - timeout=15, - ) - if meta.url[-3:].lower() == 'ico': - file_extension = '.ico' - response = request.app.requests_client.get( - 'http://icons.better-idea.org/icon?size=60&url=' + domain, - stream=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT}, - timeout=15, - ) - filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension) - with open(filename, 'wb') as out_file: - shutil.copyfileobj(response.raw, out_file) - del response - filetype = file_type(str(filename)) - if filetype == 'gz': - # decompress - orig = gzip.GzipFile(filename, 'rb') - orig_content = orig.read() - orig.close() - os.remove(filename) - with open(filename, 'wb') as new: - new.write(orig_content) - self.favicon = domain + file_extension - - def _set_favicon_with_realfavicongenerator(self, request: Request, domain: str): - """Fetch favicon for the domain.""" - response = request.app.requests_client.get( - 'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=android_chrome&site=' + domain, - stream=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.mashape_api_key}, - ) - if response.status_code == 404: - # Fall back to desktop favicon - response = request.app.requests_client.get( - 'https://realfavicongenerator.p.rapidapi.com/favicon/icon?platform=desktop&site=' + domain, - stream=True, - headers={'User-Agent': DIGIMARKS_USER_AGENT, 'X-Mashape-Key': settings.mashape_api_key}, - ) - # Debug for the moment - print(domain) - print(response.headers) - if 'Content-Length' in response.headers and response.headers['Content-Length'] == '0': - # No favicon found, likely - print('Skipping this favicon, needs fallback') - return - # Default to 'image/png' - file_extension = '.png' - if response.headers['content-type'] == 'image/jpeg': - file_extension = '.jpg' - if response.headers['content-type'] == 'image/x-icon': - file_extension = '.ico' - filename = os.path.join(settings.media_dir, 'favicons/', domain + file_extension) - with open(filename, 'wb') as out_file: - shutil.copyfileobj(response.raw, out_file) - del response - filetype = file_type(str(filename)) - if filetype == 'gz': - # decompress - orig = gzip.GzipFile(filename, 'rb') - origcontent = orig.read() - orig.close() - os.remove(filename) - with open(filename, 'wb') as new: - new.write(origcontent) - self.favicon = domain + file_extension - - def set_favicon(self, request: Request) -> None: - """Fetch favicon for the domain.""" - u = urlparse(self.url) - domain = u.netloc - if os.path.isfile(os.path.join(settings.media_dir, 'favicons/', domain + '.png')): - # If file exists, don't re-download it - self.favicon = f'{domain}.png' - return - if os.path.isfile(os.path.join(settings.media_dir, 'favicons/', domain + '.ico')): - # If file exists, don't re-download it - self.favicon = f'{domain}.ico' - return - # self._set_favicon_with_iconsbetterideaorg(domain) - self._set_favicon_with_realfavicongenerator(request, domain) - - def set_tags(self, new_tags: str) -> None: - """Set tags from `tags`, strip and sort them. - - :param str new_tags: New tags to sort and set. - """ - tags_split = new_tags.split(',') - tags_clean = clean_tags(tags_split) - self.tags = ','.join(tags_clean) - - def get_redirect_uri(self, request: Request): - """Derive where to redirect to. - - :param Request request: Request object. - """ - if self.redirect_uri: - return self.redirect_uri - if self.http_status in (301, 302): - result = request.app.requests_client.head( - self.url, allow_redirects=True, headers={'User-Agent': DIGIMARKS_USER_AGENT}, timeout=30 - ) - self.http_status = result.status_code - self.redirect_uri = result.url - return result.url - return None - - def get_uri_domain(self) -> str: - """Return the domain of the URL in this bookmark. - - :return: domain of the url - :rtype: str - """ - parsed = urlparse(self.url) - return parsed.hostname - - @classmethod - def strip_url_params(cls, url: str) -> str: - """Strip URL params from URL. - - :param url: URL to strip URL params from. - :return: clean URL - :rtype: str - """ - parsed = urlparse(url) - return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, '', parsed.fragment)) - @property def tags_list(self): """Get the tags as a list, iterable in template.""" @@ -385,99 +264,17 @@ class Bookmark(Base): return self.tags.split(',') return [] - def to_dict(self) -> dict: - """Generate dictionary representation of the object. - :return: Dictionary representation of the object, JSON serialisable. - :rtype: dict - """ - result = { - 'title': self.title, - 'url': self.url, - 'created': self.created_date.strftime('%Y-%m-%d %H:%M:%S'), - 'url_hash': self.url_hash, - 'tags': self.tags, - } - return result - - @property - def serialize(self) -> dict: - return self.to_dict() - - -class PublicTag(Base): - """Publicly shared tag.""" +class PublicTag(SQLModel, table=True): + """Public tag object.""" __tablename__ = 'public_tag' - id = Column(Integer, primary_key=True) - tagkey = Column(VARCHAR(255)) - userkey = Column(VARCHAR(255)) - tag = Column(VARCHAR(255)) - created_date = Column(DateTime, default=datetime.datetime.now) - - def generate_key(self): - """Generate hash-based key for publicly shared tag.""" - self.tagkey = binascii.hexlify(os.urandom(16)) - - -users = select(User) -print(users) -for user in users: - print(user) - - -def get_tags_for_user(user_key: str): - """Extract all tags from the bookmarks.""" - bookmarks = select(Bookmark).filter(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE) - tags = [] - for bookmark in bookmarks: - tags += bookmark.tags_list - return clean_tags(tags) - - -def get_cached_tags(user_key: str): - """Fail-safe way to get the cached tags for `user_key`.""" - try: - return all_tags[user_key] - except KeyError: - return [] - - -def get_theme(user_key: str): - themes = {DEFAULT_THEME: {}} - try: - usertheme = usersettings[user_key]['theme'] - return themes[usertheme] - except KeyError: - return themes[DEFAULT_THEME] # default - - -def make_external(request: Request, url: str): - return urljoin(request.url_root, url) - - -def _find_bookmarks(user_key: str, filter_text) -> list[Bookmark]: - """Look up bookmark for `user_key` which contains `filter_text` in its properties.""" - return ( - Bookmark.select() - .where( - Bookmark.userkey == user_key, - ( - Bookmark.title.contains(filter_text) - | Bookmark.url.contains(filter_text) - | Bookmark.note.contains(filter_text) - ), - Bookmark.status == Bookmark.VISIBLE, - ) - .order_by(Bookmark.created_date.desc()) - ) - - -# @app.errorhandler(404) -# def page_not_found(e): -# theme = themes[DEFAULT_THEME] -# return render_template('404.html', error=e, theme=theme), 404 + id: int = Field(primary_key=True) + tagkey: str + userkey: str = Field(foreign_key='user.key') + tag: str + created_date: datetime = Field(default=datetime.now(timezone.utc)) @app.get('/') @@ -488,599 +285,136 @@ def index(): return {} -def get_bookmarks(request: Request, user_key, filter_method=None, sort_method=None): - """User homepage, list their bookmarks, optionally filtered and/or sorted.""" - # return object_list('bookmarks.html', Bookmark.select()) - # user = User.select(key=userkey) - # if user: - # bookmarks = Bookmark.select(User=user) - # return render_template('bookmarks.html', bookmarks) - # else: - # abort(404) - # message = request.args.get('message') - bookmark_tags = get_cached_tags(user_key) +@app.get('/api/v1/admin/{system_key}/users/{user_id}') +def get_user(session: SessionDep, system_key: str, user_id: int) -> User: + """Show user information.""" + if system_key != settings.system_key: + raise HTTPException(status_code=404) - filter_text = '' - # if request.form: - # filter_text = request.form['filter_text'] - - filter_starred = False - if filter_method and filter_method.lower() == 'starred': - filter_starred = True - - filter_broken = False - if filter_method and filter_method.lower() == 'broken': - filter_broken = True - - filter_note = False - if filter_method and filter_method.lower() == 'note': - filter_note = True - - if filter_text: - bookmarks = _find_bookmarks(user_key, filter_text) - elif filter_starred: - bookmarks = ( - select(Bookmark) - .where(Bookmark.userkey == user_key, Bookmark.starred) - .order_by(Bookmark.created_date.desc()) - ) - elif filter_broken: - bookmarks = ( - select(Bookmark) - .where(Bookmark.userkey == user_key, Bookmark.http_status != 200) - .order_by(Bookmark.created_date.desc()) - ) - elif filter_note: - bookmarks = ( - select(Bookmark) - .where(Bookmark.userkey == user_key, Bookmark.note != '') - .order_by(Bookmark.created_date.desc()) - ) - else: - bookmarks = ( - select(Bookmark) - .where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE) - .order_by(Bookmark.created_date.desc()) - ) - - message = '' - return bookmarks, bookmark_tags, filter_text, message + user = session.get(User, user_id) + if not user: + raise HTTPException(status_code=404, detail='User not found') + return user -@app.get('/{user_key}', response_class=HTMLResponse) -@app.post('/{user_key}', response_class=HTMLResponse) -@app.route('//filter/', methods=['GET', 'POST']) -@app.route('//sort/', methods=['GET', 'POST']) -@app.route('//', methods=['GET', 'POST']) -@app.route('///filter/', methods=['GET', 'POST']) -@app.route('///sort/', methods=['GET', 'POST']) -def bookmarks_page(request: Request, user_key: str, filter_method=None, sort_method=None, show_as='cards'): - bookmarks, bookmarktags, filter_text, message = get_bookmarks(request, user_key, filter_method, sort_method) - theme = get_theme(user_key) - return templates.TemplateResponse( - 'bookmarks.html', - bookmarks=bookmarks, - userkey=user_key, - tags=bookmarktags, - filter_text=filter_text, - message=message, - theme=theme, - editable=True, # bookmarks can be edited - showtags=True, # tags should be shown with the bookmarks - filtermethod=filter_method, - sortmethod=sort_method, - show_as=show_as, # show list of bookmarks instead of cards - ) +# @app.get('/admin/{system_key}/users/', response_model=list[User]) +@app.get('/api/v1/admin/{system_key}/users/') +def list_users( + session: SessionDep, + system_key: str, + offset: int = 0, + limit: Annotated[int, Query(le=100)] = 100, +) -> list[User]: + """List all users in the database. - -@app.get('/{user_key}/js') -def bookmarks_js(user_key): - """Return list of bookmarks with their favicons, to be used for autocompletion.""" - bookmarks = ( - select(Bookmark) - .where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE) - .order_by(Bookmark.created_date.desc()) - ) - result = [] - for bookmark in bookmarks: - result.append({'title': bookmark.title}) - # resp = make_response(render_template( - # 'bookmarks.js', - # bookmarks=bookmarks - # )) - # resp.headers['Content-type'] = 'text/javascript; charset=utf-8' - # return resp - return result - - -@app.get('/r//', response_class=HTMLResponse) -def bookmark_redirect(user_key, url_hash): - """Securely redirect a bookmark to its url, stripping referrer (if browser plays nice).""" - # @TODO: add counter to this bookmark - try: - bookmark = Bookmark.get( - Bookmark.url_hash == url_hash, Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE - ) - except Bookmark.DoesNotExist: - raise HTTPException(status_code=404, detail='Bookmark not found') - return templates.TemplateResponse('redirect.html', url=bookmark.url) - - -@app.get('/api/v1/') -@app.get('/api/v1//filter/') -@app.get('/api/v1//sort/') -def bookmarks_json(request: Request, user_key: str, filtermethod=None, sortmethod=None): - """List bookmarks. - - :param Request request: FastAPI Request object. - :param str user_key: User key. + :param SessionDep session: + :param str system_key: secrit key + :param int offset: [Optional] offset of pagination + :param int limit: [Optional] limits the number of users to return, defaults to 100 + :return: list of users in the system + :rtype: list[User] """ - bookmarks, bookmark_tags, filter_text, message = get_bookmarks(request, user_key, filtermethod, sortmethod) + if system_key != settings.system_key: + raise HTTPException(status_code=404) - bookmarkslist = [i.serialize for i in bookmarks] - - the_data = { - 'bookmarks': bookmarkslist, - 'tags': bookmark_tags, - 'filter_text': filter_text, - 'message': message, - 'userkey': user_key, - } - return the_data + users = session.exec(select(User).offset(offset).limit(limit)).all() + return users -@app.get('/api/v1//') -def bookmark_json(user_key: str, url_hash): - """Serialise bookmark to json.""" - try: - bookmark = Bookmark.get( - Bookmark.url_hash == url_hash, Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE - ) - return bookmark.to_dict() - except Bookmark.DoesNotExist: - raise HTTPException(status_code=404, detail='Bookmark not found') +@app.get('/api/v1/{user_key}/bookmarks/') +def list_bookmarks( + session: SessionDep, + user_key: str, + offset: int = 0, + limit: Annotated[int, Query(le=100)] = 100, +) -> list[Bookmark]: + """List all bookmarks in the database.""" + bookmarks = session.exec(select(Bookmark).where(Bookmark.userkey == user_key).offset(offset).limit(limit)).all() + return bookmarks -@app.route('/api/v1//search/') -def search_bookmark_titles_json(userkey, filter_text): - """Serialise bookmark to json.""" - bookmarks = _find_bookmarks(userkey, filter_text) - result = [] - for bookmark in bookmarks: - result.append(bookmark.to_dict()) - return result - - -@app.get('//', response_class=HTMLResponse) -@app.get('///edit', response_class=HTMLResponse) -def edit_bookmark(request: Request, userkey, urlhash): - """Bookmark edit form.""" - # bookmark = getbyurlhash() - try: - bookmark = Bookmark.get(Bookmark.url_hash == urlhash, Bookmark.userkey == userkey) - except Bookmark.DoesNotExist: - raise HTTPException(status_code=404, detail='Bookmark not found') - message = request.args.get('message') - tags = get_cached_tags(userkey) - if not bookmark.note: - # Workaround for when an existing bookmark has a null note - bookmark.note = '' - theme = get_theme(userkey) - return templates.TemplateResponse( - 'edit.html', - action='Edit bookmark', - userkey=userkey, - bookmark=bookmark, - message=message, - formaction='edit', - tags=tags, - theme=theme, - ) - - -@app.get('//add', response_class=HTMLResponse) -def add_bookmark(request: Request, user_key: str): - """Bookmark add form.""" - url = request.args.get('url') - if not url: - url = '' - if request.args.get('referrer'): - url = request.referrer - bookmark = Bookmark(title='', url=url, tags='') - message = request.args.get('message') - tags = get_cached_tags(user_key) - theme = get_theme(user_key) - return templates.TemplateResponse( - 'edit.html', action='Add bookmark', userkey=user_key, bookmark=bookmark, tags=tags, message=message, theme=theme - ) - - -def update_bookmark(request: Request, user_key: str, url_hash: str = None): - """Add (no urlhash) or edit (urlhash is set) a bookmark.""" - title = request.form.get('title') - url = request.form.get('url') - tags = request.form.get('tags') - note = request.form.get('note') - starred = False - if request.form.get('starred'): - starred = True - strip_params = False - if request.form.get('strip'): - strip_params = True - - if url and not url_hash: - # New bookmark - bookmark, created = Bookmark.get_or_create(url=url, userkey=user_key) - if not created: - message = 'Existing bookmark, did not overwrite with new values' - return RedirectResponse( - request.url_for('editbookmark', userkey=user_key, urlhash=bookmark.url_hash, message=message) - ) - elif url: - # Existing bookmark, get from DB - bookmark = Bookmark.get(Bookmark.userkey == user_key, Bookmark.url_hash == url_hash) - # Editing this bookmark, set modified_date to now - bookmark.modified_date = datetime.datetime.now() - else: - # No url was supplied, abort. @TODO: raise exception? - return None - - bookmark.title = title - if strip_params: - url = Bookmark.strip_url_params(url) - bookmark.url = url - bookmark.starred = starred - bookmark.set_tags(tags) - bookmark.note = note - bookmark.set_hash() - # bookmark.fetch_image() - if not title: - # Title was empty, automatically fetch it from the url, will also update the status code - bookmark.set_title_from_source(request) - else: - bookmark.set_status_code(request) - - if bookmark.http_status in (200, 202): - try: - bookmark.set_favicon() - except IOError: - # Icon file could not be saved possibly, don't bail completely - pass - - bookmark.save() +@app.get('/api/v1/{user_key}/bookmarks/{url_hash}') +def get_bookmark( + session: SessionDep, + user_key: str, + url_hash: str, +) -> Bookmark: + """Show bookmark details.""" + bookmark = session.exec(select(Bookmark).where(Bookmark.userkey == user_key, Bookmark.url_hash == url_hash)).first() + # bookmark = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key}) return bookmark -@app.route('//adding', methods=['GET', 'POST']) -# @app.route('//adding') -def adding_bookmark(request: Request, user_key): - """Add the bookmark from form submit by /add.""" - tags = get_cached_tags(user_key) - - if request.method == 'POST': - bookmark = update_bookmark(request, user_key) - if not bookmark: - return RedirectResponse( - request.url_for('addbookmark', userkey=user_key, message='No url provided', tags=tags) - ) - if type(bookmark).__name__ == 'Response': - return bookmark - all_tags[user_key] = get_tags_for_user(user_key) - return RedirectResponse(request.url_for('editbookmark', userkey=user_key, urlhash=bookmark.url_hash)) - return RedirectResponse(request.url_for('addbookmark', userkey=user_key, tags=tags)) +@app.post('/api/v1/{user_key}/bookmarks/', response_model=Bookmark) +def add_bookmark( + session: SessionDep, + request: Request, + user_key: str, + bookmark: Bookmark, +): + """Add new bookmark for user `user_key`.""" + bookmark.userkey = user_key + bookmark.url_hash = generate_hash(str(bookmark.url)) + # if strip_params: + # url = Bookmark.strip_url_params(url) + if not bookmark.title: + # Title was empty, automatically fetch it from the url, will also update the status code + bookmark.set_title_from_source(request) + session.add(bookmark) + session.commit() + session.refresh(bookmark) + return bookmark -@app.route('///editing', methods=['GET', 'POST']) -def editing_bookmark(request: Request, user_key: str, url_hash: str): - """Edit the bookmark from form submit.""" - if request.method == 'POST': - bookmark = update_bookmark(request, user_key, url_hash=url_hash) - all_tags[user_key] = get_tags_for_user(user_key) - return RedirectResponse(request.url_for('editbookmark', userkey=user_key, urlhash=bookmark.url_hash)) - return RedirectResponse(request.url_for('editbookmark', userkey=user_key, urlhash=url_hash)) +@app.patch('/api/v1/{user_key}/bookmarks/{url_hash}', response_model=Bookmark) +def update_bookmark( + session: SessionDep, + request: Request, + user_key: str, + bookmark: Bookmark, + url_hash: str, +): + """Update existing bookmark `bookmark_key` for user `user_key`.""" + bookmark_db = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key}) + if not bookmark_db: + raise HTTPException(status_code=404, detail='Bookmark not found') + bookmark_data = bookmark.model_dump(exclude_unset=True) + bookmark_db.sqlmodel_update(bookmark_data) + if not bookmark_db.title: + # Title was empty, automatically fetch it from the url, will also update the status code + bookmark.set_title_from_source(request) + bookmark.modified_date = datetime.now(timezone.utc) + session.add(bookmark_db) + session.commit() + session.refresh(bookmark_db) + return bookmark_db -@app.route('///delete', methods=['GET', 'POST']) -def deleting_bookmark(request: Request, user_key: str, url_hash: str): - """Delete the bookmark from form submit by /delete.""" - query = Bookmark.update(status=Bookmark.DELETED).where(Bookmark.userkey == user_key, Bookmark.url_hash == url_hash) - query.execute() - query = Bookmark.update(deleted_date=datetime.datetime.now()).where( - Bookmark.userkey == user_key, Bookmark.url_hash == url_hash - ) - query.execute() - message = 'Bookmark deleted. Undo deletion'.format( - request.url_for('undeletebookmark', userkey=user_key, urlhash=url_hash) - ) - all_tags[user_key] = get_tags_for_user(user_key) - return RedirectResponse(request.url_for('bookmarks_page', userkey=user_key, message=message)) +@app.delete('/api/v1/{user_key}/bookmarks/{url_hash}', response_model=Bookmark) +def delete_bookmark( + session: SessionDep, + user_key: str, + url_hash: str, +): + """(Soft)Delete bookmark `bookmark_key` for user `user_key`.""" + bookmark = session.get(Bookmark, {'url_hash': url_hash, 'userkey': user_key}) + if not bookmark: + raise HTTPException(status_code=404, detail='Bookmark not found') + bookmark.deleted_date = datetime.now(timezone.utc) + bookmark.status = Visibility.DELETED + session.add(bookmark) + session.commit() + return {'ok': True} -@app.get('///undelete') -def undelete_bookmark(request: Request, user_key: str, url_hash: str): - """Undo deletion of the bookmark identified by urlhash.""" - query = Bookmark.update(status=Bookmark.VISIBLE).where(Bookmark.userkey == user_key, Bookmark.url_hash == url_hash) - query.execute() - message = 'Bookmark restored' - all_tags[user_key] = get_tags_for_user(user_key) - return RedirectResponse(request.url_for('bookmarks_page', userkey=user_key, message=message)) - - -@app.get('//tags', response_class=HTMLResponse) -def tags_page(user_key): - """Overview of all tags used by user.""" - tags = get_cached_tags(user_key) - # public_tags = PublicTag.select().where(Bookmark.userkey == userkey) - alltags = [] - for tag in tags: - try: - public_tag = PublicTag.get(PublicTag.userkey == user_key, PublicTag.tag == tag) - except PublicTag.DoesNotExist: - public_tag = None - - total = ( - Bookmark.select() - .where(Bookmark.userkey == user_key, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE) - .count() - ) - alltags.append({'tag': tag, 'public_tag': public_tag, 'total': total}) - totaltags = len(alltags) - totalbookmarks = Bookmark.select().where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.VISIBLE).count() - totalpublic = PublicTag.select().where(PublicTag.userkey == user_key).count() - totalstarred = Bookmark.select().where(Bookmark.userkey == user_key, Bookmark.starred).count() - totaldeleted = Bookmark.select().where(Bookmark.userkey == user_key, Bookmark.status == Bookmark.DELETED).count() - totalnotes = Bookmark.select().where(Bookmark.userkey == user_key, Bookmark.note != '').count() - totalhttperrorstatus = Bookmark.select().where(Bookmark.userkey == user_key, Bookmark.http_status != 200).count() - theme = get_theme(user_key) - return templates.TemplateResponse( - 'tags.html', - tags=alltags, - totaltags=totaltags, - totalpublic=totalpublic, - totalbookmarks=totalbookmarks, - totaldeleted=totaldeleted, - totalstarred=totalstarred, - totalhttperrorstatus=totalhttperrorstatus, - totalnotes=totalnotes, - userkey=user_key, - theme=theme, - ) - - -@app.get('//tag/', response_class=HTMLResponse) -def tag_page(request: Request, user_key: str, tag: str): - """Overview of all bookmarks with a certain tag.""" - bookmarks = ( - Bookmark.select() - .where(Bookmark.userkey == user_key, Bookmark.tags.contains(tag), Bookmark.status == Bookmark.VISIBLE) - .order_by(Bookmark.created_date.desc()) - ) - tags = get_cached_tags(user_key) - pageheader = 'tag: ' + tag - message = request.args.get('message') - - try: - public_tag = PublicTag.get(PublicTag.userkey == user_key, PublicTag.tag == tag) - except PublicTag.DoesNotExist: - public_tag = None - - theme = get_theme(user_key) - return templates.TemplateResponse( - 'bookmarks.html', - bookmarks=bookmarks, - userkey=user_key, - tags=tags, - tag=tag, - public_tag=public_tag, - action=pageheader, - message=message, - theme=theme, - editable=True, - showtags=True, - ) - - -def get_public_tag(tag_key): - """Return tag and bookmarks in this public tag collection.""" - this_tag = PublicTag.get(PublicTag.tagkey == tag_key) - bookmarks = ( - Bookmark.select() - .where( - Bookmark.userkey == this_tag.userkey, - Bookmark.tags.contains(this_tag.tag), - Bookmark.status == Bookmark.VISIBLE, - ) - .order_by(Bookmark.created_date.desc()) - ) - return this_tag, bookmarks - - -@app.get('/pub/', response_class=HTMLResponse) -def public_tag_page(tag_key): - """Read-only overview of the bookmarks in the user_key/tag of this PublicTag.""" - # this_tag = get_object_or_404(PublicTag.select().where(PublicTag.tagkey == tagkey)) - try: - this_tag, bookmarks = get_public_tag(tag_key) - # theme = themes[DEFAULT_THEME] - theme = {} - return templates.TemplateResponse( - 'publicbookmarks.html', - bookmarks=bookmarks, - tag=this_tag.tag, - action=this_tag.tag, - tagkey=tag_key, - theme=theme, - ) - except PublicTag.DoesNotExist: - raise HTTPException(status_code=404, detail='Public tag not found') - - -@app.route('/api/v1/pub/') -def public_tag_json(tag_key): - """Json representation of the Read-only overview of the bookmarks in the user_key/tag of this PublicTag.""" - try: - this_tag, bookmarks = get_public_tag(tag_key) - result = { - # 'tag': this_tag, - 'tagkey': tag_key, - 'count': len(bookmarks), - 'items': [], - } - for bookmark in bookmarks: - result['items'].append(bookmark.to_dict()) - return result - except PublicTag.DoesNotExist: - raise HTTPException(status_code=404, detail='Public tag not found') - - -@app.get('/pub//feed') -async def public_tag_feed(request: Request, tag_key: str): - """rss/atom representation of the Read-only overview of the bookmarks in the user_key/tag of this PublicTag.""" - try: - this_tag = PublicTag.get(PublicTag.tagkey == tag_key) - bookmarks = Bookmark.select().where( - Bookmark.userkey == this_tag.userkey, - Bookmark.tags.contains(this_tag.tag), - Bookmark.status == Bookmark.VISIBLE, - ) - - feed = FeedGenerator() - feed.title(this_tag.tag) - feed.id(request.url) - feed.link(href=request.url, rel='self') - feed.link(href=make_external(request, app.url_path_for('public_tag_page', tagkey=tag_key))) - - for bookmark in bookmarks: - entry = feed.add_entry() - - updated_date = bookmark.modified_date - if not bookmark.modified_date: - updated_date = bookmark.created_date - bookmarktitle = '{} (no title)'.format(bookmark.url) - if bookmark.title: - bookmarktitle = bookmark.title - - entry.id(bookmark.url) - entry.title(bookmarktitle) - entry.link(href=bookmark.url) - entry.author(name='digimarks') - entry.pubdate(bookmark.created_date.replace(tzinfo=tz.tzlocal())) - entry.published(bookmark.created_date.replace(tzinfo=tz.tzlocal())) - entry.updated(updated_date.replace(tzinfo=tz.tzlocal())) - - response = Response(data=feed.atom_str(pretty=True), media_type='application/xml') - - response.headers.set('Content-Type', 'application/atom+xml') - return response - except PublicTag.DoesNotExist: - raise HTTPException(status_code=404, detail='Tag not found') - - -@app.get('///makepublic') -@app.post('///makepublic') -async def add_public_tag(user_key: str, tag: str): - try: - User.get(User.key == user_key) - except User.DoesNotExist: - raise HTTPException(status_code=404, detail='User not found') - try: - public_tag = PublicTag.get(PublicTag.userkey == user_key, PublicTag.tag == tag) - except PublicTag.DoesNotExist: - public_tag = None - if not public_tag: - new_public_tag = PublicTag() - new_public_tag.generate_key() - new_public_tag.userkey = user_key - new_public_tag.tag = tag - new_public_tag.save() - - message = 'Public link to this tag created' - success = True - # return RedirectResponse(url=url_path_for('tag_page', userkey=userkey, tag=tag, message=message)) - else: - message = 'Public link already existed' - success = False - # return redirect(url_for('tag_page', userkey=userkey, tag=tag, message=message)) - url = app.url_path_for('tag_page', userkey=user_key, tag=tag, message=message) - return {'success': success, 'message': message, 'url': url} - - -@app.route('///removepublic/', methods=['GET', 'POST']) -def remove_public_tag(request: Request, user_key, tag, tag_key: str): - q = PublicTag.delete().where(PublicTag.userkey == user_key, PublicTag.tag == tag, PublicTag.tagkey == tag_key) - q.execute() - message = f'Public link {tag_key} has been deleted' - url = request.url_for('tag_page', userkey=user_key, tag=tag) - return {'message': message, 'url': url} - - -@app.route('//adduser') -def add_user(system_key): - """Add user endpoint, convenience.""" - if system_key == settings.system_key: - new_user = User() - new_user.generate_key() - new_user.username = 'Nomen Nescio' - new_user.save() - all_tags[new_user.key] = [] - return {'user': f'/{new_user.key.decode("utf-8")}'} - raise HTTPException(status_code=404, detail="I can't let you do that Dave") - - -@app.route('//refreshfavicons') -def refresh_favicons(system_key): - """Add user endpoint, convenience.""" - if system_key == settings.system_key: - bookmarks = Bookmark.select() - for bookmark in bookmarks: - if bookmark.favicon: - try: - filename = os.path.join(settings.media_dir, 'favicons', bookmark.favicon) - os.remove(filename) - except OSError as e: - print(e) - bookmark.set_favicon() - return {'message': 'Done refreshing icons'} - raise HTTPException(status_code=404, detail="I can't let you do that Dave") - - -@app.route('//findmissingfavicons') -def find_missing_favicons(request: Request, system_key: str): - """Add user endpoint, convenience.""" - if system_key == settings.system_key: - bookmarks = Bookmark.select() - for bookmark in bookmarks: - try: - if not bookmark.favicon or not os.path.isfile( - os.path.join(settings.media_dir, 'favicons', bookmark.favicon) - ): - # This favicon is missing - # Clear favicon, so fallback can be used instead of showing a broken image - bookmark.favicon = None - bookmark.save() - # Try to fetch and save new favicon - bookmark.set_favicon(request) - bookmark.save() - except OSError as e: - print(e) - return {'message': 'Done finding missing icons'} - raise HTTPException(status_code=404, detail="I can't let you do that Dave") - - -# Initialisation == create the bookmark, user and public tag tables if they do not exist -# TODO: switch to alembic migrations -# Bookmark.create_table(True) -# User.create_table(True) -# PublicTag.create_table(True) - -# users = User.select() -# print('Current user keys:') -# for user in users: -# all_tags[user.key] = get_tags_for_user(user.key) -# usersettings[user.key] = {'theme': user.theme} -# print(user.key) - -# Run when called standalone -# if __name__ == '__main__': -# run the application -# app.run(host='0.0.0.0', port=9999, debug=True) +@app.get('/api/v1/{user_key}/tags/') +def list_tags_for_user( + session: SessionDep, + user_key: str, +) -> list[str]: + """List all tags in use by the user.""" + bookmarks = session.exec(select(Bookmark).where(Bookmark.userkey == user_key)).all() + tags = [] + for bookmark in bookmarks: + tags += bookmark.tags_list + return clean_tags(tags)