3540 lines
139 KiB
Python
3540 lines
139 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file
|
|
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType
|
|
from data_formatting import (
|
|
format_duration, prepare_export_data, prepare_team_hours_export_data,
|
|
format_table_data, format_graph_data, format_team_data
|
|
)
|
|
from data_export import (
|
|
export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel,
|
|
export_analytics_csv, export_analytics_excel
|
|
)
|
|
from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings
|
|
import logging
|
|
from datetime import datetime, time, timedelta
|
|
import os
|
|
import csv
|
|
import io
|
|
import pandas as pd
|
|
from sqlalchemy import func
|
|
from functools import wraps
|
|
from flask_mail import Mail, Message
|
|
from dotenv import load_dotenv
|
|
from werkzeug.security import check_password_hash
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////data/timetrack.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_timetrack')
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Session lasts for 7 days
|
|
|
|
# Configure Flask-Mail
|
|
app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.example.com')
|
|
app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT', 587))
|
|
app.config['MAIL_USE_TLS'] = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
|
|
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', 'your-email@example.com')
|
|
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', 'your-password')
|
|
app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', 'TimeTrack <noreply@timetrack.com>')
|
|
|
|
# Log mail configuration (without password)
|
|
logger.info(f"Mail server: {app.config['MAIL_SERVER']}")
|
|
logger.info(f"Mail port: {app.config['MAIL_PORT']}")
|
|
logger.info(f"Mail use TLS: {app.config['MAIL_USE_TLS']}")
|
|
logger.info(f"Mail username: {app.config['MAIL_USERNAME']}")
|
|
logger.info(f"Mail default sender: {app.config['MAIL_DEFAULT_SENDER']}")
|
|
|
|
mail = Mail(app)
|
|
|
|
# Initialize the database with the app
|
|
db.init_app(app)
|
|
|
|
# Integrated migration and initialization function
|
|
def run_migrations():
|
|
"""Run all database migrations and initialize system settings."""
|
|
import sqlite3
|
|
|
|
# Determine database path based on environment
|
|
db_path = '/data/timetrack.db' if os.path.exists('/data') else 'timetrack.db'
|
|
|
|
# Check if database exists
|
|
if not os.path.exists(db_path):
|
|
print("Database doesn't exist. Creating new database.")
|
|
with app.app_context():
|
|
db.create_all()
|
|
init_system_settings()
|
|
return
|
|
|
|
print("Running database migrations...")
|
|
|
|
# Connect to the database for raw SQL operations
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if time_entry table exists first
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='time_entry'")
|
|
if not cursor.fetchone():
|
|
print("time_entry table doesn't exist. Creating all tables...")
|
|
with app.app_context():
|
|
db.create_all()
|
|
init_system_settings()
|
|
conn.commit()
|
|
conn.close()
|
|
return
|
|
|
|
# Migrate time_entry table
|
|
cursor.execute("PRAGMA table_info(time_entry)")
|
|
time_entry_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
migrations = [
|
|
('is_paused', "ALTER TABLE time_entry ADD COLUMN is_paused BOOLEAN DEFAULT 0"),
|
|
('pause_start_time', "ALTER TABLE time_entry ADD COLUMN pause_start_time TIMESTAMP"),
|
|
('total_break_duration', "ALTER TABLE time_entry ADD COLUMN total_break_duration INTEGER DEFAULT 0"),
|
|
('user_id', "ALTER TABLE time_entry ADD COLUMN user_id INTEGER"),
|
|
('project_id', "ALTER TABLE time_entry ADD COLUMN project_id INTEGER"),
|
|
('notes', "ALTER TABLE time_entry ADD COLUMN notes TEXT")
|
|
]
|
|
|
|
for column_name, sql_command in migrations:
|
|
if column_name not in time_entry_columns:
|
|
print(f"Adding {column_name} column to time_entry...")
|
|
cursor.execute(sql_command)
|
|
|
|
# Create work_config table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='work_config'")
|
|
if not cursor.fetchone():
|
|
print("Creating work_config table...")
|
|
cursor.execute("""
|
|
CREATE TABLE work_config (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
work_hours_per_day FLOAT DEFAULT 8.0,
|
|
mandatory_break_minutes INTEGER DEFAULT 30,
|
|
break_threshold_hours FLOAT DEFAULT 6.0,
|
|
additional_break_minutes INTEGER DEFAULT 15,
|
|
additional_break_threshold_hours FLOAT DEFAULT 9.0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
user_id INTEGER
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
INSERT INTO work_config (work_hours_per_day, mandatory_break_minutes, break_threshold_hours)
|
|
VALUES (8.0, 30, 6.0)
|
|
""")
|
|
else:
|
|
# Check and add missing columns to work_config
|
|
cursor.execute("PRAGMA table_info(work_config)")
|
|
work_config_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
work_config_migrations = [
|
|
('additional_break_minutes', "ALTER TABLE work_config ADD COLUMN additional_break_minutes INTEGER DEFAULT 15"),
|
|
('additional_break_threshold_hours', "ALTER TABLE work_config ADD COLUMN additional_break_threshold_hours FLOAT DEFAULT 9.0"),
|
|
('user_id', "ALTER TABLE work_config ADD COLUMN user_id INTEGER"),
|
|
('time_rounding_minutes', "ALTER TABLE work_config ADD COLUMN time_rounding_minutes INTEGER DEFAULT 0"),
|
|
('round_to_nearest', "ALTER TABLE work_config ADD COLUMN round_to_nearest BOOLEAN DEFAULT 1"),
|
|
('time_format_24h', "ALTER TABLE work_config ADD COLUMN time_format_24h BOOLEAN DEFAULT 1"),
|
|
('date_format', "ALTER TABLE work_config ADD COLUMN date_format VARCHAR(20) DEFAULT 'ISO'")
|
|
]
|
|
|
|
for column_name, sql_command in work_config_migrations:
|
|
if column_name not in work_config_columns:
|
|
print(f"Adding {column_name} column to work_config...")
|
|
cursor.execute(sql_command)
|
|
|
|
# Migrate user table
|
|
cursor.execute("PRAGMA table_info(user)")
|
|
user_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
user_migrations = [
|
|
('is_verified', "ALTER TABLE user ADD COLUMN is_verified BOOLEAN DEFAULT 0"),
|
|
('verification_token', "ALTER TABLE user ADD COLUMN verification_token VARCHAR(100)"),
|
|
('token_expiry', "ALTER TABLE user ADD COLUMN token_expiry TIMESTAMP"),
|
|
('is_blocked', "ALTER TABLE user ADD COLUMN is_blocked BOOLEAN DEFAULT 0"),
|
|
('role', "ALTER TABLE user ADD COLUMN role VARCHAR(50) DEFAULT 'Team Member'"),
|
|
('team_id', "ALTER TABLE user ADD COLUMN team_id INTEGER"),
|
|
('company_id', "ALTER TABLE user ADD COLUMN company_id INTEGER"),
|
|
('two_factor_enabled', "ALTER TABLE user ADD COLUMN two_factor_enabled BOOLEAN DEFAULT 0"),
|
|
('two_factor_secret', "ALTER TABLE user ADD COLUMN two_factor_secret VARCHAR(32)"),
|
|
('account_type', "ALTER TABLE user ADD COLUMN account_type VARCHAR(20) DEFAULT 'COMPANY_USER'"),
|
|
('business_name', "ALTER TABLE user ADD COLUMN business_name VARCHAR(100)")
|
|
]
|
|
|
|
for column_name, sql_command in user_migrations:
|
|
if column_name not in user_columns:
|
|
print(f"Adding {column_name} column to user...")
|
|
cursor.execute(sql_command)
|
|
|
|
# Remove is_admin column if it exists (migration to role-based system)
|
|
if 'is_admin' in user_columns:
|
|
print("Migrating from is_admin to role-based system...")
|
|
# First ensure all users have roles set based on is_admin
|
|
cursor.execute("UPDATE user SET role = 'Administrator' WHERE is_admin = 1 AND (role IS NULL OR role = '')")
|
|
cursor.execute("UPDATE user SET role = 'Team Member' WHERE is_admin = 0 AND (role IS NULL OR role = '')")
|
|
|
|
# Drop the is_admin column (SQLite requires table recreation)
|
|
print("Removing is_admin column...")
|
|
cursor.execute("PRAGMA foreign_keys=off")
|
|
|
|
# Create new table without is_admin column
|
|
cursor.execute("""
|
|
CREATE TABLE user_new (
|
|
id INTEGER PRIMARY KEY,
|
|
username VARCHAR(80) UNIQUE NOT NULL,
|
|
email VARCHAR(120) UNIQUE NOT NULL,
|
|
password_hash VARCHAR(128),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
is_verified BOOLEAN DEFAULT 0,
|
|
verification_token VARCHAR(100),
|
|
token_expiry TIMESTAMP,
|
|
is_blocked BOOLEAN DEFAULT 0,
|
|
role VARCHAR(50) DEFAULT 'Team Member',
|
|
team_id INTEGER,
|
|
two_factor_enabled BOOLEAN DEFAULT 0,
|
|
two_factor_secret VARCHAR(32),
|
|
account_type VARCHAR(20) DEFAULT 'COMPANY_USER',
|
|
business_name VARCHAR(100),
|
|
company_id INTEGER
|
|
)
|
|
""")
|
|
|
|
# Copy data from old table to new table (excluding is_admin)
|
|
cursor.execute("""
|
|
INSERT INTO user_new (id, username, email, password_hash, created_at, is_verified,
|
|
verification_token, token_expiry, is_blocked, role, team_id,
|
|
two_factor_enabled, two_factor_secret, account_type, business_name, company_id)
|
|
SELECT id, username, email, password_hash, created_at, is_verified,
|
|
verification_token, token_expiry, is_blocked, role, team_id,
|
|
two_factor_enabled, two_factor_secret,
|
|
COALESCE(account_type, 'COMPANY_USER'),
|
|
business_name,
|
|
company_id
|
|
FROM user
|
|
""")
|
|
|
|
# Drop old table and rename new table
|
|
cursor.execute("DROP TABLE user")
|
|
cursor.execute("ALTER TABLE user_new RENAME TO user")
|
|
cursor.execute("PRAGMA foreign_keys=on")
|
|
print("Successfully removed is_admin column")
|
|
|
|
# Create team table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='team'")
|
|
if not cursor.fetchone():
|
|
print("Creating team table...")
|
|
cursor.execute("""
|
|
CREATE TABLE team (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name VARCHAR(100) UNIQUE NOT NULL,
|
|
description VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# Create system_settings table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='system_settings'")
|
|
if not cursor.fetchone():
|
|
print("Creating system_settings table...")
|
|
cursor.execute("""
|
|
CREATE TABLE system_settings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
key VARCHAR(50) UNIQUE NOT NULL,
|
|
value VARCHAR(255) NOT NULL,
|
|
description VARCHAR(255),
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# Create project table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='project'")
|
|
if not cursor.fetchone():
|
|
print("Creating project table...")
|
|
cursor.execute("""
|
|
CREATE TABLE project (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name VARCHAR(100) NOT NULL,
|
|
description TEXT,
|
|
code VARCHAR(20) NOT NULL,
|
|
is_active BOOLEAN DEFAULT 1,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
created_by_id INTEGER NOT NULL,
|
|
team_id INTEGER,
|
|
start_date DATE,
|
|
end_date DATE,
|
|
company_id INTEGER,
|
|
FOREIGN KEY (created_by_id) REFERENCES user (id),
|
|
FOREIGN KEY (team_id) REFERENCES team (id),
|
|
FOREIGN KEY (company_id) REFERENCES company (id)
|
|
)
|
|
""")
|
|
|
|
# Create company table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'")
|
|
if not cursor.fetchone():
|
|
print("Creating company table...")
|
|
cursor.execute("""
|
|
CREATE TABLE company (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name VARCHAR(100) UNIQUE NOT NULL,
|
|
slug VARCHAR(50) UNIQUE NOT NULL,
|
|
description TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
is_personal BOOLEAN DEFAULT 0,
|
|
is_active BOOLEAN DEFAULT 1,
|
|
max_users INTEGER DEFAULT 100
|
|
)
|
|
""")
|
|
else:
|
|
# Check and add missing columns to existing company table
|
|
cursor.execute("PRAGMA table_info(company)")
|
|
company_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
company_migrations = [
|
|
('is_personal', "ALTER TABLE company ADD COLUMN is_personal BOOLEAN DEFAULT 0")
|
|
]
|
|
|
|
for column_name, sql_command in company_migrations:
|
|
if column_name not in company_columns:
|
|
print(f"Adding {column_name} column to company...")
|
|
cursor.execute(sql_command)
|
|
|
|
# Create company_work_config table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company_work_config'")
|
|
if not cursor.fetchone():
|
|
print("Creating company_work_config table...")
|
|
cursor.execute("""
|
|
CREATE TABLE company_work_config (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
company_id INTEGER NOT NULL,
|
|
work_hours_per_day FLOAT DEFAULT 8.0,
|
|
mandatory_break_minutes INTEGER DEFAULT 30,
|
|
break_threshold_hours FLOAT DEFAULT 6.0,
|
|
additional_break_minutes INTEGER DEFAULT 15,
|
|
additional_break_threshold_hours FLOAT DEFAULT 9.0,
|
|
region VARCHAR(20) DEFAULT 'DE',
|
|
region_name VARCHAR(50) DEFAULT 'Germany',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
created_by_id INTEGER,
|
|
FOREIGN KEY (company_id) REFERENCES company (id),
|
|
FOREIGN KEY (created_by_id) REFERENCES user (id),
|
|
UNIQUE(company_id)
|
|
)
|
|
""")
|
|
|
|
# Create user_preferences table if it doesn't exist
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_preferences'")
|
|
if not cursor.fetchone():
|
|
print("Creating user_preferences table...")
|
|
cursor.execute("""
|
|
CREATE TABLE user_preferences (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
time_format_24h BOOLEAN DEFAULT 1,
|
|
date_format VARCHAR(20) DEFAULT 'ISO',
|
|
time_rounding_minutes INTEGER DEFAULT 0,
|
|
round_to_nearest BOOLEAN DEFAULT 1,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES user (id),
|
|
UNIQUE(user_id)
|
|
)
|
|
""")
|
|
|
|
# Fix enum constraint for SYSTEM_ADMIN role
|
|
# SQLite doesn't have native enum support, but SQLAlchemy creates check constraints
|
|
# We need to drop and recreate the table to add the new enum value
|
|
cursor.execute("PRAGMA table_info(user)")
|
|
user_columns = cursor.fetchall()
|
|
|
|
# Check if we need to migrate the role enum constraint
|
|
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='user'")
|
|
create_table_sql = cursor.fetchone()
|
|
|
|
if create_table_sql and 'System Administrator' not in create_table_sql[0]:
|
|
print("Updating role enum constraint to include SYSTEM_ADMIN...")
|
|
|
|
# Check existing role values
|
|
cursor.execute("SELECT DISTINCT role FROM user WHERE role IS NOT NULL")
|
|
existing_roles = [row[0] for row in cursor.fetchall()]
|
|
print(f"Found existing roles: {existing_roles}")
|
|
|
|
# Drop user_new table if it exists from previous failed migration
|
|
cursor.execute("DROP TABLE IF EXISTS user_new")
|
|
|
|
# Create a backup table with the new enum constraint
|
|
cursor.execute("""
|
|
CREATE TABLE user_new (
|
|
id INTEGER PRIMARY KEY,
|
|
username VARCHAR(80) NOT NULL,
|
|
email VARCHAR(120) NOT NULL,
|
|
password_hash VARCHAR(128),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
company_id INTEGER NOT NULL,
|
|
is_verified BOOLEAN DEFAULT 0,
|
|
verification_token VARCHAR(100),
|
|
token_expiry TIMESTAMP,
|
|
is_blocked BOOLEAN DEFAULT 0,
|
|
role VARCHAR(50) DEFAULT 'Team Member' CHECK (role IN ('Team Member', 'Team Leader', 'Supervisor', 'Administrator', 'System Administrator')),
|
|
team_id INTEGER,
|
|
account_type VARCHAR(20) DEFAULT 'Company User' CHECK (account_type IN ('Company User', 'Freelancer')),
|
|
business_name VARCHAR(100),
|
|
two_factor_enabled BOOLEAN DEFAULT 0,
|
|
two_factor_secret VARCHAR(32),
|
|
FOREIGN KEY (company_id) REFERENCES company (id),
|
|
FOREIGN KEY (team_id) REFERENCES team (id)
|
|
)
|
|
""")
|
|
|
|
# Copy all data from old table to new table
|
|
cursor.execute("""
|
|
INSERT INTO user_new SELECT * FROM user
|
|
""")
|
|
|
|
# Drop the old table and rename the new one
|
|
cursor.execute("DROP TABLE user")
|
|
cursor.execute("ALTER TABLE user_new RENAME TO user")
|
|
|
|
print("✓ Role enum constraint updated successfully")
|
|
|
|
# Normalize all enum values before SQLAlchemy operations
|
|
print("Normalizing all enum values...")
|
|
|
|
# Normalize role values
|
|
role_mapping = {
|
|
'TEAM_MEMBER': 'Team Member',
|
|
'TEAM_LEADER': 'Team Leader',
|
|
'SUPERVISOR': 'Supervisor',
|
|
'ADMIN': 'Administrator',
|
|
'SYSTEM_ADMIN': 'System Administrator'
|
|
}
|
|
|
|
for old_role, new_role in role_mapping.items():
|
|
cursor.execute("UPDATE user SET role = ? WHERE role = ?", (new_role, old_role))
|
|
updated_count = cursor.rowcount
|
|
if updated_count > 0:
|
|
print(f"Updated {updated_count} users from role '{old_role}' to '{new_role}'")
|
|
|
|
# Normalize account_type values
|
|
account_type_mapping = {
|
|
'COMPANY_USER': 'Company User',
|
|
'FREELANCER': 'Freelancer'
|
|
}
|
|
|
|
for old_type, new_type in account_type_mapping.items():
|
|
cursor.execute("UPDATE user SET account_type = ? WHERE account_type = ?", (new_type, old_type))
|
|
updated_count = cursor.rowcount
|
|
if updated_count > 0:
|
|
print(f"Updated {updated_count} users account_type from '{old_type}' to '{new_type}'")
|
|
|
|
# Set any NULL values to defaults
|
|
cursor.execute("UPDATE user SET role = 'Team Member' WHERE role IS NULL")
|
|
null_roles = cursor.rowcount
|
|
if null_roles > 0:
|
|
print(f"Set {null_roles} NULL roles to 'Team Member'")
|
|
|
|
cursor.execute("UPDATE user SET account_type = 'Company User' WHERE account_type IS NULL")
|
|
null_accounts = cursor.rowcount
|
|
if null_accounts > 0:
|
|
print(f"Set {null_accounts} NULL account_types to 'Company User'")
|
|
|
|
# Commit all schema changes
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error during database migration: {e}")
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
# Now use SQLAlchemy for data migrations
|
|
db.create_all() # This will create any remaining tables defined in models
|
|
|
|
# Initialize system settings
|
|
init_system_settings()
|
|
|
|
# Handle company migration and admin user setup
|
|
migrate_to_company_model()
|
|
migrate_data()
|
|
|
|
print("Database migrations completed successfully!")
|
|
|
|
def migrate_to_company_model():
|
|
"""Migrate existing data to support company model"""
|
|
import sqlite3
|
|
|
|
# Determine database path based on environment
|
|
db_path = '/data/timetrack.db' if os.path.exists('/data') else 'timetrack.db'
|
|
|
|
# Connect to the database for raw SQL operations
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if company_id column exists in user table
|
|
cursor.execute("PRAGMA table_info(user)")
|
|
user_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
if 'company_id' not in user_columns:
|
|
print("Migrating to company model...")
|
|
|
|
# Add company_id columns to existing tables
|
|
tables_to_migrate = [
|
|
('user', 'ALTER TABLE user ADD COLUMN company_id INTEGER'),
|
|
('team', 'ALTER TABLE team ADD COLUMN company_id INTEGER'),
|
|
('project', 'ALTER TABLE project ADD COLUMN company_id INTEGER')
|
|
]
|
|
|
|
for table_name, sql_command in tables_to_migrate:
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
if 'company_id' not in columns:
|
|
print(f"Adding company_id column to {table_name}...")
|
|
cursor.execute(sql_command)
|
|
|
|
# Check if there are existing users but no companies
|
|
cursor.execute("SELECT COUNT(*) FROM user")
|
|
user_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'")
|
|
company_table_exists = cursor.fetchone()
|
|
|
|
if user_count > 0 and company_table_exists:
|
|
cursor.execute("SELECT COUNT(*) FROM company")
|
|
company_count = cursor.fetchone()[0]
|
|
|
|
if company_count == 0:
|
|
print("Creating default company for existing data...")
|
|
# Create default company
|
|
cursor.execute("""
|
|
INSERT INTO company (name, slug, description, is_active, max_users)
|
|
VALUES ('Default Organization', 'default', 'Migrated from single-tenant installation', 1, 1000)
|
|
""")
|
|
|
|
# Get the company ID
|
|
cursor.execute("SELECT last_insert_rowid()")
|
|
company_id = cursor.fetchone()[0]
|
|
|
|
# Update all existing records to use the default company
|
|
cursor.execute(f"UPDATE user SET company_id = {company_id} WHERE company_id IS NULL")
|
|
cursor.execute(f"UPDATE team SET company_id = {company_id} WHERE company_id IS NULL")
|
|
cursor.execute(f"UPDATE project SET company_id = {company_id} WHERE company_id IS NULL")
|
|
|
|
print(f"Assigned {user_count} existing users to default company")
|
|
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error during company migration: {e}")
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def init_system_settings():
|
|
"""Initialize system settings with default values if they don't exist"""
|
|
if not SystemSettings.query.filter_by(key='registration_enabled').first():
|
|
print("Adding registration_enabled system setting...")
|
|
reg_setting = SystemSettings(
|
|
key='registration_enabled',
|
|
value='true',
|
|
description='Controls whether new user registration is allowed'
|
|
)
|
|
db.session.add(reg_setting)
|
|
db.session.commit()
|
|
|
|
if not SystemSettings.query.filter_by(key='email_verification_required').first():
|
|
print("Adding email_verification_required system setting...")
|
|
email_setting = SystemSettings(
|
|
key='email_verification_required',
|
|
value='true',
|
|
description='Controls whether email verification is required for new user accounts'
|
|
)
|
|
db.session.add(email_setting)
|
|
db.session.commit()
|
|
|
|
def migrate_data():
|
|
"""Handle data migrations and setup"""
|
|
# Only create default admin if no companies exist yet
|
|
if Company.query.count() == 0:
|
|
print("No companies exist, skipping admin user creation. Use company setup instead.")
|
|
return
|
|
|
|
# Check if admin user exists in the first company
|
|
default_company = Company.query.first()
|
|
if default_company:
|
|
admin = User.query.filter_by(username='admin', company_id=default_company.id).first()
|
|
if not admin:
|
|
# Create admin user for the default company
|
|
admin = User(
|
|
username='admin',
|
|
email='admin@timetrack.local',
|
|
company_id=default_company.id,
|
|
is_verified=True,
|
|
role=Role.ADMIN,
|
|
two_factor_enabled=False
|
|
)
|
|
admin.set_password('admin')
|
|
db.session.add(admin)
|
|
db.session.commit()
|
|
print("Created admin user with username 'admin' and password 'admin'")
|
|
print("IMPORTANT: Change the admin password after first login!")
|
|
else:
|
|
# Update existing admin user with new fields
|
|
admin.is_verified = True
|
|
if not admin.role:
|
|
admin.role = Role.ADMIN
|
|
if admin.two_factor_enabled is None:
|
|
admin.two_factor_enabled = False
|
|
db.session.commit()
|
|
|
|
# Update orphaned records
|
|
orphan_entries = TimeEntry.query.filter_by(user_id=None).all()
|
|
for entry in orphan_entries:
|
|
entry.user_id = admin.id
|
|
|
|
orphan_configs = WorkConfig.query.filter_by(user_id=None).all()
|
|
for config in orphan_configs:
|
|
config.user_id = admin.id
|
|
|
|
# Update existing users
|
|
users_to_update = User.query.filter_by(company_id=default_company.id).all()
|
|
for user in users_to_update:
|
|
if user.is_verified is None:
|
|
user.is_verified = True
|
|
if not user.role:
|
|
user.role = Role.TEAM_MEMBER
|
|
if user.two_factor_enabled is None:
|
|
user.two_factor_enabled = False
|
|
|
|
# Check if any system admin users exist
|
|
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
|
|
if system_admin_count == 0:
|
|
print("No system administrators found. SYSTEM_ADMIN role is now available for assignment.")
|
|
print("To promote a user: UPDATE user SET role = 'System Administrator' WHERE username = 'your_username';")
|
|
else:
|
|
print(f"Found {system_admin_count} system administrator(s)")
|
|
|
|
# Create sample projects if none exist for this company
|
|
existing_projects = Project.query.filter_by(company_id=default_company.id).count()
|
|
if existing_projects == 0:
|
|
sample_projects = [
|
|
{
|
|
'name': 'General Administration',
|
|
'code': 'ADMIN001',
|
|
'description': 'General administrative tasks and meetings'
|
|
},
|
|
{
|
|
'name': 'Development Project',
|
|
'code': 'DEV001',
|
|
'description': 'Software development and maintenance tasks'
|
|
},
|
|
{
|
|
'name': 'Customer Support',
|
|
'code': 'SUPPORT001',
|
|
'description': 'Customer service and technical support activities'
|
|
}
|
|
]
|
|
|
|
for proj_data in sample_projects:
|
|
project = Project(
|
|
name=proj_data['name'],
|
|
code=proj_data['code'],
|
|
description=proj_data['description'],
|
|
company_id=default_company.id,
|
|
created_by_id=admin.id,
|
|
is_active=True
|
|
)
|
|
db.session.add(project)
|
|
|
|
print(f"Created {len(sample_projects)} sample projects for {default_company.name}")
|
|
|
|
db.session.commit()
|
|
|
|
def migrate_work_config_data():
|
|
"""Migrate existing WorkConfig data to new architecture"""
|
|
try:
|
|
# Create CompanyWorkConfig for each company that doesn't have one
|
|
companies = Company.query.all()
|
|
for company in companies:
|
|
existing_config = CompanyWorkConfig.query.filter_by(company_id=company.id).first()
|
|
if not existing_config:
|
|
print(f"Creating CompanyWorkConfig for {company.name}")
|
|
|
|
# Use Germany defaults (existing system default)
|
|
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
|
|
|
|
company_config = CompanyWorkConfig(
|
|
company_id=company.id,
|
|
work_hours_per_day=preset['work_hours_per_day'],
|
|
mandatory_break_minutes=preset['mandatory_break_minutes'],
|
|
break_threshold_hours=preset['break_threshold_hours'],
|
|
additional_break_minutes=preset['additional_break_minutes'],
|
|
additional_break_threshold_hours=preset['additional_break_threshold_hours'],
|
|
region=WorkRegion.GERMANY,
|
|
region_name=preset['region_name']
|
|
)
|
|
db.session.add(company_config)
|
|
|
|
# Migrate existing WorkConfig user preferences to UserPreferences
|
|
old_configs = WorkConfig.query.filter(WorkConfig.user_id.isnot(None)).all()
|
|
for old_config in old_configs:
|
|
user = User.query.get(old_config.user_id)
|
|
if user:
|
|
existing_prefs = UserPreferences.query.filter_by(user_id=user.id).first()
|
|
if not existing_prefs:
|
|
print(f"Migrating preferences for user {user.username}")
|
|
|
|
user_prefs = UserPreferences(
|
|
user_id=user.id,
|
|
time_format_24h=getattr(old_config, 'time_format_24h', True),
|
|
date_format=getattr(old_config, 'date_format', 'ISO'),
|
|
time_rounding_minutes=getattr(old_config, 'time_rounding_minutes', 0),
|
|
round_to_nearest=getattr(old_config, 'round_to_nearest', True)
|
|
)
|
|
db.session.add(user_prefs)
|
|
|
|
db.session.commit()
|
|
print("Work config data migration completed successfully")
|
|
|
|
except Exception as e:
|
|
print(f"Error during work config migration: {e}")
|
|
db.session.rollback()
|
|
|
|
# Call this function during app initialization
|
|
@app.before_first_request
|
|
def initialize_app():
|
|
run_migrations()
|
|
migrate_work_config_data()
|
|
|
|
# Add this after initializing the app but before defining routes
|
|
@app.context_processor
|
|
def inject_globals():
|
|
"""Make certain variables available to all templates."""
|
|
return {
|
|
'Role': Role,
|
|
'current_year': datetime.now().year
|
|
}
|
|
|
|
# Template filters for date/time formatting
|
|
@app.template_filter('format_date')
|
|
def format_date_filter(dt):
|
|
"""Format date according to user preferences."""
|
|
if not dt or not g.user:
|
|
return dt.strftime('%Y-%m-%d') if dt else ''
|
|
|
|
from time_utils import format_date_by_preference, get_user_format_settings
|
|
date_format, _ = get_user_format_settings(g.user)
|
|
return format_date_by_preference(dt, date_format)
|
|
|
|
@app.template_filter('format_time')
|
|
def format_time_filter(dt):
|
|
"""Format time according to user preferences."""
|
|
if not dt or not g.user:
|
|
return dt.strftime('%H:%M:%S') if dt else ''
|
|
|
|
from time_utils import format_time_by_preference, get_user_format_settings
|
|
_, time_format_24h = get_user_format_settings(g.user)
|
|
return format_time_by_preference(dt, time_format_24h)
|
|
|
|
@app.template_filter('format_time_short')
|
|
def format_time_short_filter(dt):
|
|
"""Format time without seconds according to user preferences."""
|
|
if not dt or not g.user:
|
|
return dt.strftime('%H:%M') if dt else ''
|
|
|
|
from time_utils import format_time_short_by_preference, get_user_format_settings
|
|
_, time_format_24h = get_user_format_settings(g.user)
|
|
return format_time_short_by_preference(dt, time_format_24h)
|
|
|
|
@app.template_filter('format_datetime')
|
|
def format_datetime_filter(dt):
|
|
"""Format datetime according to user preferences."""
|
|
if not dt or not g.user:
|
|
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ''
|
|
|
|
from time_utils import format_datetime_by_preference, get_user_format_settings
|
|
date_format, time_format_24h = get_user_format_settings(g.user)
|
|
return format_datetime_by_preference(dt, date_format, time_format_24h)
|
|
|
|
@app.template_filter('format_duration')
|
|
def format_duration_filter(duration_seconds):
|
|
"""Format duration in readable format."""
|
|
if duration_seconds is None:
|
|
return '00:00:00'
|
|
|
|
from time_utils import format_duration_readable
|
|
return format_duration_readable(duration_seconds)
|
|
|
|
# Authentication decorator
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for('login', next=request.url))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# Admin-only decorator
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None or (g.user.role != Role.ADMIN and g.user.role != Role.SYSTEM_ADMIN):
|
|
flash('You need administrator privileges to access this page.', 'error')
|
|
return redirect(url_for('home'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# System Admin-only decorator
|
|
def system_admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None or g.user.role != Role.SYSTEM_ADMIN:
|
|
flash('You need system administrator privileges to access this page.', 'error')
|
|
return redirect(url_for('home'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_system_setting(key, default='false'):
|
|
"""Helper function to get system setting value"""
|
|
setting = SystemSettings.query.filter_by(key=key).first()
|
|
return setting.value if setting else default
|
|
|
|
def is_system_admin(user=None):
|
|
"""Helper function to check if user is system admin"""
|
|
if user is None:
|
|
user = g.user
|
|
return user and user.role == Role.SYSTEM_ADMIN
|
|
|
|
def can_access_system_settings(user=None):
|
|
"""Helper function to check if user can access system-wide settings"""
|
|
return is_system_admin(user)
|
|
|
|
def get_available_roles():
|
|
"""Get roles available for assignment, excluding SYSTEM_ADMIN unless one already exists"""
|
|
roles = list(Role)
|
|
|
|
# Only show SYSTEM_ADMIN role if at least one system admin already exists
|
|
# This prevents accidental creation of system admins
|
|
system_admin_exists = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() > 0
|
|
|
|
if not system_admin_exists:
|
|
roles = [role for role in roles if role != Role.SYSTEM_ADMIN]
|
|
|
|
return roles
|
|
|
|
# Add this decorator function after your existing decorators
|
|
def role_required(min_role):
|
|
"""
|
|
Decorator to restrict access based on user role.
|
|
min_role should be a Role enum value (e.g., Role.TEAM_LEADER)
|
|
"""
|
|
def role_decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for('login', next=request.url))
|
|
|
|
# Admin and System Admin always have access
|
|
if g.user.role == Role.ADMIN or g.user.role == Role.SYSTEM_ADMIN:
|
|
return f(*args, **kwargs)
|
|
|
|
# Check role hierarchy
|
|
role_hierarchy = {
|
|
Role.TEAM_MEMBER: 1,
|
|
Role.TEAM_LEADER: 2,
|
|
Role.SUPERVISOR: 3,
|
|
Role.ADMIN: 4,
|
|
Role.SYSTEM_ADMIN: 5
|
|
}
|
|
|
|
if role_hierarchy.get(g.user.role, 0) < role_hierarchy.get(min_role, 0):
|
|
flash('You do not have sufficient permissions to access this page.', 'error')
|
|
return redirect(url_for('home'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return role_decorator
|
|
|
|
def company_required(f):
|
|
"""
|
|
Decorator to ensure user has a valid company association and set company context.
|
|
"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for('login', next=request.url))
|
|
|
|
# System admins can access without company association
|
|
if g.user.role == Role.SYSTEM_ADMIN:
|
|
return f(*args, **kwargs)
|
|
|
|
if g.user.company_id is None:
|
|
flash('You must be associated with a company to access this page.', 'error')
|
|
return redirect(url_for('setup_company'))
|
|
|
|
# Set company context
|
|
g.company = Company.query.get(g.user.company_id)
|
|
if not g.company or not g.company.is_active:
|
|
flash('Your company is not active. Please contact support.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@app.before_request
|
|
def load_logged_in_user():
|
|
user_id = session.get('user_id')
|
|
if user_id is None:
|
|
g.user = None
|
|
g.company = None
|
|
else:
|
|
g.user = User.query.get(user_id)
|
|
if g.user:
|
|
# Set company context
|
|
if g.user.company_id:
|
|
g.company = Company.query.get(g.user.company_id)
|
|
else:
|
|
g.company = None
|
|
|
|
# Check if user is verified
|
|
if not g.user.is_verified and request.endpoint not in ['verify_email', 'static', 'logout', 'setup_company']:
|
|
# Allow unverified users to access only verification and static resources
|
|
if request.endpoint not in ['login', 'register']:
|
|
flash('Please verify your email address before accessing this page.', 'warning')
|
|
session.clear()
|
|
return redirect(url_for('login'))
|
|
else:
|
|
g.company = None
|
|
|
|
@app.route('/')
|
|
def home():
|
|
if g.user:
|
|
# Get active entry (no departure time)
|
|
active_entry = TimeEntry.query.filter_by(
|
|
user_id=g.user.id,
|
|
departure_time=None
|
|
).first()
|
|
|
|
# Get today's completed entries for history
|
|
today = datetime.now().date()
|
|
history = TimeEntry.query.filter(
|
|
TimeEntry.user_id == g.user.id,
|
|
TimeEntry.departure_time.isnot(None),
|
|
TimeEntry.arrival_time >= datetime.combine(today, time.min),
|
|
TimeEntry.arrival_time <= datetime.combine(today, time.max)
|
|
).order_by(TimeEntry.arrival_time.desc()).all()
|
|
|
|
# Get available projects for this user (company-scoped)
|
|
available_projects = []
|
|
if g.user.company_id:
|
|
all_projects = Project.query.filter_by(
|
|
company_id=g.user.company_id,
|
|
is_active=True
|
|
).all()
|
|
for project in all_projects:
|
|
if project.is_user_allowed(g.user):
|
|
available_projects.append(project)
|
|
|
|
return render_template('index.html', title='Home',
|
|
active_entry=active_entry,
|
|
history=history,
|
|
available_projects=available_projects)
|
|
else:
|
|
return render_template('about.html', title='Home')
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
|
|
if user:
|
|
# Fix role if it's a string or None
|
|
if isinstance(user.role, str) or user.role is None:
|
|
# Map string role values to enum values
|
|
role_mapping = {
|
|
'Team Member': Role.TEAM_MEMBER,
|
|
'TEAM_MEMBER': Role.TEAM_MEMBER,
|
|
'Team Leader': Role.TEAM_LEADER,
|
|
'TEAM_LEADER': Role.TEAM_LEADER,
|
|
'Supervisor': Role.SUPERVISOR,
|
|
'SUPERVISOR': Role.SUPERVISOR,
|
|
'Administrator': Role.ADMIN,
|
|
'ADMIN': Role.ADMIN
|
|
}
|
|
|
|
if isinstance(user.role, str):
|
|
user.role = role_mapping.get(user.role, Role.TEAM_MEMBER)
|
|
else:
|
|
user.role = Role.ADMIN if user.role == Role.ADMIN else Role.TEAM_MEMBER
|
|
|
|
db.session.commit()
|
|
|
|
# Now proceed with password check
|
|
if user.check_password(password):
|
|
# Check if user is blocked
|
|
if user.is_blocked:
|
|
flash('Your account has been disabled. Please contact an administrator.', 'error')
|
|
return render_template('login.html')
|
|
|
|
# Check if 2FA is enabled
|
|
if user.two_factor_enabled:
|
|
# Store user ID for 2FA verification
|
|
session['2fa_user_id'] = user.id
|
|
return redirect(url_for('verify_2fa'))
|
|
else:
|
|
# Continue with normal login process
|
|
session['user_id'] = user.id
|
|
session['username'] = user.username
|
|
session['role'] = user.role.value
|
|
|
|
flash('Login successful!', 'success')
|
|
return redirect(url_for('home'))
|
|
|
|
flash('Invalid username or password', 'error')
|
|
|
|
return render_template('login.html', title='Login')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
flash('You have been logged out.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
# Check if registration is enabled
|
|
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
|
|
|
|
if not registration_enabled:
|
|
flash('Registration is currently disabled by the administrator.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# Check if companies exist, if not redirect to company setup
|
|
if Company.query.count() == 0:
|
|
flash('No companies exist yet. Please set up your company first.', 'info')
|
|
return redirect(url_for('setup_company'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
company_code = request.form.get('company_code', '').strip()
|
|
|
|
# Validate input
|
|
error = None
|
|
if not username:
|
|
error = 'Username is required'
|
|
elif not email:
|
|
error = 'Email is required'
|
|
elif not password:
|
|
error = 'Password is required'
|
|
elif password != confirm_password:
|
|
error = 'Passwords do not match'
|
|
elif not company_code:
|
|
error = 'Company code is required'
|
|
|
|
# Find company by code
|
|
company = None
|
|
if company_code:
|
|
company = Company.query.filter_by(slug=company_code.lower()).first()
|
|
if not company:
|
|
error = 'Invalid company code'
|
|
|
|
# Check for existing users within the company
|
|
if company and not error:
|
|
if User.query.filter_by(username=username, company_id=company.id).first():
|
|
error = 'Username already exists in this company'
|
|
elif User.query.filter_by(email=email, company_id=company.id).first():
|
|
error = 'Email already registered in this company'
|
|
|
|
if error is None and company:
|
|
try:
|
|
# Check if this is the first user account in this company
|
|
is_first_user_in_company = User.query.filter_by(company_id=company.id).count() == 0
|
|
|
|
# Check if email verification is required
|
|
email_verification_required = get_system_setting('email_verification_required', 'true') == 'true'
|
|
|
|
new_user = User(
|
|
username=username,
|
|
email=email,
|
|
company_id=company.id,
|
|
is_verified=False
|
|
)
|
|
new_user.set_password(password)
|
|
|
|
# Make first user in company an admin with full privileges
|
|
if is_first_user_in_company:
|
|
new_user.role = Role.ADMIN
|
|
new_user.is_verified = True # Auto-verify first user in company
|
|
elif not email_verification_required:
|
|
# If email verification is disabled, auto-verify new users
|
|
new_user.is_verified = True
|
|
|
|
# Generate verification token (even if not needed, for consistency)
|
|
|
|
token = new_user.generate_verification_token()
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
if is_first_user_in_company:
|
|
# First user in company gets admin privileges and is auto-verified
|
|
logger.info(f"First user account created in company {company.name}: {username} with admin privileges")
|
|
flash(f'Welcome! You are the first user in {company.name} and have been granted administrator privileges. You can now log in.', 'success')
|
|
elif not email_verification_required:
|
|
# Email verification is disabled, user can log in immediately
|
|
logger.info(f"User account created with auto-verification in company {company.name}: {username}")
|
|
flash('Registration successful! You can now log in.', 'success')
|
|
else:
|
|
# Send verification email for regular users when verification is required
|
|
verification_url = url_for('verify_email', token=token, _external=True)
|
|
msg = Message('Verify your TimeTrack account', recipients=[email])
|
|
msg.body = f'''Hello {username},
|
|
|
|
Thank you for registering with TimeTrack. To complete your registration, please click on the link below:
|
|
|
|
{verification_url}
|
|
|
|
This link will expire in 24 hours.
|
|
|
|
If you did not register for TimeTrack, please ignore this email.
|
|
|
|
Best regards,
|
|
The TimeTrack Team
|
|
'''
|
|
mail.send(msg)
|
|
logger.info(f"Verification email sent to {email}")
|
|
flash('Registration initiated! Please check your email to verify your account.', 'success')
|
|
|
|
return redirect(url_for('login'))
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error during registration: {str(e)}")
|
|
error = f"An error occurred during registration: {str(e)}"
|
|
|
|
flash(error, 'error')
|
|
|
|
return render_template('register.html', title='Register')
|
|
|
|
@app.route('/register/freelancer', methods=['GET', 'POST'])
|
|
def register_freelancer():
|
|
"""Freelancer registration route - creates user without company token"""
|
|
# Check if registration is enabled
|
|
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
|
|
|
|
if not registration_enabled:
|
|
flash('Registration is currently disabled by the administrator.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
business_name = request.form.get('business_name', '').strip()
|
|
|
|
# Validate input
|
|
error = None
|
|
if not username:
|
|
error = 'Username is required'
|
|
elif not email:
|
|
error = 'Email is required'
|
|
elif not password:
|
|
error = 'Password is required'
|
|
elif password != confirm_password:
|
|
error = 'Passwords do not match'
|
|
|
|
# Check for existing users globally (freelancers get unique usernames/emails)
|
|
if not error:
|
|
if User.query.filter_by(username=username).first():
|
|
error = 'Username already exists'
|
|
elif User.query.filter_by(email=email).first():
|
|
error = 'Email already registered'
|
|
|
|
if error is None:
|
|
try:
|
|
# Create personal company for freelancer
|
|
company_name = business_name if business_name else f"{username}'s Workspace"
|
|
|
|
# Generate unique company slug
|
|
import re
|
|
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
|
|
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
|
|
|
# Ensure slug uniqueness
|
|
base_slug = slug
|
|
counter = 1
|
|
while Company.query.filter_by(slug=slug).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
# Create personal company
|
|
personal_company = Company(
|
|
name=company_name,
|
|
slug=slug,
|
|
description=f"Personal workspace for {username}",
|
|
is_personal=True,
|
|
max_users=1 # Limit to single user
|
|
)
|
|
|
|
db.session.add(personal_company)
|
|
db.session.flush() # Get company ID
|
|
|
|
# Create freelancer user
|
|
new_user = User(
|
|
username=username,
|
|
email=email,
|
|
company_id=personal_company.id,
|
|
account_type=AccountType.FREELANCER,
|
|
business_name=business_name if business_name else None,
|
|
role=Role.ADMIN, # Freelancers are admins of their personal company
|
|
is_verified=True # Auto-verify freelancers
|
|
)
|
|
new_user.set_password(password)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
logger.info(f"Freelancer account created: {username} with personal company: {company_name}")
|
|
flash(f'Welcome {username}! Your freelancer account has been created successfully. You can now log in.', 'success')
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error during freelancer registration: {str(e)}")
|
|
error = f"An error occurred during registration: {str(e)}"
|
|
|
|
if error:
|
|
flash(error, 'error')
|
|
|
|
return render_template('register_freelancer.html', title='Register as Freelancer')
|
|
|
|
@app.route('/setup_company', methods=['GET', 'POST'])
|
|
def setup_company():
|
|
"""Company setup route for creating new companies with admin users"""
|
|
existing_companies = Company.query.count()
|
|
|
|
# Determine access level
|
|
is_initial_setup = existing_companies == 0
|
|
is_super_admin = g.user and g.user.role == Role.ADMIN and existing_companies > 0
|
|
is_authorized = is_initial_setup or is_super_admin
|
|
|
|
# Check authorization for non-initial setups
|
|
if not is_initial_setup and not is_super_admin:
|
|
flash('You do not have permission to create new companies.', 'error')
|
|
return redirect(url_for('home') if g.user else url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
company_name = request.form.get('company_name')
|
|
company_description = request.form.get('company_description', '')
|
|
admin_username = request.form.get('admin_username')
|
|
admin_email = request.form.get('admin_email')
|
|
admin_password = request.form.get('admin_password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not company_name:
|
|
error = 'Company name is required'
|
|
elif not admin_username:
|
|
error = 'Admin username is required'
|
|
elif not admin_email:
|
|
error = 'Admin email is required'
|
|
elif not admin_password:
|
|
error = 'Admin password is required'
|
|
elif admin_password != confirm_password:
|
|
error = 'Passwords do not match'
|
|
elif len(admin_password) < 6:
|
|
error = 'Password must be at least 6 characters long'
|
|
|
|
if error is None:
|
|
try:
|
|
# Generate company slug
|
|
import re
|
|
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
|
|
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
|
|
|
|
# Ensure slug uniqueness
|
|
base_slug = slug
|
|
counter = 1
|
|
while Company.query.filter_by(slug=slug).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
# Create company
|
|
company = Company(
|
|
name=company_name,
|
|
slug=slug,
|
|
description=company_description,
|
|
is_active=True
|
|
)
|
|
db.session.add(company)
|
|
db.session.flush() # Get company.id without committing
|
|
|
|
# Check if username/email already exists in this company context
|
|
existing_user_by_username = User.query.filter_by(
|
|
username=admin_username,
|
|
company_id=company.id
|
|
).first()
|
|
existing_user_by_email = User.query.filter_by(
|
|
email=admin_email,
|
|
company_id=company.id
|
|
).first()
|
|
|
|
if existing_user_by_username:
|
|
error = 'Username already exists in this company'
|
|
elif existing_user_by_email:
|
|
error = 'Email already registered in this company'
|
|
|
|
if error is None:
|
|
# Create admin user
|
|
admin_user = User(
|
|
username=admin_username,
|
|
email=admin_email,
|
|
company_id=company.id,
|
|
role=Role.ADMIN,
|
|
is_verified=True # Auto-verify company admin
|
|
)
|
|
admin_user.set_password(admin_password)
|
|
db.session.add(admin_user)
|
|
db.session.commit()
|
|
|
|
if is_initial_setup:
|
|
# Auto-login the admin user for initial setup
|
|
session['user_id'] = admin_user.id
|
|
session['username'] = admin_user.username
|
|
session['role'] = admin_user.role.value
|
|
|
|
flash(f'Company "{company_name}" created successfully! You are now logged in as the administrator.', 'success')
|
|
return redirect(url_for('home'))
|
|
else:
|
|
# For super admin creating additional companies, don't auto-login
|
|
flash(f'Company "{company_name}" created successfully! Admin user "{admin_username}" has been created with the company code "{slug}".', 'success')
|
|
return redirect(url_for('admin_company') if g.user else url_for('login'))
|
|
else:
|
|
db.session.rollback()
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error during company setup: {str(e)}")
|
|
error = f"An error occurred during setup: {str(e)}"
|
|
|
|
if error:
|
|
flash(error, 'error')
|
|
|
|
return render_template('setup_company.html',
|
|
title='Company Setup',
|
|
existing_companies=existing_companies,
|
|
is_initial_setup=is_initial_setup,
|
|
is_super_admin=is_super_admin)
|
|
|
|
@app.route('/verify_email/<token>')
|
|
def verify_email(token):
|
|
user = User.query.filter_by(verification_token=token).first()
|
|
|
|
if not user:
|
|
flash('Invalid or expired verification link.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if user.verify_token(token):
|
|
db.session.commit()
|
|
flash('Email verified successfully! You can now log in.', 'success')
|
|
else:
|
|
flash('Invalid or expired verification link.', 'error')
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/dashboard')
|
|
@role_required(Role.TEAM_LEADER)
|
|
def dashboard():
|
|
# Get dashboard data based on user role
|
|
dashboard_data = {}
|
|
|
|
if g.user.role == Role.ADMIN and g.user.company_id:
|
|
# Admin sees everything within their company
|
|
|
|
dashboard_data.update({
|
|
'total_users': User.query.filter_by(company_id=g.user.company_id).count(),
|
|
'total_teams': Team.query.filter_by(company_id=g.user.company_id).count(),
|
|
'blocked_users': User.query.filter_by(company_id=g.user.company_id, is_blocked=True).count(),
|
|
'unverified_users': User.query.filter_by(company_id=g.user.company_id, is_verified=False).count(),
|
|
'recent_registrations': User.query.filter_by(company_id=g.user.company_id).order_by(User.id.desc()).limit(5).all()
|
|
})
|
|
|
|
if g.user.role in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
|
|
|
|
# Team leaders and supervisors see team-related data
|
|
if g.user.team_id or g.user.role == Role.ADMIN:
|
|
if g.user.role == Role.ADMIN and g.user.company_id:
|
|
# Admin can see all teams in their company
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).all()
|
|
team_members = User.query.filter(
|
|
User.team_id.isnot(None),
|
|
User.company_id == g.user.company_id
|
|
).all()
|
|
else:
|
|
# Team leaders/supervisors see their own team
|
|
teams = [Team.query.get(g.user.team_id)] if g.user.team_id else []
|
|
|
|
team_members = User.query.filter_by(
|
|
team_id=g.user.team_id,
|
|
company_id=g.user.company_id
|
|
).all() if g.user.team_id else []
|
|
|
|
dashboard_data.update({
|
|
'teams': teams,
|
|
'team_members': team_members,
|
|
'team_member_count': len(team_members)
|
|
})
|
|
|
|
# Get recent time entries for the user's oversight
|
|
if g.user.role == Role.ADMIN:
|
|
# Admin sees all recent entries
|
|
recent_entries = TimeEntry.query.order_by(TimeEntry.arrival_time.desc()).limit(10).all()
|
|
elif g.user.team_id:
|
|
# Team leaders see their team's entries
|
|
team_user_ids = [user.id for user in User.query.filter_by(team_id=g.user.team_id).all()]
|
|
recent_entries = TimeEntry.query.filter(TimeEntry.user_id.in_(team_user_ids)).order_by(TimeEntry.arrival_time.desc()).limit(10).all()
|
|
else:
|
|
recent_entries = []
|
|
|
|
dashboard_data['recent_entries'] = recent_entries
|
|
|
|
return render_template('dashboard.html', title='Dashboard', **dashboard_data)
|
|
|
|
# Redirect old admin dashboard URL to new dashboard
|
|
|
|
@app.route('/admin/users')
|
|
@admin_required
|
|
@company_required
|
|
def admin_users():
|
|
users = User.query.filter_by(company_id=g.user.company_id).all()
|
|
return render_template('admin_users.html', title='User Management', users=users)
|
|
|
|
@app.route('/admin/users/create', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def create_user():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
auto_verify = 'auto_verify' in request.form
|
|
|
|
# Get role and team
|
|
role_name = request.form.get('role')
|
|
team_id = request.form.get('team_id')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not username:
|
|
error = 'Username is required'
|
|
elif not email:
|
|
error = 'Email is required'
|
|
elif not password:
|
|
error = 'Password is required'
|
|
elif User.query.filter_by(username=username, company_id=g.user.company_id).first():
|
|
error = 'Username already exists in your company'
|
|
elif User.query.filter_by(email=email, company_id=g.user.company_id).first():
|
|
error = 'Email already registered in your company'
|
|
|
|
if error is None:
|
|
# Convert role string to enum
|
|
try:
|
|
role = Role[role_name] if role_name else Role.TEAM_MEMBER
|
|
except KeyError:
|
|
role = Role.TEAM_MEMBER
|
|
|
|
# Create new user with role and team
|
|
new_user = User(
|
|
username=username,
|
|
email=email,
|
|
company_id=g.user.company_id,
|
|
is_verified=auto_verify,
|
|
role=role,
|
|
team_id=team_id if team_id else None
|
|
)
|
|
new_user.set_password(password)
|
|
|
|
if not auto_verify:
|
|
# Generate verification token and send email
|
|
token = new_user.generate_verification_token()
|
|
verification_url = url_for('verify_email', token=token, _external=True)
|
|
msg = Message('Verify your TimeTrack account', recipients=[email])
|
|
msg.body = f'''Hello {username},
|
|
|
|
An administrator has created an account for you on TimeTrack. To activate your account, please click on the link below:
|
|
|
|
{verification_url}
|
|
|
|
This link will expire in 24 hours.
|
|
|
|
Best regards,
|
|
The TimeTrack Team
|
|
'''
|
|
mail.send(msg)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
if auto_verify:
|
|
flash(f'User {username} created and automatically verified!', 'success')
|
|
else:
|
|
flash(f'User {username} created! Verification email sent.', 'success')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
flash(error, 'error')
|
|
|
|
# Get all teams for the form (company-scoped)
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).all()
|
|
roles = get_available_roles()
|
|
|
|
return render_template('create_user.html', title='Create User', teams=teams, roles=roles)
|
|
|
|
@app.route('/admin/users/edit/<int:user_id>', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def edit_user(user_id):
|
|
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
|
|
# Get role and team
|
|
role_name = request.form.get('role')
|
|
team_id = request.form.get('team_id')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not username:
|
|
error = 'Username is required'
|
|
elif not email:
|
|
error = 'Email is required'
|
|
elif username != user.username and User.query.filter_by(username=username, company_id=g.user.company_id).first():
|
|
error = 'Username already exists in your company'
|
|
elif email != user.email and User.query.filter_by(email=email, company_id=g.user.company_id).first():
|
|
error = 'Email already registered in your company'
|
|
if error is None:
|
|
user.username = username
|
|
user.email = email
|
|
|
|
# Convert role string to enum
|
|
try:
|
|
user.role = Role[role_name] if role_name else Role.TEAM_MEMBER
|
|
except KeyError:
|
|
user.role = Role.TEAM_MEMBER
|
|
|
|
user.team_id = team_id if team_id else None
|
|
|
|
if password:
|
|
user.set_password(password)
|
|
|
|
db.session.commit()
|
|
|
|
flash(f'User {username} updated successfully!', 'success')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
flash(error, 'error')
|
|
|
|
# Get all teams for the form (company-scoped)
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).all()
|
|
roles = get_available_roles()
|
|
|
|
return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles)
|
|
|
|
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
|
|
@admin_required
|
|
@company_required
|
|
def delete_user(user_id):
|
|
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
# Prevent deleting yourself
|
|
if user.id == session.get('user_id'):
|
|
flash('You cannot delete your own account', 'error')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
username = user.username
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash(f'User {username} deleted successfully', 'success')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
@app.route('/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def profile():
|
|
user = User.query.get(session['user_id'])
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email')
|
|
current_password = request.form.get('current_password')
|
|
new_password = request.form.get('new_password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not email:
|
|
error = 'Email is required'
|
|
elif email != user.email and User.query.filter_by(email=email).first():
|
|
error = 'Email already registered'
|
|
|
|
# Password change validation
|
|
if new_password:
|
|
if not current_password:
|
|
error = 'Current password is required to set a new password'
|
|
elif not user.check_password(current_password):
|
|
error = 'Current password is incorrect'
|
|
elif new_password != confirm_password:
|
|
error = 'New passwords do not match'
|
|
|
|
if error is None:
|
|
user.email = email
|
|
|
|
if new_password:
|
|
user.set_password(new_password)
|
|
|
|
db.session.commit()
|
|
flash('Profile updated successfully!', 'success')
|
|
return redirect(url_for('profile'))
|
|
|
|
flash(error, 'error')
|
|
|
|
return render_template('profile.html', title='My Profile', user=user)
|
|
|
|
@app.route('/2fa/setup', methods=['GET', 'POST'])
|
|
@login_required
|
|
def setup_2fa():
|
|
if request.method == 'POST':
|
|
# Verify the TOTP code before enabling 2FA
|
|
totp_code = request.form.get('totp_code')
|
|
|
|
if not totp_code:
|
|
flash('Please enter the verification code from your authenticator app.', 'error')
|
|
return redirect(url_for('setup_2fa'))
|
|
|
|
try:
|
|
if g.user.verify_2fa_token(totp_code, allow_setup=True):
|
|
g.user.two_factor_enabled = True
|
|
db.session.commit()
|
|
flash('Two-factor authentication has been successfully enabled!', 'success')
|
|
return redirect(url_for('profile'))
|
|
else:
|
|
flash('Invalid verification code. Please make sure your device time is synchronized and try again.', 'error')
|
|
return redirect(url_for('setup_2fa'))
|
|
except Exception as e:
|
|
logger.error(f"2FA setup error: {str(e)}")
|
|
flash('An error occurred during 2FA setup. Please try again.', 'error')
|
|
return redirect(url_for('setup_2fa'))
|
|
|
|
# GET request - show setup page
|
|
if g.user.two_factor_enabled:
|
|
flash('Two-factor authentication is already enabled.', 'info')
|
|
return redirect(url_for('profile'))
|
|
|
|
# Generate secret if not exists
|
|
if not g.user.two_factor_secret:
|
|
g.user.generate_2fa_secret()
|
|
db.session.commit()
|
|
|
|
# Generate QR code
|
|
import qrcode
|
|
import io
|
|
import base64
|
|
|
|
qr_uri = g.user.get_2fa_uri()
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(qr_uri)
|
|
qr.make(fit=True)
|
|
|
|
# Create QR code image
|
|
qr_img = qr.make_image(fill_color="black", back_color="white")
|
|
img_buffer = io.BytesIO()
|
|
qr_img.save(img_buffer, format='PNG')
|
|
img_buffer.seek(0)
|
|
qr_code_b64 = base64.b64encode(img_buffer.getvalue()).decode()
|
|
|
|
return render_template('setup_2fa.html',
|
|
title='Setup Two-Factor Authentication',
|
|
secret=g.user.two_factor_secret,
|
|
qr_code=qr_code_b64)
|
|
|
|
@app.route('/2fa/disable', methods=['POST'])
|
|
@login_required
|
|
def disable_2fa():
|
|
password = request.form.get('password')
|
|
|
|
if not password or not g.user.check_password(password):
|
|
flash('Please enter your correct password to disable 2FA.', 'error')
|
|
return redirect(url_for('profile'))
|
|
|
|
g.user.two_factor_enabled = False
|
|
g.user.two_factor_secret = None
|
|
db.session.commit()
|
|
|
|
flash('Two-factor authentication has been disabled.', 'success')
|
|
return redirect(url_for('profile'))
|
|
|
|
@app.route('/2fa/verify', methods=['GET', 'POST'])
|
|
def verify_2fa():
|
|
# Check if user is in 2FA verification state
|
|
user_id = session.get('2fa_user_id')
|
|
if not user_id:
|
|
return redirect(url_for('login'))
|
|
|
|
user = User.query.get(user_id)
|
|
if not user or not user.two_factor_enabled:
|
|
session.pop('2fa_user_id', None)
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
totp_code = request.form.get('totp_code')
|
|
|
|
if user.verify_2fa_token(totp_code):
|
|
# Complete login process
|
|
session.pop('2fa_user_id', None)
|
|
session['user_id'] = user.id
|
|
session['username'] = user.username
|
|
session['role'] = user.role.value
|
|
|
|
flash('Login successful!', 'success')
|
|
return redirect(url_for('home'))
|
|
else:
|
|
flash('Invalid verification code. Please try again.', 'error')
|
|
|
|
return render_template('verify_2fa.html', title='Two-Factor Authentication')
|
|
|
|
@app.route('/about')
|
|
def about():
|
|
return render_template('about.html', title='About')
|
|
|
|
@app.route('/contact', methods=['GET', 'POST'])
|
|
@login_required
|
|
def contact():
|
|
# redacted
|
|
return render_template('contact.html', title='Contact')
|
|
|
|
# We can keep this route as a redirect to home for backward compatibility
|
|
@app.route('/timetrack')
|
|
@login_required
|
|
def timetrack():
|
|
return redirect(url_for('home'))
|
|
|
|
@app.route('/api/arrive', methods=['POST'])
|
|
@login_required
|
|
def arrive():
|
|
# Get project and notes from request
|
|
project_id = request.json.get('project_id') if request.json else None
|
|
notes = request.json.get('notes') if request.json else None
|
|
|
|
# Validate project access if project is specified
|
|
if project_id:
|
|
project = Project.query.get(project_id)
|
|
if not project or not project.is_user_allowed(g.user):
|
|
return jsonify({'error': 'Invalid or unauthorized project'}), 403
|
|
|
|
# Create a new time entry with arrival time for the current user
|
|
new_entry = TimeEntry(
|
|
user_id=g.user.id,
|
|
arrival_time=datetime.now(),
|
|
project_id=int(project_id) if project_id else None,
|
|
notes=notes
|
|
)
|
|
db.session.add(new_entry)
|
|
db.session.commit()
|
|
|
|
# Format response with user preferences
|
|
from time_utils import format_datetime_by_preference, get_user_format_settings
|
|
date_format, time_format_24h = get_user_format_settings(g.user)
|
|
|
|
return jsonify({
|
|
'id': new_entry.id,
|
|
'arrival_time': format_datetime_by_preference(new_entry.arrival_time, date_format, time_format_24h),
|
|
'project': {
|
|
'id': new_entry.project.id,
|
|
'code': new_entry.project.code,
|
|
'name': new_entry.project.name
|
|
} if new_entry.project else None,
|
|
'notes': new_entry.notes
|
|
})
|
|
|
|
@app.route('/api/leave/<int:entry_id>', methods=['POST'])
|
|
@login_required
|
|
def leave(entry_id):
|
|
# Find the time entry for the current user
|
|
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
|
|
|
|
# Set the departure time
|
|
departure_time = datetime.now()
|
|
|
|
# Apply time rounding if enabled
|
|
rounded_arrival, rounded_departure = apply_time_rounding(entry.arrival_time, departure_time, g.user)
|
|
entry.arrival_time = rounded_arrival
|
|
entry.departure_time = rounded_departure
|
|
|
|
# If currently paused, add the final break duration
|
|
if entry.is_paused and entry.pause_start_time:
|
|
final_break_duration = int((rounded_departure - entry.pause_start_time).total_seconds())
|
|
entry.total_break_duration += final_break_duration
|
|
entry.is_paused = False
|
|
entry.pause_start_time = None
|
|
|
|
# Apply rounding to break duration if enabled
|
|
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
|
|
if interval_minutes > 0:
|
|
entry.total_break_duration = round_duration_to_interval(
|
|
entry.total_break_duration, interval_minutes, round_to_nearest
|
|
)
|
|
|
|
# Calculate work duration considering breaks
|
|
entry.duration, effective_break = calculate_work_duration(
|
|
rounded_arrival,
|
|
rounded_departure,
|
|
entry.total_break_duration,
|
|
g.user
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'id': entry.id,
|
|
'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'duration': entry.duration,
|
|
'total_break_duration': entry.total_break_duration,
|
|
'effective_break_duration': effective_break
|
|
})
|
|
|
|
# Add this new route to handle pausing/resuming
|
|
@app.route('/api/toggle-pause/<int:entry_id>', methods=['POST'])
|
|
@login_required
|
|
def toggle_pause(entry_id):
|
|
# Find the time entry for the current user
|
|
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
|
|
|
|
now = datetime.now()
|
|
|
|
if entry.is_paused:
|
|
# Resuming work - calculate break duration
|
|
break_duration = int((now - entry.pause_start_time).total_seconds())
|
|
entry.total_break_duration += break_duration
|
|
entry.is_paused = False
|
|
entry.pause_start_time = None
|
|
|
|
message = "Work resumed"
|
|
else:
|
|
# Pausing work
|
|
entry.is_paused = True
|
|
entry.pause_start_time = now
|
|
|
|
message = "Work paused"
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'id': entry.id,
|
|
'is_paused': entry.is_paused,
|
|
'total_break_duration': entry.total_break_duration,
|
|
'message': message
|
|
})
|
|
|
|
@app.route('/config', methods=['GET', 'POST'])
|
|
@login_required
|
|
def config():
|
|
# Get user preferences or create default if none exists
|
|
preferences = UserPreferences.query.filter_by(user_id=g.user.id).first()
|
|
if not preferences:
|
|
preferences = UserPreferences(user_id=g.user.id)
|
|
db.session.add(preferences)
|
|
db.session.commit()
|
|
|
|
if request.method == 'POST':
|
|
try:
|
|
# Update only user preferences (no company policies)
|
|
preferences.time_format_24h = 'time_format_24h' in request.form
|
|
preferences.date_format = request.form.get('date_format', 'ISO')
|
|
preferences.time_rounding_minutes = int(request.form.get('time_rounding_minutes', 0))
|
|
preferences.round_to_nearest = 'round_to_nearest' in request.form
|
|
|
|
db.session.commit()
|
|
flash('Preferences updated successfully!', 'success')
|
|
return redirect(url_for('config'))
|
|
except ValueError:
|
|
flash('Please enter valid values for all fields', 'error')
|
|
|
|
# Get company work policies for display (read-only)
|
|
company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
|
|
|
|
# Import time utils for display options
|
|
from time_utils import get_available_rounding_options, get_available_date_formats
|
|
rounding_options = get_available_rounding_options()
|
|
date_format_options = get_available_date_formats()
|
|
|
|
return render_template('config.html', title='User Preferences',
|
|
preferences=preferences,
|
|
company_config=company_config,
|
|
rounding_options=rounding_options,
|
|
date_format_options=date_format_options)
|
|
|
|
@app.route('/api/delete/<int:entry_id>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_entry(entry_id):
|
|
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
|
|
db.session.delete(entry)
|
|
db.session.commit()
|
|
return jsonify({'success': True, 'message': 'Entry deleted successfully'})
|
|
|
|
@app.route('/api/update/<int:entry_id>', methods=['PUT'])
|
|
@login_required
|
|
def update_entry(entry_id):
|
|
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
|
|
data = request.json
|
|
|
|
if 'arrival_time' in data:
|
|
try:
|
|
entry.arrival_time = datetime.strptime(data['arrival_time'], '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
return jsonify({'success': False, 'message': 'Invalid arrival time format'}), 400
|
|
|
|
if 'departure_time' in data and data['departure_time']:
|
|
try:
|
|
entry.departure_time = datetime.strptime(data['departure_time'], '%Y-%m-%d %H:%M:%S')
|
|
# Recalculate duration if both times are present
|
|
if entry.arrival_time and entry.departure_time:
|
|
# Calculate work duration considering breaks
|
|
entry.duration, _ = calculate_work_duration(
|
|
entry.arrival_time,
|
|
entry.departure_time,
|
|
entry.total_break_duration,
|
|
g.user
|
|
)
|
|
except ValueError:
|
|
return jsonify({'success': False, 'message': 'Invalid departure time format'}), 400
|
|
|
|
db.session.commit()
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Entry updated successfully',
|
|
'entry': {
|
|
'id': entry.id,
|
|
'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S') if entry.departure_time else None,
|
|
'duration': entry.duration,
|
|
'is_paused': entry.is_paused,
|
|
'total_break_duration': entry.total_break_duration
|
|
}
|
|
})
|
|
|
|
def calculate_work_duration(arrival_time, departure_time, total_break_duration, user):
|
|
"""
|
|
Calculate work duration considering both configured and actual break times.
|
|
|
|
Args:
|
|
arrival_time: Datetime of arrival
|
|
departure_time: Datetime of departure
|
|
total_break_duration: Actual logged break duration in seconds
|
|
user: User object to get company configuration
|
|
|
|
Returns:
|
|
tuple: (work_duration_in_seconds, effective_break_duration_in_seconds)
|
|
"""
|
|
# Calculate raw duration
|
|
raw_duration = (departure_time - arrival_time).total_seconds()
|
|
|
|
# Get company work configuration for break rules
|
|
company_config = CompanyWorkConfig.query.filter_by(company_id=user.company_id).first()
|
|
if not company_config:
|
|
# Use Germany defaults if no company config exists
|
|
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
|
|
break_threshold_hours = preset['break_threshold_hours']
|
|
mandatory_break_minutes = preset['mandatory_break_minutes']
|
|
additional_break_threshold_hours = preset['additional_break_threshold_hours']
|
|
additional_break_minutes = preset['additional_break_minutes']
|
|
else:
|
|
break_threshold_hours = company_config.break_threshold_hours
|
|
mandatory_break_minutes = company_config.mandatory_break_minutes
|
|
additional_break_threshold_hours = company_config.additional_break_threshold_hours
|
|
additional_break_minutes = company_config.additional_break_minutes
|
|
|
|
# Calculate mandatory breaks based on work duration
|
|
work_hours = raw_duration / 3600 # Convert seconds to hours
|
|
configured_break_seconds = 0
|
|
|
|
# Apply primary break if work duration exceeds threshold
|
|
if work_hours > break_threshold_hours:
|
|
configured_break_seconds += mandatory_break_minutes * 60
|
|
|
|
# Apply additional break if work duration exceeds additional threshold
|
|
if work_hours > additional_break_threshold_hours:
|
|
configured_break_seconds += additional_break_minutes * 60
|
|
|
|
# Use the greater of configured breaks or actual logged breaks
|
|
effective_break_duration = max(configured_break_seconds, total_break_duration)
|
|
|
|
# Calculate final work duration
|
|
work_duration = int(raw_duration - effective_break_duration)
|
|
|
|
return work_duration, effective_break_duration
|
|
|
|
@app.route('/api/resume/<int:entry_id>', methods=['POST'])
|
|
@login_required
|
|
def resume_entry(entry_id):
|
|
# Find the entry to resume for the current user
|
|
entry_to_resume = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
|
|
|
|
# Check if there's already an active entry
|
|
active_entry = TimeEntry.query.filter_by(user_id=session['user_id'], departure_time=None).first()
|
|
if active_entry:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': 'Cannot resume this entry. Another session is already active.'
|
|
}), 400
|
|
|
|
# Clear the departure time to make this entry active again
|
|
entry_to_resume.departure_time = None
|
|
|
|
# Reset pause state if it was paused
|
|
entry_to_resume.is_paused = False
|
|
entry_to_resume.pause_start_time = None
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Work resumed on existing entry',
|
|
'id': entry_to_resume.id,
|
|
'arrival_time': entry_to_resume.arrival_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'total_break_duration': entry_to_resume.total_break_duration
|
|
})
|
|
|
|
@app.route('/api/manual-entry', methods=['POST'])
|
|
@login_required
|
|
def manual_entry():
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# Extract data from request
|
|
project_id = data.get('project_id')
|
|
start_date = data.get('start_date')
|
|
start_time = data.get('start_time')
|
|
end_date = data.get('end_date')
|
|
end_time = data.get('end_time')
|
|
break_minutes = int(data.get('break_minutes', 0))
|
|
notes = data.get('notes', '')
|
|
|
|
# Validate required fields
|
|
if not all([start_date, start_time, end_date, end_time]):
|
|
return jsonify({'error': 'Start and end date/time are required'}), 400
|
|
|
|
# Parse datetime strings
|
|
try:
|
|
arrival_datetime = datetime.strptime(f"{start_date} {start_time}", '%Y-%m-%d %H:%M:%S')
|
|
departure_datetime = datetime.strptime(f"{end_date} {end_time}", '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
# Try without seconds if parsing fails
|
|
arrival_datetime = datetime.strptime(f"{start_date} {start_time}:00", '%Y-%m-%d %H:%M:%S')
|
|
departure_datetime = datetime.strptime(f"{end_date} {end_time}:00", '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
return jsonify({'error': 'Invalid date/time format'}), 400
|
|
|
|
# Validate that end time is after start time
|
|
if departure_datetime <= arrival_datetime:
|
|
return jsonify({'error': 'End time must be after start time'}), 400
|
|
|
|
# Apply time rounding if enabled
|
|
rounded_arrival, rounded_departure = apply_time_rounding(arrival_datetime, departure_datetime, g.user)
|
|
|
|
# Validate project access if project is specified
|
|
if project_id:
|
|
project = Project.query.get(project_id)
|
|
if not project or not project.is_user_allowed(g.user):
|
|
return jsonify({'error': 'Invalid or unauthorized project'}), 403
|
|
|
|
# Check for overlapping entries for this user (using rounded times)
|
|
overlapping_entry = TimeEntry.query.filter(
|
|
TimeEntry.user_id == g.user.id,
|
|
TimeEntry.departure_time.isnot(None),
|
|
TimeEntry.arrival_time < rounded_departure,
|
|
TimeEntry.departure_time > rounded_arrival
|
|
).first()
|
|
|
|
if overlapping_entry:
|
|
return jsonify({
|
|
'error': 'This time entry overlaps with an existing entry'
|
|
}), 400
|
|
|
|
# Calculate total duration in seconds (using rounded times)
|
|
total_duration = int((rounded_departure - rounded_arrival).total_seconds())
|
|
break_duration_seconds = break_minutes * 60
|
|
|
|
# Apply rounding to break duration if enabled
|
|
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
|
|
if interval_minutes > 0:
|
|
break_duration_seconds = round_duration_to_interval(
|
|
break_duration_seconds, interval_minutes, round_to_nearest
|
|
)
|
|
|
|
# Validate break duration doesn't exceed total duration
|
|
if break_duration_seconds >= total_duration:
|
|
return jsonify({'error': 'Break duration cannot exceed total work duration'}), 400
|
|
|
|
# Calculate work duration (total duration minus breaks)
|
|
work_duration = total_duration - break_duration_seconds
|
|
|
|
# Create the manual time entry (using rounded times)
|
|
new_entry = TimeEntry(
|
|
user_id=g.user.id,
|
|
arrival_time=rounded_arrival,
|
|
departure_time=rounded_departure,
|
|
duration=work_duration,
|
|
total_break_duration=break_duration_seconds,
|
|
project_id=int(project_id) if project_id else None,
|
|
notes=notes,
|
|
is_paused=False,
|
|
pause_start_time=None
|
|
)
|
|
|
|
db.session.add(new_entry)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Manual time entry added successfully',
|
|
'entry_id': new_entry.id
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating manual time entry: {str(e)}")
|
|
db.session.rollback()
|
|
return jsonify({'error': 'An error occurred while creating the time entry'}), 500
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template('404.html'), 404
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return render_template('500.html'), 500
|
|
|
|
@app.route('/test')
|
|
def test():
|
|
return "App is working!"
|
|
|
|
@app.route('/admin/users/toggle-status/<int:user_id>')
|
|
@admin_required
|
|
@company_required
|
|
def toggle_user_status(user_id):
|
|
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
# Prevent blocking yourself
|
|
if user.id == session.get('user_id'):
|
|
flash('You cannot block your own account', 'error')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
# Toggle the blocked status
|
|
user.is_blocked = not user.is_blocked
|
|
db.session.commit()
|
|
|
|
if user.is_blocked:
|
|
flash(f'User {user.username} has been blocked', 'success')
|
|
else:
|
|
flash(f'User {user.username} has been unblocked', 'success')
|
|
|
|
return redirect(url_for('admin_users'))
|
|
|
|
# Add this route to manage system settings
|
|
@app.route('/admin/settings', methods=['GET', 'POST'])
|
|
@admin_required
|
|
def admin_settings():
|
|
if request.method == 'POST':
|
|
# Update registration setting
|
|
registration_enabled = 'registration_enabled' in request.form
|
|
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
|
|
if reg_setting:
|
|
reg_setting.value = 'true' if registration_enabled else 'false'
|
|
|
|
# Update email verification setting
|
|
email_verification_required = 'email_verification_required' in request.form
|
|
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
|
|
if email_setting:
|
|
email_setting.value = 'true' if email_verification_required else 'false'
|
|
|
|
db.session.commit()
|
|
flash('System settings updated successfully!', 'success')
|
|
|
|
# Get current settings
|
|
settings = {}
|
|
for setting in SystemSettings.query.all():
|
|
if setting.key == 'registration_enabled':
|
|
settings['registration_enabled'] = setting.value == 'true'
|
|
elif setting.key == 'email_verification_required':
|
|
settings['email_verification_required'] = setting.value == 'true'
|
|
|
|
return render_template('admin_settings.html', title='System Settings', settings=settings)
|
|
|
|
@app.route('/system-admin/dashboard')
|
|
@system_admin_required
|
|
def system_admin_dashboard():
|
|
"""System Administrator Dashboard - view all data across companies"""
|
|
|
|
# Global statistics
|
|
total_companies = Company.query.count()
|
|
total_users = User.query.count()
|
|
total_teams = Team.query.count()
|
|
total_projects = Project.query.count()
|
|
total_time_entries = TimeEntry.query.count()
|
|
|
|
# System admin count
|
|
system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
|
|
regular_admins = User.query.filter_by(role=Role.ADMIN).count()
|
|
|
|
# Recent activity (last 7 days)
|
|
from datetime import datetime, timedelta
|
|
week_ago = datetime.now() - timedelta(days=7)
|
|
|
|
recent_users = User.query.filter(User.created_at >= week_ago).count()
|
|
recent_companies = Company.query.filter(Company.created_at >= week_ago).count()
|
|
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= week_ago).count()
|
|
|
|
# Top companies by user count
|
|
top_companies = db.session.query(
|
|
Company.name,
|
|
Company.id,
|
|
db.func.count(User.id).label('user_count')
|
|
).join(User).group_by(Company.id).order_by(db.func.count(User.id).desc()).limit(5).all()
|
|
|
|
# Recent companies
|
|
recent_companies_list = Company.query.order_by(Company.created_at.desc()).limit(5).all()
|
|
|
|
# System health checks
|
|
orphaned_users = User.query.filter_by(company_id=None).count()
|
|
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
|
|
blocked_users = User.query.filter_by(is_blocked=True).count()
|
|
|
|
return render_template('system_admin_dashboard.html',
|
|
title='System Administrator Dashboard',
|
|
total_companies=total_companies,
|
|
total_users=total_users,
|
|
total_teams=total_teams,
|
|
total_projects=total_projects,
|
|
total_time_entries=total_time_entries,
|
|
system_admins=system_admins,
|
|
regular_admins=regular_admins,
|
|
recent_users=recent_users,
|
|
recent_companies=recent_companies,
|
|
recent_time_entries=recent_time_entries,
|
|
top_companies=top_companies,
|
|
recent_companies_list=recent_companies_list,
|
|
orphaned_users=orphaned_users,
|
|
orphaned_time_entries=orphaned_time_entries,
|
|
blocked_users=blocked_users)
|
|
|
|
@app.route('/system-admin/users')
|
|
@system_admin_required
|
|
def system_admin_users():
|
|
"""System Admin: View all users across all companies"""
|
|
filter_type = request.args.get('filter', '')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 50
|
|
|
|
# Build query based on filter
|
|
query = User.query
|
|
|
|
if filter_type == 'blocked':
|
|
query = query.filter_by(is_blocked=True)
|
|
elif filter_type == 'system_admins':
|
|
query = query.filter_by(role=Role.SYSTEM_ADMIN)
|
|
elif filter_type == 'admins':
|
|
query = query.filter_by(role=Role.ADMIN)
|
|
elif filter_type == 'unverified':
|
|
query = query.filter_by(is_verified=False)
|
|
elif filter_type == 'freelancers':
|
|
query = query.filter_by(account_type=AccountType.FREELANCER)
|
|
|
|
# Add company join for display
|
|
query = query.join(Company).add_columns(Company.name.label('company_name'))
|
|
|
|
# Order by creation date (newest first)
|
|
query = query.order_by(User.created_at.desc())
|
|
|
|
# Paginate results
|
|
users = query.paginate(page=page, per_page=per_page, error_out=False)
|
|
|
|
return render_template('system_admin_users.html',
|
|
title='System Admin - All Users',
|
|
users=users,
|
|
current_filter=filter_type)
|
|
|
|
@app.route('/system-admin/users/<int:user_id>/edit', methods=['GET', 'POST'])
|
|
@system_admin_required
|
|
def system_admin_edit_user(user_id):
|
|
"""System Admin: Edit any user across companies"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if request.method == 'POST':
|
|
# Get form data
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
role = request.form.get('role')
|
|
is_blocked = request.form.get('is_blocked') == 'on'
|
|
is_verified = request.form.get('is_verified') == 'on'
|
|
company_id = request.form.get('company_id')
|
|
team_id = request.form.get('team_id') or None
|
|
|
|
# Validation
|
|
error = None
|
|
|
|
# Check if username is unique within the company
|
|
existing_user = User.query.filter(
|
|
User.username == username,
|
|
User.company_id == company_id,
|
|
User.id != user_id
|
|
).first()
|
|
|
|
if existing_user:
|
|
error = f'Username "{username}" is already taken in this company.'
|
|
|
|
# Check if email is unique within the company
|
|
existing_email = User.query.filter(
|
|
User.email == email,
|
|
User.company_id == company_id,
|
|
User.id != user_id
|
|
).first()
|
|
|
|
if existing_email:
|
|
error = f'Email "{email}" is already registered in this company.'
|
|
|
|
if not error:
|
|
# Update user
|
|
user.username = username
|
|
user.email = email
|
|
user.role = Role(role)
|
|
user.is_blocked = is_blocked
|
|
user.is_verified = is_verified
|
|
user.company_id = company_id
|
|
user.team_id = team_id
|
|
|
|
db.session.commit()
|
|
flash(f'User {username} updated successfully.', 'success')
|
|
return redirect(url_for('system_admin_users'))
|
|
|
|
flash(error, 'error')
|
|
|
|
# Get all companies and teams for form dropdowns
|
|
companies = Company.query.order_by(Company.name).all()
|
|
teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all()
|
|
roles = get_available_roles()
|
|
|
|
return render_template('system_admin_edit_user.html',
|
|
title=f'Edit User: {user.username}',
|
|
user=user,
|
|
companies=companies,
|
|
teams=teams,
|
|
roles=roles)
|
|
|
|
@app.route('/system-admin/users/<int:user_id>/delete', methods=['POST'])
|
|
@system_admin_required
|
|
def system_admin_delete_user(user_id):
|
|
"""System Admin: Delete any user (with safety checks)"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Safety check: prevent deleting the last system admin
|
|
if user.role == Role.SYSTEM_ADMIN:
|
|
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
|
|
if system_admin_count <= 1:
|
|
flash('Cannot delete the last system administrator.', 'error')
|
|
return redirect(url_for('system_admin_users'))
|
|
|
|
# Safety check: prevent deleting yourself
|
|
if user.id == g.user.id:
|
|
flash('Cannot delete your own account.', 'error')
|
|
return redirect(url_for('system_admin_users'))
|
|
|
|
username = user.username
|
|
company_name = user.company.name if user.company else 'Unknown'
|
|
|
|
# Delete related data first
|
|
TimeEntry.query.filter_by(user_id=user.id).delete()
|
|
WorkConfig.query.filter_by(user_id=user.id).delete()
|
|
|
|
# Delete the user
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
flash(f'User "{username}" from company "{company_name}" has been deleted.', 'success')
|
|
return redirect(url_for('system_admin_users'))
|
|
|
|
@app.route('/system-admin/companies')
|
|
@system_admin_required
|
|
def system_admin_companies():
|
|
"""System Admin: View all companies"""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
companies = Company.query.order_by(Company.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False)
|
|
|
|
# Get user counts for each company
|
|
company_stats = {}
|
|
for company in companies.items:
|
|
user_count = User.query.filter_by(company_id=company.id).count()
|
|
admin_count = User.query.filter(
|
|
User.company_id == company.id,
|
|
User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN])
|
|
).count()
|
|
company_stats[company.id] = {
|
|
'user_count': user_count,
|
|
'admin_count': admin_count
|
|
}
|
|
|
|
return render_template('system_admin_companies.html',
|
|
title='System Admin - All Companies',
|
|
companies=companies,
|
|
company_stats=company_stats)
|
|
|
|
@app.route('/system-admin/companies/<int:company_id>')
|
|
@system_admin_required
|
|
def system_admin_company_detail(company_id):
|
|
"""System Admin: View detailed company information"""
|
|
company = Company.query.get_or_404(company_id)
|
|
|
|
# Get company statistics
|
|
users = User.query.filter_by(company_id=company.id).all()
|
|
teams = Team.query.filter_by(company_id=company.id).all()
|
|
projects = Project.query.filter_by(company_id=company.id).all()
|
|
|
|
# Recent activity
|
|
from datetime import datetime, timedelta
|
|
week_ago = datetime.now() - timedelta(days=7)
|
|
recent_time_entries = TimeEntry.query.join(User).filter(
|
|
User.company_id == company.id,
|
|
TimeEntry.arrival_time >= week_ago
|
|
).count()
|
|
|
|
# Role distribution
|
|
role_counts = {}
|
|
for role in Role:
|
|
count = User.query.filter_by(company_id=company.id, role=role).count()
|
|
if count > 0:
|
|
role_counts[role.value] = count
|
|
|
|
return render_template('system_admin_company_detail.html',
|
|
title=f'Company: {company.name}',
|
|
company=company,
|
|
users=users,
|
|
teams=teams,
|
|
projects=projects,
|
|
recent_time_entries=recent_time_entries,
|
|
role_counts=role_counts)
|
|
|
|
@app.route('/system-admin/time-entries')
|
|
@system_admin_required
|
|
def system_admin_time_entries():
|
|
"""System Admin: View time entries across all companies"""
|
|
page = request.args.get('page', 1, type=int)
|
|
company_filter = request.args.get('company', '')
|
|
per_page = 50
|
|
|
|
# Build query
|
|
query = TimeEntry.query.join(User).join(Company)
|
|
|
|
if company_filter:
|
|
query = query.filter(Company.id == company_filter)
|
|
|
|
# Add columns for display
|
|
query = query.add_columns(
|
|
User.username,
|
|
Company.name.label('company_name'),
|
|
Project.name.label('project_name')
|
|
).outerjoin(Project)
|
|
|
|
# Order by arrival time (newest first)
|
|
query = query.order_by(TimeEntry.arrival_time.desc())
|
|
|
|
# Paginate
|
|
entries = query.paginate(page=page, per_page=per_page, error_out=False)
|
|
|
|
# Get companies for filter dropdown
|
|
companies = Company.query.order_by(Company.name).all()
|
|
|
|
return render_template('system_admin_time_entries.html',
|
|
title='System Admin - Time Entries',
|
|
entries=entries,
|
|
companies=companies,
|
|
current_company=company_filter)
|
|
|
|
@app.route('/system-admin/settings', methods=['GET', 'POST'])
|
|
@system_admin_required
|
|
def system_admin_settings():
|
|
"""System Admin: Global system settings"""
|
|
if request.method == 'POST':
|
|
# Update system settings
|
|
registration_enabled = request.form.get('registration_enabled') == 'on'
|
|
email_verification = request.form.get('email_verification_required') == 'on'
|
|
|
|
# Update or create settings
|
|
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
|
|
if reg_setting:
|
|
reg_setting.value = 'true' if registration_enabled else 'false'
|
|
else:
|
|
reg_setting = SystemSettings(
|
|
key='registration_enabled',
|
|
value='true' if registration_enabled else 'false',
|
|
description='Controls whether new user registration is allowed'
|
|
)
|
|
db.session.add(reg_setting)
|
|
|
|
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
|
|
if email_setting:
|
|
email_setting.value = 'true' if email_verification else 'false'
|
|
else:
|
|
email_setting = SystemSettings(
|
|
key='email_verification_required',
|
|
value='true' if email_verification else 'false',
|
|
description='Controls whether email verification is required for new accounts'
|
|
)
|
|
db.session.add(email_setting)
|
|
|
|
db.session.commit()
|
|
flash('System settings updated successfully.', 'success')
|
|
return redirect(url_for('system_admin_settings'))
|
|
|
|
# Get current settings
|
|
settings = {}
|
|
all_settings = SystemSettings.query.all()
|
|
for setting in all_settings:
|
|
if setting.key == 'registration_enabled':
|
|
settings['registration_enabled'] = setting.value == 'true'
|
|
elif setting.key == 'email_verification_required':
|
|
settings['email_verification_required'] = setting.value == 'true'
|
|
|
|
# System statistics
|
|
total_companies = Company.query.count()
|
|
total_users = User.query.count()
|
|
total_system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
|
|
|
|
return render_template('system_admin_settings.html',
|
|
title='System Administrator Settings',
|
|
settings=settings,
|
|
total_companies=total_companies,
|
|
total_users=total_users,
|
|
total_system_admins=total_system_admins)
|
|
|
|
@app.route('/admin/work-policies', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def admin_work_policies():
|
|
# Get or create company work config
|
|
work_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
|
|
if not work_config:
|
|
# Create default config for the company
|
|
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
|
|
work_config = CompanyWorkConfig(
|
|
company_id=g.user.company_id,
|
|
work_hours_per_day=preset['work_hours_per_day'],
|
|
mandatory_break_minutes=preset['mandatory_break_minutes'],
|
|
break_threshold_hours=preset['break_threshold_hours'],
|
|
additional_break_minutes=preset['additional_break_minutes'],
|
|
additional_break_threshold_hours=preset['additional_break_threshold_hours'],
|
|
region=WorkRegion.GERMANY,
|
|
region_name=preset['region_name'],
|
|
created_by_id=g.user.id
|
|
)
|
|
db.session.add(work_config)
|
|
db.session.commit()
|
|
|
|
if request.method == 'POST':
|
|
try:
|
|
# Handle regional preset selection
|
|
if request.form.get('action') == 'apply_preset':
|
|
region_code = request.form.get('region_preset')
|
|
if region_code:
|
|
region = WorkRegion(region_code)
|
|
preset = CompanyWorkConfig.get_regional_preset(region)
|
|
|
|
work_config.work_hours_per_day = preset['work_hours_per_day']
|
|
work_config.mandatory_break_minutes = preset['mandatory_break_minutes']
|
|
work_config.break_threshold_hours = preset['break_threshold_hours']
|
|
work_config.additional_break_minutes = preset['additional_break_minutes']
|
|
work_config.additional_break_threshold_hours = preset['additional_break_threshold_hours']
|
|
work_config.region = region
|
|
work_config.region_name = preset['region_name']
|
|
|
|
db.session.commit()
|
|
flash(f'Applied {preset["region_name"]} work policy preset', 'success')
|
|
return redirect(url_for('admin_work_policies'))
|
|
|
|
# Handle manual configuration update
|
|
else:
|
|
work_config.work_hours_per_day = float(request.form.get('work_hours_per_day', 8.0))
|
|
work_config.mandatory_break_minutes = int(request.form.get('mandatory_break_minutes', 30))
|
|
work_config.break_threshold_hours = float(request.form.get('break_threshold_hours', 6.0))
|
|
work_config.additional_break_minutes = int(request.form.get('additional_break_minutes', 15))
|
|
work_config.additional_break_threshold_hours = float(request.form.get('additional_break_threshold_hours', 9.0))
|
|
work_config.region = WorkRegion.CUSTOM
|
|
work_config.region_name = 'Custom Configuration'
|
|
|
|
db.session.commit()
|
|
flash('Work policies updated successfully!', 'success')
|
|
return redirect(url_for('admin_work_policies'))
|
|
|
|
except ValueError:
|
|
flash('Please enter valid numbers for all fields', 'error')
|
|
|
|
# Get available regional presets
|
|
regional_presets = []
|
|
for region in WorkRegion:
|
|
preset = CompanyWorkConfig.get_regional_preset(region)
|
|
regional_presets.append({
|
|
'code': region.value,
|
|
'name': preset['region_name'],
|
|
'description': f"{preset['work_hours_per_day']}h/day, {preset['mandatory_break_minutes']}min break after {preset['break_threshold_hours']}h"
|
|
})
|
|
|
|
return render_template('admin_work_policies.html',
|
|
title='Work Policies',
|
|
work_config=work_config,
|
|
regional_presets=regional_presets,
|
|
WorkRegion=WorkRegion)
|
|
|
|
# Company Management Routes
|
|
@app.route('/admin/company')
|
|
@admin_required
|
|
@company_required
|
|
def admin_company():
|
|
"""View and manage company settings"""
|
|
company = g.company
|
|
|
|
# Get company statistics
|
|
stats = {
|
|
'total_users': User.query.filter_by(company_id=company.id).count(),
|
|
'total_teams': Team.query.filter_by(company_id=company.id).count(),
|
|
'total_projects': Project.query.filter_by(company_id=company.id).count(),
|
|
'active_projects': Project.query.filter_by(company_id=company.id, is_active=True).count(),
|
|
}
|
|
|
|
return render_template('admin_company.html', title='Company Management', company=company, stats=stats)
|
|
|
|
@app.route('/admin/company/edit', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def edit_company():
|
|
"""Edit company details"""
|
|
company = g.company
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name')
|
|
description = request.form.get('description', '')
|
|
max_users = request.form.get('max_users')
|
|
is_active = 'is_active' in request.form
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Company name is required'
|
|
elif name != company.name and Company.query.filter_by(name=name).first():
|
|
error = 'Company name already exists'
|
|
|
|
if max_users:
|
|
try:
|
|
max_users = int(max_users)
|
|
if max_users < 1:
|
|
error = 'Maximum users must be at least 1'
|
|
except ValueError:
|
|
error = 'Maximum users must be a valid number'
|
|
else:
|
|
max_users = None
|
|
|
|
if error is None:
|
|
company.name = name
|
|
company.description = description
|
|
company.max_users = max_users
|
|
company.is_active = is_active
|
|
db.session.commit()
|
|
|
|
flash('Company details updated successfully!', 'success')
|
|
return redirect(url_for('admin_company'))
|
|
else:
|
|
flash(error, 'error')
|
|
|
|
return render_template('edit_company.html', title='Edit Company', company=company)
|
|
|
|
@app.route('/admin/company/users')
|
|
@admin_required
|
|
@company_required
|
|
def company_users():
|
|
"""List all users in the company with detailed information"""
|
|
users = User.query.filter_by(company_id=g.company.id).order_by(User.created_at.desc()).all()
|
|
|
|
# Calculate user statistics
|
|
user_stats = {
|
|
'total': len(users),
|
|
'verified': len([u for u in users if u.is_verified]),
|
|
'unverified': len([u for u in users if not u.is_verified]),
|
|
'blocked': len([u for u in users if u.is_blocked]),
|
|
'active': len([u for u in users if not u.is_blocked and u.is_verified]),
|
|
'admins': len([u for u in users if u.role == Role.ADMIN]),
|
|
'supervisors': len([u for u in users if u.role == Role.SUPERVISOR]),
|
|
'team_leaders': len([u for u in users if u.role == Role.TEAM_LEADER]),
|
|
'team_members': len([u for u in users if u.role == Role.TEAM_MEMBER]),
|
|
}
|
|
|
|
return render_template('company_users.html', title='Company Users',
|
|
users=users, stats=user_stats, company=g.company)
|
|
|
|
# Add these routes for team management
|
|
@app.route('/admin/teams')
|
|
@admin_required
|
|
@company_required
|
|
def admin_teams():
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).all()
|
|
return render_template('admin_teams.html', title='Team Management', teams=teams)
|
|
|
|
@app.route('/admin/teams/create', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def create_team():
|
|
if request.method == 'POST':
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Team name is required'
|
|
elif Team.query.filter_by(name=name, company_id=g.user.company_id).first():
|
|
error = 'Team name already exists in your company'
|
|
|
|
if error is None:
|
|
new_team = Team(name=name, description=description, company_id=g.user.company_id)
|
|
db.session.add(new_team)
|
|
db.session.commit()
|
|
|
|
flash(f'Team "{name}" created successfully!', 'success')
|
|
return redirect(url_for('admin_teams'))
|
|
|
|
flash(error, 'error')
|
|
|
|
return render_template('create_team.html', title='Create Team')
|
|
|
|
@app.route('/admin/teams/edit/<int:team_id>', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def edit_team(team_id):
|
|
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Team name is required'
|
|
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
|
|
error = 'Team name already exists in your company'
|
|
|
|
if error is None:
|
|
team.name = name
|
|
team.description = description
|
|
db.session.commit()
|
|
|
|
flash(f'Team "{name}" updated successfully!', 'success')
|
|
return redirect(url_for('admin_teams'))
|
|
|
|
flash(error, 'error')
|
|
|
|
return render_template('edit_team.html', title='Edit Team', team=team)
|
|
|
|
@app.route('/admin/teams/delete/<int:team_id>', methods=['POST'])
|
|
@admin_required
|
|
@company_required
|
|
def delete_team(team_id):
|
|
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
# Check if team has members
|
|
if team.users:
|
|
flash('Cannot delete team with members. Remove all members first.', 'error')
|
|
return redirect(url_for('admin_teams'))
|
|
|
|
team_name = team.name
|
|
db.session.delete(team)
|
|
db.session.commit()
|
|
|
|
flash(f'Team "{team_name}" deleted successfully!', 'success')
|
|
return redirect(url_for('admin_teams'))
|
|
|
|
@app.route('/admin/teams/<int:team_id>', methods=['GET', 'POST'])
|
|
@admin_required
|
|
@company_required
|
|
def manage_team(team_id):
|
|
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
if request.method == 'POST':
|
|
action = request.form.get('action')
|
|
|
|
if action == 'update_team':
|
|
# Update team details
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Team name is required'
|
|
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
|
|
error = 'Team name already exists in your company'
|
|
|
|
if error is None:
|
|
team.name = name
|
|
team.description = description
|
|
db.session.commit()
|
|
flash(f'Team "{name}" updated successfully!', 'success')
|
|
else:
|
|
flash(error, 'error')
|
|
|
|
elif action == 'add_member':
|
|
# Add user to team
|
|
user_id = request.form.get('user_id')
|
|
if user_id:
|
|
user = User.query.get(user_id)
|
|
if user:
|
|
user.team_id = team.id
|
|
db.session.commit()
|
|
flash(f'User {user.username} added to team!', 'success')
|
|
else:
|
|
flash('User not found', 'error')
|
|
else:
|
|
flash('No user selected', 'error')
|
|
|
|
elif action == 'remove_member':
|
|
# Remove user from team
|
|
user_id = request.form.get('user_id')
|
|
if user_id:
|
|
user = User.query.get(user_id)
|
|
if user and user.team_id == team.id:
|
|
user.team_id = None
|
|
db.session.commit()
|
|
flash(f'User {user.username} removed from team!', 'success')
|
|
else:
|
|
flash('User not found or not in this team', 'error')
|
|
else:
|
|
flash('No user selected', 'error')
|
|
|
|
# Get team members
|
|
team_members = User.query.filter_by(team_id=team.id).all()
|
|
|
|
# Get users not in this team for the add member form (company-scoped)
|
|
|
|
available_users = User.query.filter(
|
|
User.company_id == g.user.company_id,
|
|
(User.team_id != team.id) | (User.team_id == None)
|
|
).all()
|
|
|
|
return render_template(
|
|
'manage_team.html',
|
|
title=f'Manage Team: {team.name}',
|
|
team=team,
|
|
team_members=team_members,
|
|
available_users=available_users
|
|
)
|
|
|
|
# Project Management Routes
|
|
@app.route('/admin/projects')
|
|
@role_required(Role.SUPERVISOR) # Supervisors and Admins can manage projects
|
|
@company_required
|
|
def admin_projects():
|
|
projects = Project.query.filter_by(company_id=g.user.company_id).order_by(Project.created_at.desc()).all()
|
|
return render_template('admin_projects.html', title='Project Management', projects=projects)
|
|
|
|
@app.route('/admin/projects/create', methods=['GET', 'POST'])
|
|
@role_required(Role.SUPERVISOR)
|
|
@company_required
|
|
def create_project():
|
|
if request.method == 'POST':
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
code = request.form.get('code')
|
|
team_id = request.form.get('team_id') or None
|
|
start_date_str = request.form.get('start_date')
|
|
end_date_str = request.form.get('end_date')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Project name is required'
|
|
elif not code:
|
|
error = 'Project code is required'
|
|
elif Project.query.filter_by(code=code, company_id=g.user.company_id).first():
|
|
error = 'Project code already exists in your company'
|
|
|
|
# Parse dates
|
|
start_date = None
|
|
end_date = None
|
|
if start_date_str:
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
error = 'Invalid start date format'
|
|
|
|
if end_date_str:
|
|
try:
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
error = 'Invalid end date format'
|
|
|
|
if start_date and end_date and start_date > end_date:
|
|
error = 'Start date cannot be after end date'
|
|
|
|
if error is None:
|
|
project = Project(
|
|
name=name,
|
|
description=description,
|
|
code=code.upper(),
|
|
company_id=g.user.company_id,
|
|
team_id=int(team_id) if team_id else None,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
created_by_id=g.user.id
|
|
)
|
|
db.session.add(project)
|
|
db.session.commit()
|
|
flash(f'Project "{name}" created successfully!', 'success')
|
|
return redirect(url_for('admin_projects'))
|
|
else:
|
|
flash(error, 'error')
|
|
|
|
# Get available teams for the form (company-scoped)
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
|
|
return render_template('create_project.html', title='Create Project', teams=teams)
|
|
|
|
@app.route('/admin/projects/edit/<int:project_id>', methods=['GET', 'POST'])
|
|
@role_required(Role.SUPERVISOR)
|
|
@company_required
|
|
def edit_project(project_id):
|
|
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
if request.method == 'POST':
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
code = request.form.get('code')
|
|
team_id = request.form.get('team_id') or None
|
|
is_active = request.form.get('is_active') == 'on'
|
|
start_date_str = request.form.get('start_date')
|
|
end_date_str = request.form.get('end_date')
|
|
|
|
# Validate input
|
|
error = None
|
|
if not name:
|
|
error = 'Project name is required'
|
|
elif not code:
|
|
error = 'Project code is required'
|
|
elif code != project.code and Project.query.filter_by(code=code, company_id=g.user.company_id).first():
|
|
error = 'Project code already exists in your company'
|
|
|
|
# Parse dates
|
|
start_date = None
|
|
end_date = None
|
|
if start_date_str:
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
error = 'Invalid start date format'
|
|
|
|
if end_date_str:
|
|
try:
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
error = 'Invalid end date format'
|
|
|
|
if start_date and end_date and start_date > end_date:
|
|
error = 'Start date cannot be after end date'
|
|
|
|
if error is None:
|
|
project.name = name
|
|
project.description = description
|
|
project.code = code.upper()
|
|
project.team_id = int(team_id) if team_id else None
|
|
project.is_active = is_active
|
|
project.start_date = start_date
|
|
project.end_date = end_date
|
|
db.session.commit()
|
|
flash(f'Project "{name}" updated successfully!', 'success')
|
|
return redirect(url_for('admin_projects'))
|
|
else:
|
|
flash(error, 'error')
|
|
|
|
# Get available teams for the form (company-scoped)
|
|
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
|
|
|
|
return render_template('edit_project.html', title='Edit Project', project=project, teams=teams)
|
|
|
|
@app.route('/admin/projects/delete/<int:project_id>', methods=['POST'])
|
|
@role_required(Role.ADMIN) # Only admins can delete projects
|
|
@company_required
|
|
def delete_project(project_id):
|
|
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
|
|
|
|
# Check if there are time entries associated with this project
|
|
time_entries_count = TimeEntry.query.filter_by(project_id=project_id).count()
|
|
|
|
if time_entries_count > 0:
|
|
flash(f'Cannot delete project "{project.name}" - it has {time_entries_count} time entries associated with it. Deactivate the project instead.', 'error')
|
|
else:
|
|
project_name = project.name
|
|
db.session.delete(project)
|
|
db.session.commit()
|
|
flash(f'Project "{project_name}" deleted successfully!', 'success')
|
|
|
|
return redirect(url_for('admin_projects'))
|
|
|
|
@app.route('/api/team/hours_data', methods=['GET'])
|
|
@login_required
|
|
@role_required(Role.TEAM_LEADER) # Only team leaders and above can access
|
|
@company_required
|
|
def team_hours_data():
|
|
# Get the current user's team
|
|
team = Team.query.get(g.user.team_id)
|
|
|
|
if not team:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': 'You are not assigned to any team.'
|
|
}), 400
|
|
|
|
# Get date range from query parameters or use current week as default
|
|
today = datetime.now().date()
|
|
start_of_week = today - timedelta(days=today.weekday())
|
|
end_of_week = start_of_week + timedelta(days=6)
|
|
|
|
start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d'))
|
|
end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d'))
|
|
include_self = request.args.get('include_self', 'false') == 'true'
|
|
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': 'Invalid date format.'
|
|
}), 400
|
|
|
|
# Get all team members
|
|
team_members = User.query.filter_by(team_id=team.id).all()
|
|
|
|
# Prepare data structure for team members' hours
|
|
team_data = []
|
|
|
|
for member in team_members:
|
|
# Skip if the member is the current user (team leader) and include_self is False
|
|
if member.id == g.user.id and not include_self:
|
|
continue
|
|
|
|
# Get time entries for this member in the date range
|
|
entries = TimeEntry.query.filter(
|
|
TimeEntry.user_id == member.id,
|
|
TimeEntry.arrival_time >= datetime.combine(start_date, time.min),
|
|
TimeEntry.arrival_time <= datetime.combine(end_date, time.max)
|
|
).order_by(TimeEntry.arrival_time).all()
|
|
|
|
# Calculate daily and total hours
|
|
daily_hours = {}
|
|
total_seconds = 0
|
|
|
|
for entry in entries:
|
|
if entry.duration: # Only count completed entries
|
|
entry_date = entry.arrival_time.date()
|
|
date_str = entry_date.strftime('%Y-%m-%d')
|
|
|
|
if date_str not in daily_hours:
|
|
daily_hours[date_str] = 0
|
|
|
|
daily_hours[date_str] += entry.duration
|
|
total_seconds += entry.duration
|
|
|
|
# Convert seconds to hours for display
|
|
for date_str in daily_hours:
|
|
daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours
|
|
|
|
total_hours = round(total_seconds / 3600, 2) # Convert to hours
|
|
|
|
# Format entries for JSON response
|
|
formatted_entries = []
|
|
for entry in entries:
|
|
formatted_entries.append({
|
|
'id': entry.id,
|
|
'arrival_time': entry.arrival_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'departure_time': entry.departure_time.strftime('%Y-%m-%d %H:%M:%S') if entry.departure_time else None,
|
|
'duration': entry.duration,
|
|
'total_break_duration': entry.total_break_duration
|
|
})
|
|
|
|
# Add member data to team data
|
|
team_data.append({
|
|
'user': {
|
|
'id': member.id,
|
|
'username': member.username,
|
|
'email': member.email
|
|
},
|
|
'daily_hours': daily_hours,
|
|
'total_hours': total_hours,
|
|
'entries': formatted_entries
|
|
})
|
|
|
|
# Generate a list of dates in the range for the table header
|
|
date_range = []
|
|
current_date = start_date
|
|
while current_date <= end_date:
|
|
date_range.append(current_date.strftime('%Y-%m-%d'))
|
|
current_date += timedelta(days=1)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'team': {
|
|
'id': team.id,
|
|
'name': team.name,
|
|
'description': team.description
|
|
},
|
|
'team_data': team_data,
|
|
'date_range': date_range,
|
|
'start_date': start_date.strftime('%Y-%m-%d'),
|
|
'end_date': end_date.strftime('%Y-%m-%d')
|
|
})
|
|
|
|
@app.route('/export')
|
|
def export():
|
|
return render_template('export.html', title='Export Data')
|
|
|
|
def get_date_range(period, start_date_str=None, end_date_str=None):
|
|
"""Get start and end date based on period or custom date range."""
|
|
today = datetime.now().date()
|
|
|
|
if period:
|
|
if period == 'today':
|
|
return today, today
|
|
elif period == 'week':
|
|
start_date = today - timedelta(days=today.weekday())
|
|
return start_date, today
|
|
elif period == 'month':
|
|
start_date = today.replace(day=1)
|
|
return start_date, today
|
|
elif period == 'all':
|
|
earliest_entry = TimeEntry.query.order_by(TimeEntry.arrival_time).first()
|
|
start_date = earliest_entry.arrival_time.date() if earliest_entry else today
|
|
return start_date, today
|
|
else:
|
|
# Custom date range
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
return start_date, end_date
|
|
except (ValueError, TypeError):
|
|
raise ValueError('Invalid date format')
|
|
|
|
@app.route('/download_export')
|
|
def download_export():
|
|
"""Handle export download requests."""
|
|
export_format = request.args.get('format', 'csv')
|
|
period = request.args.get('period')
|
|
|
|
try:
|
|
start_date, end_date = get_date_range(
|
|
period,
|
|
request.args.get('start_date'),
|
|
request.args.get('end_date')
|
|
)
|
|
except ValueError:
|
|
flash('Invalid date format. Please use YYYY-MM-DD format.')
|
|
return redirect(url_for('export'))
|
|
|
|
# Query entries within the date range
|
|
start_datetime = datetime.combine(start_date, time.min)
|
|
end_datetime = datetime.combine(end_date, time.max)
|
|
|
|
entries = TimeEntry.query.filter(
|
|
TimeEntry.arrival_time >= start_datetime,
|
|
TimeEntry.arrival_time <= end_datetime
|
|
).order_by(TimeEntry.arrival_time).all()
|
|
|
|
if not entries:
|
|
flash('No entries found for the selected date range.')
|
|
return redirect(url_for('export'))
|
|
|
|
# Prepare data and filename
|
|
data = prepare_export_data(entries)
|
|
filename = f"timetrack_export_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}"
|
|
|
|
# Export based on format
|
|
if export_format == 'csv':
|
|
return export_to_csv(data, filename)
|
|
elif export_format == 'excel':
|
|
return export_to_excel(data, filename)
|
|
else:
|
|
flash('Invalid export format.')
|
|
return redirect(url_for('export'))
|
|
|
|
|
|
@app.route('/analytics')
|
|
@app.route('/analytics/<mode>')
|
|
@login_required
|
|
def analytics(mode='personal'):
|
|
"""Unified analytics view combining history, team hours, and graphs"""
|
|
# Validate mode parameter
|
|
if mode not in ['personal', 'team']:
|
|
mode = 'personal'
|
|
|
|
# Check team access for team mode
|
|
if mode == 'team':
|
|
if not g.user.team_id:
|
|
flash('You must be assigned to a team to view team analytics.', 'warning')
|
|
return redirect(url_for('analytics', mode='personal'))
|
|
|
|
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
|
|
flash('You do not have permission to view team analytics.', 'error')
|
|
return redirect(url_for('analytics', mode='personal'))
|
|
|
|
# Get available projects for filtering
|
|
available_projects = []
|
|
all_projects = Project.query.filter_by(is_active=True).all()
|
|
for project in all_projects:
|
|
if project.is_user_allowed(g.user):
|
|
available_projects.append(project)
|
|
|
|
# Get team members if in team mode
|
|
team_members = []
|
|
if mode == 'team' and g.user.team_id:
|
|
team_members = User.query.filter_by(team_id=g.user.team_id).all()
|
|
|
|
# Default date range (current week)
|
|
today = datetime.now().date()
|
|
start_of_week = today - timedelta(days=today.weekday())
|
|
end_of_week = start_of_week + timedelta(days=6)
|
|
|
|
return render_template('analytics.html',
|
|
title='Time Analytics',
|
|
mode=mode,
|
|
available_projects=available_projects,
|
|
team_members=team_members,
|
|
default_start_date=start_of_week.strftime('%Y-%m-%d'),
|
|
default_end_date=end_of_week.strftime('%Y-%m-%d'))
|
|
|
|
@app.route('/api/analytics/data')
|
|
@login_required
|
|
def analytics_data():
|
|
"""API endpoint for analytics data"""
|
|
mode = request.args.get('mode', 'personal')
|
|
view_type = request.args.get('view', 'table')
|
|
start_date = request.args.get('start_date')
|
|
end_date = request.args.get('end_date')
|
|
project_filter = request.args.get('project_id')
|
|
granularity = request.args.get('granularity', 'daily')
|
|
|
|
# Validate mode
|
|
if mode not in ['personal', 'team']:
|
|
return jsonify({'error': 'Invalid mode'}), 400
|
|
|
|
# Check permissions for team mode
|
|
if mode == 'team':
|
|
if not g.user.team_id:
|
|
return jsonify({'error': 'No team assigned'}), 403
|
|
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
|
|
return jsonify({'error': 'Insufficient permissions'}), 403
|
|
|
|
try:
|
|
# Parse dates
|
|
if start_date:
|
|
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
|
|
if end_date:
|
|
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
|
|
|
|
# Get filtered data
|
|
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
|
|
|
|
# Format data based on view type
|
|
if view_type == 'graph':
|
|
formatted_data = format_graph_data(data, granularity)
|
|
elif view_type == 'team':
|
|
formatted_data = format_team_data(data, granularity)
|
|
else:
|
|
formatted_data = format_table_data(data)
|
|
|
|
return jsonify(formatted_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in analytics_data: {str(e)}")
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
def get_filtered_analytics_data(user, mode, start_date=None, end_date=None, project_filter=None):
|
|
"""Get filtered time entry data for analytics"""
|
|
# Base query
|
|
query = TimeEntry.query
|
|
|
|
# Apply user/team filter
|
|
if mode == 'personal':
|
|
query = query.filter(TimeEntry.user_id == user.id)
|
|
elif mode == 'team' and user.team_id:
|
|
team_user_ids = [u.id for u in User.query.filter_by(team_id=user.team_id).all()]
|
|
query = query.filter(TimeEntry.user_id.in_(team_user_ids))
|
|
|
|
# Apply date filters
|
|
if start_date:
|
|
query = query.filter(func.date(TimeEntry.arrival_time) >= start_date)
|
|
if end_date:
|
|
query = query.filter(func.date(TimeEntry.arrival_time) <= end_date)
|
|
|
|
# Apply project filter
|
|
if project_filter:
|
|
if project_filter == 'none':
|
|
query = query.filter(TimeEntry.project_id.is_(None))
|
|
else:
|
|
try:
|
|
project_id = int(project_filter)
|
|
query = query.filter(TimeEntry.project_id == project_id)
|
|
except ValueError:
|
|
pass
|
|
|
|
return query.order_by(TimeEntry.arrival_time.desc()).all()
|
|
|
|
|
|
@app.route('/api/companies/<int:company_id>/teams')
|
|
@system_admin_required
|
|
def api_company_teams(company_id):
|
|
"""API: Get teams for a specific company (System Admin only)"""
|
|
teams = Team.query.filter_by(company_id=company_id).order_by(Team.name).all()
|
|
return jsonify([{
|
|
'id': team.id,
|
|
'name': team.name,
|
|
'description': team.description
|
|
} for team in teams])
|
|
|
|
@app.route('/api/system-admin/stats')
|
|
@system_admin_required
|
|
def api_system_admin_stats():
|
|
"""API: Get real-time system statistics for dashboard"""
|
|
from datetime import datetime, timedelta
|
|
|
|
# Get basic counts
|
|
total_companies = Company.query.count()
|
|
total_users = User.query.count()
|
|
total_teams = Team.query.count()
|
|
total_projects = Project.query.count()
|
|
total_time_entries = TimeEntry.query.count()
|
|
|
|
# Active sessions
|
|
active_sessions = TimeEntry.query.filter_by(departure_time=None, is_paused=False).count()
|
|
paused_sessions = TimeEntry.query.filter_by(is_paused=True).count()
|
|
|
|
# Recent activity (last 24 hours)
|
|
yesterday = datetime.now() - timedelta(days=1)
|
|
recent_users = User.query.filter(User.created_at >= yesterday).count()
|
|
recent_companies = Company.query.filter(Company.created_at >= yesterday).count()
|
|
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= yesterday).count()
|
|
|
|
# System health
|
|
orphaned_users = User.query.filter_by(company_id=None).count()
|
|
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
|
|
blocked_users = User.query.filter_by(is_blocked=True).count()
|
|
unverified_users = User.query.filter_by(is_verified=False).count()
|
|
|
|
return jsonify({
|
|
'totals': {
|
|
'companies': total_companies,
|
|
'users': total_users,
|
|
'teams': total_teams,
|
|
'projects': total_projects,
|
|
'time_entries': total_time_entries
|
|
},
|
|
'active': {
|
|
'sessions': active_sessions,
|
|
'paused_sessions': paused_sessions
|
|
},
|
|
'recent': {
|
|
'users': recent_users,
|
|
'companies': recent_companies,
|
|
'time_entries': recent_time_entries
|
|
},
|
|
'health': {
|
|
'orphaned_users': orphaned_users,
|
|
'orphaned_time_entries': orphaned_time_entries,
|
|
'blocked_users': blocked_users,
|
|
'unverified_users': unverified_users
|
|
}
|
|
})
|
|
|
|
@app.route('/api/system-admin/companies/<int:company_id>/users')
|
|
@system_admin_required
|
|
def api_company_users(company_id):
|
|
"""API: Get users for a specific company (System Admin only)"""
|
|
company = Company.query.get_or_404(company_id)
|
|
users = User.query.filter_by(company_id=company.id).order_by(User.username).all()
|
|
|
|
return jsonify({
|
|
'company': {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'is_personal': company.is_personal
|
|
},
|
|
'users': [{
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'role': user.role.value,
|
|
'is_blocked': user.is_blocked,
|
|
'is_verified': user.is_verified,
|
|
'created_at': user.created_at.isoformat(),
|
|
'team_id': user.team_id
|
|
} for user in users]
|
|
})
|
|
|
|
@app.route('/api/system-admin/users/<int:user_id>/toggle-block', methods=['POST'])
|
|
@system_admin_required
|
|
def api_toggle_user_block(user_id):
|
|
"""API: Toggle user blocked status (System Admin only)"""
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
# Safety check: prevent blocking yourself
|
|
if user.id == g.user.id:
|
|
return jsonify({'error': 'Cannot block your own account'}), 400
|
|
|
|
# Safety check: prevent blocking the last system admin
|
|
if user.role == Role.SYSTEM_ADMIN and not user.is_blocked:
|
|
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count()
|
|
if system_admin_count <= 1:
|
|
return jsonify({'error': 'Cannot block the last system administrator'}), 400
|
|
|
|
user.is_blocked = not user.is_blocked
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'is_blocked': user.is_blocked,
|
|
'message': f'User {"blocked" if user.is_blocked else "unblocked"} successfully'
|
|
})
|
|
|
|
@app.route('/api/system-admin/companies/<int:company_id>/stats')
|
|
@system_admin_required
|
|
def api_company_stats(company_id):
|
|
"""API: Get detailed statistics for a specific company"""
|
|
company = Company.query.get_or_404(company_id)
|
|
|
|
# User counts by role
|
|
role_counts = {}
|
|
for role in Role:
|
|
count = User.query.filter_by(company_id=company.id, role=role).count()
|
|
if count > 0:
|
|
role_counts[role.value] = count
|
|
|
|
# Team and project counts
|
|
team_count = Team.query.filter_by(company_id=company.id).count()
|
|
project_count = Project.query.filter_by(company_id=company.id).count()
|
|
active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count()
|
|
|
|
# Time entries statistics
|
|
from datetime import datetime, timedelta
|
|
week_ago = datetime.now() - timedelta(days=7)
|
|
month_ago = datetime.now() - timedelta(days=30)
|
|
|
|
weekly_entries = TimeEntry.query.join(User).filter(
|
|
User.company_id == company.id,
|
|
TimeEntry.arrival_time >= week_ago
|
|
).count()
|
|
|
|
monthly_entries = TimeEntry.query.join(User).filter(
|
|
User.company_id == company.id,
|
|
TimeEntry.arrival_time >= month_ago
|
|
).count()
|
|
|
|
# Active sessions
|
|
active_sessions = TimeEntry.query.join(User).filter(
|
|
User.company_id == company.id,
|
|
TimeEntry.departure_time == None,
|
|
TimeEntry.is_paused == False
|
|
).count()
|
|
|
|
return jsonify({
|
|
'company': {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'is_personal': company.is_personal,
|
|
'is_active': company.is_active
|
|
},
|
|
'users': {
|
|
'total': sum(role_counts.values()),
|
|
'by_role': role_counts
|
|
},
|
|
'structure': {
|
|
'teams': team_count,
|
|
'projects': project_count,
|
|
'active_projects': active_projects
|
|
},
|
|
'activity': {
|
|
'weekly_entries': weekly_entries,
|
|
'monthly_entries': monthly_entries,
|
|
'active_sessions': active_sessions
|
|
}
|
|
})
|
|
|
|
@app.route('/api/analytics/export')
|
|
@login_required
|
|
def analytics_export():
|
|
"""Export analytics data in various formats"""
|
|
export_format = request.args.get('format', 'csv')
|
|
view_type = request.args.get('view', 'table')
|
|
mode = request.args.get('mode', 'personal')
|
|
start_date = request.args.get('start_date')
|
|
end_date = request.args.get('end_date')
|
|
project_filter = request.args.get('project_id')
|
|
|
|
# Validate permissions
|
|
if mode == 'team':
|
|
if not g.user.team_id:
|
|
flash('No team assigned', 'error')
|
|
return redirect(url_for('analytics'))
|
|
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
|
|
flash('Insufficient permissions', 'error')
|
|
return redirect(url_for('analytics'))
|
|
|
|
try:
|
|
# Parse dates
|
|
if start_date:
|
|
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
|
|
if end_date:
|
|
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
|
|
|
|
# Get data
|
|
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
|
|
|
|
if export_format == 'csv':
|
|
return export_analytics_csv(data, view_type, mode)
|
|
elif export_format == 'excel':
|
|
return export_analytics_excel(data, view_type, mode)
|
|
else:
|
|
flash('Invalid export format', 'error')
|
|
return redirect(url_for('analytics'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in analytics export: {str(e)}")
|
|
flash('Error generating export', 'error')
|
|
return redirect(url_for('analytics'))
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get('PORT', 5000))
|
|
app.run(debug=False, host='0.0.0.0', port=port) |