From f2d749ba3fde18d7bc157a44393a5f27a9592a65 Mon Sep 17 00:00:00 2001 From: Sebastian Serfling Date: Tue, 21 Apr 2026 15:10:35 +0200 Subject: [PATCH] feat: initial ImapSync Manager setup --- README.md | 94 +++ backend/Dockerfile | 16 + backend/main.py | 444 +++++++++++++ backend/requirements.txt | 8 + backend/static/index.html | 1258 +++++++++++++++++++++++++++++++++++++ docker-compose.yml | 37 ++ worker/Dockerfile | 15 + worker/worker.py | 215 +++++++ 8 files changed, 2087 insertions(+) create mode 100644 README.md create mode 100644 backend/Dockerfile create mode 100644 backend/main.py create mode 100644 backend/requirements.txt create mode 100644 backend/static/index.html create mode 100644 docker-compose.yml create mode 100644 worker/Dockerfile create mode 100644 worker/worker.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..cb084ef --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# ImapSync Manager + +Docker-basiertes System für IMAP-E-Mail-Migrationen mit Web-GUI. + +## Features + +- **Web-GUI** – Dashboard, Job-Verwaltung, Log-Viewer, Statistiken +- **Authentifizierung** – MD5-Passwörter in SQLite, JWT-Token +- **Rollensystem** – Admin / Operator / Viewer +- **Job-Scheduler** – Manuell oder per Cron-Zeitplan +- **Auftragsverwalter (Worker)** – Führt imapsync-Jobs sequenziell aus +- **Log-Archiv** – Vollständige imapsync-Logs pro Ausführung + +## Schnellstart + +```bash +# Starten +docker compose up -d + +# Logs verfolgen +docker compose logs -f + +# Zugriff +http://localhost:8080 +``` + +**Standard-Login:** `admin` / `admin` +→ Passwort sofort nach dem ersten Login ändern! + +## Verzeichnisstruktur + +``` +imapsync-docker/ +├── docker-compose.yml +├── backend/ ← FastAPI + Web-GUI +│ ├── Dockerfile +│ ├── main.py +│ ├── requirements.txt +│ └── static/ +│ └── index.html +├── worker/ ← imapsync-Ausführung +│ ├── Dockerfile +│ └── worker.py +└── data/ ← Persistente Daten (auto-erstellt) + ├── imapsync.db ← SQLite-Datenbank + └── logs/ ← imapsync-Logdateien +``` + +## Sicherheit + +- Passwörter werden als MD5-Hash in SQLite gespeichert +- JWT-Token läuft nach 12h ab +- SECRET_KEY in docker-compose.yml anpassen! + +## Cron-Beispiele + +| Ausdruck | Bedeutung | +|---------------|-------------------------| +| `0 2 * * *` | Täglich um 02:00 Uhr | +| `0 */6 * * *` | Alle 6 Stunden | +| `0 2 * * 0` | Wöchentlich, So 02:00 | +| `30 1 * * 1-5`| Mo-Fr um 01:30 Uhr | + +## Rollen + +| Rolle | Benutzer | Jobs | Start/Stop | Logs | +|----------|----------|------|-----------|------| +| admin | ✅ | ✅ | ✅ | ✅ | +| operator | ✗ | ✅ | ✅ | ✅ | +| viewer | ✗ | ✗ | ✗ | ✅ | + +## Umgebungsvariablen + +### Web-Container +| Variable | Standard | Beschreibung | +|--------------|--------------------|-----------------------| +| DB_PATH | /data/imapsync.db | Pfad zur SQLite-DB | +| LOG_DIR | /data/logs | Log-Verzeichnis | +| SECRET_KEY | (Pflicht ändern!) | JWT-Signaturschlüssel | + +### Worker-Container +| Variable | Standard | Beschreibung | +|---------------|----------|------------------------------| +| POLL_INTERVAL | 15 | Sekunden zwischen DB-Abfragen | + +## Datensicherung + +```bash +# DB sichern +cp data/imapsync.db data/imapsync.db.bak + +# Komplettes Backup +tar -czf imapsync-backup-$(date +%Y%m%d).tar.gz data/ +``` diff --git a/backend/Dockerfile b/backend/Dockerfile new file mode 100644 index 0000000..fca5485 --- /dev/null +++ b/backend/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +RUN mkdir -p /data/logs + +EXPOSE 8080 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000..cfbf6b8 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,444 @@ +import os +import sqlite3 +import hashlib +import secrets +from datetime import datetime, timedelta +from typing import Optional +from contextlib import asynccontextmanager + +from fastapi import FastAPI, HTTPException, Depends, status, Request +from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import BaseModel +from jose import JWTError, jwt + +# ── Config ────────────────────────────────────────────────────────────────── +DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") +LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") +SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-change-me") +ALGORITHM = "HS256" +TOKEN_EXPIRE_HOURS = 12 + +os.makedirs(LOG_DIR, exist_ok=True) + +# ── DB ─────────────────────────────────────────────────────────────────────── +def get_db(): + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + +def init_db(): + conn = get_db() + conn.executescript(""" + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_md5 TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'viewer', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS sync_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + src_host TEXT NOT NULL, + src_port INTEGER DEFAULT 993, + src_ssl INTEGER DEFAULT 1, + src_user TEXT NOT NULL, + src_password TEXT NOT NULL, + dst_host TEXT NOT NULL, + dst_port INTEGER DEFAULT 993, + dst_ssl INTEGER DEFAULT 1, + dst_user TEXT NOT NULL, + dst_password TEXT NOT NULL, + extra_args TEXT DEFAULT '', + schedule TEXT DEFAULT NULL, + enabled INTEGER DEFAULT 1, + status TEXT DEFAULT 'idle', + last_run DATETIME, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + created_by TEXT + ); + + CREATE TABLE IF NOT EXISTS job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL, + started_at DATETIME DEFAULT CURRENT_TIMESTAMP, + finished_at DATETIME, + status TEXT DEFAULT 'running', + log_file TEXT, + messages_synced INTEGER DEFAULT 0, + messages_skipped INTEGER DEFAULT 0, + errors INTEGER DEFAULT 0, + duration_sec INTEGER DEFAULT 0, + FOREIGN KEY (job_id) REFERENCES sync_jobs(id) + ); + + CREATE TABLE IF NOT EXISTS sessions ( + token TEXT PRIMARY KEY, + username TEXT NOT NULL, + expires_at DATETIME NOT NULL + ); + """) + # Create default admin if no users exist + cur = conn.execute("SELECT COUNT(*) FROM users") + if cur.fetchone()[0] == 0: + pw_md5 = hashlib.md5("admin".encode()).hexdigest() + conn.execute( + "INSERT INTO users (username, password_md5, role) VALUES (?, ?, ?)", + ("admin", pw_md5, "admin") + ) + conn.commit() + conn.close() + +# ── Auth ────────────────────────────────────────────────────────────────────── +def md5_hash(pw: str) -> str: + return hashlib.md5(pw.encode()).hexdigest() + +def create_token(username: str, role: str) -> str: + expire = datetime.utcnow() + timedelta(hours=TOKEN_EXPIRE_HOURS) + payload = {"sub": username, "role": role, "exp": expire} + return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) + +def decode_token(token: str) -> dict: + try: + return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + except JWTError: + raise HTTPException(status_code=401, detail="Token ungültig oder abgelaufen") + +security = HTTPBearer() + +def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): + return decode_token(credentials.credentials) + +def require_admin(user=Depends(get_current_user)): + if user.get("role") != "admin": + raise HTTPException(status_code=403, detail="Nur Admins erlaubt") + return user + +# ── App ─────────────────────────────────────────────────────────────────────── +@asynccontextmanager +async def lifespan(app: FastAPI): + init_db() + yield + +app = FastAPI(title="ImapSync Manager", lifespan=lifespan) + +# ── Schemas ─────────────────────────────────────────────────────────────────── +class LoginRequest(BaseModel): + username: str + password: str + +class UserCreate(BaseModel): + username: str + password: str + role: str = "viewer" # admin | operator | viewer + +class UserUpdate(BaseModel): + password: Optional[str] = None + role: Optional[str] = None + +class SyncJobCreate(BaseModel): + name: str + src_host: str + src_port: int = 993 + src_ssl: bool = True + src_user: str + src_password: str + dst_host: str + dst_port: int = 993 + dst_ssl: bool = True + dst_user: str + dst_password: str + extra_args: str = "" + schedule: Optional[str] = None + enabled: bool = True + +class SyncJobUpdate(BaseModel): + name: Optional[str] = None + src_host: Optional[str] = None + src_port: Optional[int] = None + src_ssl: Optional[bool] = None + src_user: Optional[str] = None + src_password: Optional[str] = None + dst_host: Optional[str] = None + dst_port: Optional[int] = None + dst_ssl: Optional[bool] = None + dst_user: Optional[str] = None + dst_password: Optional[str] = None + extra_args: Optional[str] = None + schedule: Optional[str] = None + enabled: Optional[bool] = None + +# ── Health ──────────────────────────────────────────────────────────────────── +@app.get("/api/health") +def health(): + return {"status": "ok", "time": datetime.utcnow().isoformat()} + +# ── Auth Endpoints ──────────────────────────────────────────────────────────── +@app.post("/api/auth/login") +def login(req: LoginRequest): + conn = get_db() + row = conn.execute( + "SELECT * FROM users WHERE username = ? AND password_md5 = ?", + (req.username, md5_hash(req.password)) + ).fetchone() + conn.close() + if not row: + raise HTTPException(status_code=401, detail="Ungültige Zugangsdaten") + token = create_token(row["username"], row["role"]) + return {"token": token, "username": row["username"], "role": row["role"]} + +@app.get("/api/auth/me") +def me(user=Depends(get_current_user)): + return user + +# ── User Endpoints ──────────────────────────────────────────────────────────── +@app.get("/api/users") +def list_users(user=Depends(require_admin)): + conn = get_db() + rows = conn.execute( + "SELECT id, username, role, created_at FROM users ORDER BY created_at DESC" + ).fetchall() + conn.close() + return [dict(r) for r in rows] + +@app.post("/api/users", status_code=201) +def create_user(req: UserCreate, user=Depends(require_admin)): + conn = get_db() + try: + conn.execute( + "INSERT INTO users (username, password_md5, role) VALUES (?, ?, ?)", + (req.username, md5_hash(req.password), req.role) + ) + conn.commit() + except sqlite3.IntegrityError: + raise HTTPException(status_code=409, detail="Benutzername bereits vergeben") + finally: + conn.close() + return {"message": f"Benutzer '{req.username}' erstellt"} + +@app.put("/api/users/{user_id}") +def update_user(user_id: int, req: UserUpdate, user=Depends(require_admin)): + conn = get_db() + row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") + updates = {} + if req.password: + updates["password_md5"] = md5_hash(req.password) + if req.role: + updates["role"] = req.role + if updates: + set_clause = ", ".join(f"{k} = ?" for k in updates) + conn.execute( + f"UPDATE users SET {set_clause} WHERE id = ?", + (*updates.values(), user_id) + ) + conn.commit() + conn.close() + return {"message": "Benutzer aktualisiert"} + +@app.delete("/api/users/{user_id}") +def delete_user(user_id: int, user=Depends(require_admin)): + conn = get_db() + row = conn.execute("SELECT username FROM users WHERE id = ?", (user_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") + if row["username"] == user["sub"]: + raise HTTPException(status_code=400, detail="Eigenen Account nicht löschbar") + conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) + conn.commit() + conn.close() + return {"message": "Benutzer gelöscht"} + +# ── Sync Job Endpoints ──────────────────────────────────────────────────────── +@app.get("/api/jobs") +def list_jobs(user=Depends(get_current_user)): + conn = get_db() + rows = conn.execute(""" + SELECT j.*, + (SELECT COUNT(*) FROM job_runs r WHERE r.job_id = j.id) as run_count, + (SELECT SUM(messages_synced) FROM job_runs r WHERE r.job_id = j.id) as total_synced + FROM sync_jobs j ORDER BY j.created_at DESC + """).fetchall() + conn.close() + result = [] + for r in rows: + d = dict(r) + d.pop("src_password", None) + d.pop("dst_password", None) + result.append(d) + return result + +@app.post("/api/jobs", status_code=201) +def create_job(req: SyncJobCreate, user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + conn = get_db() + cur = conn.execute(""" + INSERT INTO sync_jobs + (name,src_host,src_port,src_ssl,src_user,src_password, + dst_host,dst_port,dst_ssl,dst_user,dst_password, + extra_args,schedule,enabled,created_by) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + """, (req.name, req.src_host, req.src_port, int(req.src_ssl), req.src_user, req.src_password, + req.dst_host, req.dst_port, int(req.dst_ssl), req.dst_user, req.dst_password, + req.extra_args, req.schedule, int(req.enabled), user["sub"])) + conn.commit() + job_id = cur.lastrowid + conn.close() + return {"message": "Job erstellt", "id": job_id} + +@app.get("/api/jobs/{job_id}") +def get_job(job_id: int, user=Depends(get_current_user)): + conn = get_db() + row = conn.execute("SELECT * FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() + conn.close() + if not row: + raise HTTPException(status_code=404, detail="Job nicht gefunden") + d = dict(row) + if user.get("role") == "viewer": + d.pop("src_password", None) + d.pop("dst_password", None) + return d + +@app.put("/api/jobs/{job_id}") +def update_job(job_id: int, req: SyncJobUpdate, user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + conn = get_db() + row = conn.execute("SELECT id FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Job nicht gefunden") + fields = {k: v for k, v in req.model_dump().items() if v is not None} + if "src_ssl" in fields: + fields["src_ssl"] = int(fields["src_ssl"]) + if "dst_ssl" in fields: + fields["dst_ssl"] = int(fields["dst_ssl"]) + if "enabled" in fields: + fields["enabled"] = int(fields["enabled"]) + if fields: + set_clause = ", ".join(f"{k} = ?" for k in fields) + conn.execute( + f"UPDATE sync_jobs SET {set_clause} WHERE id = ?", + (*fields.values(), job_id) + ) + conn.commit() + conn.close() + return {"message": "Job aktualisiert"} + +@app.delete("/api/jobs/{job_id}") +def delete_job(job_id: int, user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + conn = get_db() + conn.execute("DELETE FROM job_runs WHERE job_id = ?", (job_id,)) + conn.execute("DELETE FROM sync_jobs WHERE id = ?", (job_id,)) + conn.commit() + conn.close() + return {"message": "Job gelöscht"} + +@app.post("/api/jobs/{job_id}/trigger") +def trigger_job(job_id: int, user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + conn = get_db() + row = conn.execute("SELECT status FROM sync_jobs WHERE id = ?", (job_id,)).fetchone() + if not row: + raise HTTPException(status_code=404, detail="Job nicht gefunden") + if row["status"] in ("queued", "running"): + raise HTTPException(status_code=409, detail="Job läuft bereits oder ist in der Warteschlange") + conn.execute("UPDATE sync_jobs SET status = 'queued' WHERE id = ?", (job_id,)) + conn.commit() + conn.close() + return {"message": "Job in Warteschlange eingereiht"} + +@app.post("/api/jobs/{job_id}/stop") +def stop_job(job_id: int, user=Depends(get_current_user)): + if user.get("role") == "viewer": + raise HTTPException(status_code=403, detail="Keine Berechtigung") + conn = get_db() + conn.execute( + "UPDATE sync_jobs SET status = 'idle' WHERE id = ? AND status = 'queued'", + (job_id,) + ) + conn.commit() + conn.close() + return {"message": "Job aus Warteschlange entfernt (wenn möglich)"} + +# ── Run / Log Endpoints ─────────────────────────────────────────────────────── +@app.get("/api/jobs/{job_id}/runs") +def get_job_runs(job_id: int, limit: int = 50, user=Depends(get_current_user)): + conn = get_db() + rows = conn.execute(""" + SELECT * FROM job_runs WHERE job_id = ? + ORDER BY started_at DESC LIMIT ? + """, (job_id, limit)).fetchall() + conn.close() + return [dict(r) for r in rows] + +@app.get("/api/runs/{run_id}/log") +def get_run_log(run_id: int, user=Depends(get_current_user)): + conn = get_db() + row = conn.execute("SELECT log_file FROM job_runs WHERE id = ?", (run_id,)).fetchone() + conn.close() + if not row or not row["log_file"]: + raise HTTPException(status_code=404, detail="Kein Log gefunden") + log_path = os.path.join(LOG_DIR, row["log_file"]) + if not os.path.exists(log_path): + raise HTTPException(status_code=404, detail="Logdatei nicht vorhanden") + with open(log_path, "r", errors="replace") as f: + return {"content": f.read()} + +# ── Stats Endpoint ──────────────────────────────────────────────────────────── +@app.get("/api/stats") +def get_stats(user=Depends(get_current_user)): + conn = get_db() + total_jobs = conn.execute("SELECT COUNT(*) FROM sync_jobs").fetchone()[0] + active_jobs = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE enabled=1").fetchone()[0] + running = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE status='running'").fetchone()[0] + queued = conn.execute("SELECT COUNT(*) FROM sync_jobs WHERE status='queued'").fetchone()[0] + total_runs = conn.execute("SELECT COUNT(*) FROM job_runs").fetchone()[0] + failed_runs = conn.execute("SELECT COUNT(*) FROM job_runs WHERE status='failed'").fetchone()[0] + total_synced = conn.execute("SELECT COALESCE(SUM(messages_synced),0) FROM job_runs").fetchone()[0] + total_errors = conn.execute("SELECT COALESCE(SUM(errors),0) FROM job_runs").fetchone()[0] + + # Last 14 days activity + daily = conn.execute(""" + SELECT DATE(started_at) as day, + COUNT(*) as runs, + COALESCE(SUM(messages_synced),0) as synced, + COALESCE(SUM(errors),0) as errors + FROM job_runs + WHERE started_at >= DATE('now', '-14 days') + GROUP BY day ORDER BY day + """).fetchall() + + # Recent runs + recent = conn.execute(""" + SELECT r.id, r.job_id, j.name as job_name, r.started_at, r.finished_at, + r.status, r.messages_synced, r.errors, r.duration_sec + FROM job_runs r + JOIN sync_jobs j ON j.id = r.job_id + ORDER BY r.started_at DESC LIMIT 10 + """).fetchall() + + conn.close() + return { + "jobs": {"total": total_jobs, "active": active_jobs, "running": running, "queued": queued}, + "runs": {"total": total_runs, "failed": failed_runs}, + "messages": {"synced": total_synced, "errors": total_errors}, + "daily": [dict(d) for d in daily], + "recent_runs": [dict(r) for r in recent], + } + +# ── Static Frontend ─────────────────────────────────────────────────────────── +app.mount("/static", StaticFiles(directory="static"), name="static") + +@app.get("/{full_path:path}", response_class=HTMLResponse) +async def serve_spa(full_path: str): + with open("static/index.html", "r") as f: + return HTMLResponse(f.read()) diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..710dbb2 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.115.0 +uvicorn[standard]==0.32.0 +python-multipart==0.0.12 +pydantic==2.9.2 +python-jose[cryptography]==3.3.0 +passlib==1.7.4 +croniter==3.0.3 +aiofiles==24.1.0 diff --git a/backend/static/index.html b/backend/static/index.html new file mode 100644 index 0000000..d093981 --- /dev/null +++ b/backend/static/index.html @@ -0,0 +1,1258 @@ + + + + + +ImapSync Manager + + + + + +
+ +
+ + +
+ + +
+
+ ◈ DASHBOARD +
+
+
+
+
+ + + + + +
+ + + + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..af460af --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3.8' + +services: + web: + build: ./backend + container_name: imapsync-web + ports: + - "8080:8080" + volumes: + - ./data:/data + environment: + - DB_PATH=/data/imapsync.db + - LOG_DIR=/data/logs + - SECRET_KEY=change-me-in-production + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/api/health"] + interval: 30s + timeout: 10s + retries: 3 + + worker: + build: ./worker + container_name: imapsync-worker + volumes: + - ./data:/data + environment: + - DB_PATH=/data/imapsync.db + - LOG_DIR=/data/logs + - POLL_INTERVAL=15 + restart: unless-stopped + depends_on: + web: + condition: service_healthy + +volumes: + data: diff --git a/worker/Dockerfile b/worker/Dockerfile new file mode 100644 index 0000000..b5ea63a --- /dev/null +++ b/worker/Dockerfile @@ -0,0 +1,15 @@ +FROM debian:bookworm-slim + +# Install imapsync and dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + imapsync \ + python3 \ + python3-pip \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY worker.py . + +RUN mkdir -p /data/logs + +CMD ["python3", "-u", "worker.py"] diff --git a/worker/worker.py b/worker/worker.py new file mode 100644 index 0000000..698c068 --- /dev/null +++ b/worker/worker.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +""" +ImapSync Worker +--------------- +Polls SQLite for queued jobs, executes imapsync, parses output, +updates run records and job status. Also handles cron schedules. +""" + +import os +import re +import sqlite3 +import subprocess +import time +import logging +from datetime import datetime + +DB_PATH = os.environ.get("DB_PATH", "/data/imapsync.db") +LOG_DIR = os.environ.get("LOG_DIR", "/data/logs") +POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15")) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [WORKER] %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger(__name__) + +os.makedirs(LOG_DIR, exist_ok=True) + + +def get_db(): + conn = sqlite3.connect(DB_PATH, timeout=30, check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + return conn + + +def parse_imapsync_output(text: str) -> dict: + """Extract stats from imapsync stdout/stderr.""" + stats = {"messages_synced": 0, "messages_skipped": 0, "errors": 0} + m = re.search(r"Messages transferred:\s+(\d+)", text) + if m: + stats["messages_synced"] = int(m.group(1)) + m = re.search(r"Messages skipped:\s+(\d+)", text) + if m: + stats["messages_skipped"] = int(m.group(1)) + # Count error lines + stats["errors"] = len(re.findall(r"(?i)^\s*(error|err)\b", text, re.MULTILINE)) + return stats + + +def check_due_schedules(): + """Queue jobs whose cron schedule is due (within last POLL_INTERVAL seconds).""" + try: + from croniter import croniter + except ImportError: + return # croniter not installed in this image, skip + + conn = get_db() + try: + now = datetime.utcnow() + rows = conn.execute( + "SELECT id, schedule, last_run FROM sync_jobs " + "WHERE enabled=1 AND schedule IS NOT NULL AND schedule != '' " + "AND status NOT IN ('running', 'queued')" + ).fetchall() + + for row in rows: + try: + cron = croniter(row["schedule"]) + last_run = datetime.fromisoformat(row["last_run"]) if row["last_run"] else datetime(2000, 1, 1) + prev_due = cron.get_prev(datetime) + # If last scheduled run is after last actual run, queue it + if prev_due > last_run: + conn.execute( + "UPDATE sync_jobs SET status='queued' WHERE id=?", + (row["id"],) + ) + log.info(f"Job {row['id']} scheduled run queued (cron: {row['schedule']})") + except Exception as e: + log.warning(f"Cron parse error for job {row['id']}: {e}") + conn.commit() + finally: + conn.close() + + +def run_job(job: sqlite3.Row): + job_id = job["id"] + log.info(f"Starting job {job_id}: {job['name']}") + + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + log_filename = f"job_{job_id}_{timestamp}.log" + log_path = os.path.join(LOG_DIR, log_filename) + + conn = get_db() + conn.execute("UPDATE sync_jobs SET status='running', last_run=? WHERE id=?", + (datetime.utcnow().isoformat(), job_id)) + cur = conn.execute( + "INSERT INTO job_runs (job_id, log_file, status) VALUES (?, ?, 'running')", + (job_id, log_filename) + ) + run_id = cur.lastrowid + conn.commit() + conn.close() + + # Build imapsync command + ssl1 = "--ssl1" if job["src_ssl"] else "--nossl1" + ssl2 = "--ssl2" if job["dst_ssl"] else "--nossl2" + cmd = [ + "imapsync", + "--host1", job["src_host"], + "--port1", str(job["src_port"]), + ssl1, + "--user1", job["src_user"], + "--password1", job["src_password"], + "--host2", job["dst_host"], + "--port2", str(job["dst_port"]), + ssl2, + "--user2", job["dst_user"], + "--password2", job["dst_password"], + "--nolog", + ] + if job["extra_args"]: + cmd += job["extra_args"].split() + + started = time.time() + exit_code = 0 + output = "" + + try: + with open(log_path, "w") as lf: + lf.write(f"# ImapSync Job: {job['name']}\n") + lf.write(f"# Started: {datetime.utcnow().isoformat()}\n") + lf.write(f"# Command: {' '.join(cmd[:20])}...\n\n") + result = subprocess.run( + cmd, + stdout=lf, + stderr=subprocess.STDOUT, + timeout=7200, # 2h max + ) + exit_code = result.returncode + with open(log_path, "r", errors="replace") as lf: + output = lf.read() + except subprocess.TimeoutExpired: + log.error(f"Job {job_id} timed out after 2h") + exit_code = -1 + output = "TIMEOUT after 2 hours" + with open(log_path, "a") as lf: + lf.write("\n\nTIMEOUT: Job exceeded 2 hour limit\n") + except Exception as e: + log.error(f"Job {job_id} exception: {e}") + exit_code = -2 + output = str(e) + with open(log_path, "a") as lf: + lf.write(f"\n\nEXCEPTION: {e}\n") + + duration = int(time.time() - started) + stats = parse_imapsync_output(output) + job_status = "done" if exit_code == 0 else "failed" + + conn = get_db() + conn.execute(""" + UPDATE job_runs SET + status=?, finished_at=?, messages_synced=?, + messages_skipped=?, errors=?, duration_sec=? + WHERE id=? + """, (job_status, datetime.utcnow().isoformat(), + stats["messages_synced"], stats["messages_skipped"], + stats["errors"], duration, run_id)) + conn.execute( + "UPDATE sync_jobs SET status=? WHERE id=?", + ("idle", job_id) + ) + conn.commit() + conn.close() + + log.info( + f"Job {job_id} finished [{job_status}] in {duration}s — " + f"synced={stats['messages_synced']} skipped={stats['messages_skipped']} " + f"errors={stats['errors']}" + ) + + +def main(): + log.info(f"Worker started. DB={DB_PATH} LOG_DIR={LOG_DIR} POLL={POLL_INTERVAL}s") + # Wait for DB to be initialized by the web container + for i in range(30): + if os.path.exists(DB_PATH): + break + log.info(f"Waiting for DB... ({i+1}/30)") + time.sleep(2) + + while True: + try: + check_due_schedules() + + conn = get_db() + job = conn.execute( + "SELECT * FROM sync_jobs WHERE status='queued' AND enabled=1 " + "ORDER BY created_at ASC LIMIT 1" + ).fetchone() + conn.close() + + if job: + run_job(job) + else: + time.sleep(POLL_INTERVAL) + + except Exception as e: + log.error(f"Worker loop error: {e}", exc_info=True) + time.sleep(POLL_INTERVAL) + + +if __name__ == "__main__": + main()