diff --git a/models/note.py b/models/note.py index 5f3daf8..9bbaac1 100644 --- a/models/note.py +++ b/models/note.py @@ -299,8 +299,9 @@ class Note(db.Model): @property def file_url(self): """Get the URL to access the uploaded file""" - if self.file_path: - return f'/uploads/notes/{self.file_path}' + if self.file_path and self.id: + from flask import url_for + return url_for('notes_api.serve_note_file', note_id=self.id) return None @property diff --git a/routes/notes.py b/routes/notes.py index c9f07bc..1272cc3 100644 --- a/routes/notes.py +++ b/routes/notes.py @@ -8,8 +8,9 @@ from sqlalchemy import and_, or_ # Local application imports from models import (Note, NoteFolder, NoteLink, NoteVisibility, Project, - Task, db) + Task, UserPreferences, db) from routes.auth import company_required, login_required +from security_utils import sanitize_folder_path, validate_folder_access # Create blueprint notes_bp = Blueprint('notes', __name__, url_prefix='/notes') @@ -30,6 +31,16 @@ def notes_list(): visibility_filter = request.args.get('visibility', '') search_query = request.args.get('search', '') + # Sanitize folder filter if provided + if folder_filter: + try: + folder_filter = sanitize_folder_path(folder_filter) + # Validate folder exists + if not validate_folder_access(folder_filter, g.user.company_id, db.session): + folder_filter = '' # Reset to root if invalid + except ValueError: + folder_filter = '' # Reset to root if invalid + # Base query - only non-archived notes for the user's company query = Note.query.filter_by( company_id=g.user.company_id, @@ -197,6 +208,41 @@ def create_note(): task_id = request.form.get('task_id') is_pinned = request.form.get('is_pinned') == '1' + # Sanitize and validate folder if provided + if folder: + try: + folder = sanitize_folder_path(folder) + # Ensure folder exists or create it + if not validate_folder_access(folder, g.user.company_id, db.session): + # Create folder hierarchy if it doesn't exist + folder_parts = folder.split('/') + current_path = '' + for i, part in enumerate(folder_parts): + if i == 0: + current_path = part + else: + current_path = current_path + '/' + part + + existing = NoteFolder.query.filter_by( + company_id=g.user.company_id, + path=current_path + ).first() + + if not existing: + parent_path = '/'.join(folder_parts[:i]) if i > 0 else None + new_folder = NoteFolder( + name=part, + path=current_path, + parent_path=parent_path, + company_id=g.user.company_id, + created_by_id=g.user.id + ) + db.session.add(new_folder) + db.session.flush() # Ensure folder is created before continuing + except ValueError as e: + flash(f'Invalid folder path: {str(e)}', 'error') + return redirect(url_for('notes.create_note')) + # Validate if not title: flash('Title is required', 'error') @@ -366,6 +412,18 @@ def edit_note(slug): task_id = request.form.get('task_id') is_pinned = request.form.get('is_pinned') == '1' + # Sanitize and validate folder if provided + if folder: + try: + folder = sanitize_folder_path(folder) + # Validate folder exists + if not validate_folder_access(folder, g.user.company_id, db.session): + flash('Invalid folder selected', 'error') + return redirect(url_for('notes.edit_note', slug=slug)) + except ValueError as e: + flash(f'Invalid folder path: {str(e)}', 'error') + return redirect(url_for('notes.edit_note', slug=slug)) + # Validate if not title: flash('Title is required', 'error') @@ -495,4 +553,30 @@ def notes_folders(): folders=folders, folder_tree=folder_tree, folder_counts=folder_counts, - title='Manage Folders') \ No newline at end of file + title='Manage Folders') + + +@notes_bp.route('/preferences', methods=['POST']) +@login_required +@company_required +def update_note_preferences(): + """Update note-related user preferences""" + note_preview_font = request.form.get('note_preview_font', 'system') + + # Get or create user preferences + preferences = UserPreferences.query.filter_by(user_id=g.user.id).first() + if not preferences: + preferences = UserPreferences(user_id=g.user.id) + db.session.add(preferences) + + # Update preferences + preferences.note_preview_font = note_preview_font + + db.session.commit() + + # Return JSON response for AJAX calls + if request.headers.get('X-Requested-With') == 'XMLHttpRequest': + return jsonify({'success': True, 'font': note_preview_font}) + + # Otherwise redirect back + return redirect(request.referrer or url_for('notes.notes_list')) \ No newline at end of file diff --git a/routes/notes_api.py b/routes/notes_api.py index 3ee106a..8648191 100644 --- a/routes/notes_api.py +++ b/routes/notes_api.py @@ -2,12 +2,15 @@ from datetime import datetime, timezone # Third-party imports -from flask import Blueprint, abort, g, jsonify, request +from flask import Blueprint, abort, g, jsonify, request, url_for, current_app, send_file from sqlalchemy import and_, or_ # Local application imports from models import Note, NoteFolder, NoteLink, NoteVisibility, db from routes.auth import company_required, login_required +from security_utils import (sanitize_folder_path, validate_folder_access, + generate_secure_file_path, validate_filename, + ensure_safe_file_path, get_safe_mime_type) # Create blueprint notes_api_bp = Blueprint('notes_api', __name__, url_prefix='/api/notes') @@ -20,6 +23,13 @@ def api_folder_details(): """Get folder details including note count""" folder_path = request.args.get('folder', '') + # Sanitize folder path if provided + if folder_path: + try: + folder_path = sanitize_folder_path(folder_path) + except ValueError: + return jsonify({'success': False, 'message': 'Invalid folder path'}), 400 + # Get note count for this folder note_count = Note.query.filter_by( company_id=g.user.company_id, @@ -62,15 +72,30 @@ def api_create_folder(): return jsonify({'success': False, 'message': 'Folder name is required'}), 400 # Validate folder name - if '/' in folder_name: - return jsonify({'success': False, 'message': 'Folder name cannot contain /'}), 400 + if '/' in folder_name or '..' in folder_name: + return jsonify({'success': False, 'message': 'Folder name cannot contain / or ..'}), 400 - # Build full path + # Sanitize parent path if provided + if parent_path: + try: + parent_path = sanitize_folder_path(parent_path) + # Validate parent exists + if not validate_folder_access(parent_path, g.user.company_id, db.session): + return jsonify({'success': False, 'message': 'Parent folder does not exist'}), 404 + except ValueError: + return jsonify({'success': False, 'message': 'Invalid parent folder path'}), 400 + + # Build full path and sanitize if parent_path: full_path = f"{parent_path}/{folder_name}" else: full_path = folder_name + try: + full_path = sanitize_folder_path(full_path) + except ValueError as e: + return jsonify({'success': False, 'message': str(e)}), 400 + # Check if folder already exists existing = NoteFolder.query.filter_by( company_id=g.user.company_id, @@ -118,9 +143,15 @@ def api_rename_folder(): if not old_path or not new_name: return jsonify({'success': False, 'message': 'Both old path and new name are required'}), 400 + # Sanitize old path + try: + old_path = sanitize_folder_path(old_path) + except ValueError: + return jsonify({'success': False, 'message': 'Invalid folder path'}), 400 + # Validate new name - if '/' in new_name: - return jsonify({'success': False, 'message': 'Folder name cannot contain /'}), 400 + if '/' in new_name or '..' in new_name: + return jsonify({'success': False, 'message': 'Folder name cannot contain / or ..'}), 400 # Find the folder folder = NoteFolder.query.filter_by( @@ -136,6 +167,12 @@ def api_rename_folder(): path_parts[-1] = new_name new_path = '/'.join(path_parts) + # Sanitize new path + try: + new_path = sanitize_folder_path(new_path) + except ValueError as e: + return jsonify({'success': False, 'message': str(e)}), 400 + # Check if new path already exists existing = NoteFolder.query.filter_by( company_id=g.user.company_id, @@ -194,6 +231,12 @@ def api_delete_folder(): if not folder_path: return jsonify({'success': False, 'message': 'Folder path is required'}), 400 + # Sanitize folder path + try: + folder_path = sanitize_folder_path(folder_path) + except ValueError: + return jsonify({'success': False, 'message': 'Invalid folder path'}), 400 + # Check if folder has notes note_count = Note.query.filter_by( company_id=g.user.company_id, @@ -663,10 +706,7 @@ def clean_filename_for_title(filename): def upload_note(): """Upload a file (markdown or image) and create a note from it""" import os - import time - from werkzeug.utils import secure_filename from PIL import Image - from flask import current_app, url_for if 'file' not in request.files: return jsonify({'success': False, 'error': 'No file provided'}), 400 @@ -675,32 +715,38 @@ def upload_note(): if file.filename == '': return jsonify({'success': False, 'error': 'No file selected'}), 400 + # Validate filename + try: + original_filename = validate_filename(file.filename) + except ValueError as e: + return jsonify({'success': False, 'error': str(e)}), 400 + # Check if file type is allowed - if not Note.allowed_file(file.filename): + if not Note.allowed_file(original_filename): return jsonify({'success': False, 'error': 'File type not allowed. Supported: markdown (.md), images (.jpg, .png, .gif, .webp, .svg), text (.txt), documents (.pdf, .doc, .docx)'}), 400 # Get file info - original_filename = secure_filename(file.filename) file_type = Note.get_file_type_from_filename(original_filename) - # Create upload directory in persistent volume - upload_base = os.path.join('/data', 'uploads', 'notes') - if file_type == 'image': - upload_dir = os.path.join(upload_base, 'images') - elif file_type == 'markdown': - upload_dir = os.path.join(upload_base, 'markdown') - elif file_type == 'text': - upload_dir = os.path.join(upload_base, 'text') - else: - upload_dir = os.path.join(upload_base, 'documents') + # Generate secure file path using UUID + try: + relative_path = generate_secure_file_path(file_type, original_filename) + except ValueError as e: + return jsonify({'success': False, 'error': str(e)}), 400 + # Create upload directory in persistent volume + upload_base = '/data/uploads/notes' + file_path = os.path.join(upload_base, relative_path) + upload_dir = os.path.dirname(file_path) + + # Ensure directory exists os.makedirs(upload_dir, exist_ok=True) - # Generate unique filename - timestamp = int(time.time()) - filename = f"{g.user.company_id}_{g.user.id}_{timestamp}_{original_filename}" - file_path = os.path.join(upload_dir, filename) - relative_path = os.path.join(file_type + 's' if file_type != 'markdown' else file_type, filename) + # Ensure the path is safe + try: + file_path = ensure_safe_file_path(upload_base, relative_path) + except ValueError: + return jsonify({'success': False, 'error': 'Invalid file path'}), 400 # Save the file file.save(file_path) @@ -708,6 +754,9 @@ def upload_note(): # Get file size file_size = os.path.getsize(file_path) + # Get safe MIME type + mime_type = get_safe_mime_type(original_filename) + # Create note content based on file type if file_type == 'markdown' or file_type == 'text': # Read text content @@ -723,7 +772,8 @@ def upload_note(): elif file_type == 'image': # For images, create a simple markdown content with the image title = request.form.get('title') or clean_filename_for_title(original_filename) - content = f"![{title}](/uploads/notes/{relative_path})" + # Content will be updated after note is created with its ID + content = f"![{title}](PLACEHOLDER)" # Get image dimensions try: @@ -734,7 +784,8 @@ def upload_note(): else: # For other documents title = request.form.get('title') or clean_filename_for_title(original_filename) - content = f"[Download {original_filename}](/uploads/notes/{relative_path})" + # Content will be updated after note is created with its ID + content = f"[Download {original_filename}](PLACEHOLDER)" image_width = image_height = None # Create the note @@ -749,7 +800,7 @@ def upload_note(): file_type=file_type, original_filename=original_filename, file_size=file_size, - mime_type=file.content_type + mime_type=mime_type ) if file_type == 'image' and image_width: @@ -759,34 +810,38 @@ def upload_note(): # Set folder if provided folder = request.form.get('folder') if folder: - note.folder = folder - - # Ensure folder exists in database - folder_parts = folder.split('/') - current_path = '' - for i, part in enumerate(folder_parts): - if i == 0: - current_path = part - else: - current_path = current_path + '/' + part - - # Check if folder exists - existing_folder = NoteFolder.query.filter_by( - company_id=g.user.company_id, - path=current_path - ).first() - - if not existing_folder: - # Create folder - parent_path = '/'.join(folder_parts[:i]) if i > 0 else None - new_folder = NoteFolder( - name=part, - path=current_path, - parent_path=parent_path, - company_id=g.user.company_id, - created_by_id=g.user.id - ) - db.session.add(new_folder) + try: + folder = sanitize_folder_path(folder) + # Validate folder exists + if not validate_folder_access(folder, g.user.company_id, db.session): + # Create folder if it doesn't exist + folder_parts = folder.split('/') + current_path = '' + for i, part in enumerate(folder_parts): + if i == 0: + current_path = part + else: + current_path = current_path + '/' + part + + existing_folder = NoteFolder.query.filter_by( + company_id=g.user.company_id, + path=current_path + ).first() + + if not existing_folder: + parent_path = '/'.join(folder_parts[:i]) if i > 0 else None + new_folder = NoteFolder( + name=part, + path=current_path, + parent_path=parent_path, + company_id=g.user.company_id, + created_by_id=g.user.id + ) + db.session.add(new_folder) + note.folder = folder + except ValueError: + # Skip invalid folder + pass # Set tags if provided tags = request.form.get('tags') @@ -800,6 +855,14 @@ def upload_note(): db.session.add(note) db.session.commit() + # Update content with proper file URLs now that we have the note ID + if file_type == 'image': + note.content = f"![{note.title}]({url_for('notes_api.serve_note_file', note_id=note.id)})" + elif file_type not in ['markdown', 'text']: + note.content = f"[Download {original_filename}]({url_for('notes_api.serve_note_file', note_id=note.id)})" + + db.session.commit() + return jsonify({ 'success': True, 'note': { @@ -807,7 +870,7 @@ def upload_note(): 'title': note.title, 'slug': note.slug, 'file_type': note.file_type, - 'file_url': note.file_url, + 'file_url': url_for('notes_api.serve_note_file', note_id=note.id), 'url': url_for('notes.view_note', slug=note.slug) } }) @@ -888,16 +951,17 @@ def bulk_delete_notes(): # Delete files if they exist import os - from flask import current_app for note in notes: if note.file_path: - file_path = os.path.join(current_app.root_path, note.file_path.lstrip('/')) - if os.path.exists(file_path): - try: + try: + # Use the safe base path + upload_base = '/data/uploads/notes' + file_path = ensure_safe_file_path(upload_base, note.file_path) + if os.path.exists(file_path): os.remove(file_path) - except: - pass # Continue even if file deletion fails + except: + pass # Continue even if file deletion fails # Delete all notes for note in notes: @@ -913,3 +977,47 @@ def bulk_delete_notes(): except Exception as e: db.session.rollback() return jsonify({'success': False, 'error': str(e)}), 500 + + +@notes_api_bp.route('/file/') +@login_required +@company_required +def serve_note_file(note_id): + """Securely serve uploaded files after validating access permissions""" + import os + + # Get the note and validate access + note = Note.query.filter_by( + id=note_id, + company_id=g.user.company_id + ).first_or_404() + + # Check if user can view the note + if not note.can_user_view(g.user): + abort(403) + + # Check if note has a file + if not note.file_path: + abort(404) + + # Build safe file path + upload_base = '/data/uploads/notes' + try: + safe_path = ensure_safe_file_path(upload_base, note.file_path) + except ValueError: + abort(403) + + # Check if file exists + if not os.path.exists(safe_path): + abort(404) + + # Get safe MIME type + mime_type = get_safe_mime_type(note.original_filename or 'file') + + # Send the file + return send_file( + safe_path, + mimetype=mime_type, + as_attachment=False, # Display inline for images + download_name=note.original_filename + ) diff --git a/routes/notes_download.py b/routes/notes_download.py index ac35e46..480972e 100644 --- a/routes/notes_download.py +++ b/routes/notes_download.py @@ -14,6 +14,7 @@ from flask import (Blueprint, Response, abort, flash, g, redirect, request, from frontmatter_utils import parse_frontmatter from models import Note, db from routes.auth import company_required, login_required +from security_utils import sanitize_folder_path, validate_folder_access # Create blueprint notes_download_bp = Blueprint('notes_download', __name__) @@ -30,8 +31,11 @@ def download_note(slug, format): if not note.can_user_view(g.user): abort(403) - # Prepare filename + # Prepare filename - extra sanitization safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '_', note.title) + # Ensure filename isn't too long + if len(safe_filename) > 100: + safe_filename = safe_filename[:100] timestamp = datetime.now().strftime('%Y%m%d') if format == 'md': @@ -142,6 +146,8 @@ def download_notes_bulk(): if note and note.can_user_view(g.user): # Get content based on format safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '_', note.title) + if len(safe_filename) > 100: + safe_filename = safe_filename[:100] if format == 'md': content = note.content @@ -195,8 +201,15 @@ def download_notes_bulk(): @company_required def download_folder(folder_path, format): """Download all notes in a folder as a zip file""" - # Decode folder path (replace URL encoding) - folder_path = unquote(folder_path) + # Decode and sanitize folder path + try: + folder_path = sanitize_folder_path(unquote(folder_path)) + except ValueError: + abort(400, "Invalid folder path") + + # Validate folder exists and user has access + if not validate_folder_access(folder_path, g.user.company_id, db.session): + abort(404, "Folder not found") # Get all notes in this folder notes = Note.query.filter_by( @@ -220,6 +233,8 @@ def download_folder(folder_path, format): for note in viewable_notes: # Get content based on format safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '_', note.title) + if len(safe_filename) > 100: + safe_filename = safe_filename[:100] if format == 'md': content = note.content diff --git a/security_utils.py b/security_utils.py new file mode 100644 index 0000000..06323ee --- /dev/null +++ b/security_utils.py @@ -0,0 +1,260 @@ +""" +Security utilities for path sanitization and file handling. +""" + +import os +import re +import uuid +from werkzeug.utils import secure_filename + + +def sanitize_folder_path(path): + """ + Sanitize folder path to prevent traversal attacks. + + Args: + path: The folder path to sanitize + + Returns: + Sanitized path string + + Raises: + ValueError: If path contains forbidden patterns or characters + """ + if not path: + return "" + + # Remove any leading/trailing slashes and whitespace + path = path.strip().strip('/') + + # Reject paths containing dangerous patterns + dangerous_patterns = [ + '..', # Parent directory traversal + './', # Current directory reference + '\\', # Windows path separator + '\0', # Null byte + '~', # Home directory reference + '\x00', # Alternative null byte + '%2e%2e', # URL encoded .. + '%252e%252e', # Double URL encoded .. + ] + + # Check both original and lowercase version + path_lower = path.lower() + for pattern in dangerous_patterns: + if pattern in path or pattern in path_lower: + raise ValueError(f"Invalid path: contains forbidden pattern '{pattern}'") + + # Only allow alphanumeric, spaces, hyphens, underscores, and forward slashes + if not re.match(r'^[a-zA-Z0-9\s\-_/]+$', path): + raise ValueError("Invalid path: contains forbidden characters") + + # Normalize path (remove double slashes, etc.) + path_parts = [p for p in path.split('/') if p] + + # Additional check: ensure no part is '..' or '.' or empty + for part in path_parts: + if part in ('.', '..', '') or part.strip() == '': + raise ValueError("Invalid path: contains directory traversal") + + # Check each part doesn't exceed reasonable length + if len(part) > 100: + raise ValueError("Invalid path: folder name too long") + + # Check total depth + if len(path_parts) > 10: + raise ValueError("Invalid path: folder depth exceeds maximum allowed") + + normalized = '/'.join(path_parts) + + # Final length check + if len(normalized) > 500: + raise ValueError("Invalid path: total path length exceeds maximum allowed") + + return normalized + + +def generate_secure_file_path(file_type, original_filename): + """ + Generate secure file path using UUID to prevent predictable paths. + + Args: + file_type: Type of file (image, markdown, text, document) + original_filename: Original uploaded filename + + Returns: + Secure relative path for file storage + + Raises: + ValueError: If file type is not allowed + """ + if not original_filename: + raise ValueError("Filename is required") + + # Extract and validate extension + _, ext = os.path.splitext(original_filename) + ext = ext.lower() + + # Whitelist allowed extensions by type + allowed_extensions = { + 'markdown': {'.md', '.markdown', '.mdown', '.mkd'}, + 'image': {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.svg'}, + 'text': {'.txt'}, + 'document': {'.pdf', '.doc', '.docx'} + } + + # Verify file type matches extension + type_extensions = allowed_extensions.get(file_type, set()) + if ext not in type_extensions: + all_allowed = set() + for exts in allowed_extensions.values(): + all_allowed.update(exts) + if ext not in all_allowed: + raise ValueError(f"File extension '{ext}' is not allowed") + # Find correct file type based on extension + for ftype, exts in allowed_extensions.items(): + if ext in exts: + file_type = ftype + break + + # Generate UUID for filename + file_id = str(uuid.uuid4()) + + # Create secure filename + secure_name = f"{file_id}{ext}" + + # Return path with type subdirectory + return f"{file_type}/{secure_name}" + + +def validate_folder_access(folder_path, company_id, db_session): + """ + Validate folder exists and belongs to company. + + Args: + folder_path: Path to validate + company_id: Company ID to check against + db_session: Database session + + Returns: + True if folder is valid and accessible, False otherwise + """ + if not folder_path: + return True # Root folder is always valid + + try: + # Sanitize the path first + folder_path = sanitize_folder_path(folder_path) + except ValueError: + return False + + # Import here to avoid circular imports + from models import NoteFolder + + # Check if folder exists in database + folder = db_session.query(NoteFolder).filter_by( + path=folder_path, + company_id=company_id + ).first() + + return folder is not None + + +def ensure_safe_file_path(base_path, file_path): + """ + Ensure a file path is within the safe base directory. + + Args: + base_path: The safe base directory + file_path: The file path to check + + Returns: + Absolute safe path + + Raises: + ValueError: If path would escape the base directory + """ + # Get absolute paths + base_abs = os.path.abspath(base_path) + + # Join paths and resolve + full_path = os.path.join(base_abs, file_path) + full_abs = os.path.abspath(full_path) + + # Ensure the resolved path is within the base + if not full_abs.startswith(base_abs + os.sep) and full_abs != base_abs: + raise ValueError("Path traversal detected") + + return full_abs + + +def validate_filename(filename): + """ + Validate and secure a filename. + + Args: + filename: The filename to validate + + Returns: + Secure filename + + Raises: + ValueError: If filename is invalid + """ + if not filename: + raise ValueError("Filename is required") + + # Use werkzeug's secure_filename + secured = secure_filename(filename) + + if not secured or secured == '': + raise ValueError("Invalid filename") + + # Additional checks + if len(secured) > 255: + raise ValueError("Filename too long") + + # Ensure it has an extension + if '.' not in secured: + raise ValueError("Filename must have an extension") + + return secured + + +def get_safe_mime_type(filename): + """ + Get MIME type for a filename, defaulting to safe types. + + Args: + filename: The filename to check + + Returns: + Safe MIME type string + """ + ext = os.path.splitext(filename)[1].lower() + + mime_types = { + # Markdown + '.md': 'text/markdown', + '.markdown': 'text/markdown', + '.mdown': 'text/markdown', + '.mkd': 'text/markdown', + + # Images + '.png': 'image/png', + '.jpg': 'image/jpeg', + '.jpeg': 'image/jpeg', + '.gif': 'image/gif', + '.webp': 'image/webp', + '.svg': 'image/svg+xml', + + # Text + '.txt': 'text/plain', + + # Documents + '.pdf': 'application/pdf', + '.doc': 'application/msword', + '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' + } + + return mime_types.get(ext, 'application/octet-stream') \ No newline at end of file diff --git a/templates/note_view.html b/templates/note_view.html index 7c1c385..1227231 100644 --- a/templates/note_view.html +++ b/templates/note_view.html @@ -2,99 +2,156 @@ {% block content %}
- - + +
+
+
+

Note Preferences

+ +
+
+
+
+ + + Choose the font family for note previews in the list view +
+ +
+ +
+

This is how your note previews will look with the selected font.

+

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.

+
+
+
+ +
+
+
+