from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company import logging from datetime import datetime, time, timedelta import os import csv import io import pandas as pd from sqlalchemy import func from functools import wraps from flask_mail import Mail, Message from dotenv import load_dotenv from werkzeug.security import check_password_hash # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////data/timetrack.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_timetrack') app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Session lasts for 7 days # Configure Flask-Mail app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.example.com') app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT', 587)) app.config['MAIL_USE_TLS'] = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1'] app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', 'your-email@example.com') app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', 'your-password') app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', 'TimeTrack ') # Log mail configuration (without password) logger.info(f"Mail server: {app.config['MAIL_SERVER']}") logger.info(f"Mail port: {app.config['MAIL_PORT']}") logger.info(f"Mail use TLS: {app.config['MAIL_USE_TLS']}") logger.info(f"Mail username: {app.config['MAIL_USERNAME']}") logger.info(f"Mail default sender: {app.config['MAIL_DEFAULT_SENDER']}") mail = Mail(app) # Initialize the database with the app db.init_app(app) # Integrated migration and initialization function def run_migrations(): """Run all database migrations and initialize system settings.""" import sqlite3 # Determine database path based on environment db_path = '/data/timetrack.db' if os.path.exists('/data') else 'timetrack.db' # Check if database exists if not os.path.exists(db_path): print("Database doesn't exist. Creating new database.") with app.app_context(): db.create_all() init_system_settings() return print("Running database migrations...") # Connect to the database for raw SQL operations conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # Check if time_entry table exists first cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='time_entry'") if not cursor.fetchone(): print("time_entry table doesn't exist. Creating all tables...") with app.app_context(): db.create_all() init_system_settings() conn.commit() conn.close() return # Migrate time_entry table cursor.execute("PRAGMA table_info(time_entry)") time_entry_columns = [column[1] for column in cursor.fetchall()] migrations = [ ('is_paused', "ALTER TABLE time_entry ADD COLUMN is_paused BOOLEAN DEFAULT 0"), ('pause_start_time', "ALTER TABLE time_entry ADD COLUMN pause_start_time TIMESTAMP"), ('total_break_duration', "ALTER TABLE time_entry ADD COLUMN total_break_duration INTEGER DEFAULT 0"), ('user_id', "ALTER TABLE time_entry ADD COLUMN user_id INTEGER"), ('project_id', "ALTER TABLE time_entry ADD COLUMN project_id INTEGER"), ('notes', "ALTER TABLE time_entry ADD COLUMN notes TEXT") ] for column_name, sql_command in migrations: if column_name not in time_entry_columns: print(f"Adding {column_name} column to time_entry...") cursor.execute(sql_command) # Create work_config table if it doesn't exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='work_config'") if not cursor.fetchone(): print("Creating work_config table...") cursor.execute(""" CREATE TABLE work_config ( id INTEGER PRIMARY KEY AUTOINCREMENT, work_hours_per_day FLOAT DEFAULT 8.0, mandatory_break_minutes INTEGER DEFAULT 30, break_threshold_hours FLOAT DEFAULT 6.0, additional_break_minutes INTEGER DEFAULT 15, additional_break_threshold_hours FLOAT DEFAULT 9.0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_id INTEGER ) """) cursor.execute(""" INSERT INTO work_config (work_hours_per_day, mandatory_break_minutes, break_threshold_hours) VALUES (8.0, 30, 6.0) """) else: # Check and add missing columns to work_config cursor.execute("PRAGMA table_info(work_config)") work_config_columns = [column[1] for column in cursor.fetchall()] work_config_migrations = [ ('additional_break_minutes', "ALTER TABLE work_config ADD COLUMN additional_break_minutes INTEGER DEFAULT 15"), ('additional_break_threshold_hours', "ALTER TABLE work_config ADD COLUMN additional_break_threshold_hours FLOAT DEFAULT 9.0"), ('user_id', "ALTER TABLE work_config ADD COLUMN user_id INTEGER") ] for column_name, sql_command in work_config_migrations: if column_name not in work_config_columns: print(f"Adding {column_name} column to work_config...") cursor.execute(sql_command) # Migrate user table cursor.execute("PRAGMA table_info(user)") user_columns = [column[1] for column in cursor.fetchall()] user_migrations = [ ('is_verified', "ALTER TABLE user ADD COLUMN is_verified BOOLEAN DEFAULT 0"), ('verification_token', "ALTER TABLE user ADD COLUMN verification_token VARCHAR(100)"), ('token_expiry', "ALTER TABLE user ADD COLUMN token_expiry TIMESTAMP"), ('is_blocked', "ALTER TABLE user ADD COLUMN is_blocked BOOLEAN DEFAULT 0"), ('role', "ALTER TABLE user ADD COLUMN role VARCHAR(50) DEFAULT 'Team Member'"), ('team_id', "ALTER TABLE user ADD COLUMN team_id INTEGER"), ('two_factor_enabled', "ALTER TABLE user ADD COLUMN two_factor_enabled BOOLEAN DEFAULT 0"), ('two_factor_secret', "ALTER TABLE user ADD COLUMN two_factor_secret VARCHAR(32)") ] for column_name, sql_command in user_migrations: if column_name not in user_columns: print(f"Adding {column_name} column to user...") cursor.execute(sql_command) # Remove is_admin column if it exists (migration to role-based system) if 'is_admin' in user_columns: print("Migrating from is_admin to role-based system...") # First ensure all users have roles set based on is_admin cursor.execute("UPDATE user SET role = 'Administrator' WHERE is_admin = 1 AND (role IS NULL OR role = '')") cursor.execute("UPDATE user SET role = 'Team Member' WHERE is_admin = 0 AND (role IS NULL OR role = '')") # Drop the is_admin column (SQLite requires table recreation) print("Removing is_admin column...") cursor.execute("PRAGMA foreign_keys=off") # Create new table without is_admin column cursor.execute(""" CREATE TABLE user_new ( id INTEGER PRIMARY KEY, username VARCHAR(80) UNIQUE NOT NULL, email VARCHAR(120) UNIQUE NOT NULL, password_hash VARCHAR(128), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_verified BOOLEAN DEFAULT 0, verification_token VARCHAR(100), token_expiry TIMESTAMP, is_blocked BOOLEAN DEFAULT 0, role VARCHAR(50) DEFAULT 'Team Member', team_id INTEGER, two_factor_enabled BOOLEAN DEFAULT 0, two_factor_secret VARCHAR(32) ) """) # Copy data from old table to new table (excluding is_admin) cursor.execute(""" INSERT INTO user_new (id, username, email, password_hash, created_at, is_verified, verification_token, token_expiry, is_blocked, role, team_id, two_factor_enabled, two_factor_secret) SELECT id, username, email, password_hash, created_at, is_verified, verification_token, token_expiry, is_blocked, role, team_id, two_factor_enabled, two_factor_secret FROM user """) # Drop old table and rename new table cursor.execute("DROP TABLE user") cursor.execute("ALTER TABLE user_new RENAME TO user") cursor.execute("PRAGMA foreign_keys=on") print("Successfully removed is_admin column") # Create team table if it doesn't exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='team'") if not cursor.fetchone(): print("Creating team table...") cursor.execute(""" CREATE TABLE team ( id INTEGER PRIMARY KEY AUTOINCREMENT, name VARCHAR(100) UNIQUE NOT NULL, description VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create system_settings table if it doesn't exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='system_settings'") if not cursor.fetchone(): print("Creating system_settings table...") cursor.execute(""" CREATE TABLE system_settings ( id INTEGER PRIMARY KEY AUTOINCREMENT, key VARCHAR(50) UNIQUE NOT NULL, value VARCHAR(255) NOT NULL, description VARCHAR(255), updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create project table if it doesn't exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='project'") if not cursor.fetchone(): print("Creating project table...") cursor.execute(""" CREATE TABLE project ( id INTEGER PRIMARY KEY AUTOINCREMENT, name VARCHAR(100) NOT NULL, description TEXT, code VARCHAR(20) NOT NULL, is_active BOOLEAN DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_by_id INTEGER NOT NULL, team_id INTEGER, start_date DATE, end_date DATE, company_id INTEGER, FOREIGN KEY (created_by_id) REFERENCES user (id), FOREIGN KEY (team_id) REFERENCES team (id), FOREIGN KEY (company_id) REFERENCES company (id) ) """) # Create company table if it doesn't exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'") if not cursor.fetchone(): print("Creating company table...") cursor.execute(""" CREATE TABLE company ( id INTEGER PRIMARY KEY AUTOINCREMENT, name VARCHAR(100) UNIQUE NOT NULL, slug VARCHAR(50) UNIQUE NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT 1, max_users INTEGER DEFAULT 100 ) """) # Commit all schema changes conn.commit() except Exception as e: print(f"Error during database migration: {e}") conn.rollback() raise finally: conn.close() # Now use SQLAlchemy for data migrations db.create_all() # This will create any remaining tables defined in models # Initialize system settings init_system_settings() # Handle company migration and admin user setup migrate_to_company_model() migrate_data() print("Database migrations completed successfully!") def migrate_to_company_model(): """Migrate existing data to support company model""" import sqlite3 # Determine database path based on environment db_path = '/data/timetrack.db' if os.path.exists('/data') else 'timetrack.db' # Connect to the database for raw SQL operations conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # Check if company_id column exists in user table cursor.execute("PRAGMA table_info(user)") user_columns = [column[1] for column in cursor.fetchall()] if 'company_id' not in user_columns: print("Migrating to company model...") # Add company_id columns to existing tables tables_to_migrate = [ ('user', 'ALTER TABLE user ADD COLUMN company_id INTEGER'), ('team', 'ALTER TABLE team ADD COLUMN company_id INTEGER'), ('project', 'ALTER TABLE project ADD COLUMN company_id INTEGER') ] for table_name, sql_command in tables_to_migrate: cursor.execute(f"PRAGMA table_info({table_name})") columns = [column[1] for column in cursor.fetchall()] if 'company_id' not in columns: print(f"Adding company_id column to {table_name}...") cursor.execute(sql_command) # Check if there are existing users but no companies cursor.execute("SELECT COUNT(*) FROM user") user_count = cursor.fetchone()[0] cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'") company_table_exists = cursor.fetchone() if user_count > 0 and company_table_exists: cursor.execute("SELECT COUNT(*) FROM company") company_count = cursor.fetchone()[0] if company_count == 0: print("Creating default company for existing data...") # Create default company cursor.execute(""" INSERT INTO company (name, slug, description, is_active, max_users) VALUES ('Default Organization', 'default', 'Migrated from single-tenant installation', 1, 1000) """) # Get the company ID cursor.execute("SELECT last_insert_rowid()") company_id = cursor.fetchone()[0] # Update all existing records to use the default company cursor.execute(f"UPDATE user SET company_id = {company_id} WHERE company_id IS NULL") cursor.execute(f"UPDATE team SET company_id = {company_id} WHERE company_id IS NULL") cursor.execute(f"UPDATE project SET company_id = {company_id} WHERE company_id IS NULL") print(f"Assigned {user_count} existing users to default company") conn.commit() except Exception as e: print(f"Error during company migration: {e}") conn.rollback() raise finally: conn.close() def init_system_settings(): """Initialize system settings with default values if they don't exist""" if not SystemSettings.query.filter_by(key='registration_enabled').first(): print("Adding registration_enabled system setting...") reg_setting = SystemSettings( key='registration_enabled', value='true', description='Controls whether new user registration is allowed' ) db.session.add(reg_setting) db.session.commit() if not SystemSettings.query.filter_by(key='email_verification_required').first(): print("Adding email_verification_required system setting...") email_setting = SystemSettings( key='email_verification_required', value='true', description='Controls whether email verification is required for new user accounts' ) db.session.add(email_setting) db.session.commit() def migrate_data(): """Handle data migrations and setup""" # Only create default admin if no companies exist yet if Company.query.count() == 0: print("No companies exist, skipping admin user creation. Use company setup instead.") return # Check if admin user exists in the first company default_company = Company.query.first() if default_company: admin = User.query.filter_by(username='admin', company_id=default_company.id).first() if not admin: # Create admin user for the default company admin = User( username='admin', email='admin@timetrack.local', company_id=default_company.id, is_verified=True, role=Role.ADMIN, two_factor_enabled=False ) admin.set_password('admin') db.session.add(admin) db.session.commit() print("Created admin user with username 'admin' and password 'admin'") print("IMPORTANT: Change the admin password after first login!") else: # Update existing admin user with new fields admin.is_verified = True if not admin.role: admin.role = Role.ADMIN if admin.two_factor_enabled is None: admin.two_factor_enabled = False db.session.commit() # Update orphaned records orphan_entries = TimeEntry.query.filter_by(user_id=None).all() for entry in orphan_entries: entry.user_id = admin.id orphan_configs = WorkConfig.query.filter_by(user_id=None).all() for config in orphan_configs: config.user_id = admin.id # Update existing users users_to_update = User.query.filter_by(company_id=default_company.id).all() for user in users_to_update: if user.is_verified is None: user.is_verified = True if not user.role: user.role = Role.TEAM_MEMBER if user.two_factor_enabled is None: user.two_factor_enabled = False # Create sample projects if none exist for this company existing_projects = Project.query.filter_by(company_id=default_company.id).count() if existing_projects == 0: sample_projects = [ { 'name': 'General Administration', 'code': 'ADMIN001', 'description': 'General administrative tasks and meetings' }, { 'name': 'Development Project', 'code': 'DEV001', 'description': 'Software development and maintenance tasks' }, { 'name': 'Customer Support', 'code': 'SUPPORT001', 'description': 'Customer service and technical support activities' } ] for proj_data in sample_projects: project = Project( name=proj_data['name'], code=proj_data['code'], description=proj_data['description'], company_id=default_company.id, created_by_id=admin.id, is_active=True ) db.session.add(project) print(f"Created {len(sample_projects)} sample projects for {default_company.name}") db.session.commit() # Call this function during app initialization @app.before_first_request def initialize_app(): run_migrations() # Add this after initializing the app but before defining routes @app.context_processor def inject_globals(): """Make certain variables available to all templates.""" return { 'Role': Role, 'current_year': datetime.now().year } # Authentication decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) return f(*args, **kwargs) return decorated_function # Admin-only decorator def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None or g.user.role != Role.ADMIN: flash('You need administrator privileges to access this page.', 'error') return redirect(url_for('home')) return f(*args, **kwargs) return decorated_function def get_system_setting(key, default='false'): """Helper function to get system setting value""" setting = SystemSettings.query.filter_by(key=key).first() return setting.value if setting else default # Add this decorator function after your existing decorators def role_required(min_role): """ Decorator to restrict access based on user role. min_role should be a Role enum value (e.g., Role.TEAM_LEADER) """ def role_decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) # Admin always has access if g.user.role == Role.ADMIN: return f(*args, **kwargs) # Check role hierarchy role_hierarchy = { Role.TEAM_MEMBER: 1, Role.TEAM_LEADER: 2, Role.SUPERVISOR: 3, Role.ADMIN: 4 } if role_hierarchy.get(g.user.role, 0) < role_hierarchy.get(min_role, 0): flash('You do not have sufficient permissions to access this page.', 'error') return redirect(url_for('home')) return f(*args, **kwargs) return decorated_function return role_decorator def company_required(f): """ Decorator to ensure user has a valid company association and set company context. """ @wraps(f) def decorated_function(*args, **kwargs): if g.user is None: return redirect(url_for('login', next=request.url)) if g.user.company_id is None: flash('You must be associated with a company to access this page.', 'error') return redirect(url_for('setup_company')) # Set company context g.company = Company.query.get(g.user.company_id) if not g.company or not g.company.is_active: flash('Your company is not active. Please contact support.', 'error') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function @app.before_request def load_logged_in_user(): user_id = session.get('user_id') if user_id is None: g.user = None g.company = None else: g.user = User.query.get(user_id) if g.user: # Set company context if g.user.company_id: g.company = Company.query.get(g.user.company_id) else: g.company = None # Check if user is verified if not g.user.is_verified and request.endpoint not in ['verify_email', 'static', 'logout', 'setup_company']: # Allow unverified users to access only verification and static resources if request.endpoint not in ['login', 'register']: flash('Please verify your email address before accessing this page.', 'warning') session.clear() return redirect(url_for('login')) else: g.company = None @app.route('/') def home(): if g.user: # Get active entry (no departure time) active_entry = TimeEntry.query.filter_by( user_id=g.user.id, departure_time=None ).first() # Get today's completed entries for history today = datetime.now().date() history = TimeEntry.query.filter( TimeEntry.user_id == g.user.id, TimeEntry.departure_time.isnot(None), TimeEntry.arrival_time >= datetime.combine(today, time.min), TimeEntry.arrival_time <= datetime.combine(today, time.max) ).order_by(TimeEntry.arrival_time.desc()).all() # Get available projects for this user (company-scoped) available_projects = [] if g.user.company_id: all_projects = Project.query.filter_by( company_id=g.user.company_id, is_active=True ).all() for project in all_projects: if project.is_user_allowed(g.user): available_projects.append(project) return render_template('index.html', title='Home', active_entry=active_entry, history=history, available_projects=available_projects) else: return render_template('index.html', title='Home') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user: # Fix role if it's a string or None if isinstance(user.role, str) or user.role is None: # Map string role values to enum values role_mapping = { 'Team Member': Role.TEAM_MEMBER, 'TEAM_MEMBER': Role.TEAM_MEMBER, 'Team Leader': Role.TEAM_LEADER, 'TEAM_LEADER': Role.TEAM_LEADER, 'Supervisor': Role.SUPERVISOR, 'SUPERVISOR': Role.SUPERVISOR, 'Administrator': Role.ADMIN, 'ADMIN': Role.ADMIN } if isinstance(user.role, str): user.role = role_mapping.get(user.role, Role.TEAM_MEMBER) else: user.role = Role.ADMIN if user.role == Role.ADMIN else Role.TEAM_MEMBER db.session.commit() # Now proceed with password check if user.check_password(password): # Check if user is blocked if user.is_blocked: flash('Your account has been disabled. Please contact an administrator.', 'error') return render_template('login.html') # Check if 2FA is enabled if user.two_factor_enabled: # Store user ID for 2FA verification session['2fa_user_id'] = user.id return redirect(url_for('verify_2fa')) else: # Continue with normal login process session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value flash('Login successful!', 'success') return redirect(url_for('home')) flash('Invalid username or password', 'error') return render_template('login.html', title='Login') @app.route('/logout') def logout(): session.clear() flash('You have been logged out.', 'info') return redirect(url_for('login')) @app.route('/register', methods=['GET', 'POST']) def register(): # Check if registration is enabled registration_enabled = get_system_setting('registration_enabled', 'true') == 'true' if not registration_enabled: flash('Registration is currently disabled by the administrator.', 'error') return redirect(url_for('login')) # Check if companies exist, if not redirect to company setup if Company.query.count() == 0: flash('No companies exist yet. Please set up your company first.', 'info') return redirect(url_for('setup_company')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') confirm_password = request.form.get('confirm_password') company_code = request.form.get('company_code', '').strip() # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif not password: error = 'Password is required' elif password != confirm_password: error = 'Passwords do not match' elif not company_code: error = 'Company code is required' # Find company by code company = None if company_code: company = Company.query.filter_by(slug=company_code.lower()).first() if not company: error = 'Invalid company code' # Check for existing users within the company if company and not error: if User.query.filter_by(username=username, company_id=company.id).first(): error = 'Username already exists in this company' elif User.query.filter_by(email=email, company_id=company.id).first(): error = 'Email already registered in this company' if error is None and company: try: # Check if this is the first user account in this company is_first_user_in_company = User.query.filter_by(company_id=company.id).count() == 0 # Check if email verification is required email_verification_required = get_system_setting('email_verification_required', 'true') == 'true' new_user = User( username=username, email=email, company_id=company.id, is_verified=False ) new_user.set_password(password) # Make first user in company an admin with full privileges if is_first_user_in_company: new_user.role = Role.ADMIN new_user.is_verified = True # Auto-verify first user in company elif not email_verification_required: # If email verification is disabled, auto-verify new users new_user.is_verified = True # Generate verification token (even if not needed, for consistency) token = new_user.generate_verification_token() db.session.add(new_user) db.session.commit() if is_first_user_in_company: # First user in company gets admin privileges and is auto-verified logger.info(f"First user account created in company {company.name}: {username} with admin privileges") flash(f'Welcome! You are the first user in {company.name} and have been granted administrator privileges. You can now log in.', 'success') elif not email_verification_required: # Email verification is disabled, user can log in immediately logger.info(f"User account created with auto-verification in company {company.name}: {username}") flash('Registration successful! You can now log in.', 'success') else: # Send verification email for regular users when verification is required verification_url = url_for('verify_email', token=token, _external=True) msg = Message('Verify your TimeTrack account', recipients=[email]) msg.body = f'''Hello {username}, Thank you for registering with TimeTrack. To complete your registration, please click on the link below: {verification_url} This link will expire in 24 hours. If you did not register for TimeTrack, please ignore this email. Best regards, The TimeTrack Team ''' mail.send(msg) logger.info(f"Verification email sent to {email}") flash('Registration initiated! Please check your email to verify your account.', 'success') return redirect(url_for('login')) except Exception as e: db.session.rollback() logger.error(f"Error during registration: {str(e)}") error = f"An error occurred during registration: {str(e)}" flash(error, 'error') return render_template('register.html', title='Register') @app.route('/setup_company', methods=['GET', 'POST']) def setup_company(): """Company setup route for creating new companies with admin users""" existing_companies = Company.query.count() # Determine access level is_initial_setup = existing_companies == 0 is_super_admin = g.user and g.user.role == Role.ADMIN and existing_companies > 0 is_authorized = is_initial_setup or is_super_admin # Check authorization for non-initial setups if not is_initial_setup and not is_super_admin: flash('You do not have permission to create new companies.', 'error') return redirect(url_for('home') if g.user else url_for('login')) if request.method == 'POST': company_name = request.form.get('company_name') company_description = request.form.get('company_description', '') admin_username = request.form.get('admin_username') admin_email = request.form.get('admin_email') admin_password = request.form.get('admin_password') confirm_password = request.form.get('confirm_password') # Validate input error = None if not company_name: error = 'Company name is required' elif not admin_username: error = 'Admin username is required' elif not admin_email: error = 'Admin email is required' elif not admin_password: error = 'Admin password is required' elif admin_password != confirm_password: error = 'Passwords do not match' elif len(admin_password) < 6: error = 'Password must be at least 6 characters long' if error is None: try: # Generate company slug import re slug = re.sub(r'[^\w\s-]', '', company_name.lower()) slug = re.sub(r'[-\s]+', '-', slug).strip('-') # Ensure slug uniqueness base_slug = slug counter = 1 while Company.query.filter_by(slug=slug).first(): slug = f"{base_slug}-{counter}" counter += 1 # Create company company = Company( name=company_name, slug=slug, description=company_description, is_active=True ) db.session.add(company) db.session.flush() # Get company.id without committing # Check if username/email already exists in this company context existing_user_by_username = User.query.filter_by( username=admin_username, company_id=company.id ).first() existing_user_by_email = User.query.filter_by( email=admin_email, company_id=company.id ).first() if existing_user_by_username: error = 'Username already exists in this company' elif existing_user_by_email: error = 'Email already registered in this company' if error is None: # Create admin user admin_user = User( username=admin_username, email=admin_email, company_id=company.id, role=Role.ADMIN, is_verified=True # Auto-verify company admin ) admin_user.set_password(admin_password) db.session.add(admin_user) db.session.commit() if is_initial_setup: # Auto-login the admin user for initial setup session['user_id'] = admin_user.id session['username'] = admin_user.username session['role'] = admin_user.role.value flash(f'Company "{company_name}" created successfully! You are now logged in as the administrator.', 'success') return redirect(url_for('home')) else: # For super admin creating additional companies, don't auto-login flash(f'Company "{company_name}" created successfully! Admin user "{admin_username}" has been created with the company code "{slug}".', 'success') return redirect(url_for('admin_company') if g.user else url_for('login')) else: db.session.rollback() except Exception as e: db.session.rollback() logger.error(f"Error during company setup: {str(e)}") error = f"An error occurred during setup: {str(e)}" if error: flash(error, 'error') return render_template('setup_company.html', title='Company Setup', existing_companies=existing_companies, is_initial_setup=is_initial_setup, is_super_admin=is_super_admin) @app.route('/verify_email/') def verify_email(token): user = User.query.filter_by(verification_token=token).first() if not user: flash('Invalid or expired verification link.', 'error') return redirect(url_for('login')) if user.verify_token(token): db.session.commit() flash('Email verified successfully! You can now log in.', 'success') else: flash('Invalid or expired verification link.', 'error') return redirect(url_for('login')) @app.route('/dashboard') @role_required(Role.TEAM_LEADER) def dashboard(): # Get dashboard data based on user role dashboard_data = {} if g.user.role == Role.ADMIN and g.user.company_id: # Admin sees everything within their company dashboard_data.update({ 'total_users': User.query.filter_by(company_id=g.user.company_id).count(), 'total_teams': Team.query.filter_by(company_id=g.user.company_id).count(), 'blocked_users': User.query.filter_by(company_id=g.user.company_id, is_blocked=True).count(), 'unverified_users': User.query.filter_by(company_id=g.user.company_id, is_verified=False).count(), 'recent_registrations': User.query.filter_by(company_id=g.user.company_id).order_by(User.id.desc()).limit(5).all() }) if g.user.role in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]: # Team leaders and supervisors see team-related data if g.user.team_id or g.user.role == Role.ADMIN: if g.user.role == Role.ADMIN and g.user.company_id: # Admin can see all teams in their company teams = Team.query.filter_by(company_id=g.user.company_id).all() team_members = User.query.filter( User.team_id.isnot(None), User.company_id == g.user.company_id ).all() else: # Team leaders/supervisors see their own team teams = [Team.query.get(g.user.team_id)] if g.user.team_id else [] team_members = User.query.filter_by( team_id=g.user.team_id, company_id=g.user.company_id ).all() if g.user.team_id else [] dashboard_data.update({ 'teams': teams, 'team_members': team_members, 'team_member_count': len(team_members) }) # Get recent time entries for the user's oversight if g.user.role == Role.ADMIN: # Admin sees all recent entries recent_entries = TimeEntry.query.order_by(TimeEntry.arrival_time.desc()).limit(10).all() elif g.user.team_id: # Team leaders see their team's entries team_user_ids = [user.id for user in User.query.filter_by(team_id=g.user.team_id).all()] recent_entries = TimeEntry.query.filter(TimeEntry.user_id.in_(team_user_ids)).order_by(TimeEntry.arrival_time.desc()).limit(10).all() else: recent_entries = [] dashboard_data['recent_entries'] = recent_entries return render_template('dashboard.html', title='Dashboard', **dashboard_data) # Redirect old admin dashboard URL to new dashboard @app.route('/admin/users') @admin_required @company_required def admin_users(): users = User.query.filter_by(company_id=g.user.company_id).all() return render_template('admin_users.html', title='User Management', users=users) @app.route('/admin/users/create', methods=['GET', 'POST']) @admin_required @company_required def create_user(): if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') auto_verify = 'auto_verify' in request.form # Get role and team role_name = request.form.get('role') team_id = request.form.get('team_id') # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif not password: error = 'Password is required' elif User.query.filter_by(username=username, company_id=g.user.company_id).first(): error = 'Username already exists in your company' elif User.query.filter_by(email=email, company_id=g.user.company_id).first(): error = 'Email already registered in your company' if error is None: # Convert role string to enum try: role = Role[role_name] if role_name else Role.TEAM_MEMBER except KeyError: role = Role.TEAM_MEMBER # Create new user with role and team new_user = User( username=username, email=email, company_id=g.user.company_id, is_verified=auto_verify, role=role, team_id=team_id if team_id else None ) new_user.set_password(password) if not auto_verify: # Generate verification token and send email token = new_user.generate_verification_token() verification_url = url_for('verify_email', token=token, _external=True) msg = Message('Verify your TimeTrack account', recipients=[email]) msg.body = f'''Hello {username}, An administrator has created an account for you on TimeTrack. To activate your account, please click on the link below: {verification_url} This link will expire in 24 hours. Best regards, The TimeTrack Team ''' mail.send(msg) db.session.add(new_user) db.session.commit() if auto_verify: flash(f'User {username} created and automatically verified!', 'success') else: flash(f'User {username} created! Verification email sent.', 'success') return redirect(url_for('admin_users')) flash(error, 'error') # Get all teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).all() roles = [role for role in Role] return render_template('create_user.html', title='Create User', teams=teams, roles=roles) @app.route('/admin/users/edit/', methods=['GET', 'POST']) @admin_required @company_required def edit_user(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') # Get role and team role_name = request.form.get('role') team_id = request.form.get('team_id') # Validate input error = None if not username: error = 'Username is required' elif not email: error = 'Email is required' elif username != user.username and User.query.filter_by(username=username, company_id=g.user.company_id).first(): error = 'Username already exists in your company' elif email != user.email and User.query.filter_by(email=email, company_id=g.user.company_id).first(): error = 'Email already registered in your company' if error is None: user.username = username user.email = email # Convert role string to enum try: user.role = Role[role_name] if role_name else Role.TEAM_MEMBER except KeyError: user.role = Role.TEAM_MEMBER user.team_id = team_id if team_id else None if password: user.set_password(password) db.session.commit() flash(f'User {username} updated successfully!', 'success') return redirect(url_for('admin_users')) flash(error, 'error') # Get all teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).all() roles = [role for role in Role] return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles) @app.route('/admin/users/delete/', methods=['POST']) @admin_required @company_required def delete_user(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() # Prevent deleting yourself if user.id == session.get('user_id'): flash('You cannot delete your own account', 'error') return redirect(url_for('admin_users')) username = user.username db.session.delete(user) db.session.commit() flash(f'User {username} deleted successfully', 'success') return redirect(url_for('admin_users')) @app.route('/profile', methods=['GET', 'POST']) @login_required def profile(): user = User.query.get(session['user_id']) if request.method == 'POST': email = request.form.get('email') current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') # Validate input error = None if not email: error = 'Email is required' elif email != user.email and User.query.filter_by(email=email).first(): error = 'Email already registered' # Password change validation if new_password: if not current_password: error = 'Current password is required to set a new password' elif not user.check_password(current_password): error = 'Current password is incorrect' elif new_password != confirm_password: error = 'New passwords do not match' if error is None: user.email = email if new_password: user.set_password(new_password) db.session.commit() flash('Profile updated successfully!', 'success') return redirect(url_for('profile')) flash(error, 'error') return render_template('profile.html', title='My Profile', user=user) @app.route('/2fa/setup', methods=['GET', 'POST']) @login_required def setup_2fa(): if request.method == 'POST': # Verify the TOTP code before enabling 2FA totp_code = request.form.get('totp_code') if not totp_code: flash('Please enter the verification code from your authenticator app.', 'error') return redirect(url_for('setup_2fa')) try: if g.user.verify_2fa_token(totp_code, allow_setup=True): g.user.two_factor_enabled = True db.session.commit() flash('Two-factor authentication has been successfully enabled!', 'success') return redirect(url_for('profile')) else: flash('Invalid verification code. Please make sure your device time is synchronized and try again.', 'error') return redirect(url_for('setup_2fa')) except Exception as e: logger.error(f"2FA setup error: {str(e)}") flash('An error occurred during 2FA setup. Please try again.', 'error') return redirect(url_for('setup_2fa')) # GET request - show setup page if g.user.two_factor_enabled: flash('Two-factor authentication is already enabled.', 'info') return redirect(url_for('profile')) # Generate secret if not exists if not g.user.two_factor_secret: g.user.generate_2fa_secret() db.session.commit() # Generate QR code import qrcode import io import base64 qr_uri = g.user.get_2fa_uri() qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(qr_uri) qr.make(fit=True) # Create QR code image qr_img = qr.make_image(fill_color="black", back_color="white") img_buffer = io.BytesIO() qr_img.save(img_buffer, format='PNG') img_buffer.seek(0) qr_code_b64 = base64.b64encode(img_buffer.getvalue()).decode() return render_template('setup_2fa.html', title='Setup Two-Factor Authentication', secret=g.user.two_factor_secret, qr_code=qr_code_b64) @app.route('/2fa/disable', methods=['POST']) @login_required def disable_2fa(): password = request.form.get('password') if not password or not g.user.check_password(password): flash('Please enter your correct password to disable 2FA.', 'error') return redirect(url_for('profile')) g.user.two_factor_enabled = False g.user.two_factor_secret = None db.session.commit() flash('Two-factor authentication has been disabled.', 'success') return redirect(url_for('profile')) @app.route('/2fa/verify', methods=['GET', 'POST']) def verify_2fa(): # Check if user is in 2FA verification state user_id = session.get('2fa_user_id') if not user_id: return redirect(url_for('login')) user = User.query.get(user_id) if not user or not user.two_factor_enabled: session.pop('2fa_user_id', None) return redirect(url_for('login')) if request.method == 'POST': totp_code = request.form.get('totp_code') if user.verify_2fa_token(totp_code): # Complete login process session.pop('2fa_user_id', None) session['user_id'] = user.id session['username'] = user.username session['role'] = user.role.value flash('Login successful!', 'success') return redirect(url_for('home')) else: flash('Invalid verification code. Please try again.', 'error') return render_template('verify_2fa.html', title='Two-Factor Authentication') @app.route('/about') @login_required def about(): return render_template('about.html', title='About') @app.route('/contact', methods=['GET', 'POST']) @login_required def contact(): # redacted return render_template('contact.html', title='Contact') # We can keep this route as a redirect to home for backward compatibility @app.route('/timetrack') @login_required def timetrack(): return redirect(url_for('home')) @app.route('/api/arrive', methods=['POST']) @login_required def arrive(): # Get project and notes from request project_id = request.json.get('project_id') if request.json else None notes = request.json.get('notes') if request.json else None # Validate project access if project is specified if project_id: project = Project.query.get(project_id) if not project or not project.is_user_allowed(g.user): return jsonify({'error': 'Invalid or unauthorized project'}), 403 # Create a new time entry with arrival time for the current user new_entry = TimeEntry( user_id=g.user.id, arrival_time=datetime.now(), project_id=int(project_id) if project_id else None, notes=notes ) db.session.add(new_entry) db.session.commit() return jsonify({ 'id': new_entry.id, 'arrival_time': new_entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'), 'project': { 'id': new_entry.project.id, 'code': new_entry.project.code, 'name': new_entry.project.name } if new_entry.project else None, 'notes': new_entry.notes }) @app.route('/api/leave/', methods=['POST']) @login_required def leave(entry_id): # Find the time entry for the current user entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() # Set the departure time departure_time = datetime.now() entry.departure_time = departure_time # If currently paused, add the final break duration if entry.is_paused and entry.pause_start_time: final_break_duration = int((departure_time - entry.pause_start_time).total_seconds()) entry.total_break_duration += final_break_duration entry.is_paused = False entry.pause_start_time = None # Calculate work duration considering breaks entry.duration, effective_break = calculate_work_duration( entry.arrival_time, departure_time, entry.total_break_duration ) db.session.commit() return jsonify({ 'id': entry.id, 'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'), 'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S'), 'duration': entry.duration, 'total_break_duration': entry.total_break_duration, 'effective_break_duration': effective_break }) # Add this new route to handle pausing/resuming @app.route('/api/toggle-pause/', methods=['POST']) @login_required def toggle_pause(entry_id): # Find the time entry for the current user entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() now = datetime.now() if entry.is_paused: # Resuming work - calculate break duration break_duration = int((now - entry.pause_start_time).total_seconds()) entry.total_break_duration += break_duration entry.is_paused = False entry.pause_start_time = None message = "Work resumed" else: # Pausing work entry.is_paused = True entry.pause_start_time = now message = "Work paused" db.session.commit() return jsonify({ 'id': entry.id, 'is_paused': entry.is_paused, 'total_break_duration': entry.total_break_duration, 'message': message }) @app.route('/config', methods=['GET', 'POST']) @login_required def config(): # Get current configuration or create default if none exists config = WorkConfig.query.order_by(WorkConfig.id.desc()).first() if not config: config = WorkConfig() db.session.add(config) db.session.commit() if request.method == 'POST': try: # Update configuration with form data config.work_hours_per_day = float(request.form.get('work_hours_per_day', 8.0)) config.mandatory_break_minutes = int(request.form.get('mandatory_break_minutes', 30)) config.break_threshold_hours = float(request.form.get('break_threshold_hours', 6.0)) config.additional_break_minutes = int(request.form.get('additional_break_minutes', 15)) config.additional_break_threshold_hours = float(request.form.get('additional_break_threshold_hours', 9.0)) db.session.commit() flash('Configuration updated successfully!', 'success') return redirect(url_for('config')) except ValueError: flash('Please enter valid numbers for all fields', 'error') return render_template('config.html', title='Configuration', config=config) @app.route('/api/delete/', methods=['DELETE']) @login_required def delete_entry(entry_id): entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() db.session.delete(entry) db.session.commit() return jsonify({'success': True, 'message': 'Entry deleted successfully'}) @app.route('/api/update/', methods=['PUT']) @login_required def update_entry(entry_id): entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() data = request.json if 'arrival_time' in data: try: entry.arrival_time = datetime.strptime(data['arrival_time'], '%Y-%m-%d %H:%M:%S') except ValueError: return jsonify({'success': False, 'message': 'Invalid arrival time format'}), 400 if 'departure_time' in data and data['departure_time']: try: entry.departure_time = datetime.strptime(data['departure_time'], '%Y-%m-%d %H:%M:%S') # Recalculate duration if both times are present if entry.arrival_time and entry.departure_time: # Calculate work duration considering breaks entry.duration, _ = calculate_work_duration( entry.arrival_time, entry.departure_time, entry.total_break_duration ) except ValueError: return jsonify({'success': False, 'message': 'Invalid departure time format'}), 400 db.session.commit() return jsonify({ 'success': True, 'message': 'Entry updated successfully', 'entry': { 'id': entry.id, 'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'), 'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S') if entry.departure_time else None, 'duration': entry.duration, 'is_paused': entry.is_paused, 'total_break_duration': entry.total_break_duration } }) @app.route('/team/hours') @login_required @role_required(Role.TEAM_LEADER) # Only team leaders and above can access @company_required def team_hours(): # Get the current user's team team = Team.query.get(g.user.team_id) if not team: flash('You are not assigned to any team.', 'error') return redirect(url_for('home')) # Get date range from query parameters or use current week as default today = datetime.now().date() start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6) start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d')) end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d')) try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: flash('Invalid date format. Using current week instead.', 'warning') start_date = start_of_week end_date = end_of_week # Generate a list of dates in the range for the table header date_range = [] current_date = start_date while current_date <= end_date: date_range.append(current_date) current_date += timedelta(days=1) return render_template( 'team_hours.html', title=f'Team Hours', start_date=start_date, end_date=end_date, date_range=date_range ) @app.route('/history') @login_required def history(): # Get project filter from query parameters project_filter = request.args.get('project_id') # Base query for user's time entries query = TimeEntry.query.filter_by(user_id=g.user.id) # Apply project filter if specified if project_filter: if project_filter == 'none': # Show entries with no project assigned query = query.filter(TimeEntry.project_id.is_(None)) else: # Show entries for specific project try: project_id = int(project_filter) query = query.filter_by(project_id=project_id) except ValueError: # Invalid project ID, ignore filter pass # Get filtered entries ordered by most recent first all_entries = query.order_by(TimeEntry.arrival_time.desc()).all() # Get available projects for the filter dropdown (company-scoped) available_projects = [] if g.user.company_id: all_projects = Project.query.filter_by(company_id=g.user.company_id, is_active=True).all() for project in all_projects: if project.is_user_allowed(g.user): available_projects.append(project) return render_template('history.html', title='Time Entry History', entries=all_entries, available_projects=available_projects) def calculate_work_duration(arrival_time, departure_time, total_break_duration): """ Calculate work duration considering both configured and actual break times. Args: arrival_time: Datetime of arrival departure_time: Datetime of departure total_break_duration: Actual logged break duration in seconds Returns: tuple: (work_duration_in_seconds, effective_break_duration_in_seconds) """ # Calculate raw duration raw_duration = (departure_time - arrival_time).total_seconds() # Get work configuration for break rules config = WorkConfig.query.order_by(WorkConfig.id.desc()).first() if not config: config = WorkConfig() # Use default values if no config exists # Ensure configuration values are not None, use defaults if they are break_threshold_hours = config.break_threshold_hours if config.break_threshold_hours is not None else 6.0 mandatory_break_minutes = config.mandatory_break_minutes if config.mandatory_break_minutes is not None else 30 additional_break_threshold_hours = config.additional_break_threshold_hours if config.additional_break_threshold_hours is not None else 9.0 additional_break_minutes = config.additional_break_minutes if config.additional_break_minutes is not None else 15 # Calculate mandatory breaks based on work duration work_hours = raw_duration / 3600 # Convert seconds to hours configured_break_seconds = 0 # Apply primary break if work duration exceeds threshold if work_hours > break_threshold_hours: configured_break_seconds += mandatory_break_minutes * 60 # Apply additional break if work duration exceeds additional threshold if work_hours > additional_break_threshold_hours: configured_break_seconds += additional_break_minutes * 60 # Use the greater of configured breaks or actual logged breaks effective_break_duration = max(configured_break_seconds, total_break_duration) # Calculate final work duration work_duration = int(raw_duration - effective_break_duration) return work_duration, effective_break_duration @app.route('/api/resume/', methods=['POST']) @login_required def resume_entry(entry_id): # Find the entry to resume for the current user entry_to_resume = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404() # Check if there's already an active entry active_entry = TimeEntry.query.filter_by(user_id=session['user_id'], departure_time=None).first() if active_entry: return jsonify({ 'success': False, 'message': 'Cannot resume this entry. Another session is already active.' }), 400 # Clear the departure time to make this entry active again entry_to_resume.departure_time = None # Reset pause state if it was paused entry_to_resume.is_paused = False entry_to_resume.pause_start_time = None db.session.commit() return jsonify({ 'success': True, 'message': 'Work resumed on existing entry', 'id': entry_to_resume.id, 'arrival_time': entry_to_resume.arrival_time.strftime('%Y-%m-%d %H:%M:%S'), 'total_break_duration': entry_to_resume.total_break_duration }) @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(500) def internal_server_error(e): return render_template('500.html'), 500 @app.route('/test') def test(): return "App is working!" @app.route('/admin/users/toggle-status/') @admin_required @company_required def toggle_user_status(user_id): user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404() # Prevent blocking yourself if user.id == session.get('user_id'): flash('You cannot block your own account', 'error') return redirect(url_for('admin_users')) # Toggle the blocked status user.is_blocked = not user.is_blocked db.session.commit() if user.is_blocked: flash(f'User {user.username} has been blocked', 'success') else: flash(f'User {user.username} has been unblocked', 'success') return redirect(url_for('admin_users')) # Add this route to manage system settings @app.route('/admin/settings', methods=['GET', 'POST']) @admin_required def admin_settings(): if request.method == 'POST': # Update registration setting registration_enabled = 'registration_enabled' in request.form reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first() if reg_setting: reg_setting.value = 'true' if registration_enabled else 'false' # Update email verification setting email_verification_required = 'email_verification_required' in request.form email_setting = SystemSettings.query.filter_by(key='email_verification_required').first() if email_setting: email_setting.value = 'true' if email_verification_required else 'false' db.session.commit() flash('System settings updated successfully!', 'success') # Get current settings settings = {} for setting in SystemSettings.query.all(): if setting.key == 'registration_enabled': settings['registration_enabled'] = setting.value == 'true' elif setting.key == 'email_verification_required': settings['email_verification_required'] = setting.value == 'true' return render_template('admin_settings.html', title='System Settings', settings=settings) # Company Management Routes @app.route('/admin/company') @admin_required @company_required def admin_company(): """View and manage company settings""" company = g.company # Get company statistics stats = { 'total_users': User.query.filter_by(company_id=company.id).count(), 'total_teams': Team.query.filter_by(company_id=company.id).count(), 'total_projects': Project.query.filter_by(company_id=company.id).count(), 'active_projects': Project.query.filter_by(company_id=company.id, is_active=True).count(), } return render_template('admin_company.html', title='Company Management', company=company, stats=stats) @app.route('/admin/company/edit', methods=['GET', 'POST']) @admin_required @company_required def edit_company(): """Edit company details""" company = g.company if request.method == 'POST': name = request.form.get('name') description = request.form.get('description', '') max_users = request.form.get('max_users') is_active = 'is_active' in request.form # Validate input error = None if not name: error = 'Company name is required' elif name != company.name and Company.query.filter_by(name=name).first(): error = 'Company name already exists' if max_users: try: max_users = int(max_users) if max_users < 1: error = 'Maximum users must be at least 1' except ValueError: error = 'Maximum users must be a valid number' else: max_users = None if error is None: company.name = name company.description = description company.max_users = max_users company.is_active = is_active db.session.commit() flash('Company details updated successfully!', 'success') return redirect(url_for('admin_company')) else: flash(error, 'error') return render_template('edit_company.html', title='Edit Company', company=company) @app.route('/admin/company/users') @admin_required @company_required def company_users(): """List all users in the company with detailed information""" users = User.query.filter_by(company_id=g.company.id).order_by(User.created_at.desc()).all() # Calculate user statistics user_stats = { 'total': len(users), 'verified': len([u for u in users if u.is_verified]), 'unverified': len([u for u in users if not u.is_verified]), 'blocked': len([u for u in users if u.is_blocked]), 'active': len([u for u in users if not u.is_blocked and u.is_verified]), 'admins': len([u for u in users if u.role == Role.ADMIN]), 'supervisors': len([u for u in users if u.role == Role.SUPERVISOR]), 'team_leaders': len([u for u in users if u.role == Role.TEAM_LEADER]), 'team_members': len([u for u in users if u.role == Role.TEAM_MEMBER]), } return render_template('company_users.html', title='Company Users', users=users, stats=user_stats, company=g.company) # Add these routes for team management @app.route('/admin/teams') @admin_required @company_required def admin_teams(): teams = Team.query.filter_by(company_id=g.user.company_id).all() return render_template('admin_teams.html', title='Team Management', teams=teams) @app.route('/admin/teams/create', methods=['GET', 'POST']) @admin_required @company_required def create_team(): if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: new_team = Team(name=name, description=description, company_id=g.user.company_id) db.session.add(new_team) db.session.commit() flash(f'Team "{name}" created successfully!', 'success') return redirect(url_for('admin_teams')) flash(error, 'error') return render_template('create_team.html', title='Create Team') @app.route('/admin/teams/edit/', methods=['GET', 'POST']) @admin_required @company_required def edit_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: team.name = name team.description = description db.session.commit() flash(f'Team "{name}" updated successfully!', 'success') return redirect(url_for('admin_teams')) flash(error, 'error') return render_template('edit_team.html', title='Edit Team', team=team) @app.route('/admin/teams/delete/', methods=['POST']) @admin_required @company_required def delete_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() # Check if team has members if team.users: flash('Cannot delete team with members. Remove all members first.', 'error') return redirect(url_for('admin_teams')) team_name = team.name db.session.delete(team) db.session.commit() flash(f'Team "{team_name}" deleted successfully!', 'success') return redirect(url_for('admin_teams')) @app.route('/admin/teams/', methods=['GET', 'POST']) @admin_required @company_required def manage_team(team_id): team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': action = request.form.get('action') if action == 'update_team': # Update team details name = request.form.get('name') description = request.form.get('description') # Validate input error = None if not name: error = 'Team name is required' elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first(): error = 'Team name already exists in your company' if error is None: team.name = name team.description = description db.session.commit() flash(f'Team "{name}" updated successfully!', 'success') else: flash(error, 'error') elif action == 'add_member': # Add user to team user_id = request.form.get('user_id') if user_id: user = User.query.get(user_id) if user: user.team_id = team.id db.session.commit() flash(f'User {user.username} added to team!', 'success') else: flash('User not found', 'error') else: flash('No user selected', 'error') elif action == 'remove_member': # Remove user from team user_id = request.form.get('user_id') if user_id: user = User.query.get(user_id) if user and user.team_id == team.id: user.team_id = None db.session.commit() flash(f'User {user.username} removed from team!', 'success') else: flash('User not found or not in this team', 'error') else: flash('No user selected', 'error') # Get team members team_members = User.query.filter_by(team_id=team.id).all() # Get users not in this team for the add member form (company-scoped) available_users = User.query.filter( User.company_id == g.user.company_id, (User.team_id != team.id) | (User.team_id == None) ).all() return render_template( 'manage_team.html', title=f'Manage Team: {team.name}', team=team, team_members=team_members, available_users=available_users ) # Project Management Routes @app.route('/admin/projects') @role_required(Role.SUPERVISOR) # Supervisors and Admins can manage projects @company_required def admin_projects(): projects = Project.query.filter_by(company_id=g.user.company_id).order_by(Project.created_at.desc()).all() return render_template('admin_projects.html', title='Project Management', projects=projects) @app.route('/admin/projects/create', methods=['GET', 'POST']) @role_required(Role.SUPERVISOR) @company_required def create_project(): if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') code = request.form.get('code') team_id = request.form.get('team_id') or None start_date_str = request.form.get('start_date') end_date_str = request.form.get('end_date') # Validate input error = None if not name: error = 'Project name is required' elif not code: error = 'Project code is required' elif Project.query.filter_by(code=code, company_id=g.user.company_id).first(): error = 'Project code already exists in your company' # Parse dates start_date = None end_date = None if start_date_str: try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid start date format' if end_date_str: try: end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid end date format' if start_date and end_date and start_date > end_date: error = 'Start date cannot be after end date' if error is None: project = Project( name=name, description=description, code=code.upper(), company_id=g.user.company_id, team_id=int(team_id) if team_id else None, start_date=start_date, end_date=end_date, created_by_id=g.user.id ) db.session.add(project) db.session.commit() flash(f'Project "{name}" created successfully!', 'success') return redirect(url_for('admin_projects')) else: flash(error, 'error') # Get available teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all() return render_template('create_project.html', title='Create Project', teams=teams) @app.route('/admin/projects/edit/', methods=['GET', 'POST']) @role_required(Role.SUPERVISOR) @company_required def edit_project(project_id): project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404() if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') code = request.form.get('code') team_id = request.form.get('team_id') or None is_active = request.form.get('is_active') == 'on' start_date_str = request.form.get('start_date') end_date_str = request.form.get('end_date') # Validate input error = None if not name: error = 'Project name is required' elif not code: error = 'Project code is required' elif code != project.code and Project.query.filter_by(code=code, company_id=g.user.company_id).first(): error = 'Project code already exists in your company' # Parse dates start_date = None end_date = None if start_date_str: try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid start date format' if end_date_str: try: end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: error = 'Invalid end date format' if start_date and end_date and start_date > end_date: error = 'Start date cannot be after end date' if error is None: project.name = name project.description = description project.code = code.upper() project.team_id = int(team_id) if team_id else None project.is_active = is_active project.start_date = start_date project.end_date = end_date db.session.commit() flash(f'Project "{name}" updated successfully!', 'success') return redirect(url_for('admin_projects')) else: flash(error, 'error') # Get available teams for the form (company-scoped) teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all() return render_template('edit_project.html', title='Edit Project', project=project, teams=teams) @app.route('/admin/projects/delete/', methods=['POST']) @role_required(Role.ADMIN) # Only admins can delete projects @company_required def delete_project(project_id): project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404() # Check if there are time entries associated with this project time_entries_count = TimeEntry.query.filter_by(project_id=project_id).count() if time_entries_count > 0: flash(f'Cannot delete project "{project.name}" - it has {time_entries_count} time entries associated with it. Deactivate the project instead.', 'error') else: project_name = project.name db.session.delete(project) db.session.commit() flash(f'Project "{project_name}" deleted successfully!', 'success') return redirect(url_for('admin_projects')) @app.route('/api/team/hours_data', methods=['GET']) @login_required @role_required(Role.TEAM_LEADER) # Only team leaders and above can access @company_required def team_hours_data(): # Get the current user's team team = Team.query.get(g.user.team_id) if not team: return jsonify({ 'success': False, 'message': 'You are not assigned to any team.' }), 400 # Get date range from query parameters or use current week as default today = datetime.now().date() start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6) start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d')) end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d')) include_self = request.args.get('include_self', 'false') == 'true' try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: return jsonify({ 'success': False, 'message': 'Invalid date format.' }), 400 # Get all team members team_members = User.query.filter_by(team_id=team.id).all() # Prepare data structure for team members' hours team_data = [] for member in team_members: # Skip if the member is the current user (team leader) and include_self is False if member.id == g.user.id and not include_self: continue # Get time entries for this member in the date range entries = TimeEntry.query.filter( TimeEntry.user_id == member.id, TimeEntry.arrival_time >= datetime.combine(start_date, time.min), TimeEntry.arrival_time <= datetime.combine(end_date, time.max) ).order_by(TimeEntry.arrival_time).all() # Calculate daily and total hours daily_hours = {} total_seconds = 0 for entry in entries: if entry.duration: # Only count completed entries entry_date = entry.arrival_time.date() date_str = entry_date.strftime('%Y-%m-%d') if date_str not in daily_hours: daily_hours[date_str] = 0 daily_hours[date_str] += entry.duration total_seconds += entry.duration # Convert seconds to hours for display for date_str in daily_hours: daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours total_hours = round(total_seconds / 3600, 2) # Convert to hours # Format entries for JSON response formatted_entries = [] for entry in entries: formatted_entries.append({ 'id': entry.id, 'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'), 'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S') if entry.departure_time else None, 'duration': entry.duration, 'total_break_duration': entry.total_break_duration }) # Add member data to team data team_data.append({ 'user': { 'id': member.id, 'username': member.username, 'email': member.email }, 'daily_hours': daily_hours, 'total_hours': total_hours, 'entries': formatted_entries }) # Generate a list of dates in the range for the table header date_range = [] current_date = start_date while current_date <= end_date: date_range.append(current_date.strftime('%Y-%m-%d')) current_date += timedelta(days=1) return jsonify({ 'success': True, 'team': { 'id': team.id, 'name': team.name, 'description': team.description }, 'team_data': team_data, 'date_range': date_range, 'start_date': start_date.strftime('%Y-%m-%d'), 'end_date': end_date.strftime('%Y-%m-%d') }) @app.route('/export') def export(): return render_template('export.html', title='Export Data') def get_date_range(period, start_date_str=None, end_date_str=None): """Get start and end date based on period or custom date range.""" today = datetime.now().date() if period: if period == 'today': return today, today elif period == 'week': start_date = today - timedelta(days=today.weekday()) return start_date, today elif period == 'month': start_date = today.replace(day=1) return start_date, today elif period == 'all': earliest_entry = TimeEntry.query.order_by(TimeEntry.arrival_time).first() start_date = earliest_entry.arrival_time.date() if earliest_entry else today return start_date, today else: # Custom date range try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() return start_date, end_date except (ValueError, TypeError): raise ValueError('Invalid date format') def format_duration(seconds): """Format duration in seconds to HH:MM:SS format.""" if seconds is None: return '00:00:00' hours = seconds // 3600 minutes = (seconds % 3600) // 60 seconds = seconds % 60 return f"{hours:d}:{minutes:02d}:{seconds:02d}" def prepare_export_data(entries): """Prepare time entries data for export.""" data = [] for entry in entries: row = { 'Date': entry.arrival_time.strftime('%Y-%m-%d'), 'Project Code': entry.project.code if entry.project else '', 'Project Name': entry.project.name if entry.project else '', 'Arrival Time': entry.arrival_time.strftime('%H:%M:%S'), 'Departure Time': entry.departure_time.strftime('%H:%M:%S') if entry.departure_time else 'Active', 'Work Duration (HH:MM:SS)': format_duration(entry.duration) if entry.duration is not None else 'In progress', 'Break Duration (HH:MM:SS)': format_duration(entry.total_break_duration), 'Work Duration (seconds)': entry.duration if entry.duration is not None else 0, 'Break Duration (seconds)': entry.total_break_duration if entry.total_break_duration is not None else 0, 'Notes': entry.notes if entry.notes else '' } data.append(row) return data def export_to_csv(data, filename): """Export data to CSV format.""" output = io.StringIO() writer = csv.DictWriter(output, fieldnames=data[0].keys()) writer.writeheader() writer.writerows(data) return Response( output.getvalue(), mimetype='text/csv', headers={'Content-Disposition': f'attachment;filename={filename}.csv'} ) def export_to_excel(data, filename): """Export data to Excel format with formatting.""" df = pd.DataFrame(data) output = io.BytesIO() with pd.ExcelWriter(output, engine='xlsxwriter') as writer: df.to_excel(writer, sheet_name='TimeTrack Data', index=False) # Auto-adjust columns' width worksheet = writer.sheets['TimeTrack Data'] for i, col in enumerate(df.columns): column_width = max(df[col].astype(str).map(len).max(), len(col)) + 2 worksheet.set_column(i, i, column_width) output.seek(0) return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"{filename}.xlsx" ) def prepare_team_hours_export_data(team, team_data, date_range): """Prepare team hours data for export.""" export_data = [] for member_data in team_data: user = member_data['user'] daily_hours = member_data['daily_hours'] # Create base row with member info row = { 'Team': team['name'], 'Member': user['username'], 'Email': user['email'], 'Total Hours': member_data['total_hours'] } # Add daily hours columns for date_str in date_range: formatted_date = datetime.strptime(date_str, '%Y-%m-%d').strftime('%m/%d/%Y') row[formatted_date] = daily_hours.get(date_str, 0.0) export_data.append(row) return export_data def export_team_hours_to_csv(data, filename): """Export team hours data to CSV format.""" if not data: return None output = io.StringIO() writer = csv.DictWriter(output, fieldnames=data[0].keys()) writer.writeheader() writer.writerows(data) return Response( output.getvalue(), mimetype='text/csv', headers={'Content-Disposition': f'attachment;filename={filename}.csv'} ) def export_team_hours_to_excel(data, filename, team_name): """Export team hours data to Excel format with formatting.""" if not data: return None df = pd.DataFrame(data) output = io.BytesIO() with pd.ExcelWriter(output, engine='xlsxwriter') as writer: df.to_excel(writer, sheet_name=f'{team_name} Hours', index=False) # Get the workbook and worksheet objects workbook = writer.book worksheet = writer.sheets[f'{team_name} Hours'] # Create formats header_format = workbook.add_format({ 'bold': True, 'text_wrap': True, 'valign': 'top', 'fg_color': '#4CAF50', 'font_color': 'white', 'border': 1 }) # Auto-adjust columns' width and apply formatting for i, col in enumerate(df.columns): column_width = max(df[col].astype(str).map(len).max(), len(col)) + 2 worksheet.set_column(i, i, column_width) # Apply header formatting worksheet.write(0, i, col, header_format) output.seek(0) return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f"{filename}.xlsx" ) @app.route('/download_export') def download_export(): """Handle export download requests.""" export_format = request.args.get('format', 'csv') period = request.args.get('period') try: start_date, end_date = get_date_range( period, request.args.get('start_date'), request.args.get('end_date') ) except ValueError: flash('Invalid date format. Please use YYYY-MM-DD format.') return redirect(url_for('export')) # Query entries within the date range start_datetime = datetime.combine(start_date, time.min) end_datetime = datetime.combine(end_date, time.max) entries = TimeEntry.query.filter( TimeEntry.arrival_time >= start_datetime, TimeEntry.arrival_time <= end_datetime ).order_by(TimeEntry.arrival_time).all() if not entries: flash('No entries found for the selected date range.') return redirect(url_for('export')) # Prepare data and filename data = prepare_export_data(entries) filename = f"timetrack_export_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}" # Export based on format if export_format == 'csv': return export_to_csv(data, filename) elif export_format == 'excel': return export_to_excel(data, filename) else: flash('Invalid export format.') return redirect(url_for('export')) @app.route('/download_team_hours_export') @login_required @role_required(Role.TEAM_LEADER) @company_required def download_team_hours_export(): """Handle team hours export download requests.""" export_format = request.args.get('format', 'csv') # Get the current user's team team = Team.query.get(g.user.team_id) if not team: flash('You are not assigned to any team.') return redirect(url_for('team_hours')) # Get date range from query parameters or use current week as default today = datetime.now().date() start_of_week = today - timedelta(days=today.weekday()) end_of_week = start_of_week + timedelta(days=6) start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d')) end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d')) include_self = request.args.get('include_self', 'false') == 'true' try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() except ValueError: flash('Invalid date format.') return redirect(url_for('team_hours')) # Get all team members team_members = User.query.filter_by(team_id=team.id).all() # Prepare data structure for team members' hours team_data = [] for member in team_members: # Skip if the member is the current user (team leader) and include_self is False if member.id == g.user.id and not include_self: continue # Get time entries for this member in the date range entries = TimeEntry.query.filter( TimeEntry.user_id == member.id, TimeEntry.arrival_time >= datetime.combine(start_date, time.min), TimeEntry.arrival_time <= datetime.combine(end_date, time.max) ).order_by(TimeEntry.arrival_time).all() # Calculate daily and total hours daily_hours = {} total_seconds = 0 for entry in entries: if entry.duration: # Only count completed entries entry_date = entry.arrival_time.date() date_str = entry_date.strftime('%Y-%m-%d') if date_str not in daily_hours: daily_hours[date_str] = 0 daily_hours[date_str] += entry.duration total_seconds += entry.duration # Convert seconds to hours for display for date_str in daily_hours: daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours total_hours = round(total_seconds / 3600, 2) # Convert to hours # Add member data to team data team_data.append({ 'user': { 'id': member.id, 'username': member.username, 'email': member.email }, 'daily_hours': daily_hours, 'total_hours': total_hours }) if not team_data: flash('No team member data found for the selected date range.') return redirect(url_for('team_hours')) # Generate a list of dates in the range date_range = [] current_date = start_date while current_date <= end_date: date_range.append(current_date.strftime('%Y-%m-%d')) current_date += timedelta(days=1) # Prepare data for export team_info = { 'id': team.id, 'name': team.name, 'description': team.description } export_data = prepare_team_hours_export_data(team_info, team_data, date_range) # Generate filename filename = f"{team.name.replace(' ', '_')}_hours_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}" # Export based on format if export_format == 'csv': response = export_team_hours_to_csv(export_data, filename) if response: return response else: flash('Error generating CSV export.') return redirect(url_for('team_hours')) elif export_format == 'excel': response = export_team_hours_to_excel(export_data, filename, team.name) if response: return response else: flash('Error generating Excel export.') return redirect(url_for('team_hours')) else: flash('Invalid export format.') return redirect(url_for('team_hours')) if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) app.run(debug=False, host='0.0.0.0', port=port)