Files
TimeTrack/routes/notes_api.py

916 lines
28 KiB
Python

# Standard library imports
from datetime import datetime, timezone
# Third-party imports
from flask import Blueprint, abort, g, jsonify, request
from sqlalchemy import and_, or_
# Local application imports
from models import Note, NoteFolder, NoteLink, NoteVisibility, db
from routes.auth import company_required, login_required
# Create blueprint
notes_api_bp = Blueprint('notes_api', __name__, url_prefix='/api/notes')
@notes_api_bp.route('/folder-details')
@login_required
@company_required
def api_folder_details():
"""Get folder details including note count"""
folder_path = request.args.get('folder', '')
# Get note count for this folder
note_count = Note.query.filter_by(
company_id=g.user.company_id,
folder=folder_path,
is_archived=False
).count()
# Check if folder exists in NoteFolder table
folder = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=folder_path
).first()
return jsonify({
'success': True,
'folder': {
'path': folder_path,
'name': folder_path.split('/')[-1] if folder_path else 'Root',
'exists': folder is not None,
'note_count': note_count,
'created_at': folder.created_at.isoformat() if folder else None
}
})
@notes_api_bp.route('/folders', methods=['POST'])
@login_required
@company_required
def api_create_folder():
"""Create a new folder"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': 'No data provided'}), 400
folder_name = data.get('name', '').strip()
parent_path = data.get('parent', '').strip()
description = data.get('description', '').strip()
if not folder_name:
return jsonify({'success': False, 'message': 'Folder name is required'}), 400
# Validate folder name
if '/' in folder_name:
return jsonify({'success': False, 'message': 'Folder name cannot contain /'}), 400
# Build full path
if parent_path:
full_path = f"{parent_path}/{folder_name}"
else:
full_path = folder_name
# Check if folder already exists
existing = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=full_path
).first()
if existing:
return jsonify({'success': False, 'message': 'Folder already exists'}), 400
# Create folder
folder = NoteFolder(
name=folder_name,
path=full_path,
parent_path=parent_path if parent_path else None,
description=description if description else None,
company_id=g.user.company_id,
created_by_id=g.user.id
)
db.session.add(folder)
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder created successfully',
'folder': {
'path': folder.path,
'name': folder_name
}
})
@notes_api_bp.route('/folders', methods=['PUT'])
@login_required
@company_required
def api_rename_folder():
"""Rename a folder"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': 'No data provided'}), 400
old_path = data.get('old_path', '').strip()
new_name = data.get('new_name', '').strip()
if not old_path or not new_name:
return jsonify({'success': False, 'message': 'Both old path and new name are required'}), 400
# Validate new name
if '/' in new_name:
return jsonify({'success': False, 'message': 'Folder name cannot contain /'}), 400
# Find the folder
folder = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=old_path
).first()
if not folder:
return jsonify({'success': False, 'message': 'Folder not found'}), 404
# Build new path
path_parts = old_path.split('/')
path_parts[-1] = new_name
new_path = '/'.join(path_parts)
# Check if new path already exists
existing = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=new_path
).first()
if existing:
return jsonify({'success': False, 'message': 'A folder with this name already exists'}), 400
# Update folder path
folder.path = new_path
# Update all notes in this folder
Note.query.filter_by(
company_id=g.user.company_id,
folder=old_path
).update({Note.folder: new_path})
# Update all subfolders
subfolders = NoteFolder.query.filter(
NoteFolder.company_id == g.user.company_id,
NoteFolder.path.like(f"{old_path}/%")
).all()
for subfolder in subfolders:
subfolder.path = subfolder.path.replace(old_path, new_path, 1)
# Update all notes in subfolders
notes_in_subfolders = Note.query.filter(
Note.company_id == g.user.company_id,
Note.folder.like(f"{old_path}/%")
).all()
for note in notes_in_subfolders:
note.folder = note.folder.replace(old_path, new_path, 1)
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder renamed successfully',
'folder': {
'old_path': old_path,
'new_path': new_path
}
})
@notes_api_bp.route('/folders', methods=['DELETE'])
@login_required
@company_required
def api_delete_folder():
"""Delete a folder"""
folder_path = request.args.get('path', '').strip()
if not folder_path:
return jsonify({'success': False, 'message': 'Folder path is required'}), 400
# Check if folder has notes
note_count = Note.query.filter_by(
company_id=g.user.company_id,
folder=folder_path,
is_archived=False
).count()
if note_count > 0:
return jsonify({
'success': False,
'message': f'Cannot delete folder with {note_count} notes. Please move or delete the notes first.'
}), 400
# Find and delete the folder
folder = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=folder_path
).first()
if folder:
db.session.delete(folder)
db.session.commit()
return jsonify({
'success': True,
'message': 'Folder deleted successfully'
})
@notes_api_bp.route('/<slug>/folder', methods=['PUT'])
@login_required
@company_required
def update_note_folder(slug):
"""Update a note's folder via drag and drop"""
note = Note.query.filter_by(slug=slug, company_id=g.user.company_id).first_or_404()
# Check permissions
if not note.can_user_edit(g.user):
return jsonify({'success': False, 'message': 'Permission denied'}), 403
data = request.get_json()
new_folder = data.get('folder', '').strip()
# Update note folder
note.folder = new_folder if new_folder else None
note.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify({
'success': True,
'message': 'Note moved successfully'
})
@notes_api_bp.route('/<int:note_id>/move', methods=['POST'])
@login_required
@company_required
def move_note_to_folder(note_id):
"""Move a note to a different folder (used by drag and drop)"""
note = Note.query.filter_by(id=note_id, company_id=g.user.company_id).first()
if not note:
return jsonify({'success': False, 'error': 'Note not found'}), 404
# Check permissions
if not note.can_user_edit(g.user):
return jsonify({'success': False, 'error': 'Permission denied'}), 403
data = request.get_json()
new_folder = data.get('folder', '').strip()
# Update note folder
note.folder = new_folder if new_folder else None
note.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify({
'success': True,
'message': 'Note moved successfully',
'folder': note.folder or ''
})
@notes_api_bp.route('/<int:note_id>/tags', methods=['POST'])
@login_required
@company_required
def add_tags_to_note(note_id):
"""Add tags to a note"""
note = Note.query.filter_by(id=note_id, company_id=g.user.company_id).first()
if not note:
return jsonify({'success': False, 'message': 'Note not found'}), 404
# Check permissions
if not note.can_user_edit(g.user):
return jsonify({'success': False, 'message': 'Permission denied'}), 403
data = request.get_json()
new_tags = data.get('tags', '').strip()
if not new_tags:
return jsonify({'success': False, 'message': 'No tags provided'}), 400
# Merge with existing tags
existing_tags = note.get_tags_list()
new_tag_list = [tag.strip() for tag in new_tags.split(',') if tag.strip()]
# Combine and deduplicate
all_tags = list(set(existing_tags + new_tag_list))
# Update note
note.tags = ', '.join(sorted(all_tags))
note.updated_at = datetime.now(timezone.utc)
db.session.commit()
return jsonify({
'success': True,
'message': 'Tags added successfully',
'tags': all_tags
})
@notes_api_bp.route('/<int:note_id>/link', methods=['POST'])
@login_required
@company_required
def link_notes(note_id):
"""Create a link between two notes"""
source_note = Note.query.filter_by(id=note_id, company_id=g.user.company_id).first()
if not source_note:
return jsonify({'success': False, 'message': 'Source note not found'}), 404
# Check permissions
if not source_note.can_user_edit(g.user):
return jsonify({'success': False, 'message': 'Permission denied'}), 403
data = request.get_json()
target_note_id = data.get('target_note_id')
link_type = data.get('link_type', 'related')
if not target_note_id:
return jsonify({'success': False, 'message': 'Target note ID is required'}), 400
# Get target note
target_note = Note.query.filter_by(id=target_note_id, company_id=g.user.company_id).first()
if not target_note:
return jsonify({'success': False, 'message': 'Target note not found'}), 404
# Check if user can view target note
if not target_note.can_user_view(g.user):
return jsonify({'success': False, 'message': 'You cannot link to a note you cannot view'}), 403
# Check if link already exists
existing_link = NoteLink.query.filter_by(
source_note_id=source_note.id,
target_note_id=target_note.id
).first()
if existing_link:
return jsonify({'success': False, 'message': 'Link already exists'}), 400
# Create link
link = NoteLink(
source_note_id=source_note.id,
target_note_id=target_note.id,
link_type=link_type,
created_by_id=g.user.id
)
db.session.add(link)
db.session.commit()
return jsonify({
'success': True,
'message': 'Notes linked successfully'
})
@notes_api_bp.route('/<int:note_id>/link', methods=['DELETE'])
@login_required
@company_required
def unlink_notes(note_id):
"""Remove a link between two notes"""
source_note = Note.query.filter_by(id=note_id, company_id=g.user.company_id).first()
if not source_note:
return jsonify({'success': False, 'message': 'Source note not found'}), 404
# Check permissions
if not source_note.can_user_edit(g.user):
return jsonify({'success': False, 'message': 'Permission denied'}), 403
data = request.get_json()
target_note_id = data.get('target_note_id')
if not target_note_id:
return jsonify({'success': False, 'message': 'Target note ID is required'}), 400
# Find and delete the link (check both directions)
link = NoteLink.query.filter(
or_(
and_(
NoteLink.source_note_id == source_note.id,
NoteLink.target_note_id == target_note_id
),
and_(
NoteLink.source_note_id == target_note_id,
NoteLink.target_note_id == source_note.id
)
)
).first()
if not link:
return jsonify({'success': False, 'message': 'Link not found'}), 404
db.session.delete(link)
db.session.commit()
return jsonify({
'success': True,
'message': 'Link removed successfully'
})
@notes_api_bp.route('/<slug>/shares', methods=['POST'])
@login_required
@company_required
def create_note_share(slug):
"""Create a share link for a note"""
note = Note.query.filter_by(slug=slug, company_id=g.user.company_id).first()
if not note:
return jsonify({'success': False, 'error': 'Note not found'}), 404
# Check permissions - only editors can create shares
if not note.can_user_edit(g.user):
return jsonify({'success': False, 'error': 'Permission denied'}), 403
data = request.get_json()
try:
share = note.create_share_link(
expires_in_days=data.get('expires_in_days'),
password=data.get('password'),
max_views=data.get('max_views')
)
db.session.commit()
return jsonify({
'success': True,
'share': share.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@notes_api_bp.route('/<slug>/shares', methods=['GET'])
@login_required
@company_required
def list_note_shares(slug):
"""List all share links for a note"""
note = Note.query.filter_by(slug=slug, company_id=g.user.company_id).first()
if not note:
return jsonify({'success': False, 'error': 'Note not found'}), 404
# Check permissions
if not note.can_user_view(g.user):
return jsonify({'success': False, 'error': 'Permission denied'}), 403
# Get all shares (not just active ones)
shares = note.get_all_shares()
return jsonify({
'success': True,
'shares': [s.to_dict() for s in shares]
})
@notes_api_bp.route('/shares/<int:share_id>', methods=['DELETE'])
@login_required
@company_required
def delete_note_share(share_id):
"""Delete a share link"""
from models import NoteShare
share = NoteShare.query.get(share_id)
if not share:
return jsonify({'success': False, 'error': 'Share not found'}), 404
# Check permissions
if share.note.company_id != g.user.company_id:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
if not share.note.can_user_edit(g.user):
return jsonify({'success': False, 'error': 'Permission denied'}), 403
try:
db.session.delete(share)
db.session.commit()
return jsonify({
'success': True,
'message': 'Share link deleted successfully'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@notes_api_bp.route('/shares/<int:share_id>', methods=['PUT'])
@login_required
@company_required
def update_note_share(share_id):
"""Update a share link settings"""
from models import NoteShare
share = NoteShare.query.get(share_id)
if not share:
return jsonify({'success': False, 'error': 'Share not found'}), 404
# Check permissions
if share.note.company_id != g.user.company_id:
return jsonify({'success': False, 'error': 'Permission denied'}), 403
if not share.note.can_user_edit(g.user):
return jsonify({'success': False, 'error': 'Permission denied'}), 403
data = request.get_json()
try:
# Update expiration
if 'expires_in_days' in data:
if data['expires_in_days'] is None:
share.expires_at = None
else:
from datetime import datetime, timedelta
share.expires_at = datetime.now() + timedelta(days=data['expires_in_days'])
# Update password
if 'password' in data:
share.set_password(data['password'])
# Update view limit
if 'max_views' in data:
share.max_views = data['max_views']
db.session.commit()
return jsonify({
'success': True,
'share': share.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@notes_api_bp.route('/autocomplete')
@login_required
@company_required
def autocomplete_notes():
"""Get notes for autocomplete suggestions"""
query = request.args.get('q', '').strip()
limit = min(int(request.args.get('limit', 10)), 50) # Max 50 results
# Base query - only notes user can see
notes_query = Note.query.filter(
Note.company_id == g.user.company_id,
Note.is_archived == False
).filter(
or_(
# Private notes created by user
and_(Note.visibility == NoteVisibility.PRIVATE, Note.created_by_id == g.user.id),
# Team notes from user's team
and_(Note.visibility == NoteVisibility.TEAM, Note.created_by.has(team_id=g.user.team_id)),
# Company notes
Note.visibility == NoteVisibility.COMPANY
)
)
# Apply search filter if query provided
if query:
search_pattern = f'%{query}%'
notes_query = notes_query.filter(
or_(
Note.title.ilike(search_pattern),
Note.slug.ilike(search_pattern)
)
)
# Order by relevance (exact matches first) and limit results
notes = notes_query.order_by(
# Exact title match first
(Note.title == query).desc(),
# Then exact slug match
(Note.slug == query).desc(),
# Then by title
Note.title
).limit(limit).all()
# Format results for autocomplete
results = []
for note in notes:
results.append({
'id': note.id,
'title': note.title,
'slug': note.slug,
'folder': note.folder or '',
'tags': note.get_tags_list(),
'visibility': note.visibility.value,
'preview': note.get_preview(100),
'updated_at': note.updated_at.isoformat() if note.updated_at else None
})
return jsonify({
'success': True,
'results': results,
'count': len(results),
'query': query
})
def clean_filename_for_title(filename):
"""Remove common folder-like prefixes from filename to create cleaner titles."""
# Remove file extension
name = filename.rsplit('.', 1)[0]
# Common prefixes that might be folder names
prefixes_to_remove = [
'Webpages_', 'Documents_', 'Files_', 'Downloads_',
'Images_', 'Photos_', 'Pictures_', 'Uploads_',
'Docs_', 'PDFs_', 'Attachments_', 'Archive_'
]
# Remove prefix if found at the beginning
for prefix in prefixes_to_remove:
if name.startswith(prefix):
name = name[len(prefix):]
break
# Also handle cases where the filename starts with numbers/dates
# e.g., "2024-01-15_Document_Name" -> "Document Name"
import re
name = re.sub(r'^\d{4}[-_]\d{2}[-_]\d{2}[-_]', '', name)
name = re.sub(r'^\d+[-_]', '', name)
# Replace underscores with spaces for readability
name = name.replace('_', ' ')
# Clean up multiple spaces
name = ' '.join(name.split())
return name if name else filename.rsplit('.', 1)[0]
@notes_api_bp.route('/upload', methods=['POST'])
@login_required
@company_required
def upload_note():
"""Upload a file (markdown or image) and create a note from it"""
import os
import time
from werkzeug.utils import secure_filename
from PIL import Image
from flask import current_app, url_for
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': 'No file selected'}), 400
# Check if file type is allowed
if not Note.allowed_file(file.filename):
return jsonify({'success': False, 'error': 'File type not allowed. Supported: markdown (.md), images (.jpg, .png, .gif, .webp, .svg), text (.txt), documents (.pdf, .doc, .docx)'}), 400
# Get file info
original_filename = secure_filename(file.filename)
file_type = Note.get_file_type_from_filename(original_filename)
# Create upload directory in persistent volume
upload_base = os.path.join('/data', 'uploads', 'notes')
if file_type == 'image':
upload_dir = os.path.join(upload_base, 'images')
elif file_type == 'markdown':
upload_dir = os.path.join(upload_base, 'markdown')
elif file_type == 'text':
upload_dir = os.path.join(upload_base, 'text')
else:
upload_dir = os.path.join(upload_base, 'documents')
os.makedirs(upload_dir, exist_ok=True)
# Generate unique filename
timestamp = int(time.time())
filename = f"{g.user.company_id}_{g.user.id}_{timestamp}_{original_filename}"
file_path = os.path.join(upload_dir, filename)
relative_path = os.path.join(file_type + 's' if file_type != 'markdown' else file_type, filename)
# Save the file
file.save(file_path)
# Get file size
file_size = os.path.getsize(file_path)
# Create note content based on file type
if file_type == 'markdown' or file_type == 'text':
# Read text content
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# If not UTF-8, try with different encoding
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
title = request.form.get('title') or clean_filename_for_title(original_filename)
image_width = image_height = None
elif file_type == 'image':
# For images, create a simple markdown content with the image
title = request.form.get('title') or clean_filename_for_title(original_filename)
content = f"![{title}](/uploads/notes/{relative_path})"
# Get image dimensions
try:
with Image.open(file_path) as img:
image_width, image_height = img.size
except:
image_width = image_height = None
else:
# For other documents
title = request.form.get('title') or clean_filename_for_title(original_filename)
content = f"[Download {original_filename}](/uploads/notes/{relative_path})"
image_width = image_height = None
# Create the note
note = Note(
title=title,
content=content,
visibility=NoteVisibility.PRIVATE,
created_by_id=g.user.id,
company_id=g.user.company_id,
is_file_based=True,
file_path=relative_path,
file_type=file_type,
original_filename=original_filename,
file_size=file_size,
mime_type=file.content_type
)
if file_type == 'image' and image_width:
note.image_width = image_width
note.image_height = image_height
# Set folder if provided
folder = request.form.get('folder')
if folder:
note.folder = folder
# Ensure folder exists in database
folder_parts = folder.split('/')
current_path = ''
for i, part in enumerate(folder_parts):
if i == 0:
current_path = part
else:
current_path = current_path + '/' + part
# Check if folder exists
existing_folder = NoteFolder.query.filter_by(
company_id=g.user.company_id,
path=current_path
).first()
if not existing_folder:
# Create folder
parent_path = '/'.join(folder_parts[:i]) if i > 0 else None
new_folder = NoteFolder(
name=part,
path=current_path,
parent_path=parent_path,
company_id=g.user.company_id,
created_by_id=g.user.id
)
db.session.add(new_folder)
# Set tags if provided
tags = request.form.get('tags')
if tags:
note.tags = tags
# Generate slug
note.generate_slug()
try:
db.session.add(note)
db.session.commit()
return jsonify({
'success': True,
'note': {
'id': note.id,
'title': note.title,
'slug': note.slug,
'file_type': note.file_type,
'file_url': note.file_url,
'url': url_for('notes.view_note', slug=note.slug)
}
})
except Exception as e:
db.session.rollback()
# Delete uploaded file on error
if os.path.exists(file_path):
os.remove(file_path)
return jsonify({'success': False, 'error': str(e)}), 500
@notes_api_bp.route('/bulk-move', methods=['POST'])
@login_required
@company_required
def bulk_move_notes():
"""Move multiple notes to a different folder."""
data = request.get_json()
if not data or 'note_ids' not in data:
return jsonify({'success': False, 'error': 'No notes specified'}), 400
note_ids = data.get('note_ids', [])
folder = data.get('folder', '')
if not note_ids:
return jsonify({'success': False, 'error': 'No notes specified'}), 400
try:
# Get all notes and verify ownership
notes = Note.query.filter(
Note.id.in_(note_ids),
Note.company_id == g.user.company_id
).all()
if len(notes) != len(note_ids):
return jsonify({'success': False, 'error': 'Some notes not found or access denied'}), 403
# Update folder for all notes
for note in notes:
note.folder = folder if folder else None
db.session.commit()
return jsonify({
'success': True,
'message': f'Moved {len(notes)} note(s) to {"folder: " + folder if folder else "root"}'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@notes_api_bp.route('/bulk-delete', methods=['POST'])
@login_required
@company_required
def bulk_delete_notes():
"""Delete multiple notes."""
data = request.get_json()
if not data or 'note_ids' not in data:
return jsonify({'success': False, 'error': 'No notes specified'}), 400
note_ids = data.get('note_ids', [])
if not note_ids:
return jsonify({'success': False, 'error': 'No notes specified'}), 400
try:
# Get all notes and verify ownership
notes = Note.query.filter(
Note.id.in_(note_ids),
Note.company_id == g.user.company_id
).all()
if len(notes) != len(note_ids):
return jsonify({'success': False, 'error': 'Some notes not found or access denied'}), 403
# Delete files if they exist
import os
from flask import current_app
for note in notes:
if note.file_path:
file_path = os.path.join(current_app.root_path, note.file_path.lstrip('/'))
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass # Continue even if file deletion fails
# Delete all notes
for note in notes:
db.session.delete(note)
db.session.commit()
return jsonify({
'success': True,
'message': f'Deleted {len(notes)} note(s)'
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500