#!/usr/bin/env python """ Cleanup unverified user accounts older than 24 hours. This script can be run manually or scheduled via cron. """ import os import sys from datetime import datetime, timedelta from sqlalchemy import and_ # Add the application path to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app import app, db from models import User, Company, SystemEvent from models.enums import Role def cleanup_unverified_accounts(dry_run=False): """ Delete unverified user accounts that are older than 24 hours. Args: dry_run (bool): If True, only show what would be deleted without actually deleting. Returns: int: Number of accounts deleted (or would be deleted in dry run mode) """ with app.app_context(): # Find unverified accounts older than 24 hours cutoff_time = datetime.utcnow() - timedelta(hours=24) unverified_users = User.query.filter( and_( User.is_verified == False, User.created_at < cutoff_time ) ).all() deleted_count = 0 for user in unverified_users: # Check if this user is the only admin in their company # We shouldn't delete them if they are, as it would orphan the company if user.role in [Role.ADMIN, Role.SYSTEM_ADMIN]: other_admins = User.query.filter( and_( User.company_id == user.company_id, User.id != user.id, User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN]) ) ).count() if other_admins == 0: print(f"Skipping {user.username} (ID: {user.id}) - only admin in company {user.company_id}") continue if dry_run: print(f"Would delete unverified user: {user.username} (ID: {user.id}, Email: {user.email}, Created: {user.created_at})") deleted_count += 1 # Check if company would be deleted too (for dry run) company = Company.query.get(user.company_id) if company: other_users = User.query.filter( and_( User.company_id == user.company_id, User.id != user.id ) ).count() if other_users == 0: print(f" Would also delete empty company: {company.name} (ID: {company.id})") else: print(f"Deleting unverified user: {user.username} (ID: {user.id}, Email: {user.email}, Created: {user.created_at})") # Log the deletion as a system event if SystemEvent exists try: SystemEvent.log_event( event_type='user_deleted', event_category='system', description=f'Unverified user {user.username} deleted after 24 hours', user_id=None, # No user context for system cleanup company_id=user.company_id ) except: # SystemEvent might not exist, continue without logging pass # Check if the company should be deleted (if it was created with this user and has no other users) company = Company.query.get(user.company_id) if company: other_users = User.query.filter( and_( User.company_id == user.company_id, User.id != user.id ) ).count() # Delete the user db.session.delete(user) deleted_count += 1 # If no other users, delete the company too if other_users == 0: print(f" Also deleting empty company: {company.name} (ID: {company.id})") db.session.delete(company) else: # Delete the user even if company doesn't exist db.session.delete(user) deleted_count += 1 if not dry_run and deleted_count > 0: db.session.commit() print(f"\nSuccessfully deleted {deleted_count} unverified account(s)") elif dry_run: print(f"\nDry run: Would delete {deleted_count} unverified account(s)") else: print("No unverified accounts older than 24 hours found") return deleted_count def main(): """Main function to handle command line arguments""" import argparse parser = argparse.ArgumentParser(description='Cleanup unverified user accounts older than 24 hours') parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without actually deleting') parser.add_argument('--quiet', action='store_true', help='Suppress output except for errors') args = parser.parse_args() if not args.quiet: print(f"Starting cleanup of unverified accounts at {datetime.utcnow()}") print("-" * 60) try: deleted_count = cleanup_unverified_accounts(dry_run=args.dry_run) if not args.quiet: print("-" * 60) print(f"Cleanup completed at {datetime.utcnow()}") # Exit with 0 for success sys.exit(0) except Exception as e: print(f"Error during cleanup: {str(e)}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()