from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, TaskDependency, Sprint, SprintStatus, Announcement, SystemEvent, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate from data_formatting import ( format_duration, prepare_export_data, prepare_team_hours_export_data, format_table_data, format_graph_data, format_team_data, format_burndown_data ) from data_export import ( export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel, export_analytics_csv, export_analytics_excel ) from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings import logging from datetime import datetime, time, timedelta import os import csv import io import pandas as pd from sqlalchemy import func from functools import wraps from flask_mail import Mail, Message from dotenv import load_dotenv from werkzeug.security import check_password_hash # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:////data/timetrack.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_timetrack') app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Session lasts for 7 days # Configure Flask-Mail app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.example.com') app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT') or 587) app.config['MAIL_USE_TLS'] = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1'] app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', 'your-email@example.com') app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', 'your-password') app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', 'TimeTrack ') # Log mail configuration (without password) logger.info(f"Mail server: {app.config['MAIL_SERVER']}") logger.info(f"Mail port: {app.config['MAIL_PORT']}") logger.info(f"Mail use TLS: {app.config['MAIL_USE_TLS']}") logger.info(f"Mail username: {app.config['MAIL_USERNAME']}") logger.info(f"Mail default sender: {app.config['MAIL_DEFAULT_SENDER']}") mail = Mail(app) # Initialize the database with the app db.init_app(app) # Consolidated migration using migrate_db module def run_migrations(): """Run all database migrations using the consolidated migrate_db module.""" # Check if we're using PostgreSQL or SQLite database_url = app.config['SQLALCHEMY_DATABASE_URI'] print(f"DEBUG: Database URL: {database_url}") is_postgresql = 'postgresql://' in database_url or 'postgres://' in database_url print(f"DEBUG: Is PostgreSQL: {is_postgresql}") if is_postgresql: print("Using PostgreSQL - skipping SQLite migrations, ensuring tables exist...") with app.app_context(): db.create_all() init_system_settings() print("PostgreSQL setup completed successfully!") else: print("Using SQLite - running SQLite migrations...") try: from migrate_db import run_all_migrations run_all_migrations() print("SQLite database migrations completed successfully!") except ImportError as e: print(f"Error importing migrate_db: {e}") print("Falling back to basic table creation...") with app.app_context(): db.create_all() init_system_settings() except Exception as e: print(f"Error during SQLite migration: {e}") print("Falling back to basic table creation...") with app.app_context(): db.create_all() init_system_settings() def migrate_to_company_model(): """Migrate existing data to support company model (stub - handled by migrate_db)""" try: from migrate_db import migrate_to_company_model, get_db_path db_path = get_db_path() migrate_to_company_model(db_path) except ImportError: print("migrate_db module not available - skipping company model migration") except Exception as e: print(f"Error during company migration: {e}") raise def init_system_settings(): """Initialize system settings with default values if they don't exist""" if not SystemSettings.query.filter_by(key='registration_enabled').first(): print("Adding registration_enabled system setting...") reg_setting = SystemSettings( key='registration_enabled', value='true', description='Controls whether new user registration is allowed' ) db.session.add(reg_setting) db.session.commit() if not SystemSettings.query.filter_by(key='email_verification_required').first(): print("Adding email_verification_required system setting...") email_setting = SystemSettings( key='email_verification_required', value='true', description='Controls whether email verification is required for new user accounts' ) db.session.add(email_setting) db.session.commit() def migrate_data(): """Handle data migrations and setup (stub - handled by migrate_db)""" try: from migrate_db import migrate_data migrate_data() except ImportError: print("migrate_db module not available - skipping data migration") except Exception as e: print(f"Error during data migration: {e}") raise def migrate_work_config_data(): """Migrate existing WorkConfig data to new architecture (stub - handled by migrate_db)""" try: from migrate_db import migrate_work_config_data, get_db_path db_path = get_db_path() migrate_work_config_data(db_path) except ImportError: print("migrate_db module not available - skipping work config data migration") except Exception as e: print(f"Error during work config migration: {e}") raise def migrate_task_system(): """Create tables for the task management system (stub - handled by migrate_db)""" try: from migrate_db import migrate_task_system, get_db_path db_path = get_db_path() migrate_task_system(db_path) except ImportError: print("migrate_db module not available - skipping task system migration") except Exception as e: print(f"Error during task system migration: {e}") raise # Call this function during app initialization @app.before_first_request def initialize_app(): run_migrations() # This handles all migrations including work config data # Add this after initializing the app but before defining routes @app.context_processor def inject_globals(): """Make certain variables available to all templates.""" # Get active announcements for current user active_announcements = [] if g.user: active_announcements = Announcement.get_active_announcements_for_user(g.user) # Get tracking script settings tracking_script_enabled = False tracking_script_code = '' try: tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first() if tracking_enabled_setting: tracking_script_enabled = tracking_enabled_setting.value == 'true' tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first() if tracking_code_setting: tracking_script_code = tracking_code_setting.value except Exception: pass # In case database isn't available yet return { 'Role': Role, 'AccountType': AccountType, 'current_year': datetime.now().year, 'active_announcements': active_announcements, 'tracking_script_enabled': tracking_script_enabled, 'tracking_script_code': tracking_script_code } # Template filters for date/time formatting @app.template_filter('from_json') def from_json_filter(json_str): """Parse JSON string to Python object.""" if not json_str: return [] try: import json return json.loads(json_str) except (json.JSONDecodeError, TypeError): return [] @app.template_filter('format_date') def format_date_filter(dt): """Format date according to user preferences.""" if not dt or not g.user: return dt.strftime('%Y-%m-%d') if dt else '' from time_utils import format_date_by_preference, get_user_format_settings date_format, _ = get_user_format_settings(g.user) return format_date_by_preference(dt, date_format) @app.template_filter('format_time') def format_time_filter(dt): """Format time according to user preferences.""" if not dt or not g.user: return dt.strftime('%H:%M:%S') if dt else '' from time_utils import format_time_by_preference, get_user_format_settings _, time_format_24h = get_user_format_settings(g.user) return format_time_by_preference(dt, time_format_24h) @app.template_filter('format_time_short') def format_time_short_filter(dt): """Format time without seconds according to user preferences.""" if not dt or not g.user: return dt.strftime('%H:%M') if dt else '' from time_utils import format_time_short_by_preference, get_user_format_settings _, time_format_24h = get_user_format_settings(g.user) return format_time_short_by_preference(dt, time_format_24h) @app.template_filter('format_datetime') def format_datetime_filter(dt): """Format datetime according to user preferences.""" if not dt or not g.user: return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else '' from time_utils import format_datetime_by_preference, get_user_format_settings date_format, time_format_24h = get_user_format_settings(g.user) return format_datetime_by_preference(dt, date_format, time_format_24h) @app.template_filter('format_duration') def format_duration_filter(duration_seconds): """Format duration in readable format.""" if duration_seconds is None: return '00:00:00' from time_utils import format_duration_readable return format_duration_readable(duration_seconds) # Authentication decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) return f(*args, **kwargs) return decorated_function # Admin-only decorator def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None or (g.user.role != Role.ADMIN and g.user.role != Role.SYSTEM_ADMIN): flash('You need administrator privileges to access this page.', 'error') return redirect(url_for('home')) return f(*args, **kwargs) return decorated_function # System Admin-only decorator def system_admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None or g.user.role != Role.SYSTEM_ADMIN: flash('You need system administrator privileges to access this page.', 'error') return redirect(url_for('home')) return f(*args, **kwargs) return decorated_function def get_system_setting(key, default='false'): """Helper function to get system setting value""" setting = SystemSettings.query.filter_by(key=key).first() return setting.value if setting else default def is_system_admin(user=None): """Helper function to check if user is system admin""" if user is None: user = g.user return user and user.role == Role.SYSTEM_ADMIN def can_access_system_settings(user=None): """Helper function to check if user can access system-wide settings""" return is_system_admin(user) def get_available_roles(): """Get roles available for assignment, excluding SYSTEM_ADMIN unless one already exists""" roles = list(Role) # Only show SYSTEM_ADMIN role if at least one system admin already exists # This prevents accidental creation of system admins system_admin_exists = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() > 0 if not system_admin_exists: roles = [role for role in roles if role != Role.SYSTEM_ADMIN] return roles # Add this decorator function after your existing decorators def role_required(min_role): """ Decorator to restrict access based on user role. min_role should be a Role enum value (e.g., Role.TEAM_LEADER) """ def role_decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) # Admin and System Admin always have access if g.user.role == Role.ADMIN or g.user.role == Role.SYSTEM_ADMIN: return f(*args, **kwargs) # Check role hierarchy role_hierarchy = { Role.TEAM_MEMBER: 1, Role.TEAM_LEADER: 2, Role.SUPERVISOR: 3, Role.ADMIN: 4, Role.SYSTEM_ADMIN: 5 } if role_hierarchy.get(g.user.role, 0) < role_hierarchy.get(min_role, 0): flash('You do not have sufficient permissions to access this page.', 'error') return redirect(url_for('home')) return f(*args, **kwargs) return decorated_function return role_decorator def company_required(f): """ Decorator to ensure user has a valid company association and set company context. """ @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) # System admins can access without company association if g.user.role == Role.SYSTEM_ADMIN: return f(*args, **kwargs) if g.user.company_id is None: flash('You must be associated with a company to access this page.', 'error') return redirect(url_for('setup_company')) # Set company context g.company = Company.query.get(g.user.company_id) if not g.company or not g.company.is_active: flash('Your company is not active. Please contact support.', 'error') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function @app.before_request def load_logged_in_user(): user_id = session.get('user_id') if user_id is None: g.user = None g.company = None else: g.user = User.query.get(user_id) if g.user: # Set company context if g.user.company_id: g.company = Company.query.get(g.user.company_id) else: g.company = None # Check if user is verified if not g.user.is_verified and request.endpoint not in ['verify_email', 'static', 'logout', 'setup_company']: # Allow unverified users to access only verification and static resources if request.endpoint not in ['login', 'register']: flash('Please verify your email address before accessing this page.', 'warning') session.clear() return redirect(url_for('login')) else: g.company = None @app.route('/') def home(): if g.user: # Get active entry (no departure time) active_entry = TimeEntry.query.filter_by( user_id=g.user.id, departure_time=None ).first() # Get today's completed entries for history today = datetime.now().date() history = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.departure_time.isnot(None), TimeEntry.arrival_time >= datetime.combine(today, time.min), TimeEntry.arrival_time <= datetime.combine(today, time.max) ).order_by(TimeEntry.arrival_time.desc()).all() # Get available projects for this user (company-scoped) available_projects = [] if g.user.company_id: all_projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).all() for project in all_projects: if project.is_user_allowed(g.user): available_projects.append(project) return render_template('index.html', title='Home', active_entry=active_entry, history=history, available_projects=available_projects) else: return render_template('about.html', title='Home') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user: # Fix role if it's a string or None if isinstance(user.role, str) or user.role is None: # Map string role values to enum values role_mapping = { 'Team Member': Role.TEAM_MEMBER, 'TEAM_MEMBER': Role.TEAM_MEMBER, 'Team Leader': Role.TEAM_LEADER, 'TEAM_LEADER': Role.TEAM_LEADER, 'Supervisor': Role.SUPERVISOR, 'SUPERVISOR': Role.SUPERVISOR, 'Administrator': Role.ADMIN, 'ADMIN': Role.ADMIN } if isinstance(user.role, str): user.role = role_mapping.get(user.role, Role.TEAM_MEMBER) else: user.role = Role.ADMIN if user.role == Role.ADMIN else Role.TEAM_MEMBER db.session.commit() # Now proceed with password check if user.check_password(password): # Check if user is blocked if user.is_blocked: flash('Your account has been disabled. Please contact an administrator.', 'error') return render_template('login.html') # Check if 2FA is enabled if user.two_factor_enabled: # Store user ID for 2FA verification session['2fa_user_id'] = user.id return redirect(url_for('verify_2fa')) else: # Continue with normal login process session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value # Log successful login SystemEvent.log_event( 'user_login', f'User {user.username} logged in successfully', 'auth', 'info', user_id=user.id, company_id=user.company_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) flash('Login successful!', 'success') return redirect(url_for('home')) # Log failed login attempt SystemEvent.log_event( 'login_failed', f'Failed login attempt for username: {username}', 'auth', 'warning', ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) flash('Invalid username or password', 'error') return render_template('login.html', title='Login') @app.route('/logout') def logout(): # Log logout event before clearing session if 'user_id' in session: user = User.query.get(session['user_id']) if user: SystemEvent.log_event( 'user_logout', f'User {user.username} logged out', 'auth', 'info', user_id=user.id, company_id=user.company_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) session.clear() flash('You have been logged out.', 'info') return redirect(url_for('login')) @app.route('/register', methods=['GET', 'POST']) def register(): # Check if registration is enabled registration_enabled = get_system_setting('registration_enabled', 'true') == 'true' if not registration_enabled: flash('Registration is currently disabled by the administrator.', 'error') return redirect(url_for('login')) # Check if companies exist, if not redirect to company setup if Company.query.count() == 0: flash('No companies exist yet. Please set up your company first.', 'info') return redirect(url_for('setup_company')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') confirm_password = request.form.get('confirm_password') company_code = request.form.get('company_code', '').strip() # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif not password: error = 'Password is required' elif password != confirm_password: error = 'Passwords do not match' elif not company_code: error = 'Company code is required' # Find company by code company = None if company_code: company = Company.query.filter_by(slug=company_code.lower()).first() if not company: error = 'Invalid company code' # Check for existing users within the company if company and not error: if User.query.filter_by(username=username, company_id=company.id).first(): error = 'Username already exists in this company' elif User.query.filter_by(email=email, company_id=company.id).first(): error = 'Email already registered in this company' if error is None and company: try: # Check if this is the first user account in this company is_first_user_in_company = User.query.filter_by(company_id=company.id).count() == 0 # Check if email verification is required email_verification_required = get_system_setting('email_verification_required', 'true') == 'true' new_user = User( username=username, email=email, company_id=company.id, is_verified=False ) new_user.set_password(password) # Make first user in company an admin with full privileges if is_first_user_in_company: new_user.role = Role.ADMIN new_user.is_verified = True # Auto-verify first user in company elif not email_verification_required: # If email verification is disabled, auto-verify new users new_user.is_verified = True # Generate verification token (even if not needed, for consistency) token = new_user.generate_verification_token() db.session.add(new_user) db.session.commit() if is_first_user_in_company: # First user in company gets admin privileges and is auto-verified logger.info(f"First user account created in company {company.name}: {username} with admin privileges") flash(f'Welcome! You are the first user in {company.name} and have been granted administrator privileges. You can now log in.', 'success') elif not email_verification_required: # Email verification is disabled, user can log in immediately logger.info(f"User account created with auto-verification in company {company.name}: {username}") flash('Registration successful! You can now log in.', 'success') else: # Send verification email for regular users when verification is required verification_url = url_for('verify_email', token=token, _external=True) msg = Message('Verify your TimeTrack account', recipients=[email]) msg.body = f'''Hello {username}, Thank you for registering with TimeTrack. To complete your registration, please click on the link below: {verification_url} This link will expire in 24 hours. If you did not register for TimeTrack, please ignore this email. Best regards, The TimeTrack Team ''' mail.send(msg) logger.info(f"Verification email sent to {email}") flash('Registration initiated! Please check your email to verify your account.', 'success') return redirect(url_for('login')) except Exception as e: db.session.rollback() logger.error(f"Error during registration: {str(e)}") error = f"An error occurred during registration: {str(e)}" flash(error, 'error') return render_template('register.html', title='Register') @app.route('/register/freelancer', methods=['GET', 'POST']) def register_freelancer(): """Freelancer registration route - creates user without company token""" # Check if registration is enabled registration_enabled = get_system_setting('registration_enabled', 'true') == 'true' if not registration_enabled: flash('Registration is currently disabled by the administrator.', 'error') return redirect(url_for('login')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') confirm_password = request.form.get('confirm_password') business_name = request.form.get('business_name', '').strip() # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif not password: error = 'Password is required' elif password != confirm_password: error = 'Passwords do not match' # Check for existing users globally (freelancers get unique usernames/emails) if not error: if User.query.filter_by(username=username).first(): error = 'Username already exists' elif User.query.filter_by(email=email).first(): error = 'Email already registered' if error is None: try: # Create personal company for freelancer company_name = business_name if business_name else f"{username}'s Workspace" # Generate unique company slug import re slug = re.sub(r'[^\w\s-]', '', company_name.lower()) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # Ensure slug uniqueness base_slug = slug counter = 1 while Company.query.filter_by(slug=slug).first(): slug = f"{base_slug}-{counter}" counter += 1 # Create personal company personal_company = Company( name=company_name, slug=slug, description=f"Personal workspace for {username}", is_personal=True, max_users=1 # Limit to single user ) db.session.add(personal_company) db.session.flush() # Get company ID # Create freelancer user new_user = User( username=username, email=email, company_id=personal_company.id, account_type=AccountType.FREELANCER, business_name=business_name if business_name else None, role=Role.ADMIN, # Freelancers are admins of their personal company is_verified=True # Auto-verify freelancers ) new_user.set_password(password) db.session.add(new_user) db.session.commit() logger.info(f"Freelancer account created: {username} with personal company: {company_name}") flash(f'Welcome {username}! Your freelancer account has been created successfully. You can now log in.', 'success') return redirect(url_for('login')) except Exception as e: db.session.rollback() logger.error(f"Error during freelancer registration: {str(e)}") error = f"An error occurred during registration: {str(e)}" if error: flash(error, 'error') return render_template('register_freelancer.html', title='Register as Freelancer') @app.route('/setup_company', methods=['GET', 'POST']) def setup_company(): """Company setup route for creating new companies with admin users""" existing_companies = Company.query.count() # Determine access level is_initial_setup = existing_companies == 0 is_super_admin = g.user and g.user.role == Role.ADMIN and existing_companies > 0 is_authorized = is_initial_setup or is_super_admin # Check authorization for non-initial setups if not is_initial_setup and not is_super_admin: flash('You do not have permission to create new companies.', 'error') return redirect(url_for('home') if g.user else url_for('login')) if request.method == 'POST': company_name = request.form.get('company_name') company_description = request.form.get('company_description', '') admin_username = request.form.get('admin_username') admin_email = request.form.get('admin_email') admin_password = request.form.get('admin_password') confirm_password = request.form.get('confirm_password') # Validate input error = None if not company_name: error = 'Company name is required' elif not admin_username: error = 'Admin username is required' elif not admin_email: error = 'Admin email is required' elif not admin_password: error = 'Admin password is required' elif admin_password != confirm_password: error = 'Passwords do not match' elif len(admin_password) < 6: error = 'Password must be at least 6 characters long' if error is None: try: # Generate company slug import re slug = re.sub(r'[^\w\s-]', '', company_name.lower()) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # Ensure slug uniqueness base_slug = slug counter = 1 while Company.query.filter_by(slug=slug).first(): slug = f"{base_slug}-{counter}" counter += 1 # Create company company = Company( name=company_name, slug=slug, description=company_description, is_active=True ) db.session.add(company) db.session.flush() # Get company.id without committing # Check if username/email already exists in this company context existing_user_by_username = User.query.filter_by( username=admin_username, company_id=company.id ).first() existing_user_by_email = User.query.filter_by( email=admin_email, company_id=company.id ).first() if existing_user_by_username: error = 'Username already exists in this company' elif existing_user_by_email: error = 'Email already registered in this company' if error is None: # Create admin user admin_user = User( username=admin_username, email=admin_email, company_id=company.id, role=Role.ADMIN, is_verified=True # Auto-verify company admin ) admin_user.set_password(admin_password) db.session.add(admin_user) db.session.commit() if is_initial_setup: # Auto-login the admin user for initial setup session['user_id'] = admin_user.id session['username'] = admin_user.username session['role'] = admin_user.role.value flash(f'Company "{company_name}" created successfully! You are now logged in as the administrator.', 'success') return redirect(url_for('home')) else: # For super admin creating additional companies, don't auto-login flash(f'Company "{company_name}" created successfully! Admin user "{admin_username}" has been created with the company code "{slug}".', 'success') return redirect(url_for('admin_company') if g.user else url_for('login')) else: db.session.rollback() except Exception as e: db.session.rollback() logger.error(f"Error during company setup: {str(e)}") error = f"An error occurred during setup: {str(e)}" if error: flash(error, 'error') return render_template('setup_company.html', title='Company Setup', existing_companies=existing_companies, is_initial_setup=is_initial_setup, is_super_admin=is_super_admin) @app.route('/verify_email/') def verify_email(token): user = User.query.filter_by(verification_token=token).first() if not user: flash('Invalid or expired verification link.', 'error') return redirect(url_for('login')) if user.verify_token(token): db.session.commit() flash('Email verified successfully! You can now log in.', 'success') else: flash('Invalid or expired verification link.', 'error') return redirect(url_for('login')) @app.route('/dashboard') @role_required(Role.TEAM_MEMBER) @company_required def dashboard(): """User dashboard with configurable widgets.""" return render_template('dashboard.html', title='Dashboard') # Redirect old admin dashboard URL to new dashboard @app.route('/admin/users') @admin_required @company_required def admin_users(): users = User.query.filter_by(company_id=g.user.company_id).all() return render_template('admin_users.html', title='User Management', users=users) @app.route('/admin/users/create', methods=['GET', 'POST']) @admin_required @company_required def create_user(): if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') auto_verify = 'auto_verify' in request.form # Get role and team role_name = request.form.get('role') team_id = request.form.get('team_id') # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif not password: error = 'Password is required' elif User.query.filter_by(username=username, company_id=g.user.company_id).first(): error = 'Username already exists in your company' elif User.query.filter_by(email=email, company_id=g.user.company_id).first(): error = 'Email already registered in your company' if error is None: # Convert role string to enum try: role = Role[role_name] if role_name else Role.TEAM_MEMBER except KeyError: role = Role.TEAM_MEMBER # Create new user with role and team new_user = User( username=username, email=email, company_id=g.user.company_id, is_verified=auto_verify, role=role, team_id=team_id if team_id else None ) new_user.set_password(password) if not auto_verify: # Generate verification token and send email token = new_user.generate_verification_token() verification_url = url_for('verify_email', token=token, _external=True) msg = Message('Verify your TimeTrack account', recipients=[email]) msg.body = f'''Hello {username}, An administrator has created an account for you on TimeTrack. To activate your account, please click on the link below: {verification_url} This link will expire in 24 hours. Best regards, The TimeTrack Team ''' mail.send(msg) db.session.add(new_user) db.session.commit() if auto_verify: flash(f'User {username} created and automatically verified!', 'success') else: flash(f'User {username} created! Verification email sent.', 'success') return redirect(url_for('admin_users')) flash(error, 'error') # Get all teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).all() roles = get_available_roles() return render_template('create_user.html', title='Create User', teams=teams, roles=roles) @app.route('/admin/users/edit/', methods=['GET', 'POST']) @admin_required @company_required def edit_user(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') # Get role and team role_name = request.form.get('role') team_id = request.form.get('team_id') # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif username != user.username and User.query.filter_by(username=username, company_id=g.user.company_id).first(): error = 'Username already exists in your company' elif email != user.email and User.query.filter_by(email=email, company_id=g.user.company_id).first(): error = 'Email already registered in your company' if error is None: user.username = username user.email = email # Convert role string to enum try: user.role = Role[role_name] if role_name else Role.TEAM_MEMBER except KeyError: user.role = Role.TEAM_MEMBER user.team_id = team_id if team_id else None if password: user.set_password(password) db.session.commit() flash(f'User {username} updated successfully!', 'success') return redirect(url_for('admin_users')) flash(error, 'error') # Get all teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).all() roles = get_available_roles() return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles) @app.route('/admin/users/delete/', methods=['POST']) @admin_required @company_required def delete_user(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() # Prevent deleting yourself if user.id == session.get('user_id'): flash('You cannot delete your own account', 'error') return redirect(url_for('admin_users')) username = user.username try: # Handle dependent records before deleting user # Find an alternative admin/supervisor to transfer ownership to alternative_admin = User.query.filter( User.company_id == g.user.company_id, User.role.in_([Role.ADMIN, Role.SUPERVISOR]), User.id != user_id ).first() if alternative_admin: # Transfer ownership of projects to alternative admin Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of tasks to alternative admin Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of subtasks to alternative admin SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of project categories to alternative admin ProjectCategory.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) else: # No alternative admin found - redirect to company deletion confirmation flash('No other administrator or supervisor found. Company deletion required.', 'warning') return redirect(url_for('confirm_company_deletion', user_id=user_id)) # Delete user-specific records that can be safely removed TimeEntry.query.filter_by(user_id=user_id).delete() WorkConfig.query.filter_by(user_id=user_id).delete() UserPreferences.query.filter_by(user_id=user_id).delete() # Delete user dashboards (cascades to widgets) UserDashboard.query.filter_by(user_id=user_id).delete() # Clear task and subtask assignments Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None}) SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None}) # Now safe to delete the user db.session.delete(user) db.session.commit() flash(f'User {username} deleted successfully. Projects and tasks transferred to {alternative_admin.username}', 'success') except Exception as e: db.session.rollback() logger.error(f"Error deleting user {user_id}: {str(e)}") flash(f'Error deleting user: {str(e)}', 'error') return redirect(url_for('admin_users')) @app.route('/confirm-company-deletion/', methods=['GET', 'POST']) @login_required def confirm_company_deletion(user_id): """Show confirmation page for company deletion when no alternative admin exists""" # Only allow admin or system admin access if g.user.role not in [Role.ADMIN, Role.SYSTEM_ADMIN]: flash('Access denied: Admin privileges required', 'error') return redirect(url_for('index')) user = User.query.get_or_404(user_id) # For admin users, ensure they're in the same company if g.user.role == Role.ADMIN and user.company_id != g.user.company_id: flash('Access denied: You can only delete users in your company', 'error') return redirect(url_for('admin_users')) # Prevent deleting yourself if user.id == g.user.id: flash('You cannot delete your own account', 'error') return redirect(url_for('admin_users') if g.user.role != Role.SYSTEM_ADMIN else url_for('system_admin_users')) company = user.company # Verify no alternative admin exists alternative_admin = User.query.filter( User.company_id == company.id, User.role.in_([Role.ADMIN, Role.SUPERVISOR]), User.id != user_id ).first() if alternative_admin: flash('Alternative admin found. Regular user deletion should be used instead.', 'error') return redirect(url_for('admin_users') if g.user.role != Role.SYSTEM_ADMIN else url_for('system_admin_users')) if request.method == 'POST': # Verify company name confirmation company_name_confirm = request.form.get('company_name_confirm', '').strip() understand_deletion = request.form.get('understand_deletion') if company_name_confirm != company.name: flash('Company name confirmation does not match', 'error') return redirect(url_for('confirm_company_deletion', user_id=user_id)) if not understand_deletion: flash('You must confirm that you understand the consequences', 'error') return redirect(url_for('confirm_company_deletion', user_id=user_id)) try: # Perform cascade deletion company_name = company.name # Delete all company-related data in the correct order # First, clear foreign key references that could cause constraint violations # 1. Delete time entries (they reference tasks and users) TimeEntry.query.filter(TimeEntry.user_id.in_( db.session.query(User.id).filter(User.company_id == company.id) )).delete(synchronize_session=False) # 2. Delete user preferences and dashboards UserPreferences.query.filter(UserPreferences.user_id.in_( db.session.query(User.id).filter(User.company_id == company.id) )).delete(synchronize_session=False) UserDashboard.query.filter(UserDashboard.user_id.in_( db.session.query(User.id).filter(User.company_id == company.id) )).delete(synchronize_session=False) # 3. Delete work configs WorkConfig.query.filter(WorkConfig.user_id.in_( db.session.query(User.id).filter(User.company_id == company.id) )).delete(synchronize_session=False) # 4. Delete subtasks (they depend on tasks) SubTask.query.filter(SubTask.task_id.in_( db.session.query(Task.id).filter(Task.project_id.in_( db.session.query(Project.id).filter(Project.company_id == company.id) )) )).delete(synchronize_session=False) # 5. Delete tasks (now safe since subtasks are deleted) Task.query.filter(Task.project_id.in_( db.session.query(Project.id).filter(Project.company_id == company.id) )).delete(synchronize_session=False) # 6. Delete projects Project.query.filter_by(company_id=company.id).delete() # 7. Delete project categories ProjectCategory.query.filter_by(company_id=company.id).delete() # 8. Delete company work config CompanyWorkConfig.query.filter_by(company_id=company.id).delete() # 9. Delete teams Team.query.filter_by(company_id=company.id).delete() # 10. Delete users User.query.filter_by(company_id=company.id).delete() # 11. Delete system events for this company SystemEvent.query.filter_by(company_id=company.id).delete() # 12. Finally, delete the company itself db.session.delete(company) db.session.commit() flash(f'Company "{company_name}" and all associated data has been permanently deleted', 'success') # Log the deletion SystemEvent.log_event( event_type='company_deleted', description=f'Company "{company_name}" was deleted by {g.user.username} due to no alternative admin for user deletion', event_category='admin_action', severity='warning', user_id=g.user.id ) return redirect(url_for('system_admin_companies') if g.user.role == Role.SYSTEM_ADMIN else url_for('index')) except Exception as e: db.session.rollback() logger.error(f"Error deleting company {company.id}: {str(e)}") flash(f'Error deleting company: {str(e)}', 'error') return redirect(url_for('confirm_company_deletion', user_id=user_id)) # GET request - show confirmation page # Gather all data that will be deleted users = User.query.filter_by(company_id=company.id).all() teams = Team.query.filter_by(company_id=company.id).all() projects = Project.query.filter_by(company_id=company.id).all() categories = ProjectCategory.query.filter_by(company_id=company.id).all() # Get tasks for all projects in the company project_ids = [p.id for p in projects] tasks = Task.query.filter(Task.project_id.in_(project_ids)).all() if project_ids else [] # Count time entries user_ids = [u.id for u in users] time_entries_count = TimeEntry.query.filter(TimeEntry.user_id.in_(user_ids)).count() if user_ids else 0 # Calculate total hours total_duration = db.session.query(func.sum(TimeEntry.duration)).filter( TimeEntry.user_id.in_(user_ids) ).scalar() or 0 total_hours_tracked = round(total_duration / 3600, 2) if total_duration else 0 return render_template('confirm_company_deletion.html', user=user, company=company, users=users, teams=teams, projects=projects, categories=categories, tasks=tasks, time_entries_count=time_entries_count, total_hours_tracked=total_hours_tracked) @app.route('/profile', methods=['GET', 'POST']) @login_required def profile(): user = User.query.get(session['user_id']) if request.method == 'POST': email = request.form.get('email') current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') # Validate input error = None if not email: error = 'Email is required' elif email != user.email and User.query.filter_by(email=email).first(): error = 'Email already registered' # Password change validation if new_password: if not current_password: error = 'Current password is required to set a new password' elif not user.check_password(current_password): error = 'Current password is incorrect' elif new_password != confirm_password: error = 'New passwords do not match' if error is None: user.email = email if new_password: user.set_password(new_password) db.session.commit() flash('Profile updated successfully!', 'success') return redirect(url_for('profile')) flash(error, 'error') return render_template('profile.html', title='My Profile', user=user) @app.route('/2fa/setup', methods=['GET', 'POST']) @login_required def setup_2fa(): if request.method == 'POST': # Verify the TOTP code before enabling 2FA totp_code = request.form.get('totp_code') if not totp_code: flash('Please enter the verification code from your authenticator app.', 'error') return redirect(url_for('setup_2fa')) try: if g.user.verify_2fa_token(totp_code, allow_setup=True): g.user.two_factor_enabled = True db.session.commit() flash('Two-factor authentication has been successfully enabled!', 'success') return redirect(url_for('profile')) else: flash('Invalid verification code. Please make sure your device time is synchronized and try again.', 'error') return redirect(url_for('setup_2fa')) except Exception as e: logger.error(f"2FA setup error: {str(e)}") flash('An error occurred during 2FA setup. Please try again.', 'error') return redirect(url_for('setup_2fa')) # GET request - show setup page if g.user.two_factor_enabled: flash('Two-factor authentication is already enabled.', 'info') return redirect(url_for('profile')) # Generate secret if not exists if not g.user.two_factor_secret: g.user.generate_2fa_secret() db.session.commit() # Generate QR code import qrcode import io import base64 qr_uri = g.user.get_2fa_uri() qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(qr_uri) qr.make(fit=True) # Create QR code image qr_img = qr.make_image(fill_color="black", back_color="white") img_buffer = io.BytesIO() qr_img.save(img_buffer, format='PNG') img_buffer.seek(0) qr_code_b64 = base64.b64encode(img_buffer.getvalue()).decode() return render_template('setup_2fa.html', title='Setup Two-Factor Authentication', secret=g.user.two_factor_secret, qr_code=qr_code_b64) @app.route('/2fa/disable', methods=['POST']) @login_required def disable_2fa(): password = request.form.get('password') if not password or not g.user.check_password(password): flash('Please enter your correct password to disable 2FA.', 'error') return redirect(url_for('profile')) g.user.two_factor_enabled = False g.user.two_factor_secret = None db.session.commit() flash('Two-factor authentication has been disabled.', 'success') return redirect(url_for('profile')) @app.route('/2fa/verify', methods=['GET', 'POST']) def verify_2fa(): # Check if user is in 2FA verification state user_id = session.get('2fa_user_id') if not user_id: return redirect(url_for('login')) user = User.query.get(user_id) if not user or not user.two_factor_enabled: session.pop('2fa_user_id', None) return redirect(url_for('login')) if request.method == 'POST': totp_code = request.form.get('totp_code') if user.verify_2fa_token(totp_code): # Complete login process session.pop('2fa_user_id', None) session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value # Log successful 2FA login SystemEvent.log_event( 'user_login_2fa', f'User {user.username} logged in successfully with 2FA', 'auth', 'info', user_id=user.id, company_id=user.company_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) flash('Login successful!', 'success') return redirect(url_for('home')) else: # Log failed 2FA attempt SystemEvent.log_event( '2fa_failed', f'Failed 2FA verification for user {user.username}', 'auth', 'warning', user_id=user.id, company_id=user.company_id, ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) flash('Invalid verification code. Please try again.', 'error') return render_template('verify_2fa.html', title='Two-Factor Authentication') @app.route('/about') def about(): return render_template('about.html', title='About') @app.route('/contact', methods=['GET', 'POST']) @login_required def contact(): # redacted return render_template('contact.html', title='Contact') # We can keep this route as a redirect to home for backward compatibility @app.route('/timetrack') @login_required def timetrack(): return redirect(url_for('home')) @app.route('/api/arrive', methods=['POST']) @login_required def arrive(): # Get project and notes from request project_id = request.json.get('project_id') if request.json else None notes = request.json.get('notes') if request.json else None # Validate project access if project is specified if project_id: project = Project.query.get(project_id) if not project or not project.is_user_allowed(g.user): return jsonify({'error': 'Invalid or unauthorized project'}), 403 # Create a new time entry with arrival time for the current user new_entry = TimeEntry( user_id=g.user.id, arrival_time=datetime.now(), project_id=int(project_id) if project_id else None, notes=notes ) db.session.add(new_entry) db.session.commit() # Format response with user preferences from time_utils import format_datetime_by_preference, get_user_format_settings date_format, time_format_24h = get_user_format_settings(g.user) return jsonify({ 'id': new_entry.id, 'arrival_time': format_datetime_by_preference(new_entry.arrival_time, date_format, time_format_24h), 'project': { 'id': new_entry.project.id, 'code': new_entry.project.code, 'name': new_entry.project.name } if new_entry.project else None, 'notes': new_entry.notes }) @app.route('/api/leave/', methods=['POST']) @login_required def leave(entry_id): # Find the time entry for the current user entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() # Set the departure time departure_time = datetime.now() # Apply time rounding if enabled rounded_arrival, rounded_departure = apply_time_rounding(entry.arrival_time, departure_time, g.user) entry.arrival_time = rounded_arrival entry.departure_time = rounded_departure # If currently paused, add the final break duration if entry.is_paused and entry.pause_start_time: final_break_duration = int((rounded_departure - entry.pause_start_time).total_seconds()) entry.total_break_duration += final_break_duration entry.is_paused = False entry.pause_start_time = None # Apply rounding to break duration if enabled interval_minutes, round_to_nearest = get_user_rounding_settings(g.user) if interval_minutes > 0: entry.total_break_duration = round_duration_to_interval( entry.total_break_duration, interval_minutes, round_to_nearest ) # Calculate work duration considering breaks entry.duration, effective_break = calculate_work_duration( rounded_arrival, rounded_departure, entry.total_break_duration, g.user ) db.session.commit() return jsonify({ 'id': entry.id, 'arrival_time': entry.arrival_time.isoformat(), 'departure_time': entry.departure_time.isoformat(), 'duration': entry.duration, 'total_break_duration': entry.total_break_duration, 'effective_break_duration': effective_break }) # Add this new route to handle pausing/resuming @app.route('/api/toggle-pause/', methods=['POST']) @login_required def toggle_pause(entry_id): # Find the time entry for the current user entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() now = datetime.now() if entry.is_paused: # Resuming work - calculate break duration break_duration = int((now - entry.pause_start_time).total_seconds()) entry.total_break_duration += break_duration entry.is_paused = False entry.pause_start_time = None message = "Work resumed" else: # Pausing work entry.is_paused = True entry.pause_start_time = now message = "Work paused" db.session.commit() return jsonify({ 'id': entry.id, 'is_paused': entry.is_paused, 'total_break_duration': entry.total_break_duration, 'message': message }) @app.route('/config', methods=['GET', 'POST']) @login_required def config(): # Get user preferences or create default if none exists preferences = UserPreferences.query.filter_by(user_id=g.user.id).first() if not preferences: preferences = UserPreferences(user_id=g.user.id) db.session.add(preferences) db.session.commit() if request.method == 'POST': try: # Update only user preferences (no company policies) preferences.time_format_24h = 'time_format_24h' in request.form preferences.date_format = request.form.get('date_format', 'ISO') preferences.time_rounding_minutes = int(request.form.get('time_rounding_minutes', 0)) preferences.round_to_nearest = 'round_to_nearest' in request.form db.session.commit() flash('Preferences updated successfully!', 'success') return redirect(url_for('config')) except ValueError: flash('Please enter valid values for all fields', 'error') # Get company work policies for display (read-only) company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first() # Import time utils for display options from time_utils import get_available_rounding_options, get_available_date_formats rounding_options = get_available_rounding_options() date_format_options = get_available_date_formats() return render_template('config.html', title='User Preferences', preferences=preferences, company_config=company_config, rounding_options=rounding_options, date_format_options=date_format_options) @app.route('/api/delete/', methods=['DELETE']) @login_required def delete_entry(entry_id): entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() db.session.delete(entry) db.session.commit() return jsonify({'success': True, 'message': 'Entry deleted successfully'}) @app.route('/api/update/', methods=['PUT']) @login_required def update_entry(entry_id): entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() data = request.json if not data: return jsonify({'success': False, 'message': 'No JSON data provided'}), 400 if 'arrival_time' in data: try: # Accept only ISO 8601 format arrival_time_str = data['arrival_time'] entry.arrival_time = datetime.fromisoformat(arrival_time_str.replace('Z', '+00:00')) except (ValueError, AttributeError) as e: return jsonify({'success': False, 'message': f'Invalid arrival time format. Expected ISO 8601: {arrival_time_str}'}), 400 if 'departure_time' in data and data['departure_time']: try: # Accept only ISO 8601 format departure_time_str = data['departure_time'] entry.departure_time = datetime.fromisoformat(departure_time_str.replace('Z', '+00:00')) # Recalculate duration if both times are present if entry.arrival_time and entry.departure_time: # Calculate work duration considering breaks entry.duration, _ = calculate_work_duration( entry.arrival_time, entry.departure_time, entry.total_break_duration, g.user ) except (ValueError, AttributeError) as e: return jsonify({'success': False, 'message': f'Invalid departure time format. Expected ISO 8601: {departure_time_str}'}), 400 db.session.commit() return jsonify({ 'success': True, 'message': 'Entry updated successfully', 'entry': { 'id': entry.id, 'arrival_time': entry.arrival_time.isoformat(), 'departure_time': entry.departure_time.isoformat() if entry.departure_time else None, 'duration': entry.duration, 'is_paused': entry.is_paused, 'total_break_duration': entry.total_break_duration } }) def calculate_work_duration(arrival_time, departure_time, total_break_duration, user): """ Calculate work duration considering both configured and actual break times. Args: arrival_time: Datetime of arrival departure_time: Datetime of departure total_break_duration: Actual logged break duration in seconds user: User object to get company configuration Returns: tuple: (work_duration_in_seconds, effective_break_duration_in_seconds) """ # Calculate raw duration raw_duration = (departure_time - arrival_time).total_seconds() # Get company work configuration for break rules company_config = CompanyWorkConfig.query.filter_by(company_id=user.company_id).first() if not company_config: # Use Germany defaults if no company config exists preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY) break_threshold_hours = preset['break_threshold_hours'] mandatory_break_minutes = preset['mandatory_break_minutes'] additional_break_threshold_hours = preset['additional_break_threshold_hours'] additional_break_minutes = preset['additional_break_minutes'] else: break_threshold_hours = company_config.break_threshold_hours mandatory_break_minutes = company_config.mandatory_break_minutes additional_break_threshold_hours = company_config.additional_break_threshold_hours additional_break_minutes = company_config.additional_break_minutes # Calculate mandatory breaks based on work duration work_hours = raw_duration / 3600 # Convert seconds to hours configured_break_seconds = 0 # Apply primary break if work duration exceeds threshold if work_hours > break_threshold_hours: configured_break_seconds += mandatory_break_minutes * 60 # Apply additional break if work duration exceeds additional threshold if work_hours > additional_break_threshold_hours: configured_break_seconds += additional_break_minutes * 60 # Use the greater of configured breaks or actual logged breaks effective_break_duration = max(configured_break_seconds, total_break_duration) # Calculate final work duration work_duration = int(raw_duration - effective_break_duration) return work_duration, effective_break_duration @app.route('/api/resume/', methods=['POST']) @login_required def resume_entry(entry_id): # Find the entry to resume for the current user entry_to_resume = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() # Check if there's already an active entry active_entry = TimeEntry.query.filter_by(user_id=session['user_id'], departure_time=None).first() if active_entry: return jsonify({ 'success': False, 'message': 'Cannot resume this entry. Another session is already active.' }), 400 # Clear the departure time to make this entry active again entry_to_resume.departure_time = None # Reset pause state if it was paused entry_to_resume.is_paused = False entry_to_resume.pause_start_time = None db.session.commit() return jsonify({ 'success': True, 'message': 'Work resumed on existing entry', 'id': entry_to_resume.id, 'arrival_time': entry_to_resume.arrival_time.isoformat(), 'total_break_duration': entry_to_resume.total_break_duration }) @app.route('/api/manual-entry', methods=['POST']) @login_required def manual_entry(): try: data = request.get_json() # Extract data from request project_id = data.get('project_id') start_date = data.get('start_date') start_time = data.get('start_time') end_date = data.get('end_date') end_time = data.get('end_time') break_minutes = int(data.get('break_minutes', 0)) notes = data.get('notes', '') # Validate required fields if not all([start_date, start_time, end_date, end_time]): return jsonify({'error': 'Start and end date/time are required'}), 400 # Parse datetime strings try: arrival_datetime = datetime.strptime(f"{start_date} {start_time}", '%Y-%m-%d %H:%M:%S') departure_datetime = datetime.strptime(f"{end_date} {end_time}", '%Y-%m-%d %H:%M:%S') except ValueError: try: # Try without seconds if parsing fails arrival_datetime = datetime.strptime(f"{start_date} {start_time}:00", '%Y-%m-%d %H:%M:%S') departure_datetime = datetime.strptime(f"{end_date} {end_time}:00", '%Y-%m-%d %H:%M:%S') except ValueError: return jsonify({'error': 'Invalid date/time format'}), 400 # Validate that end time is after start time if departure_datetime <= arrival_datetime: return jsonify({'error': 'End time must be after start time'}), 400 # Apply time rounding if enabled rounded_arrival, rounded_departure = apply_time_rounding(arrival_datetime, departure_datetime, g.user) # Validate project access if project is specified if project_id: project = Project.query.get(project_id) if not project or not project.is_user_allowed(g.user): return jsonify({'error': 'Invalid or unauthorized project'}), 403 # Check for overlapping entries for this user (using rounded times) overlapping_entry = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.departure_time.isnot(None), TimeEntry.arrival_time < rounded_departure, TimeEntry.departure_time > rounded_arrival ).first() if overlapping_entry: return jsonify({ 'error': 'This time entry overlaps with an existing entry' }), 400 # Calculate total duration in seconds (using rounded times) total_duration = int((rounded_departure - rounded_arrival).total_seconds()) break_duration_seconds = break_minutes * 60 # Apply rounding to break duration if enabled interval_minutes, round_to_nearest = get_user_rounding_settings(g.user) if interval_minutes > 0: break_duration_seconds = round_duration_to_interval( break_duration_seconds, interval_minutes, round_to_nearest ) # Validate break duration doesn't exceed total duration if break_duration_seconds >= total_duration: return jsonify({'error': 'Break duration cannot exceed total work duration'}), 400 # Calculate work duration (total duration minus breaks) work_duration = total_duration - break_duration_seconds # Create the manual time entry (using rounded times) new_entry = TimeEntry( user_id=g.user.id, arrival_time=rounded_arrival, departure_time=rounded_departure, duration=work_duration, total_break_duration=break_duration_seconds, project_id=int(project_id) if project_id else None, notes=notes, is_paused=False, pause_start_time=None ) db.session.add(new_entry) db.session.commit() return jsonify({ 'success': True, 'message': 'Manual time entry added successfully', 'entry_id': new_entry.id }) except Exception as e: logger.error(f"Error creating manual time entry: {str(e)}") db.session.rollback() return jsonify({'error': 'An error occurred while creating the time entry'}), 500 @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(500) def internal_server_error(e): return render_template('500.html'), 500 @app.route('/test') def test(): return "App is working!" @app.route('/admin/users/toggle-status/') @admin_required @company_required def toggle_user_status(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() # Prevent blocking yourself if user.id == session.get('user_id'): flash('You cannot block your own account', 'error') return redirect(url_for('admin_users')) # Toggle the blocked status user.is_blocked = not user.is_blocked db.session.commit() if user.is_blocked: flash(f'User {user.username} has been blocked', 'success') else: flash(f'User {user.username} has been unblocked', 'success') return redirect(url_for('admin_users')) # Add this route to manage system settings @app.route('/admin/settings', methods=['GET', 'POST']) @admin_required def admin_settings(): if request.method == 'POST': # Update registration setting registration_enabled = 'registration_enabled' in request.form reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first() if reg_setting: reg_setting.value = 'true' if registration_enabled else 'false' # Update email verification setting email_verification_required = 'email_verification_required' in request.form email_setting = SystemSettings.query.filter_by(key='email_verification_required').first() if email_setting: email_setting.value = 'true' if email_verification_required else 'false' db.session.commit() flash('System settings updated successfully!', 'success') # Get current settings settings = {} for setting in SystemSettings.query.all(): if setting.key == 'registration_enabled': settings['registration_enabled'] = setting.value == 'true' elif setting.key == 'email_verification_required': settings['email_verification_required'] = setting.value == 'true' return render_template('admin_settings.html', title='System Settings', settings=settings) @app.route('/system-admin/dashboard') @system_admin_required def system_admin_dashboard(): """System Administrator Dashboard - view all data across companies""" # Global statistics total_companies = Company.query.count() total_users = User.query.count() total_teams = Team.query.count() total_projects = Project.query.count() total_time_entries = TimeEntry.query.count() # System admin count system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() regular_admins = User.query.filter_by(role=Role.ADMIN).count() # Recent activity (last 7 days) from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) recent_users = User.query.filter(User.created_at >= week_ago).count() recent_companies = Company.query.filter(Company.created_at >= week_ago).count() recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= week_ago).count() # Top companies by user count top_companies = db.session.query( Company.name, Company.id, db.func.count(User.id).label('user_count') ).join(User).group_by(Company.id).order_by(db.func.count(User.id).desc()).limit(5).all() # Recent companies recent_companies_list = Company.query.order_by(Company.created_at.desc()).limit(5).all() # System health checks orphaned_users = User.query.filter_by(company_id=None).count() orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count() blocked_users = User.query.filter_by(is_blocked=True).count() return render_template('system_admin_dashboard.html', title='System Administrator Dashboard', total_companies=total_companies, total_users=total_users, total_teams=total_teams, total_projects=total_projects, total_time_entries=total_time_entries, system_admins=system_admins, regular_admins=regular_admins, recent_users=recent_users, recent_companies=recent_companies, recent_time_entries=recent_time_entries, top_companies=top_companies, recent_companies_list=recent_companies_list, orphaned_users=orphaned_users, orphaned_time_entries=orphaned_time_entries, blocked_users=blocked_users) @app.route('/system-admin/users') @system_admin_required def system_admin_users(): """System Admin: View all users across all companies""" filter_type = request.args.get('filter', '') page = request.args.get('page', 1, type=int) per_page = 50 # Build query based on filter query = User.query if filter_type == 'blocked': query = query.filter_by(is_blocked=True) elif filter_type == 'system_admins': query = query.filter_by(role=Role.SYSTEM_ADMIN) elif filter_type == 'admins': query = query.filter_by(role=Role.ADMIN) elif filter_type == 'unverified': query = query.filter_by(is_verified=False) elif filter_type == 'freelancers': query = query.filter_by(account_type=AccountType.FREELANCER) # Add company join for display query = query.join(Company).add_columns(Company.name.label('company_name')) # Order by creation date (newest first) query = query.order_by(User.created_at.desc()) # Paginate results users = query.paginate(page=page, per_page=per_page, error_out=False) return render_template('system_admin_users.html', title='System Admin - All Users', users=users, current_filter=filter_type) @app.route('/system-admin/users//edit', methods=['GET', 'POST']) @system_admin_required def system_admin_edit_user(user_id): """System Admin: Edit any user across companies""" user = User.query.get_or_404(user_id) if request.method == 'POST': # Get form data username = request.form.get('username') email = request.form.get('email') role = request.form.get('role') is_blocked = request.form.get('is_blocked') == 'on' is_verified = request.form.get('is_verified') == 'on' company_id = request.form.get('company_id') team_id = request.form.get('team_id') or None # Validation error = None # Check if username is unique within the company existing_user = User.query.filter( User.username == username, User.company_id == company_id, User.id != user_id ).first() if existing_user: error = f'Username "{username}" is already taken in this company.' # Check if email is unique within the company existing_email = User.query.filter( User.email == email, User.company_id == company_id, User.id != user_id ).first() if existing_email: error = f'Email "{email}" is already registered in this company.' if not error: # Update user user.username = username user.email = email user.role = Role(role) user.is_blocked = is_blocked user.is_verified = is_verified user.company_id = company_id user.team_id = team_id db.session.commit() flash(f'User {username} updated successfully.', 'success') return redirect(url_for('system_admin_users')) flash(error, 'error') # Get all companies and teams for form dropdowns companies = Company.query.order_by(Company.name).all() teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all() roles = get_available_roles() return render_template('system_admin_edit_user.html', title=f'Edit User: {user.username}', user=user, companies=companies, teams=teams, roles=roles) @app.route('/system-admin/users//delete', methods=['POST']) @system_admin_required def system_admin_delete_user(user_id): """System Admin: Delete any user (with safety checks)""" user = User.query.get_or_404(user_id) # Safety check: prevent deleting the last system admin if user.role == Role.SYSTEM_ADMIN: system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() if system_admin_count <= 1: flash('Cannot delete the last system administrator.', 'error') return redirect(url_for('system_admin_users')) # Safety check: prevent deleting yourself if user.id == g.user.id: flash('Cannot delete your own account.', 'error') return redirect(url_for('system_admin_users')) username = user.username company_name = user.company.name if user.company else 'Unknown' try: # Handle dependent records before deleting user # Find an alternative admin/supervisor in the same company to transfer ownership to alternative_admin = User.query.filter( User.company_id == user.company_id, User.role.in_([Role.ADMIN, Role.SUPERVISOR]), User.id != user_id ).first() if alternative_admin: # Transfer ownership of projects to alternative admin Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of tasks to alternative admin Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of subtasks to alternative admin SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) # Transfer ownership of project categories to alternative admin ProjectCategory.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id}) else: # No alternative admin found - redirect to company deletion confirmation flash('No other administrator or supervisor found in the same company. Company deletion required.', 'warning') return redirect(url_for('confirm_company_deletion', user_id=user_id)) # Delete user-specific records that can be safely removed TimeEntry.query.filter_by(user_id=user_id).delete() WorkConfig.query.filter_by(user_id=user_id).delete() UserPreferences.query.filter_by(user_id=user_id).delete() # Delete user dashboards (cascades to widgets) UserDashboard.query.filter_by(user_id=user_id).delete() # Clear task and subtask assignments Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None}) SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None}) # Now safe to delete the user db.session.delete(user) db.session.commit() flash(f'User "{username}" from company "{company_name}" has been deleted. Projects and tasks transferred to {alternative_admin.username}', 'success') except Exception as e: db.session.rollback() logger.error(f"Error deleting user {user_id}: {str(e)}") flash(f'Error deleting user: {str(e)}', 'error') return redirect(url_for('system_admin_users')) @app.route('/system-admin/companies') @system_admin_required def system_admin_companies(): """System Admin: View all companies""" page = request.args.get('page', 1, type=int) per_page = 20 companies = Company.query.order_by(Company.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False) # Get user counts for each company company_stats = {} for company in companies.items: user_count = User.query.filter_by(company_id=company.id).count() admin_count = User.query.filter( User.company_id == company.id, User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN]) ).count() company_stats[company.id] = { 'user_count': user_count, 'admin_count': admin_count } return render_template('system_admin_companies.html', title='System Admin - All Companies', companies=companies, company_stats=company_stats) @app.route('/system-admin/companies/') @system_admin_required def system_admin_company_detail(company_id): """System Admin: View detailed company information""" company = Company.query.get_or_404(company_id) # Get company statistics users = User.query.filter_by(company_id=company.id).all() teams = Team.query.filter_by(company_id=company.id).all() projects = Project.query.filter_by(company_id=company.id).all() # Recent activity from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) recent_time_entries = TimeEntry.query.join(User).filter( User.company_id == company.id, TimeEntry.arrival_time >= week_ago ).count() # Role distribution role_counts = {} for role in Role: count = User.query.filter_by(company_id=company.id, role=role).count() if count > 0: role_counts[role.value] = count return render_template('system_admin_company_detail.html', title=f'Company: {company.name}', company=company, users=users, teams=teams, projects=projects, recent_time_entries=recent_time_entries, role_counts=role_counts) @app.route('/system-admin/time-entries') @system_admin_required def system_admin_time_entries(): """System Admin: View time entries across all companies""" page = request.args.get('page', 1, type=int) company_filter = request.args.get('company', '') per_page = 50 # Build query query = TimeEntry.query.join(User).join(Company) if company_filter: query = query.filter(Company.id == company_filter) # Add columns for display query = query.add_columns( User.username, Company.name.label('company_name'), Project.name.label('project_name') ).outerjoin(Project) # Order by arrival time (newest first) query = query.order_by(TimeEntry.arrival_time.desc()) # Paginate entries = query.paginate(page=page, per_page=per_page, error_out=False) # Get companies for filter dropdown companies = Company.query.order_by(Company.name).all() return render_template('system_admin_time_entries.html', title='System Admin - Time Entries', entries=entries, companies=companies, current_company=company_filter) @app.route('/system-admin/settings', methods=['GET', 'POST']) @system_admin_required def system_admin_settings(): """System Admin: Global system settings""" if request.method == 'POST': # Update system settings registration_enabled = request.form.get('registration_enabled') == 'on' email_verification = request.form.get('email_verification_required') == 'on' tracking_script_enabled = request.form.get('tracking_script_enabled') == 'on' tracking_script_code = request.form.get('tracking_script_code', '') # Update or create settings reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first() if reg_setting: reg_setting.value = 'true' if registration_enabled else 'false' else: reg_setting = SystemSettings( key='registration_enabled', value='true' if registration_enabled else 'false', description='Controls whether new user registration is allowed' ) db.session.add(reg_setting) email_setting = SystemSettings.query.filter_by(key='email_verification_required').first() if email_setting: email_setting.value = 'true' if email_verification else 'false' else: email_setting = SystemSettings( key='email_verification_required', value='true' if email_verification else 'false', description='Controls whether email verification is required for new accounts' ) db.session.add(email_setting) tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first() if tracking_enabled_setting: tracking_enabled_setting.value = 'true' if tracking_script_enabled else 'false' else: tracking_enabled_setting = SystemSettings( key='tracking_script_enabled', value='true' if tracking_script_enabled else 'false', description='Controls whether custom tracking script is enabled' ) db.session.add(tracking_enabled_setting) tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first() if tracking_code_setting: tracking_code_setting.value = tracking_script_code else: tracking_code_setting = SystemSettings( key='tracking_script_code', value=tracking_script_code, description='Custom tracking script code (HTML/JavaScript)' ) db.session.add(tracking_code_setting) db.session.commit() flash('System settings updated successfully.', 'success') return redirect(url_for('system_admin_settings')) # Get current settings settings = {} all_settings = SystemSettings.query.all() for setting in all_settings: if setting.key == 'registration_enabled': settings['registration_enabled'] = setting.value == 'true' elif setting.key == 'email_verification_required': settings['email_verification_required'] = setting.value == 'true' elif setting.key == 'tracking_script_enabled': settings['tracking_script_enabled'] = setting.value == 'true' elif setting.key == 'tracking_script_code': settings['tracking_script_code'] = setting.value # System statistics total_companies = Company.query.count() total_users = User.query.count() total_system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() return render_template('system_admin_settings.html', title='System Administrator Settings', settings=settings, total_companies=total_companies, total_users=total_users, total_system_admins=total_system_admins) @app.route('/system-admin/health') @system_admin_required def system_admin_health(): """System Admin: System health check and event log""" # Get system health summary health_summary = SystemEvent.get_system_health_summary() # Get recent events (last 7 days) recent_events = SystemEvent.get_recent_events(days=7, limit=100) # Get events by severity for quick stats errors = SystemEvent.get_events_by_severity('error', days=7, limit=20) warnings = SystemEvent.get_events_by_severity('warning', days=7, limit=20) # System metrics from datetime import datetime, timedelta now = datetime.now() # Database connection test db_healthy = True db_error = None try: db.session.execute('SELECT 1') except Exception as e: db_healthy = False db_error = str(e) SystemEvent.log_event( 'database_check_failed', f'Database health check failed: {str(e)}', 'system', 'error' ) # Application uptime (approximate based on first event) first_event = SystemEvent.query.order_by(SystemEvent.timestamp.asc()).first() uptime_start = first_event.timestamp if first_event else now uptime_duration = now - uptime_start # Recent activity stats today = now.date() today_events = SystemEvent.query.filter( func.date(SystemEvent.timestamp) == today ).count() # Log the health check SystemEvent.log_event( 'system_health_check', f'System health check performed by {session.get("username", "unknown")}', 'system', 'info', user_id=session.get('user_id'), ip_address=request.remote_addr, user_agent=request.headers.get('User-Agent') ) return render_template('system_admin_health.html', title='System Health Check', health_summary=health_summary, recent_events=recent_events, errors=errors, warnings=warnings, db_healthy=db_healthy, db_error=db_error, uptime_duration=uptime_duration, today_events=today_events) @app.route('/system-admin/announcements') @system_admin_required def system_admin_announcements(): """System Admin: Manage announcements""" page = request.args.get('page', 1, type=int) per_page = 20 announcements = Announcement.query.order_by(Announcement.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False) return render_template('system_admin_announcements.html', title='System Admin - Announcements', announcements=announcements) @app.route('/system-admin/announcements/new', methods=['GET', 'POST']) @system_admin_required def system_admin_announcement_new(): """System Admin: Create new announcement""" if request.method == 'POST': title = request.form.get('title') content = request.form.get('content') announcement_type = request.form.get('announcement_type', 'info') is_urgent = request.form.get('is_urgent') == 'on' is_active = request.form.get('is_active') == 'on' # Handle date fields start_date = request.form.get('start_date') end_date = request.form.get('end_date') start_datetime = None end_datetime = None if start_date: try: start_datetime = datetime.strptime(start_date, '%Y-%m-%dT%H:%M') except ValueError: pass if end_date: try: end_datetime = datetime.strptime(end_date, '%Y-%m-%dT%H:%M') except ValueError: pass # Handle targeting target_all_users = request.form.get('target_all_users') == 'on' target_roles = None target_companies = None if not target_all_users: selected_roles = request.form.getlist('target_roles') selected_companies = request.form.getlist('target_companies') if selected_roles: import json target_roles = json.dumps(selected_roles) if selected_companies: import json target_companies = json.dumps([int(c) for c in selected_companies]) announcement = Announcement( title=title, content=content, announcement_type=announcement_type, is_urgent=is_urgent, is_active=is_active, start_date=start_datetime, end_date=end_datetime, target_all_users=target_all_users, target_roles=target_roles, target_companies=target_companies, created_by_id=g.user.id ) db.session.add(announcement) db.session.commit() flash('Announcement created successfully.', 'success') return redirect(url_for('system_admin_announcements')) # Get roles and companies for targeting options roles = [role.value for role in Role] companies = Company.query.order_by(Company.name).all() return render_template('system_admin_announcement_form.html', title='Create Announcement', announcement=None, roles=roles, companies=companies) @app.route('/system-admin/announcements//edit', methods=['GET', 'POST']) @system_admin_required def system_admin_announcement_edit(id): """System Admin: Edit announcement""" announcement = Announcement.query.get_or_404(id) if request.method == 'POST': announcement.title = request.form.get('title') announcement.content = request.form.get('content') announcement.announcement_type = request.form.get('announcement_type', 'info') announcement.is_urgent = request.form.get('is_urgent') == 'on' announcement.is_active = request.form.get('is_active') == 'on' # Handle date fields start_date = request.form.get('start_date') end_date = request.form.get('end_date') if start_date: try: announcement.start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M') except ValueError: announcement.start_date = None else: announcement.start_date = None if end_date: try: announcement.end_date = datetime.strptime(end_date, '%Y-%m-%dT%H:%M') except ValueError: announcement.end_date = None else: announcement.end_date = None # Handle targeting announcement.target_all_users = request.form.get('target_all_users') == 'on' if not announcement.target_all_users: selected_roles = request.form.getlist('target_roles') selected_companies = request.form.getlist('target_companies') if selected_roles: import json announcement.target_roles = json.dumps(selected_roles) else: announcement.target_roles = None if selected_companies: import json announcement.target_companies = json.dumps([int(c) for c in selected_companies]) else: announcement.target_companies = None else: announcement.target_roles = None announcement.target_companies = None announcement.updated_at = datetime.now() db.session.commit() flash('Announcement updated successfully.', 'success') return redirect(url_for('system_admin_announcements')) # Get roles and companies for targeting options roles = [role.value for role in Role] companies = Company.query.order_by(Company.name).all() return render_template('system_admin_announcement_form.html', title='Edit Announcement', announcement=announcement, roles=roles, companies=companies) @app.route('/system-admin/announcements//delete', methods=['POST']) @system_admin_required def system_admin_announcement_delete(id): """System Admin: Delete announcement""" announcement = Announcement.query.get_or_404(id) db.session.delete(announcement) db.session.commit() flash('Announcement deleted successfully.', 'success') return redirect(url_for('system_admin_announcements')) @app.route('/admin/work-policies', methods=['GET', 'POST']) @admin_required @company_required def admin_work_policies(): # Get or create company work config work_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first() if not work_config: # Create default config for the company preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY) work_config = CompanyWorkConfig( company_id=g.user.company_id, work_hours_per_day=preset['work_hours_per_day'], mandatory_break_minutes=preset['mandatory_break_minutes'], break_threshold_hours=preset['break_threshold_hours'], additional_break_minutes=preset['additional_break_minutes'], additional_break_threshold_hours=preset['additional_break_threshold_hours'], region=WorkRegion.GERMANY, region_name=preset['region_name'], created_by_id=g.user.id ) db.session.add(work_config) db.session.commit() if request.method == 'POST': try: # Handle regional preset selection if request.form.get('action') == 'apply_preset': region_code = request.form.get('region_preset') if region_code: region = WorkRegion(region_code) preset = CompanyWorkConfig.get_regional_preset(region) work_config.work_hours_per_day = preset['work_hours_per_day'] work_config.mandatory_break_minutes = preset['mandatory_break_minutes'] work_config.break_threshold_hours = preset['break_threshold_hours'] work_config.additional_break_minutes = preset['additional_break_minutes'] work_config.additional_break_threshold_hours = preset['additional_break_threshold_hours'] work_config.region = region work_config.region_name = preset['region_name'] db.session.commit() flash(f'Applied {preset["region_name"]} work policy preset', 'success') return redirect(url_for('admin_work_policies')) # Handle manual configuration update else: work_config.work_hours_per_day = float(request.form.get('work_hours_per_day', 8.0)) work_config.mandatory_break_minutes = int(request.form.get('mandatory_break_minutes', 30)) work_config.break_threshold_hours = float(request.form.get('break_threshold_hours', 6.0)) work_config.additional_break_minutes = int(request.form.get('additional_break_minutes', 15)) work_config.additional_break_threshold_hours = float(request.form.get('additional_break_threshold_hours', 9.0)) work_config.region = WorkRegion.CUSTOM work_config.region_name = 'Custom Configuration' db.session.commit() flash('Work policies updated successfully!', 'success') return redirect(url_for('admin_work_policies')) except ValueError: flash('Please enter valid numbers for all fields', 'error') # Get available regional presets regional_presets = [] for region in WorkRegion: preset = CompanyWorkConfig.get_regional_preset(region) regional_presets.append({ 'code': region.value, 'name': preset['region_name'], 'description': f"{preset['work_hours_per_day']}h/day, {preset['mandatory_break_minutes']}min break after {preset['break_threshold_hours']}h" }) return render_template('admin_work_policies.html', title='Work Policies', work_config=work_config, regional_presets=regional_presets, WorkRegion=WorkRegion) # Company Management Routes @app.route('/admin/company') @admin_required @company_required def admin_company(): """View and manage company settings""" company = g.company # Get company statistics stats = { 'total_users': User.query.filter_by(company_id=company.id).count(), 'total_teams': Team.query.filter_by(company_id=company.id).count(), 'total_projects': Project.query.filter_by(company_id=company.id).count(), 'active_projects': Project.query.filter_by(company_id=company.id, is_active=True).count(), } return render_template('admin_company.html', title='Company Management', company=company, stats=stats) @app.route('/admin/company/edit', methods=['GET', 'POST']) @admin_required @company_required def edit_company(): """Edit company details""" company = g.company if request.method == 'POST': name = request.form.get('name') description = request.form.get('description', '') max_users = request.form.get('max_users') is_active = 'is_active' in request.form # Validate input error = None if not name: error = 'Company name is required' elif name != company.name and Company.query.filter_by(name=name).first(): error = 'Company name already exists' if max_users: try: max_users = int(max_users) if max_users < 1: error = 'Maximum users must be at least 1' except ValueError: error = 'Maximum users must be a valid number' else: max_users = None if error is None: company.name = name company.description = description company.max_users = max_users company.is_active = is_active db.session.commit() flash('Company details updated successfully!', 'success') return redirect(url_for('admin_company')) else: flash(error, 'error') return render_template('edit_company.html', title='Edit Company', company=company) @app.route('/admin/company/users') @admin_required @company_required def company_users(): """List all users in the company with detailed information""" users = User.query.filter_by(company_id=g.company.id).order_by(User.created_at.desc()).all() # Calculate user statistics user_stats = { 'total': len(users), 'verified': len([u for u in users if u.is_verified]), 'unverified': len([u for u in users if not u.is_verified]), 'blocked': len([u for u in users if u.is_blocked]), 'active': len([u for u in users if not u.is_blocked and u.is_verified]), 'admins': len([u for u in users if u.role == Role.ADMIN]), 'supervisors': len([u for u in users if u.role == Role.SUPERVISOR]), 'team_leaders': len([u for u in users if u.role == Role.TEAM_LEADER]), 'team_members': len([u for u in users if u.role == Role.TEAM_MEMBER]), } return render_template('company_users.html', title='Company Users', users=users, stats=user_stats, company=g.company) # Add these routes for team management @app.route('/admin/teams') @admin_required @company_required def admin_teams(): teams = Team.query.filter_by(company_id=g.user.company_id).all() return render_template('admin_teams.html', title='Team Management', teams=teams) @app.route('/admin/teams/create', methods=['GET', 'POST']) @admin_required @company_required def create_team(): if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: new_team = Team(name=name, description=description, company_id=g.user.company_id) db.session.add(new_team) db.session.commit() flash(f'Team "{name}" created successfully!', 'success') return redirect(url_for('admin_teams')) flash(error, 'error') return render_template('create_team.html', title='Create Team') @app.route('/admin/teams/edit/', methods=['GET', 'POST']) @admin_required @company_required def edit_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: team.name = name team.description = description db.session.commit() flash(f'Team "{name}" updated successfully!', 'success') return redirect(url_for('admin_teams')) flash(error, 'error') return render_template('edit_team.html', title='Edit Team', team=team) @app.route('/admin/teams/delete/', methods=['POST']) @admin_required @company_required def delete_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() # Check if team has members if team.users: flash('Cannot delete team with members. Remove all members first.', 'error') return redirect(url_for('admin_teams')) team_name = team.name db.session.delete(team) db.session.commit() flash(f'Team "{team_name}" deleted successfully!', 'success') return redirect(url_for('admin_teams')) @app.route('/admin/teams/', methods=['GET', 'POST']) @admin_required @company_required def manage_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': action = request.form.get('action') if action == 'update_team': # Update team details name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: team.name = name team.description = description db.session.commit() flash(f'Team "{name}" updated successfully!', 'success') else: flash(error, 'error') elif action == 'add_member': # Add user to team user_id = request.form.get('user_id') if user_id: user = User.query.get(user_id) if user: user.team_id = team.id db.session.commit() flash(f'User {user.username} added to team!', 'success') else: flash('User not found', 'error') else: flash('No user selected', 'error') elif action == 'remove_member': # Remove user from team user_id = request.form.get('user_id') if user_id: user = User.query.get(user_id) if user and user.team_id == team.id: user.team_id = None db.session.commit() flash(f'User {user.username} removed from team!', 'success') else: flash('User not found or not in this team', 'error') else: flash('No user selected', 'error') # Get team members team_members = User.query.filter_by(team_id=team.id).all() # Get users not in this team for the add member form (company-scoped) available_users = User.query.filter( User.company_id == g.user.company_id, (User.team_id != team.id) | (User.team_id == None) ).all() return render_template( 'manage_team.html', title=f'Manage Team: {team.name}', team=team, team_members=team_members, available_users=available_users ) # Project Management Routes @app.route('/admin/projects') @role_required(Role.SUPERVISOR) # Supervisors and Admins can manage projects @company_required def admin_projects(): projects = Project.query.filter_by(company_id=g.user.company_id).order_by(Project.created_at.desc()).all() categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all() return render_template('admin_projects.html', title='Project Management', projects=projects, categories=categories) @app.route('/admin/projects/create', methods=['GET', 'POST']) @role_required(Role.SUPERVISOR) @company_required def create_project(): if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') code = request.form.get('code') team_id = request.form.get('team_id') or None category_id = request.form.get('category_id') or None start_date_str = request.form.get('start_date') end_date_str = request.form.get('end_date') # Validate input error = None if not name: error = 'Project name is required' elif not code: error = 'Project code is required' elif Project.query.filter_by(code=code, company_id=g.user.company_id).first(): error = 'Project code already exists in your company' # Parse dates start_date = None end_date = None if start_date_str: try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid start date format' if end_date_str: try: end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid end date format' if start_date and end_date and start_date > end_date: error = 'Start date cannot be after end date' if error is None: project = Project( name=name, description=description, code=code.upper(), company_id=g.user.company_id, team_id=int(team_id) if team_id else None, category_id=int(category_id) if category_id else None, start_date=start_date, end_date=end_date, created_by_id=g.user.id ) db.session.add(project) db.session.commit() flash(f'Project "{name}" created successfully!', 'success') return redirect(url_for('admin_projects')) else: flash(error, 'error') # Get available teams and categories for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all() categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all() return render_template('create_project.html', title='Create Project', teams=teams, categories=categories) @app.route('/admin/projects/edit/', methods=['GET', 'POST']) @role_required(Role.SUPERVISOR) @company_required def edit_project(project_id): project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') code = request.form.get('code') team_id = request.form.get('team_id') or None category_id = request.form.get('category_id') or None is_active = request.form.get('is_active') == 'on' start_date_str = request.form.get('start_date') end_date_str = request.form.get('end_date') # Validate input error = None if not name: error = 'Project name is required' elif not code: error = 'Project code is required' elif code != project.code and Project.query.filter_by(code=code, company_id=g.user.company_id).first(): error = 'Project code already exists in your company' # Parse dates start_date = None end_date = None if start_date_str: try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid start date format' if end_date_str: try: end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid end date format' if start_date and end_date and start_date > end_date: error = 'Start date cannot be after end date' if error is None: project.name = name project.description = description project.code = code.upper() project.team_id = int(team_id) if team_id else None project.category_id = int(category_id) if category_id else None project.is_active = is_active project.start_date = start_date project.end_date = end_date db.session.commit() flash(f'Project "{name}" updated successfully!', 'success') return redirect(url_for('admin_projects')) else: flash(error, 'error') # Get available teams and categories for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all() categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all() return render_template('edit_project.html', title='Edit Project', project=project, teams=teams, categories=categories) @app.route('/admin/projects/delete/', methods=['POST']) @role_required(Role.ADMIN) # Only admins can delete projects @company_required def delete_project(project_id): project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404() # Check if there are time entries associated with this project time_entries_count = TimeEntry.query.filter_by(project_id=project_id).count() if time_entries_count > 0: flash(f'Cannot delete project "{project.name}" - it has {time_entries_count} time entries associated with it. Deactivate the project instead.', 'error') else: project_name = project.name db.session.delete(project) db.session.commit() flash(f'Project "{project_name}" deleted successfully!', 'success') return redirect(url_for('admin_projects')) @app.route('/api/team/hours_data', methods=['GET']) @login_required @role_required(Role.TEAM_LEADER) # Only team leaders and above can access @company_required def team_hours_data(): # Get the current user's team team = Team.query.get(g.user.team_id) if not team: return jsonify({ 'success': False, 'message': 'You are not assigned to any team.' }), 400 # Get date range from query parameters or use current week as default today = datetime.now().date() start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6) start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d')) end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d')) include_self = request.args.get('include_self', 'false') == 'true' try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: return jsonify({ 'success': False, 'message': 'Invalid date format.' }), 400 # Get all team members team_members = User.query.filter_by(team_id=team.id).all() # Prepare data structure for team members' hours team_data = [] for member in team_members: # Skip if the member is the current user (team leader) and include_self is False if member.id == g.user.id and not include_self: continue # Get time entries for this member in the date range entries = TimeEntry.query.filter( TimeEntry.user_id == member.id, TimeEntry.arrival_time >= datetime.combine(start_date, time.min), TimeEntry.arrival_time <= datetime.combine(end_date, time.max) ).order_by(TimeEntry.arrival_time).all() # Calculate daily and total hours daily_hours = {} total_seconds = 0 for entry in entries: if entry.duration: # Only count completed entries entry_date = entry.arrival_time.date() date_str = entry_date.strftime('%Y-%m-%d') if date_str not in daily_hours: daily_hours[date_str] = 0 daily_hours[date_str] += entry.duration total_seconds += entry.duration # Convert seconds to hours for display for date_str in daily_hours: daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours total_hours = round(total_seconds / 3600, 2) # Convert to hours # Format entries for JSON response formatted_entries = [] for entry in entries: formatted_entries.append({ 'id': entry.id, 'arrival_time': entry.arrival_time.isoformat(), 'departure_time': entry.departure_time.isoformat() if entry.departure_time else None, 'duration': entry.duration, 'total_break_duration': entry.total_break_duration }) # Add member data to team data team_data.append({ 'user': { 'id': member.id, 'username': member.username, 'email': member.email }, 'daily_hours': daily_hours, 'total_hours': total_hours, 'entries': formatted_entries }) # Generate a list of dates in the range for the table header date_range = [] current_date = start_date while current_date <= end_date: date_range.append(current_date.strftime('%Y-%m-%d')) current_date += timedelta(days=1) return jsonify({ 'success': True, 'team': { 'id': team.id, 'name': team.name, 'description': team.description }, 'team_data': team_data, 'date_range': date_range, 'start_date': start_date.isoformat(), 'end_date': end_date.isoformat() }) @app.route('/export') def export(): return render_template('export.html', title='Export Data') def get_date_range(period, start_date_str=None, end_date_str=None): """Get start and end date based on period or custom date range.""" today = datetime.now().date() if period: if period == 'today': return today, today elif period == 'week': start_date = today - timedelta(days=today.weekday()) return start_date, today elif period == 'month': start_date = today.replace(day=1) return start_date, today elif period == 'all': earliest_entry = TimeEntry.query.order_by(TimeEntry.arrival_time).first() start_date = earliest_entry.arrival_time.date() if earliest_entry else today return start_date, today else: # Custom date range try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() return start_date, end_date except (ValueError, TypeError): raise ValueError('Invalid date format') @app.route('/download_export') def download_export(): """Handle export download requests.""" export_format = request.args.get('format', 'csv') period = request.args.get('period') try: start_date, end_date = get_date_range( period, request.args.get('start_date'), request.args.get('end_date') ) except ValueError: flash('Invalid date format. Please use YYYY-MM-DD format.') return redirect(url_for('export')) # Query entries within the date range start_datetime = datetime.combine(start_date, time.min) end_datetime = datetime.combine(end_date, time.max) entries = TimeEntry.query.filter( TimeEntry.arrival_time >= start_datetime, TimeEntry.arrival_time <= end_datetime ).order_by(TimeEntry.arrival_time).all() if not entries: flash('No entries found for the selected date range.') return redirect(url_for('export')) # Prepare data and filename data = prepare_export_data(entries) filename = f"timetrack_export_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}" # Export based on format if export_format == 'csv': return export_to_csv(data, filename) elif export_format == 'excel': return export_to_excel(data, filename) else: flash('Invalid export format.') return redirect(url_for('export')) @app.route('/analytics') @app.route('/analytics/') @login_required def analytics(mode='personal'): """Unified analytics view combining history, team hours, and graphs""" # Validate mode parameter if mode not in ['personal', 'team']: mode = 'personal' # Check team access for team mode if mode == 'team': if not g.user.team_id: flash('You must be assigned to a team to view team analytics.', 'warning') return redirect(url_for('analytics', mode='personal')) if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]: flash('You do not have permission to view team analytics.', 'error') return redirect(url_for('analytics', mode='personal')) # Get available projects for filtering available_projects = [] all_projects = Project.query.filter_by(is_active=True).all() for project in all_projects: if project.is_user_allowed(g.user): available_projects.append(project) # Get team members if in team mode team_members = [] if mode == 'team' and g.user.team_id: team_members = User.query.filter_by(team_id=g.user.team_id).all() # Default date range (current week) today = datetime.now().date() start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6) return render_template('analytics.html', title='Time Analytics', mode=mode, available_projects=available_projects, team_members=team_members, default_start_date=start_of_week.strftime('%Y-%m-%d'), default_end_date=end_of_week.strftime('%Y-%m-%d')) @app.route('/api/analytics/data') @login_required def analytics_data(): """API endpoint for analytics data""" mode = request.args.get('mode', 'personal') view_type = request.args.get('view', 'table') start_date = request.args.get('start_date') end_date = request.args.get('end_date') project_filter = request.args.get('project_id') granularity = request.args.get('granularity', 'daily') # Validate mode if mode not in ['personal', 'team']: return jsonify({'error': 'Invalid mode'}), 400 # Check permissions for team mode if mode == 'team': if not g.user.team_id: return jsonify({'error': 'No team assigned'}), 403 if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]: return jsonify({'error': 'Insufficient permissions'}), 403 try: # Parse dates if start_date: start_date = datetime.strptime(start_date, '%Y-%m-%d').date() if end_date: end_date = datetime.strptime(end_date, '%Y-%m-%d').date() # Get filtered data data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter) # Format data based on view type if view_type == 'graph': formatted_data = format_graph_data(data, granularity) # For burndown chart, we need task data instead of time entries chart_type = request.args.get('chart_type', 'timeSeries') if chart_type == 'burndown': # Get tasks for burndown chart tasks = get_filtered_tasks_for_burndown(g.user, mode, start_date, end_date, project_filter) burndown_data = format_burndown_data(tasks, start_date, end_date) formatted_data.update(burndown_data) elif view_type == 'team': formatted_data = format_team_data(data, granularity) else: formatted_data = format_table_data(data) return jsonify(formatted_data) except Exception as e: logger.error(f"Error in analytics_data: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 def get_filtered_analytics_data(user, mode, start_date=None, end_date=None, project_filter=None): """Get filtered time entry data for analytics""" # Base query query = TimeEntry.query # Apply user/team filter if mode == 'personal': query = query.filter(TimeEntry.user_id == user.id) elif mode == 'team' and user.team_id: team_user_ids = [u.id for u in User.query.filter_by(team_id=user.team_id).all()] query = query.filter(TimeEntry.user_id.in_(team_user_ids)) # Apply date filters if start_date: query = query.filter(func.date(TimeEntry.arrival_time) >= start_date) if end_date: query = query.filter(func.date(TimeEntry.arrival_time) <= end_date) # Apply project filter if project_filter: if project_filter == 'none': query = query.filter(TimeEntry.project_id.is_(None)) else: try: project_id = int(project_filter) query = query.filter(TimeEntry.project_id == project_id) except ValueError: pass return query.order_by(TimeEntry.arrival_time.desc()).all() def get_filtered_tasks_for_burndown(user, mode, start_date=None, end_date=None, project_filter=None): """Get filtered tasks for burndown chart""" # Base query - get tasks from user's company query = Task.query.join(Project).filter(Project.company_id == user.company_id) # Apply user/team filter if mode == 'personal': # For personal mode, get tasks assigned to the user or created by them query = query.filter( (Task.assigned_to_id == user.id) | (Task.created_by_id == user.id) ) elif mode == 'team' and user.team_id: # For team mode, get tasks from projects assigned to the team query = query.filter(Project.team_id == user.team_id) # Apply project filter if project_filter: if project_filter == 'none': # No project filter for tasks - they must belong to a project return [] else: try: project_id = int(project_filter) query = query.filter(Task.project_id == project_id) except ValueError: pass # Apply date filters - use task creation date and completion date if start_date: query = query.filter( (Task.created_at >= datetime.combine(start_date, time.min)) | (Task.completed_date >= start_date) ) if end_date: query = query.filter( Task.created_at <= datetime.combine(end_date, time.max) ) return query.order_by(Task.created_at.desc()).all() @app.route('/api/companies//teams') @system_admin_required def api_company_teams(company_id): """API: Get teams for a specific company (System Admin only)""" teams = Team.query.filter_by(company_id=company_id).order_by(Team.name).all() return jsonify([{ 'id': team.id, 'name': team.name, 'description': team.description } for team in teams]) @app.route('/api/system-admin/stats') @system_admin_required def api_system_admin_stats(): """API: Get real-time system statistics for dashboard""" from datetime import datetime, timedelta # Get basic counts total_companies = Company.query.count() total_users = User.query.count() total_teams = Team.query.count() total_projects = Project.query.count() total_time_entries = TimeEntry.query.count() # Active sessions active_sessions = TimeEntry.query.filter_by(departure_time=None, is_paused=False).count() paused_sessions = TimeEntry.query.filter_by(is_paused=True).count() # Recent activity (last 24 hours) yesterday = datetime.now() - timedelta(days=1) recent_users = User.query.filter(User.created_at >= yesterday).count() recent_companies = Company.query.filter(Company.created_at >= yesterday).count() recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= yesterday).count() # System health orphaned_users = User.query.filter_by(company_id=None).count() orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count() blocked_users = User.query.filter_by(is_blocked=True).count() unverified_users = User.query.filter_by(is_verified=False).count() return jsonify({ 'totals': { 'companies': total_companies, 'users': total_users, 'teams': total_teams, 'projects': total_projects, 'time_entries': total_time_entries }, 'active': { 'sessions': active_sessions, 'paused_sessions': paused_sessions }, 'recent': { 'users': recent_users, 'companies': recent_companies, 'time_entries': recent_time_entries }, 'health': { 'orphaned_users': orphaned_users, 'orphaned_time_entries': orphaned_time_entries, 'blocked_users': blocked_users, 'unverified_users': unverified_users } }) @app.route('/api/system-admin/companies//users') @system_admin_required def api_company_users(company_id): """API: Get users for a specific company (System Admin only)""" company = Company.query.get_or_404(company_id) users = User.query.filter_by(company_id=company.id).order_by(User.username).all() return jsonify({ 'company': { 'id': company.id, 'name': company.name, 'is_personal': company.is_personal }, 'users': [{ 'id': user.id, 'username': user.username, 'email': user.email, 'role': user.role.value, 'is_blocked': user.is_blocked, 'is_verified': user.is_verified, 'created_at': user.created_at.isoformat(), 'team_id': user.team_id } for user in users] }) @app.route('/api/system-admin/users//toggle-block', methods=['POST']) @system_admin_required def api_toggle_user_block(user_id): """API: Toggle user blocked status (System Admin only)""" user = User.query.get_or_404(user_id) # Safety check: prevent blocking yourself if user.id == g.user.id: return jsonify({'error': 'Cannot block your own account'}), 400 # Safety check: prevent blocking the last system admin if user.role == Role.SYSTEM_ADMIN and not user.is_blocked: system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count() if system_admin_count <= 1: return jsonify({'error': 'Cannot block the last system administrator'}), 400 user.is_blocked = not user.is_blocked db.session.commit() return jsonify({ 'id': user.id, 'username': user.username, 'is_blocked': user.is_blocked, 'message': f'User {"blocked" if user.is_blocked else "unblocked"} successfully' }) @app.route('/api/system-admin/companies//stats') @system_admin_required def api_company_stats(company_id): """API: Get detailed statistics for a specific company""" company = Company.query.get_or_404(company_id) # User counts by role role_counts = {} for role in Role: count = User.query.filter_by(company_id=company.id, role=role).count() if count > 0: role_counts[role.value] = count # Team and project counts team_count = Team.query.filter_by(company_id=company.id).count() project_count = Project.query.filter_by(company_id=company.id).count() active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count() # Time entries statistics from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) month_ago = datetime.now() - timedelta(days=30) weekly_entries = TimeEntry.query.join(User).filter( User.company_id == company.id, TimeEntry.arrival_time >= week_ago ).count() monthly_entries = TimeEntry.query.join(User).filter( User.company_id == company.id, TimeEntry.arrival_time >= month_ago ).count() # Active sessions active_sessions = TimeEntry.query.join(User).filter( User.company_id == company.id, TimeEntry.departure_time == None, TimeEntry.is_paused == False ).count() return jsonify({ 'company': { 'id': company.id, 'name': company.name, 'is_personal': company.is_personal, 'is_active': company.is_active }, 'users': { 'total': sum(role_counts.values()), 'by_role': role_counts }, 'structure': { 'teams': team_count, 'projects': project_count, 'active_projects': active_projects }, 'activity': { 'weekly_entries': weekly_entries, 'monthly_entries': monthly_entries, 'active_sessions': active_sessions } }) @app.route('/api/analytics/export') @login_required def analytics_export(): """Export analytics data in various formats""" export_format = request.args.get('format', 'csv') view_type = request.args.get('view', 'table') mode = request.args.get('mode', 'personal') start_date = request.args.get('start_date') end_date = request.args.get('end_date') project_filter = request.args.get('project_id') # Validate permissions if mode == 'team': if not g.user.team_id: flash('No team assigned', 'error') return redirect(url_for('analytics')) if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]: flash('Insufficient permissions', 'error') return redirect(url_for('analytics')) try: # Parse dates if start_date: start_date = datetime.strptime(start_date, '%Y-%m-%d').date() if end_date: end_date = datetime.strptime(end_date, '%Y-%m-%d').date() # Get data data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter) if export_format == 'csv': return export_analytics_csv(data, view_type, mode) elif export_format == 'excel': return export_analytics_excel(data, view_type, mode) else: flash('Invalid export format', 'error') return redirect(url_for('analytics')) except Exception as e: logger.error(f"Error in analytics export: {str(e)}") flash('Error generating export', 'error') return redirect(url_for('analytics')) # Task Management Routes @app.route('/admin/projects//tasks') @role_required(Role.TEAM_MEMBER) # All authenticated users can view tasks @company_required def manage_project_tasks(project_id): project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404() # Check if user has access to this project if not project.is_user_allowed(g.user): flash('You do not have access to this project.', 'error') return redirect(url_for('admin_projects')) # Get all tasks for this project tasks = Task.query.filter_by(project_id=project_id).order_by(Task.created_at.desc()).all() # Get team members for assignment dropdown if project.team_id: # If project is assigned to a specific team, only show team members team_members = User.query.filter_by(team_id=project.team_id, company_id=g.user.company_id).all() else: # If project is available to all teams, show all company users team_members = User.query.filter_by(company_id=g.user.company_id).all() return render_template('manage_project_tasks.html', title=f'Tasks - {project.name}', project=project, tasks=tasks, team_members=team_members) # Unified Task Management Route @app.route('/tasks') @role_required(Role.TEAM_MEMBER) @company_required def unified_task_management(): """Unified task management interface""" # Get all projects the user has access to (for filtering and task creation) if g.user.role in [Role.ADMIN, Role.SUPERVISOR]: # Admins and Supervisors can see all company projects available_projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).order_by(Project.name).all() elif g.user.team_id: # Team members see team projects + unassigned projects available_projects = Project.query.filter( Project.company_id == g.user.company_id, Project.is_active == True, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).order_by(Project.name).all() # Filter by actual access permissions available_projects = [p for p in available_projects if p.is_user_allowed(g.user)] else: # Unassigned users see only unassigned projects available_projects = Project.query.filter_by( company_id=g.user.company_id, team_id=None, is_active=True ).order_by(Project.name).all() available_projects = [p for p in available_projects if p.is_user_allowed(g.user)] # Get team members for task assignment (company-scoped) if g.user.role in [Role.ADMIN, Role.SUPERVISOR]: # Admins can assign to anyone in the company team_members = User.query.filter_by( company_id=g.user.company_id, is_blocked=False ).order_by(User.username).all() elif g.user.team_id: # Team members can assign to team members + supervisors/admins team_members = User.query.filter( User.company_id == g.user.company_id, User.is_blocked == False, db.or_( User.team_id == g.user.team_id, User.role.in_([Role.ADMIN, Role.SUPERVISOR]) ) ).order_by(User.username).all() else: # Unassigned users can assign to supervisors/admins only team_members = User.query.filter( User.company_id == g.user.company_id, User.is_blocked == False, User.role.in_([Role.ADMIN, Role.SUPERVISOR]) ).order_by(User.username).all() return render_template('unified_task_management.html', title='Task Management', available_projects=available_projects, team_members=team_members) # Sprint Management Route @app.route('/sprints') @role_required(Role.TEAM_MEMBER) @company_required def sprint_management(): """Sprint management interface""" # Get all projects the user has access to (for sprint assignment) if g.user.role in [Role.ADMIN, Role.SUPERVISOR]: # Admins and Supervisors can see all company projects available_projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).order_by(Project.name).all() elif g.user.team_id: # Team members see team projects + unassigned projects available_projects = Project.query.filter( Project.company_id == g.user.company_id, Project.is_active == True, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).order_by(Project.name).all() # Filter by actual access permissions available_projects = [p for p in available_projects if p.is_user_allowed(g.user)] else: # Unassigned users see only unassigned projects available_projects = Project.query.filter_by( company_id=g.user.company_id, team_id=None, is_active=True ).order_by(Project.name).all() available_projects = [p for p in available_projects if p.is_user_allowed(g.user)] return render_template('sprint_management.html', title='Sprint Management', available_projects=available_projects) # Task API Routes @app.route('/api/tasks', methods=['POST']) @role_required(Role.TEAM_MEMBER) @company_required def create_task(): try: data = request.get_json() project_id = data.get('project_id') # Verify project access project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first() if not project or not project.is_user_allowed(g.user): return jsonify({'success': False, 'message': 'Project not found or access denied'}) # Validate required fields name = data.get('name') if not name: return jsonify({'success': False, 'message': 'Task name is required'}) # Parse dates start_date = None due_date = None if data.get('start_date'): start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date() if data.get('due_date'): due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date() # Generate task number task_number = Task.generate_task_number(g.user.company_id) # Create task task = Task( task_number=task_number, name=name, description=data.get('description', ''), status=TaskStatus[data.get('status', 'NOT_STARTED')], priority=TaskPriority[data.get('priority', 'MEDIUM')], estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None, project_id=project_id, assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None, sprint_id=int(data.get('sprint_id')) if data.get('sprint_id') else None, start_date=start_date, due_date=due_date, created_by_id=g.user.id ) db.session.add(task) db.session.commit() return jsonify({'success': True, 'message': 'Task created successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks/', methods=['GET']) @role_required(Role.TEAM_MEMBER) @company_required def get_task(task_id): try: task = Task.query.join(Project).filter( Task.id == task_id, Project.company_id == g.user.company_id ).first() if not task or not task.can_user_access(g.user): return jsonify({'success': False, 'message': 'Task not found or access denied'}) task_data = { 'id': task.id, 'name': task.name, 'description': task.description, 'status': task.status.name, 'priority': task.priority.name, 'estimated_hours': task.estimated_hours, 'assigned_to_id': task.assigned_to_id, 'start_date': task.start_date.isoformat() if task.start_date else None, 'due_date': task.due_date.isoformat() if task.due_date else None } return jsonify({'success': True, 'task': task_data}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks/', methods=['PUT']) @role_required(Role.TEAM_MEMBER) @company_required def update_task(task_id): try: task = Task.query.join(Project).filter( Task.id == task_id, Project.company_id == g.user.company_id ).first() if not task or not task.can_user_access(g.user): return jsonify({'success': False, 'message': 'Task not found or access denied'}) data = request.get_json() # Update task fields if 'name' in data: task.name = data['name'] if 'description' in data: task.description = data['description'] if 'status' in data: task.status = TaskStatus[data['status']] if data['status'] == 'COMPLETED': task.completed_date = datetime.now().date() else: task.completed_date = None if 'priority' in data: task.priority = TaskPriority[data['priority']] if 'estimated_hours' in data: task.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None if 'assigned_to_id' in data: task.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None if 'sprint_id' in data: task.sprint_id = int(data['sprint_id']) if data['sprint_id'] else None if 'start_date' in data: task.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None if 'due_date' in data: task.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None db.session.commit() return jsonify({'success': True, 'message': 'Task updated successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks/', methods=['DELETE']) @role_required(Role.TEAM_LEADER) # Only team leaders and above can delete tasks @company_required def delete_task(task_id): try: task = Task.query.join(Project).filter( Task.id == task_id, Project.company_id == g.user.company_id ).first() if not task or not task.can_user_access(g.user): return jsonify({'success': False, 'message': 'Task not found or access denied'}) db.session.delete(task) db.session.commit() return jsonify({'success': True, 'message': 'Task deleted successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) # Unified Task Management APIs @app.route('/api/tasks/unified') @role_required(Role.TEAM_MEMBER) @company_required def get_unified_tasks(): """Get all tasks for unified task view""" try: # Base query for tasks in user's company query = Task.query.join(Project).filter(Project.company_id == g.user.company_id) # Apply access restrictions based on user role and team if g.user.role not in [Role.ADMIN, Role.SUPERVISOR]: # Regular users can only see tasks from projects they have access to accessible_project_ids = [] projects = Project.query.filter_by(company_id=g.user.company_id).all() for project in projects: if project.is_user_allowed(g.user): accessible_project_ids.append(project.id) if accessible_project_ids: query = query.filter(Task.project_id.in_(accessible_project_ids)) else: # No accessible projects, return empty list return jsonify({'success': True, 'tasks': []}) tasks = query.order_by(Task.created_at.desc()).all() task_list = [] for task in tasks: # Determine if this is a team task is_team_task = ( g.user.team_id and task.project and task.project.team_id == g.user.team_id ) task_data = { 'id': task.id, 'task_number': getattr(task, 'task_number', f'TSK-{task.id:03d}'), # Fallback for existing tasks 'name': task.name, 'description': task.description, 'status': task.status.name, 'priority': task.priority.name, 'estimated_hours': task.estimated_hours, 'project_id': task.project_id, 'project_name': task.project.name if task.project else None, 'project_code': task.project.code if task.project else None, 'assigned_to_id': task.assigned_to_id, 'assigned_to_name': task.assigned_to.username if task.assigned_to else None, 'created_by_id': task.created_by_id, 'created_by_name': task.created_by.username if task.created_by else None, 'start_date': task.start_date.isoformat() if task.start_date else None, 'due_date': task.due_date.isoformat() if task.due_date else None, 'completed_date': task.completed_date.isoformat() if task.completed_date else None, 'created_at': task.created_at.isoformat(), 'is_team_task': is_team_task, 'subtask_count': len(task.subtasks) if task.subtasks else 0, 'sprint_id': task.sprint_id, 'sprint_name': task.sprint.name if task.sprint else None, 'is_current_sprint': task.sprint.is_current if task.sprint else False } task_list.append(task_data) return jsonify({'success': True, 'tasks': task_list}) except Exception as e: logger.error(f"Error in get_unified_tasks: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks//status', methods=['PUT']) @role_required(Role.TEAM_MEMBER) @company_required def update_task_status(task_id): """Update task status""" try: task = Task.query.join(Project).filter( Task.id == task_id, Project.company_id == g.user.company_id ).first() if not task or not task.can_user_access(g.user): return jsonify({'success': False, 'message': 'Task not found or access denied'}) data = request.get_json() new_status = data.get('status') if not new_status: return jsonify({'success': False, 'message': 'Status is required'}) # Validate status value - convert from enum name to enum object try: task_status = TaskStatus[new_status] except KeyError: return jsonify({'success': False, 'message': 'Invalid status value'}) # Update task status old_status = task.status task.status = task_status # Set completion date if status is COMPLETED if task_status == TaskStatus.COMPLETED: task.completed_date = datetime.now().date() elif old_status == TaskStatus.COMPLETED: # Clear completion date if moving away from completed task.completed_date = None db.session.commit() return jsonify({ 'success': True, 'message': 'Task status updated successfully', 'old_status': old_status.name, 'new_status': task_status.name }) except Exception as e: db.session.rollback() logger.error(f"Error updating task status: {str(e)}") return jsonify({'success': False, 'message': str(e)}) # Task Dependencies APIs @app.route('/api/tasks//dependencies') @role_required(Role.TEAM_MEMBER) @company_required def get_task_dependencies(task_id): """Get dependencies for a specific task""" try: # Get the task and verify ownership task = Task.query.filter_by(id=task_id, company_id=g.user.company_id).first() if not task: return jsonify({'success': False, 'message': 'Task not found'}) # Get blocked by dependencies (tasks that block this one) blocked_by_query = db.session.query(Task).join( TaskDependency, Task.id == TaskDependency.blocking_task_id ).filter(TaskDependency.blocked_task_id == task_id) # Get blocks dependencies (tasks that this one blocks) blocks_query = db.session.query(Task).join( TaskDependency, Task.id == TaskDependency.blocked_task_id ).filter(TaskDependency.blocking_task_id == task_id) blocked_by_tasks = blocked_by_query.all() blocks_tasks = blocks_query.all() def task_to_dict(t): return { 'id': t.id, 'name': t.name, 'task_number': t.task_number } return jsonify({ 'success': True, 'dependencies': { 'blocked_by': [task_to_dict(t) for t in blocked_by_tasks], 'blocks': [task_to_dict(t) for t in blocks_tasks] } }) except Exception as e: logger.error(f"Error getting task dependencies: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks//dependencies', methods=['POST']) @role_required(Role.TEAM_MEMBER) @company_required def add_task_dependency(task_id): """Add a dependency for a task""" try: data = request.get_json() task_number = data.get('task_number') dependency_type = data.get('type') # 'blocked_by' or 'blocks' if not task_number or not dependency_type: return jsonify({'success': False, 'message': 'Task number and type are required'}) # Get the main task task = Task.query.filter_by(id=task_id, company_id=g.user.company_id).first() if not task: return jsonify({'success': False, 'message': 'Task not found'}) # Find the dependency task by task number dependency_task = Task.query.filter_by( task_number=task_number, company_id=g.user.company_id ).first() if not dependency_task: return jsonify({'success': False, 'message': f'Task {task_number} not found'}) # Prevent self-dependency if dependency_task.id == task_id: return jsonify({'success': False, 'message': 'A task cannot depend on itself'}) # Create the dependency based on type if dependency_type == 'blocked_by': # Current task is blocked by the dependency task blocked_task_id = task_id blocking_task_id = dependency_task.id elif dependency_type == 'blocks': # Current task blocks the dependency task blocked_task_id = dependency_task.id blocking_task_id = task_id else: return jsonify({'success': False, 'message': 'Invalid dependency type'}) # Check if dependency already exists existing_dep = TaskDependency.query.filter_by( blocked_task_id=blocked_task_id, blocking_task_id=blocking_task_id ).first() if existing_dep: return jsonify({'success': False, 'message': 'This dependency already exists'}) # Check for circular dependencies def would_create_cycle(blocked_id, blocking_id): # Use a simple DFS to check if adding this dependency would create a cycle visited = set() def dfs(current_blocked_id): if current_blocked_id in visited: return False visited.add(current_blocked_id) # If we reach the original blocking task, we have a cycle if current_blocked_id == blocking_id: return True # Check all tasks that block the current task dependencies = TaskDependency.query.filter_by(blocked_task_id=current_blocked_id).all() for dep in dependencies: if dfs(dep.blocking_task_id): return True return False return dfs(blocked_id) if would_create_cycle(blocked_task_id, blocking_task_id): return jsonify({'success': False, 'message': 'This dependency would create a circular dependency'}) # Create the new dependency new_dependency = TaskDependency( blocked_task_id=blocked_task_id, blocking_task_id=blocking_task_id ) db.session.add(new_dependency) db.session.commit() return jsonify({'success': True, 'message': 'Dependency added successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error adding task dependency: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/tasks//dependencies/', methods=['DELETE']) @role_required(Role.TEAM_MEMBER) @company_required def remove_task_dependency(task_id, dependency_task_id): """Remove a dependency for a task""" try: data = request.get_json() dependency_type = data.get('type') # 'blocked_by' or 'blocks' if not dependency_type: return jsonify({'success': False, 'message': 'Dependency type is required'}) # Get the main task task = Task.query.filter_by(id=task_id, company_id=g.user.company_id).first() if not task: return jsonify({'success': False, 'message': 'Task not found'}) # Determine which dependency to remove based on type if dependency_type == 'blocked_by': # Remove dependency where current task is blocked by dependency_task_id dependency = TaskDependency.query.filter_by( blocked_task_id=task_id, blocking_task_id=dependency_task_id ).first() elif dependency_type == 'blocks': # Remove dependency where current task blocks dependency_task_id dependency = TaskDependency.query.filter_by( blocked_task_id=dependency_task_id, blocking_task_id=task_id ).first() else: return jsonify({'success': False, 'message': 'Invalid dependency type'}) if not dependency: return jsonify({'success': False, 'message': 'Dependency not found'}) db.session.delete(dependency) db.session.commit() return jsonify({'success': True, 'message': 'Dependency removed successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error removing task dependency: {str(e)}") return jsonify({'success': False, 'message': str(e)}) # Sprint Management APIs @app.route('/api/sprints') @role_required(Role.TEAM_MEMBER) @company_required def get_sprints(): """Get all sprints for the user's company""" try: # Base query for sprints in user's company query = Sprint.query.filter(Sprint.company_id == g.user.company_id) # Apply access restrictions based on user role and team if g.user.role not in [Role.ADMIN, Role.SUPERVISOR]: # Regular users can only see sprints they have access to accessible_sprint_ids = [] sprints = query.all() for sprint in sprints: if sprint.can_user_access(g.user): accessible_sprint_ids.append(sprint.id) if accessible_sprint_ids: query = query.filter(Sprint.id.in_(accessible_sprint_ids)) else: # No accessible sprints, return empty list return jsonify({'success': True, 'sprints': []}) sprints = query.order_by(Sprint.created_at.desc()).all() sprint_list = [] for sprint in sprints: task_summary = sprint.get_task_summary() sprint_data = { 'id': sprint.id, 'name': sprint.name, 'description': sprint.description, 'status': sprint.status.name, 'company_id': sprint.company_id, 'project_id': sprint.project_id, 'project_name': sprint.project.name if sprint.project else None, 'project_code': sprint.project.code if sprint.project else None, 'start_date': sprint.start_date.isoformat(), 'end_date': sprint.end_date.isoformat(), 'goal': sprint.goal, 'capacity_hours': sprint.capacity_hours, 'created_by_id': sprint.created_by_id, 'created_by_name': sprint.created_by.username if sprint.created_by else None, 'created_at': sprint.created_at.isoformat(), 'is_current': sprint.is_current, 'duration_days': sprint.duration_days, 'days_remaining': sprint.days_remaining, 'progress_percentage': sprint.progress_percentage, 'task_summary': task_summary } sprint_list.append(sprint_data) return jsonify({'success': True, 'sprints': sprint_list}) except Exception as e: logger.error(f"Error in get_sprints: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/sprints', methods=['POST']) @role_required(Role.TEAM_LEADER) # Team leaders and above can create sprints @company_required def create_sprint(): """Create a new sprint""" try: data = request.get_json() # Validate required fields name = data.get('name') start_date = data.get('start_date') end_date = data.get('end_date') if not name: return jsonify({'success': False, 'message': 'Sprint name is required'}) if not start_date: return jsonify({'success': False, 'message': 'Start date is required'}) if not end_date: return jsonify({'success': False, 'message': 'End date is required'}) # Parse dates try: start_date = datetime.strptime(start_date, '%Y-%m-%d').date() end_date = datetime.strptime(end_date, '%Y-%m-%d').date() except ValueError: return jsonify({'success': False, 'message': 'Invalid date format'}) if start_date >= end_date: return jsonify({'success': False, 'message': 'End date must be after start date'}) # Verify project access if project is specified project_id = data.get('project_id') if project_id: project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first() if not project or not project.is_user_allowed(g.user): return jsonify({'success': False, 'message': 'Project not found or access denied'}) # Create sprint sprint = Sprint( name=name, description=data.get('description', ''), status=SprintStatus[data.get('status', 'PLANNING')], company_id=g.user.company_id, project_id=int(project_id) if project_id else None, start_date=start_date, end_date=end_date, goal=data.get('goal'), capacity_hours=int(data.get('capacity_hours')) if data.get('capacity_hours') else None, created_by_id=g.user.id ) db.session.add(sprint) db.session.commit() return jsonify({'success': True, 'message': 'Sprint created successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error creating sprint: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/sprints/', methods=['PUT']) @role_required(Role.TEAM_LEADER) @company_required def update_sprint(sprint_id): """Update an existing sprint""" try: sprint = Sprint.query.filter_by(id=sprint_id, company_id=g.user.company_id).first() if not sprint or not sprint.can_user_access(g.user): return jsonify({'success': False, 'message': 'Sprint not found or access denied'}) data = request.get_json() # Update sprint fields if 'name' in data: sprint.name = data['name'] if 'description' in data: sprint.description = data['description'] if 'status' in data: sprint.status = SprintStatus[data['status']] if 'goal' in data: sprint.goal = data['goal'] if 'capacity_hours' in data: sprint.capacity_hours = int(data['capacity_hours']) if data['capacity_hours'] else None if 'project_id' in data: project_id = data['project_id'] if project_id: project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first() if not project or not project.is_user_allowed(g.user): return jsonify({'success': False, 'message': 'Project not found or access denied'}) sprint.project_id = int(project_id) else: sprint.project_id = None # Update dates if provided if 'start_date' in data: try: sprint.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() except ValueError: return jsonify({'success': False, 'message': 'Invalid start date format'}) if 'end_date' in data: try: sprint.end_date = datetime.strptime(data['end_date'], '%Y-%m-%d').date() except ValueError: return jsonify({'success': False, 'message': 'Invalid end date format'}) # Validate date order if sprint.start_date >= sprint.end_date: return jsonify({'success': False, 'message': 'End date must be after start date'}) db.session.commit() return jsonify({'success': True, 'message': 'Sprint updated successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error updating sprint: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/sprints/', methods=['DELETE']) @role_required(Role.TEAM_LEADER) @company_required def delete_sprint(sprint_id): """Delete a sprint and remove it from all associated tasks""" try: sprint = Sprint.query.filter_by(id=sprint_id, company_id=g.user.company_id).first() if not sprint or not sprint.can_user_access(g.user): return jsonify({'success': False, 'message': 'Sprint not found or access denied'}) # Remove sprint assignment from all tasks Task.query.filter_by(sprint_id=sprint_id).update({'sprint_id': None}) # Delete the sprint db.session.delete(sprint) db.session.commit() return jsonify({'success': True, 'message': 'Sprint deleted successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error deleting sprint: {str(e)}") return jsonify({'success': False, 'message': str(e)}) # Subtask API Routes @app.route('/api/subtasks', methods=['POST']) @role_required(Role.TEAM_MEMBER) @company_required def create_subtask(): try: data = request.get_json() task_id = data.get('task_id') # Verify task access task = Task.query.join(Project).filter( Task.id == task_id, Project.company_id == g.user.company_id ).first() if not task or not task.can_user_access(g.user): return jsonify({'success': False, 'message': 'Task not found or access denied'}) # Validate required fields name = data.get('name') if not name: return jsonify({'success': False, 'message': 'Subtask name is required'}) # Parse dates start_date = None due_date = None if data.get('start_date'): start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date() if data.get('due_date'): due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date() # Create subtask subtask = SubTask( name=name, description=data.get('description', ''), status=TaskStatus[data.get('status', 'NOT_STARTED')], priority=TaskPriority[data.get('priority', 'MEDIUM')], estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None, task_id=task_id, assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None, start_date=start_date, due_date=due_date, created_by_id=g.user.id ) db.session.add(subtask) db.session.commit() return jsonify({'success': True, 'message': 'Subtask created successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/subtasks/', methods=['GET']) @role_required(Role.TEAM_MEMBER) @company_required def get_subtask(subtask_id): try: subtask = SubTask.query.join(Task).join(Project).filter( SubTask.id == subtask_id, Project.company_id == g.user.company_id ).first() if not subtask or not subtask.can_user_access(g.user): return jsonify({'success': False, 'message': 'Subtask not found or access denied'}) subtask_data = { 'id': subtask.id, 'name': subtask.name, 'description': subtask.description, 'status': subtask.status.name, 'priority': subtask.priority.name, 'estimated_hours': subtask.estimated_hours, 'assigned_to_id': subtask.assigned_to_id, 'start_date': subtask.start_date.isoformat() if subtask.start_date else None, 'due_date': subtask.due_date.isoformat() if subtask.due_date else None } return jsonify({'success': True, 'subtask': subtask_data}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/api/subtasks/', methods=['PUT']) @role_required(Role.TEAM_MEMBER) @company_required def update_subtask(subtask_id): try: subtask = SubTask.query.join(Task).join(Project).filter( SubTask.id == subtask_id, Project.company_id == g.user.company_id ).first() if not subtask or not subtask.can_user_access(g.user): return jsonify({'success': False, 'message': 'Subtask not found or access denied'}) data = request.get_json() # Update subtask fields if 'name' in data: subtask.name = data['name'] if 'description' in data: subtask.description = data['description'] if 'status' in data: subtask.status = TaskStatus[data['status']] if data['status'] == 'COMPLETED': subtask.completed_date = datetime.now().date() else: subtask.completed_date = None if 'priority' in data: subtask.priority = TaskPriority[data['priority']] if 'estimated_hours' in data: subtask.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None if 'assigned_to_id' in data: subtask.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None if 'start_date' in data: subtask.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None if 'due_date' in data: subtask.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None db.session.commit() return jsonify({'success': True, 'message': 'Subtask updated successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/subtasks/', methods=['DELETE']) @role_required(Role.TEAM_LEADER) # Only team leaders and above can delete subtasks @company_required def delete_subtask(subtask_id): try: subtask = SubTask.query.join(Task).join(Project).filter( SubTask.id == subtask_id, Project.company_id == g.user.company_id ).first() if not subtask or not subtask.can_user_access(g.user): return jsonify({'success': False, 'message': 'Subtask not found or access denied'}) db.session.delete(subtask) db.session.commit() return jsonify({'success': True, 'message': 'Subtask deleted successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) # Category Management API Routes @app.route('/api/admin/categories', methods=['POST']) @role_required(Role.ADMIN) @company_required def create_category(): try: data = request.get_json() name = data.get('name') description = data.get('description', '') color = data.get('color', '#007bff') icon = data.get('icon', '') if not name: return jsonify({'success': False, 'message': 'Category name is required'}) # Check if category already exists existing = ProjectCategory.query.filter_by( name=name, company_id=g.user.company_id ).first() if existing: return jsonify({'success': False, 'message': 'Category name already exists'}) category = ProjectCategory( name=name, description=description, color=color, icon=icon, company_id=g.user.company_id, created_by_id=g.user.id ) db.session.add(category) db.session.commit() return jsonify({'success': True, 'message': 'Category created successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/admin/categories/', methods=['PUT']) @role_required(Role.ADMIN) @company_required def update_category(category_id): try: category = ProjectCategory.query.filter_by( id=category_id, company_id=g.user.company_id ).first() if not category: return jsonify({'success': False, 'message': 'Category not found'}) data = request.get_json() name = data.get('name') if not name: return jsonify({'success': False, 'message': 'Category name is required'}) # Check if name conflicts with another category existing = ProjectCategory.query.filter( ProjectCategory.name == name, ProjectCategory.company_id == g.user.company_id, ProjectCategory.id != category_id ).first() if existing: return jsonify({'success': False, 'message': 'Category name already exists'}) category.name = name category.description = data.get('description', '') category.color = data.get('color', category.color) category.icon = data.get('icon', '') db.session.commit() return jsonify({'success': True, 'message': 'Category updated successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) @app.route('/api/admin/categories/', methods=['DELETE']) @role_required(Role.ADMIN) @company_required def delete_category(category_id): try: category = ProjectCategory.query.filter_by( id=category_id, company_id=g.user.company_id ).first() if not category: return jsonify({'success': False, 'message': 'Category not found'}) # Unassign projects from this category projects = Project.query.filter_by(category_id=category_id).all() for project in projects: project.category_id = None db.session.delete(category) db.session.commit() return jsonify({'success': True, 'message': 'Category deleted successfully'}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) # Dashboard API Endpoints @app.route('/api/dashboard') @role_required(Role.TEAM_MEMBER) @company_required def get_dashboard(): """Get user's dashboard configuration and widgets.""" try: # Get or create user dashboard dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first() if not dashboard: dashboard = UserDashboard(user_id=g.user.id) db.session.add(dashboard) db.session.commit() logger.info(f"Created new dashboard {dashboard.id} for user {g.user.id}") else: logger.info(f"Using existing dashboard {dashboard.id} for user {g.user.id}") # Get user's widgets widgets = DashboardWidget.query.filter_by(dashboard_id=dashboard.id).order_by(DashboardWidget.grid_y, DashboardWidget.grid_x).all() logger.info(f"Found {len(widgets)} widgets for dashboard {dashboard.id}") # Convert to JSON format widget_data = [] for widget in widgets: # Convert grid size to simple size names if widget.grid_width == 1 and widget.grid_height == 1: size = 'small' elif widget.grid_width == 2 and widget.grid_height == 1: size = 'medium' elif widget.grid_width == 2 and widget.grid_height == 2: size = 'large' elif widget.grid_width == 3 and widget.grid_height == 1: size = 'wide' else: size = 'small' # Parse config JSON try: import json config = json.loads(widget.config) if widget.config else {} except (json.JSONDecodeError, TypeError): config = {} widget_data.append({ 'id': widget.id, 'type': widget.widget_type.value, 'title': widget.title, 'size': size, 'grid_x': widget.grid_x, 'grid_y': widget.grid_y, 'grid_width': widget.grid_width, 'grid_height': widget.grid_height, 'config': config }) return jsonify({ 'success': True, 'dashboard': { 'id': dashboard.id, 'layout_config': dashboard.layout_config, 'grid_columns': dashboard.grid_columns, 'theme': dashboard.theme }, 'widgets': widget_data }) except Exception as e: logger.error(f"Error loading dashboard: {e}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/dashboard/widgets', methods=['POST']) @role_required(Role.TEAM_MEMBER) @company_required def create_or_update_widget(): """Create or update a dashboard widget.""" try: data = request.get_json() # Get or create user dashboard dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first() if not dashboard: dashboard = UserDashboard(user_id=g.user.id) db.session.add(dashboard) db.session.flush() # Get the ID logger.info(f"Created new dashboard {dashboard.id} for user {g.user.id} in widget creation") else: logger.info(f"Using existing dashboard {dashboard.id} for user {g.user.id} in widget creation") # Check if updating existing widget widget_id = data.get('widget_id') if widget_id: widget = DashboardWidget.query.filter_by( id=widget_id, dashboard_id=dashboard.id ).first() if not widget: return jsonify({'success': False, 'error': 'Widget not found'}) else: # Create new widget widget = DashboardWidget(dashboard_id=dashboard.id) # Find next available position max_y = db.session.query(func.max(DashboardWidget.grid_y)).filter_by( dashboard_id=dashboard.id ).scalar() or 0 widget.grid_y = max_y + 1 widget.grid_x = 0 # Update widget properties widget.widget_type = WidgetType(data['type']) widget.title = data['title'] # Convert size to grid dimensions size = data.get('size', 'small') if size == 'small': widget.grid_width = 1 widget.grid_height = 1 elif size == 'medium': widget.grid_width = 2 widget.grid_height = 1 elif size == 'large': widget.grid_width = 2 widget.grid_height = 2 elif size == 'wide': widget.grid_width = 3 widget.grid_height = 1 # Build config from form data config = {} for key, value in data.items(): if key not in ['type', 'title', 'size', 'widget_id']: config[key] = value # Store config as JSON string if config: import json widget.config = json.dumps(config) else: widget.config = None if not widget_id: db.session.add(widget) logger.info(f"Creating new widget: {widget.widget_type.value} for dashboard {dashboard.id}") else: logger.info(f"Updating existing widget {widget_id}") db.session.commit() logger.info(f"Widget saved successfully with ID: {widget.id}") # Verify the widget was actually saved saved_widget = DashboardWidget.query.filter_by(id=widget.id).first() if saved_widget: logger.info(f"Verification: Widget {widget.id} exists in database with dashboard_id: {saved_widget.dashboard_id}") else: logger.error(f"Verification failed: Widget {widget.id} not found in database") # Convert grid size back to simple size name for response if widget.grid_width == 1 and widget.grid_height == 1: size_name = 'small' elif widget.grid_width == 2 and widget.grid_height == 1: size_name = 'medium' elif widget.grid_width == 2 and widget.grid_height == 2: size_name = 'large' elif widget.grid_width == 3 and widget.grid_height == 1: size_name = 'wide' else: size_name = 'small' # Parse config for response try: import json config_dict = json.loads(widget.config) if widget.config else {} except (json.JSONDecodeError, TypeError): config_dict = {} return jsonify({ 'success': True, 'message': 'Widget saved successfully', 'widget': { 'id': widget.id, 'type': widget.widget_type.value, 'title': widget.title, 'size': size_name, 'grid_x': widget.grid_x, 'grid_y': widget.grid_y, 'grid_width': widget.grid_width, 'grid_height': widget.grid_height, 'config': config_dict } }) except Exception as e: db.session.rollback() logger.error(f"Error saving widget: {e}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/dashboard/widgets/', methods=['DELETE']) @role_required(Role.TEAM_MEMBER) @company_required def delete_widget(widget_id): """Delete a dashboard widget.""" try: # Get user dashboard dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first() if not dashboard: return jsonify({'success': False, 'error': 'Dashboard not found'}) # Find and delete widget widget = DashboardWidget.query.filter_by( id=widget_id, dashboard_id=dashboard.id ).first() if not widget: return jsonify({'success': False, 'error': 'Widget not found'}) # No need to update positions for grid-based layout db.session.delete(widget) db.session.commit() return jsonify({'success': True, 'message': 'Widget deleted successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error deleting widget: {e}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/dashboard/positions', methods=['POST']) @role_required(Role.TEAM_MEMBER) @company_required def update_widget_positions(): """Update widget grid positions after drag and drop.""" try: data = request.get_json() positions = data.get('positions', []) # Get user dashboard dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first() if not dashboard: return jsonify({'success': False, 'error': 'Dashboard not found'}) # Update grid positions for pos_data in positions: widget = DashboardWidget.query.filter_by( id=pos_data['id'], dashboard_id=dashboard.id ).first() if widget: # For now, just assign sequential grid positions # In a more advanced implementation, we'd calculate actual grid coordinates widget.grid_x = pos_data.get('grid_x', 0) widget.grid_y = pos_data.get('grid_y', pos_data.get('position', 0)) db.session.commit() return jsonify({'success': True, 'message': 'Positions updated successfully'}) except Exception as e: db.session.rollback() logger.error(f"Error updating positions: {e}") return jsonify({'success': False, 'error': str(e)}) # Widget data endpoints @app.route('/api/dashboard/widgets//data') @role_required(Role.TEAM_MEMBER) @company_required def get_widget_data(widget_id): """Get data for a specific widget.""" try: # Get user dashboard dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first() if not dashboard: return jsonify({'success': False, 'error': 'Dashboard not found'}) # Find widget widget = DashboardWidget.query.filter_by( id=widget_id, dashboard_id=dashboard.id ).first() if not widget: return jsonify({'success': False, 'error': 'Widget not found'}) # Get widget-specific data based on type widget_data = {} if widget.widget_type == WidgetType.DAILY_SUMMARY: from datetime import datetime, timedelta config = widget.config or {} period = config.get('summary_period', 'daily') # Calculate time summaries now = datetime.now() # Today's summary start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0) today_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= start_of_today, TimeEntry.departure_time.isnot(None) ).all() today_seconds = sum(entry.duration or 0 for entry in today_entries) # This week's summary start_of_week = start_of_today - timedelta(days=start_of_today.weekday()) week_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= start_of_week, TimeEntry.departure_time.isnot(None) ).all() week_seconds = sum(entry.duration or 0 for entry in week_entries) # This month's summary start_of_month = start_of_today.replace(day=1) month_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= start_of_month, TimeEntry.departure_time.isnot(None) ).all() month_seconds = sum(entry.duration or 0 for entry in month_entries) widget_data.update({ 'today': f"{today_seconds // 3600}h {(today_seconds % 3600) // 60}m", 'week': f"{week_seconds // 3600}h {(week_seconds % 3600) // 60}m", 'month': f"{month_seconds // 3600}h {(month_seconds % 3600) // 60}m", 'entries_today': len(today_entries), 'entries_week': len(week_entries), 'entries_month': len(month_entries) }) elif widget.widget_type == WidgetType.ACTIVE_PROJECTS: config = widget.config or {} project_filter = config.get('project_filter', 'all') max_projects = int(config.get('max_projects', 5)) # Get user's projects if g.user.role in [Role.ADMIN, Role.SUPERVISOR]: projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).limit(max_projects).all() elif g.user.team_id: projects = Project.query.filter( Project.company_id == g.user.company_id, Project.is_active == True, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).limit(max_projects).all() else: projects = [] widget_data['projects'] = [{ 'id': p.id, 'name': p.name, 'code': p.code, 'description': p.description } for p in projects] elif widget.widget_type == WidgetType.ASSIGNED_TASKS: config = widget.config or {} task_filter = config.get('task_filter', 'assigned') task_status = config.get('task_status', 'active') # Get user's tasks based on filter if task_filter == 'assigned': tasks = Task.query.filter_by(assigned_to_id=g.user.id) elif task_filter == 'created': tasks = Task.query.filter_by(created_by_id=g.user.id) else: # Get tasks from user's projects if g.user.team_id: project_ids = [p.id for p in Project.query.filter( Project.company_id == g.user.company_id, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).all()] tasks = Task.query.filter(Task.project_id.in_(project_ids)) else: tasks = Task.query.join(Project).filter(Project.company_id == g.user.company_id) # Filter by status if specified if task_status != 'all': if task_status == 'active': tasks = tasks.filter(Task.status.in_([TaskStatus.PENDING, TaskStatus.IN_PROGRESS])) elif task_status == 'pending': tasks = tasks.filter_by(status=TaskStatus.PENDING) elif task_status == 'completed': tasks = tasks.filter_by(status=TaskStatus.COMPLETED) tasks = tasks.limit(10).all() widget_data['tasks'] = [{ 'id': t.id, 'name': t.name, 'description': t.description, 'status': t.status.value if t.status else 'Pending', 'priority': t.priority.value if t.priority else 'Medium', 'project_name': t.project.name if t.project else 'No Project' } for t in tasks] elif widget.widget_type == WidgetType.WEEKLY_CHART: from datetime import datetime, timedelta # Get weekly data for chart now = datetime.now() start_of_week = now - timedelta(days=now.weekday()) weekly_data = [] for i in range(7): day = start_of_week + timedelta(days=i) day_start = day.replace(hour=0, minute=0, second=0, microsecond=0) day_end = day_start + timedelta(days=1) day_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= day_start, TimeEntry.arrival_time < day_end, TimeEntry.departure_time.isnot(None) ).all() total_seconds = sum(entry.duration or 0 for entry in day_entries) weekly_data.append({ 'day': day.strftime('%A'), 'date': day.strftime('%Y-%m-%d'), 'hours': round(total_seconds / 3600, 2), 'entries': len(day_entries) }) widget_data['weekly_data'] = weekly_data elif widget.widget_type == WidgetType.TASK_PRIORITY: # Get tasks by priority if g.user.team_id: project_ids = [p.id for p in Project.query.filter( Project.company_id == g.user.company_id, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).all()] tasks = Task.query.filter( Task.project_id.in_(project_ids), Task.assigned_to_id == g.user.id ).order_by(Task.priority.desc(), Task.created_at.desc()).limit(10).all() else: tasks = Task.query.filter_by(assigned_to_id=g.user.id).order_by( Task.priority.desc(), Task.created_at.desc() ).limit(10).all() widget_data['priority_tasks'] = [{ 'id': t.id, 'name': t.name, 'description': t.description, 'priority': t.priority.value if t.priority else 'Medium', 'status': t.status.value if t.status else 'Pending', 'project_name': t.project.name if t.project else 'No Project' } for t in tasks] elif widget.widget_type == WidgetType.PROJECT_PROGRESS: # Get project progress data if g.user.role in [Role.ADMIN, Role.SUPERVISOR]: projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).limit(5).all() elif g.user.team_id: projects = Project.query.filter( Project.company_id == g.user.company_id, Project.is_active == True, db.or_(Project.team_id == g.user.team_id, Project.team_id == None) ).limit(5).all() else: projects = [] project_progress = [] for project in projects: total_tasks = Task.query.filter_by(project_id=project.id).count() completed_tasks = Task.query.filter_by( project_id=project.id, status=TaskStatus.COMPLETED ).count() progress = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0 project_progress.append({ 'id': project.id, 'name': project.name, 'code': project.code, 'progress': round(progress, 1), 'completed_tasks': completed_tasks, 'total_tasks': total_tasks }) widget_data['project_progress'] = project_progress elif widget.widget_type == WidgetType.PRODUCTIVITY_METRICS: from datetime import datetime, timedelta # Calculate productivity metrics now = datetime.now() today = now.replace(hour=0, minute=0, second=0, microsecond=0) week_ago = today - timedelta(days=7) # This week vs last week comparison this_week_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= week_ago, TimeEntry.departure_time.isnot(None) ).all() last_week_entries = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.arrival_time >= week_ago - timedelta(days=7), TimeEntry.arrival_time < week_ago, TimeEntry.departure_time.isnot(None) ).all() this_week_hours = sum(entry.duration or 0 for entry in this_week_entries) / 3600 last_week_hours = sum(entry.duration or 0 for entry in last_week_entries) / 3600 productivity_change = ((this_week_hours - last_week_hours) / last_week_hours * 100) if last_week_hours > 0 else 0 widget_data.update({ 'this_week_hours': round(this_week_hours, 1), 'last_week_hours': round(last_week_hours, 1), 'productivity_change': round(productivity_change, 1), 'avg_daily_hours': round(this_week_hours / 7, 1), 'total_entries': len(this_week_entries) }) return jsonify({ 'success': True, 'data': widget_data }) except Exception as e: logger.error(f"Error getting widget data: {e}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/current-timer-status') @role_required(Role.TEAM_MEMBER) @company_required def get_current_timer_status(): """Get current timer status for dashboard widgets.""" try: # Get the user's current active time entry active_entry = TimeEntry.query.filter_by( user_id=g.user.id, departure_time=None ).first() if active_entry: # Calculate current duration now = datetime.now() elapsed_seconds = int((now - active_entry.arrival_time).total_seconds()) return jsonify({ 'success': True, 'isActive': True, 'startTime': active_entry.arrival_time.isoformat(), 'currentDuration': elapsed_seconds, 'entryId': active_entry.id }) else: return jsonify({ 'success': True, 'isActive': False, 'message': 'No active timer' }) except Exception as e: logger.error(f"Error getting timer status: {e}") return jsonify({'success': False, 'error': str(e)}) # Smart Search API Endpoints @app.route('/api/search/users') @role_required(Role.TEAM_MEMBER) @company_required def search_users(): """Search for users for smart search auto-completion""" try: query = request.args.get('q', '').strip() if not query: return jsonify({'success': True, 'users': []}) # Search users in the same company users = User.query.filter( User.company_id == g.user.company_id, User.username.ilike(f'%{query}%') ).limit(10).all() user_list = [ { 'id': user.id, 'username': user.username, 'full_name': f"{user.first_name} {user.last_name}" if user.first_name and user.last_name else user.username } for user in users ] return jsonify({'success': True, 'users': user_list}) except Exception as e: logger.error(f"Error in search_users: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/search/projects') @role_required(Role.TEAM_MEMBER) @company_required def search_projects(): """Search for projects for smart search auto-completion""" try: query = request.args.get('q', '').strip() if not query: return jsonify({'success': True, 'projects': []}) # Search projects the user has access to projects = Project.query.filter( Project.company_id == g.user.company_id, db.or_( Project.code.ilike(f'%{query}%'), Project.name.ilike(f'%{query}%') ) ).limit(10).all() # Filter projects user has access to accessible_projects = [ project for project in projects if project.is_user_allowed(g.user) ] project_list = [ { 'id': project.id, 'code': project.code, 'name': project.name } for project in accessible_projects ] return jsonify({'success': True, 'projects': project_list}) except Exception as e: logger.error(f"Error in search_projects: {str(e)}") return jsonify({'success': False, 'message': str(e)}) @app.route('/api/search/sprints') @role_required(Role.TEAM_MEMBER) @company_required def search_sprints(): """Search for sprints for smart search auto-completion""" try: query = request.args.get('q', '').strip() if not query: return jsonify({'success': True, 'sprints': []}) # Search sprints in the same company sprints = Sprint.query.filter( Sprint.company_id == g.user.company_id, Sprint.name.ilike(f'%{query}%') ).limit(10).all() # Filter sprints user has access to accessible_sprints = [ sprint for sprint in sprints if sprint.can_user_access(g.user) ] sprint_list = [ { 'id': sprint.id, 'name': sprint.name, 'status': sprint.status.value } for sprint in accessible_sprints ] return jsonify({'success': True, 'sprints': sprint_list}) except Exception as e: logger.error(f"Error in search_sprints: {str(e)}") return jsonify({'success': False, 'message': str(e)}) if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) app.run(debug=True, host='0.0.0.0', port=port)