from __future__ import annotations import json import os import shutil import subprocess import sys import threading import time import uuid from pathlib import Path from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, PlainTextResponse, StreamingResponse from fastapi.staticfiles import StaticFiles PROJECT_ROOT = Path(__file__).resolve().parents[2] FRONTEND_DIST = PROJECT_ROOT / "webapp" / "frontend" / "dist" VAR_DIR = PROJECT_ROOT / "var" JOBS_DIR = VAR_DIR / "jobs" JOBS_DIR.mkdir(parents=True, exist_ok=True) ACTIVE_JOBS: dict[str, dict[str, object]] = {} ACTIVE_JOBS_LOCK = threading.Lock() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def _get_solver_availability() -> dict[str, bool]: try: import pyomo.environ as pyo except Exception: return {"highs": False, "gurobi": False} availability: dict[str, bool] = {} for solver_name in ("highs", "gurobi", "scip"): try: solver = pyo.SolverFactory(solver_name) availability[solver_name] = bool(solver) and bool( solver.available(exception_flag=False) ) except Exception: availability[solver_name] = False return availability class JobCancelledError(RuntimeError): pass def _set_active_process(job_id: str, process) -> None: with ACTIVE_JOBS_LOCK: runtime = ACTIVE_JOBS.get(job_id) if runtime is not None: runtime["process"] = process def _clear_active_process(job_id: str) -> None: with ACTIVE_JOBS_LOCK: runtime = ACTIVE_JOBS.get(job_id) if runtime is not None: runtime["process"] = None def _run_step( cmd: list[str], log_path: Path, env: dict[str, str] | None = None, *, job_id: str, cancel_event: threading.Event, ) -> None: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("w", encoding="utf-8") as log_file: process = subprocess.Popen( cmd, cwd=PROJECT_ROOT, env=env, stdout=log_file, stderr=subprocess.STDOUT, text=True, ) _set_active_process(job_id, process) try: while True: if cancel_event.is_set(): process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() process.wait(timeout=5) raise JobCancelledError("Job cancelled by user") result_code = process.poll() if result_code is not None: break time.sleep(0.2) finally: _clear_active_process(job_id) if result_code != 0: tail = "" try: lines = log_path.read_text(encoding="utf-8", errors="replace").splitlines() tail_lines = lines[-40:] if tail_lines: tail = "\n" + "\n".join(tail_lines) except Exception: pass raise RuntimeError(f"Command failed: {' '.join(cmd)}{tail}") def _run_job( job_dir: Path, input_path: Path, warmstart_input_path: Path | None, processed_dir: Path, output_dir: Path, solver_name: str, step_size_tonnes: int, mip_gap: float, time_limit_seconds: int, cancel_event: threading.Event, ) -> None: logs_dir = job_dir / "logs" logs_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() path_entries = env.get("PATH", "").split(os.pathsep) for extra_path in ["/usr/local/bin", "/opt/homebrew/bin"]: if extra_path not in path_entries: path_entries.insert(0, extra_path) env["PATH"] = os.pathsep.join(path_entries) env["POC1_INPUT_XLSX"] = str(input_path) env["POC1_OUTPUT_DIR"] = str(processed_dir) try: _write_status(job_dir, "processing") _run_step( [sys.executable, "src/preprocessing/exploration_preprocess.py"], logs_dir / "preprocess.log", env=env, job_id=job_dir.name, cancel_event=cancel_event, ) output_path = output_dir / "output.xlsx" warmstart_output_path = output_dir / "warmstart.json" optimization_cmd = [ sys.executable, "src/optimization/run_optimization.py", "--data-dir", str(processed_dir), "--solver", solver_name, "--step-size-tonnes", str(step_size_tonnes), "--mip-gap", str(mip_gap), "--time-limit", str(time_limit_seconds), "--output-xlsx", str(output_path), "--warmstart-out", str(warmstart_output_path), ] if warmstart_input_path is not None and warmstart_input_path.exists(): optimization_cmd.extend(["--warmstart-in", str(warmstart_input_path)]) _run_step( optimization_cmd, logs_dir / "optimization.log", env=env, job_id=job_dir.name, cancel_event=cancel_event, ) extra = {"output": str(output_path)} if warmstart_output_path.exists(): extra["warmstart"] = str(warmstart_output_path) _write_status(job_dir, "completed", extra) except JobCancelledError: _write_status(job_dir, "cancelled") except Exception as exc: _write_status(job_dir, "failed", {"error": str(exc)}) finally: with ACTIVE_JOBS_LOCK: ACTIVE_JOBS.pop(job_dir.name, None) def _write_status(job_dir: Path, status: str, extra: dict[str, str] | None = None) -> None: payload = {"status": status} if extra: payload.update(extra) (job_dir / "job.json").write_text(json.dumps(payload, indent=2), encoding="utf-8") def _read_monthly_flow_rows(output_path: Path) -> dict[str, object]: try: import pandas as pd except Exception as exc: # pragma: no cover raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc try: df = pd.read_excel( output_path, sheet_name="Sheet1", header=[0, 1, 2], index_col=[0, 1, 2], ) except Exception as exc: raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc source_names = {"Reichwalde", "Nochten", "Welzow"} shift_names = {"F", "S", "N"} flow_cols = [] for col in df.columns: if not isinstance(col, tuple) or len(col) != 3: continue target, source, shift = col if str(source) in source_names and str(shift) in shift_names: flow_cols.append(col) if not flow_cols: return {"rows": [], "targets": [], "sources": sorted(source_names)} flows = df[flow_cols].copy() raw_dates = pd.to_datetime(flows.index.get_level_values(0), errors="coerce") flows = flows.loc[~raw_dates.isna()].copy() raw_dates = raw_dates[~raw_dates.isna()] flows.index = pd.MultiIndex.from_arrays( [ raw_dates, flows.index.get_level_values(1), flows.index.get_level_values(2), ], names=flows.index.names, ) monthly_by_shift = flows.groupby(pd.Grouper(level=0, freq="MS")).sum() monthly = monthly_by_shift.T.groupby(level=[0, 1]).sum().T target_labels = { "J": "Jänschwalde", "SP": "Schwarze Pumpe", "B3": "Boxberg Werk 3", "B4": "Boxberg Werk 4", "V": "Veredlung", } rows: list[dict[str, object]] = [] for month, values in monthly.iterrows(): for (target, source), amount in values.items(): amount_value = float(amount) if amount is not None else 0.0 rows.append( { "month": month.strftime("%Y-%m"), "target": str(target), "target_label": target_labels.get(str(target), str(target)), "source": str(source), "amount_tonnes": amount_value, } ) targets = sorted({str(col[0]) for col in monthly.columns}) sources = sorted({str(col[1]) for col in monthly.columns}) return {"rows": rows, "targets": targets, "sources": sources} def _read_flow_long_rows(output_path: Path): try: import pandas as pd except Exception as exc: # pragma: no cover raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc try: df = pd.read_excel( output_path, sheet_name="Sheet1", header=[0, 1, 2], index_col=[0, 1, 2], ) except Exception as exc: raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc source_names = {"Reichwalde", "Nochten", "Welzow"} shift_names = {"F", "S", "N"} flow_cols = [ col for col in df.columns if isinstance(col, tuple) and len(col) == 3 and str(col[1]) in source_names and str(col[2]) in shift_names ] if not flow_cols: return pd.DataFrame(columns=["date", "month", "target", "source", "shift", "amount_tonnes"]) rows = [] for (date_raw, _week, _day), values in df[flow_cols].iterrows(): date = pd.to_datetime(date_raw, errors="coerce") if pd.isna(date): continue date = date.normalize() for (target, source, shift), amount in values.items(): rows.append( { "date": date, "month": date.strftime("%Y-%m"), "target": str(target), "source": str(source), "shift": str(shift), "amount_tonnes": float(amount) if amount is not None else 0.0, } ) return pd.DataFrame(rows) def _read_capacity_timeseries(job_dir: Path) -> dict[str, object]: try: import math import pandas as pd except Exception as exc: # pragma: no cover raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc output_path = job_dir / "output" / "output.xlsx" processed_dir = job_dir / "processed" if not output_path.exists(): raise HTTPException(status_code=404, detail="Output not found") flows = _read_flow_long_rows(output_path) if flows.empty: return {"groups": []} def _point_x(date_val, shift=None, granularity="day"): if granularity == "month": return f"{date_val}-01" if granularity == "shift": hour_map = {"F": 6, "S": 14, "N": 22} hour = hour_map.get(shift, 0) return f"{date_val.strftime('%Y-%m-%d')}T{hour:02d}:00:00" return date_val.strftime("%Y-%m-%d") def _point_label(date_val, shift=None, granularity="day"): if granularity == "month": return str(date_val) if granularity == "shift": return f"{date_val.strftime('%Y-%m-%d')} {shift}" return date_val.strftime("%Y-%m-%d") def _series_from_dataframe( *, series_id: str, label: str, df_usage, granularity: str, limit_value: float | None = None, limit_map: dict[tuple, float] | None = None, marker_points: list[dict[str, object]] | None = None, ) -> dict[str, object]: points: list[dict[str, object]] = [] for row in df_usage.itertuples(index=False): key = row.key usage = float(row.usage_tonnes) if granularity == "month": x = _point_x(key, granularity="month") point_label = _point_label(key, granularity="month") limit = limit_value util = (100 * usage / limit) if limit and limit > 0 else None elif granularity == "day": x = _point_x(key, granularity="day") point_label = _point_label(key, granularity="day") if limit_map is not None: limit = limit_map.get(key) else: limit = limit_value util = (100 * usage / limit) if limit and limit > 0 else None else: date_val, shift = key x = _point_x(date_val, shift=shift, granularity="shift") point_label = _point_label(date_val, shift=shift, granularity="shift") if limit_map is not None: limit = limit_map.get((date_val, shift)) else: limit = limit_value util = (100 * usage / limit) if limit and limit > 0 else None points.append( { "x": x, "label": point_label, "usage_tonnes": usage, "limit_tonnes": float(limit) if limit is not None else None, "utilization_pct": float(util) if util is not None else None, } ) return { "id": series_id, "label": label, "granularity": granularity, "points": points, "marker_points": marker_points or [], } def _aggregate_usage( *, sources: set[str] | None = None, targets: set[str] | None = None, granularity: str = "day", ): df = flows if sources is not None: df = df[df["source"].isin(sources)] if targets is not None: df = df[df["target"].isin(targets)] if granularity == "month": agg = df.groupby("month", as_index=False)["amount_tonnes"].sum() agg = agg.rename(columns={"month": "key", "amount_tonnes": "usage_tonnes"}) agg["key"] = agg["key"].astype(str) return agg.sort_values("key") if granularity == "day": agg = df.groupby("date", as_index=False)["amount_tonnes"].sum() agg = agg.rename(columns={"date": "key", "amount_tonnes": "usage_tonnes"}) return agg.sort_values("key") agg = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum() agg["_shift_sort"] = pd.Categorical(agg["shift"], categories=["F", "S", "N"], ordered=True) agg = agg.sort_values(["date", "_shift_sort"]) agg["key"] = list(zip(agg["date"], agg["shift"])) agg = agg.rename(columns={"amount_tonnes": "usage_tonnes"}) return agg[["key", "usage_tonnes"]] groups: list[dict[str, object]] = [] # Foerderkapazitaeten (kumuliert je Monat auf Tagesbasis) foerder_file = processed_dir / "foerderkapaz.parquet" if foerder_file.exists(): cap_df = pd.read_parquet(foerder_file) source_map = { "Reichwalde (RW)": "Reichwalde", "Nochten (NO)": "Nochten", "Welzow-Süd": "Welzow", } label_map = { "Reichwalde": "Förderung TB Reichwalde (kumuliert im Monat)", "Nochten": "Förderung TB Nochten (kumuliert im Monat)", "Welzow": "Förderung TB Welzow-Süd (kumuliert im Monat)", } limit_by_source: dict[str, float] = {} for _, row in cap_df[cap_df["zeitraum"] == "pro Monat"].iterrows(): source = source_map.get(str(row["tagebau"])) if source: limit_by_source[source] = float(row["maximal"]) foerder_series: list[dict[str, object]] = [] def _aggregate_cumulative_daily_in_month(*, sources: set[str]): usage_df = _aggregate_usage(sources=sources, granularity="day") if usage_df.empty: return usage_df, {} usage_df = usage_df.copy() usage_df["month"] = usage_df["key"].dt.strftime("%Y-%m") usage_df["usage_tonnes"] = usage_df.groupby("month")["usage_tonnes"].cumsum() limit_map: dict[object, float] = {} for day in usage_df["key"]: month_start = pd.Timestamp(day).replace(day=1) month_days = int(month_start.days_in_month) month_key = month_start.strftime("%Y-%m") if sources == {"Reichwalde", "Nochten"}: monthly_limit = 3_000_000.0 else: source = next(iter(sources)) monthly_limit = limit_by_source.get(source) if monthly_limit is None: continue # monthly cap line (constant during month) for cumulative comparison limit_map[day] = float(monthly_limit) return usage_df[["key", "usage_tonnes"]], limit_map for source in ["Reichwalde", "Nochten", "Welzow"]: usage_df, limit_map = _aggregate_cumulative_daily_in_month(sources={source}) if usage_df.empty: continue foerder_series.append( _series_from_dataframe( series_id=f"foerder_{source.lower()}_cum_month", label=label_map[source], df_usage=usage_df, granularity="day", limit_map=limit_map, ) ) rwno_usage_df, rwno_limit_map = _aggregate_cumulative_daily_in_month( sources={"Reichwalde", "Nochten"} ) if not rwno_usage_df.empty: foerder_series.append( _series_from_dataframe( series_id="foerder_rw_nochten_cum_month", label="Förderung TB Reichwalde + Nochten (kumuliert im Monat, gemeinsame Grenze)", df_usage=rwno_usage_df, granularity="day", limit_map=rwno_limit_map, ) ) if foerder_series: groups.append({"key": "foerder", "label": "Förderkapazitäten", "series": foerder_series}) # Verladungskapazitaeten (static + availability) verladung_series: list[dict[str, object]] = [] verladung_file = processed_dir / "verladungskap.parquet" if verladung_file.exists(): verladung_cap = pd.read_parquet(verladung_file) def _cap_lookup(verladung_label: str, zeitraum_label: str): row = verladung_cap[ (verladung_cap["verladung"] == verladung_label) & (verladung_cap["zeitraum"] == zeitraum_label) ] if row.empty: return None return float(row.iloc[0]["maximal"]) boxberg_targets = {"J", "SP", "B3", "V"} welzow_targets = {"J", "SP", "B3", "V"} boxberg_sources = {"Reichwalde", "Nochten"} welzow_sources = {"Welzow"} for series_id, label, sources, targets, granularity, cap_label, cap_period in [ ( "verladung_boxberg_shift", "Verladung Boxberg (RW+NO) pro Schicht", boxberg_sources, boxberg_targets, "shift", "Boxberg (RW+NO)", "pro Schicht", ), ( "verladung_boxberg_day", "Verladung Boxberg (RW+NO) pro Tag", boxberg_sources, boxberg_targets, "day", "Boxberg (RW+NO)", "pro Tag", ), ( "verladung_welzow_shift", "Verladung Welzow-Süd pro Schicht", welzow_sources, welzow_targets, "shift", "Welzow-Süd", "pro Schicht", ), ( "verladung_welzow_day", "Verladung Welzow-Süd pro Tag", welzow_sources, welzow_targets, "day", "Welzow-Süd", "pro Tag", ), ]: usage_df = _aggregate_usage(sources=sources, targets=targets, granularity=granularity) if usage_df.empty: continue cap = _cap_lookup(cap_label, cap_period) verladung_series.append( _series_from_dataframe( series_id=series_id, label=label, df_usage=usage_df, granularity=granularity, limit_value=cap, ) ) avail_file = processed_dir / "Verfuegbarkeiten.parquet" if avail_file.exists(): avail = pd.read_parquet(avail_file) shift_map = {"S1": "F", "S2": "S", "S3": "N"} static_shift_caps = { "welzow": _cap_lookup("Welzow-Süd", "pro Schicht"), "rw_no": _cap_lookup("Boxberg (RW+NO)", "pro Schicht"), } marker_target_series = { "welzow": "verladung_welzow_shift", "rw_no": "verladung_boxberg_shift", } for scope, label, cols, sources in [ ( "welzow", "Verfügbarkeit Welzow-Süd (pro Schicht)", ["Welzow_Sued_S1_t", "Welzow_Sued_S2_t", "Welzow_Sued_S3_t"], {"Welzow"}, ), ( "rw_no", "Verfügbarkeit Boxberg RW+NO (pro Schicht)", ["Boxberg_NO_RW_S1_t", "Boxberg_NO_RW_S2_t", "Boxberg_NO_RW_S3_t"], {"Reichwalde", "Nochten"}, ), ]: limit_map: dict[tuple, float] = {} for _, row in avail.iterrows(): date = pd.to_datetime(row["datum"], errors="coerce") if pd.isna(date): continue date = date.normalize() for col in cols: val = row.get(col) if val is None or (isinstance(val, float) and math.isnan(val)): continue shift = shift_map[col.split("_")[-2]] limit_map[(date, shift)] = float(val) if not limit_map: continue static_cap = static_shift_caps.get(scope) marker_points: list[dict[str, object]] = [] if static_cap is not None: for (date, shift), dynamic_cap in sorted(limit_map.items()): if dynamic_cap >= static_cap: continue marker_points.append( { "x": _point_x(date, shift=shift, granularity="shift"), "label": _point_label(date, shift=shift, granularity="shift"), "limit_tonnes": float(dynamic_cap), "delta_tonnes": float(static_cap - dynamic_cap), } ) target_series_id = marker_target_series.get(scope) if target_series_id and marker_points: for series in verladung_series: if series.get("id") == target_series_id: series["marker_points"] = marker_points break if verladung_series: groups.append({"key": "verladung", "label": "Verladungskapazitäten", "series": verladung_series}) # Zugdurchlasskapazitaeten (per shift) zug_file = processed_dir / "zugdurchlass.parquet" zug_series: list[dict[str, object]] = [] if zug_file.exists(): zug = pd.read_parquet(zug_file).copy() def _zug_cap(start_exact: str, ziel_exact: str): s = zug["start"].astype(str).str.strip() z = zug["ziel"].astype(str).str.strip() row = zug[(s == start_exact) & (z == ziel_exact)] if row.empty: return None return float(row.iloc[0]["maximal"]) def _add_zug_series(series_id: str, label: str, limit: float | None, terms: list[tuple[str, str]]): if limit is None: return mask = None for source, target in terms: part = (flows["source"] == source) & (flows["target"] == target) mask = part if mask is None else (mask | part) if mask is None: return df = flows[mask].copy() if df.empty: return usage_df = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum() usage_df["_shift_sort"] = pd.Categorical( usage_df["shift"], categories=["F", "S", "N"], ordered=True ) usage_df = usage_df.sort_values(["date", "_shift_sort"]) usage_df["key"] = list(zip(usage_df["date"], usage_df["shift"])) usage_df = usage_df.rename(columns={"amount_tonnes": "usage_tonnes"})[["key", "usage_tonnes"]] zug_series.append( _series_from_dataframe( series_id=series_id, label=label, df_usage=usage_df, granularity="shift", limit_value=limit, ) ) _add_zug_series( "zug_rwno_to_j", "Zugdurchlass KLP: RW+NO -> KW Jänschwalde", _zug_cap("Verladung Boxberg (KLP)", "KW Jänschwalde"), [("Reichwalde", "J"), ("Nochten", "J")], ) _add_zug_series( "zug_rwno_to_sp", "Zugdurchlass KLP: RW+NO -> KW Schwarze Pumpe", _zug_cap("Verladung Boxberg (KLP)", "KW Schwarze Pumpe"), [("Reichwalde", "SP"), ("Nochten", "SP")], ) _add_zug_series( "zug_rwno_to_v", "Zugdurchlass KLP: RW+NO -> Veredlung", _zug_cap("Verladung Boxberg (KLP)", "Veredlung ISP"), [("Reichwalde", "V"), ("Nochten", "V")], ) _add_zug_series( "zug_rwno_to_b3", "Zugdurchlass KLP: RW+NO -> KW Boxberg Werk 3", _zug_cap("Verladung Boxberg (KLP)", "KW Boxberg Werk 3"), [("Reichwalde", "B3"), ("Nochten", "B3")], ) _add_zug_series( "zug_w_to_j", "Zugdurchlass KUP: Welzow -> KW Jänschwalde", _zug_cap("Verladung Welzow-Süd (KUP)", "KW Jänschwalde"), [("Welzow", "J")], ) _add_zug_series( "zug_w_to_sp", "Zugdurchlass KUP: Welzow -> KW Schwarze Pumpe", _zug_cap("Verladung Welzow-Süd (KUP)", "KW Schwarze Pumpe"), [("Welzow", "SP")], ) _add_zug_series( "zug_w_to_v", "Zugdurchlass KUP: Welzow -> Veredlung", _zug_cap("Verladung Welzow-Süd (KUP)", "Veredlung ISP"), [("Welzow", "V")], ) _add_zug_series( "zug_w_to_b3", "Zugdurchlass KUP: Welzow -> KW Boxberg Werk 3", _zug_cap("Verladung Welzow-Süd (KUP)", "KW Boxberg Werk 3"), [("Welzow", "B3")], ) _add_zug_series( "zug_rwno_to_sp_v", "Zugdurchlass KLP kombiniert: RW+NO -> SP + Veredlung", _zug_cap("Verladung Boxberg (KLP)", "KW SP + V ISP"), [("Reichwalde", "SP"), ("Nochten", "SP"), ("Reichwalde", "V"), ("Nochten", "V")], ) _add_zug_series( "zug_rwno_to_all", "Zugdurchlass KLP kombiniert: RW+NO -> JW + SP + V + B3", _zug_cap("Verladung Boxberg (KLP)", "KW JW + KW SP + V ISP + KW B3"), [ ("Reichwalde", "J"), ("Reichwalde", "SP"), ("Reichwalde", "V"), ("Reichwalde", "B3"), ("Nochten", "J"), ("Nochten", "SP"), ("Nochten", "V"), ("Nochten", "B3"), ], ) _add_zug_series( "zug_w_to_sp_v", "Zugdurchlass KUP kombiniert: Welzow -> SP + Veredlung", _zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP"), [("Welzow", "SP"), ("Welzow", "V")], ) _add_zug_series( "zug_w_to_sp_v_b3", "Zugdurchlass KUP kombiniert: Welzow -> SP + V + B3", _zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP + KW B3"), [("Welzow", "SP"), ("Welzow", "V"), ("Welzow", "B3")], ) _add_zug_series( "zug_all_to_j", "Zugdurchlass KUP+KLP: gesamt -> KW Jänschwalde", _zug_cap("KUP + KLP", "KW Jänschwalde"), [("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "J")], ) _add_zug_series( "zug_all_to_b3", "Zugdurchlass KUP+KLP: gesamt -> KW Boxberg Werk 3", _zug_cap("KUP + KLP", "KW Boxberg Werk 3"), [("Reichwalde", "B3"), ("Nochten", "B3"), ("Welzow", "B3")], ) _add_zug_series( "zug_rwno_j_and_w_b3", "Zugdurchlass kombiniert: KLP->JW + KUP->B3", _zug_cap("KLP zum KW JW", "KUP zum KW B3"), [("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "B3")], ) _add_zug_series( "zug_rwno_jspv_and_w_b3", "Zugdurchlass kombiniert: KLP->JW+SP+V + KUP->B3", _zug_cap("KLP zum KW JW + KW SP + V ISP", "KUP zum KW B3"), [ ("Reichwalde", "J"), ("Nochten", "J"), ("Reichwalde", "SP"), ("Nochten", "SP"), ("Reichwalde", "V"), ("Nochten", "V"), ("Welzow", "B3"), ], ) kvb_file = processed_dir / "zugdurchlass_kvb_nord.parquet" if kvb_file.exists(): kvb = pd.read_parquet(kvb_file) limit_map: dict[tuple, float] = {} shift_cols = [("KVB_Nord_S1_t", "F"), ("KVB_Nord_S2_t", "S"), ("KVB_Nord_S3_t", "N")] for _, row in kvb.iterrows(): date = pd.to_datetime(row["datum"], errors="coerce") if pd.isna(date): continue date = date.normalize() for col, shift in shift_cols: val = row.get(col) if val is None or pd.isna(val): continue limit_map[(date, shift)] = float(val) static_cap_j = _zug_cap("KUP + KLP", "KW Jänschwalde") marker_points: list[dict[str, object]] = [] if limit_map and static_cap_j is not None: for (date, shift), dynamic_cap in sorted(limit_map.items()): if dynamic_cap >= static_cap_j: continue marker_points.append( { "x": _point_x(date, shift=shift, granularity="shift"), "label": _point_label(date, shift=shift, granularity="shift"), "limit_tonnes": float(dynamic_cap), "delta_tonnes": float(static_cap_j - dynamic_cap), } ) if marker_points: for series in zug_series: if series.get("id") == "zug_all_to_j": existing = list(series.get("marker_points", [])) series["marker_points"] = existing + marker_points break if zug_series: groups.append({"key": "zugdurchlass", "label": "Zugdurchlasskapazitäten", "series": zug_series}) return {"groups": groups} @app.get("/api/health") def health() -> dict[str, object]: return {"status": "ok", "solvers": _get_solver_availability()} @app.post("/api/run") async def run( file: UploadFile = File(...), warmstart_file: UploadFile | None = File(None), solver: str = Form("highs"), step_size_tonnes: int = Form(1000), mip_gap_pct: float = Form(30.0), max_runtime_minutes: float = Form(10.0), ) -> dict[str, str]: solver = solver.lower().strip() if solver != "highs": raise HTTPException(status_code=400, detail="Only HiGHS is enabled at the moment") availability = _get_solver_availability() if not availability.get(solver, False): raise HTTPException(status_code=400, detail=f"Solver not available: {solver}") if step_size_tonnes not in {960, 1000}: raise HTTPException(status_code=400, detail="Unsupported step size") if not (0 <= float(mip_gap_pct) <= 100): raise HTTPException(status_code=400, detail="mip_gap_pct must be between 0 and 100") if not (0 < float(max_runtime_minutes) <= 24 * 60): raise HTTPException(status_code=400, detail="max_runtime_minutes must be between 0 and 1440") mip_gap = float(mip_gap_pct) / 100.0 time_limit_seconds = max(1, int(round(float(max_runtime_minutes) * 60))) job_id = uuid.uuid4().hex job_dir = JOBS_DIR / job_id input_dir = job_dir / "input" processed_dir = job_dir / "processed" output_dir = job_dir / "output" warmstart_dir = job_dir / "warmstart" input_dir.mkdir(parents=True, exist_ok=True) processed_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True) warmstart_dir.mkdir(parents=True, exist_ok=True) input_path = input_dir / "PoC1_Rohkohleverteilung_Input_Parameter.xlsx" with input_path.open("wb") as buffer: shutil.copyfileobj(file.file, buffer) warmstart_input_path = None if warmstart_file is not None and warmstart_file.filename: warmstart_input_path = warmstart_dir / "warmstart.json" with warmstart_input_path.open("wb") as buffer: shutil.copyfileobj(warmstart_file.file, buffer) cancel_event = threading.Event() with ACTIVE_JOBS_LOCK: ACTIVE_JOBS[job_id] = {"cancel_event": cancel_event, "process": None} _write_status( job_dir, "queued", { "solver": solver, "step_size_tonnes": str(step_size_tonnes), "mip_gap_pct": str(float(mip_gap_pct)), "max_runtime_minutes": str(float(max_runtime_minutes)), "warmstart_used": "true" if warmstart_input_path is not None else "false", }, ) thread = threading.Thread( target=_run_job, args=( job_dir, input_path, warmstart_input_path, processed_dir, output_dir, solver, step_size_tonnes, mip_gap, time_limit_seconds, cancel_event, ), daemon=True, ) thread.start() return { "job_id": job_id, "download_url": f"/api/jobs/{job_id}/output", "warmstart_download_url": f"/api/jobs/{job_id}/warmstart", "solver": solver, "step_size_tonnes": str(step_size_tonnes), "mip_gap_pct": str(float(mip_gap_pct)), "max_runtime_minutes": str(float(max_runtime_minutes)), "warmstart_used": "true" if warmstart_input_path is not None else "false", } @app.get("/api/jobs/{job_id}") def job_status(job_id: str) -> dict[str, str]: job_dir = JOBS_DIR / job_id status_path = job_dir / "job.json" if not status_path.exists(): raise HTTPException(status_code=404, detail="Job not found") return json.loads(status_path.read_text(encoding="utf-8")) @app.post("/api/jobs/{job_id}/cancel") def cancel_job(job_id: str) -> dict[str, str]: job_dir = JOBS_DIR / job_id status_path = job_dir / "job.json" if not status_path.exists(): raise HTTPException(status_code=404, detail="Job not found") payload = json.loads(status_path.read_text(encoding="utf-8")) if payload.get("status") in {"completed", "failed", "cancelled"}: return {"status": str(payload.get("status"))} with ACTIVE_JOBS_LOCK: runtime = ACTIVE_JOBS.get(job_id) if runtime is None: _write_status(job_dir, "cancelled") return {"status": "cancelled"} cancel_event = runtime.get("cancel_event") process = runtime.get("process") if isinstance(cancel_event, threading.Event): cancel_event.set() if process is not None and getattr(process, "poll", lambda: None)() is None: try: process.terminate() except Exception: pass _write_status(job_dir, "cancelling") return {"status": "cancelling"} @app.get("/api/jobs/{job_id}/output") def job_output(job_id: str) -> FileResponse: output_path = JOBS_DIR / job_id / "output" / "output.xlsx" if not output_path.exists(): raise HTTPException(status_code=404, detail="Output not found") return FileResponse(output_path, filename="output.xlsx") @app.get("/api/jobs/{job_id}/warmstart") def job_warmstart(job_id: str) -> FileResponse: warmstart_path = JOBS_DIR / job_id / "output" / "warmstart.json" if not warmstart_path.exists(): raise HTTPException(status_code=404, detail="Warmstart output not found") return FileResponse(warmstart_path, filename="warmstart.json") @app.get("/api/jobs/{job_id}/monthly-flows") def job_monthly_flows(job_id: str) -> dict[str, object]: output_path = JOBS_DIR / job_id / "output" / "output.xlsx" if not output_path.exists(): raise HTTPException(status_code=404, detail="Output not found") return _read_monthly_flow_rows(output_path) @app.get("/api/jobs/{job_id}/capacity-timeseries") def job_capacity_timeseries(job_id: str) -> dict[str, object]: job_dir = JOBS_DIR / job_id if not job_dir.exists(): raise HTTPException(status_code=404, detail="Job not found") return _read_capacity_timeseries(job_dir) @app.get("/api/jobs/{job_id}/logs/{log_name}") def job_log(job_id: str, log_name: str) -> PlainTextResponse: if log_name not in {"preprocess", "optimization"}: raise HTTPException(status_code=400, detail="Unknown log name") log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log" if not log_path.exists(): raise HTTPException(status_code=404, detail="Log not found") return PlainTextResponse(log_path.read_text(encoding="utf-8")) @app.get("/api/jobs/{job_id}/logs/{log_name}/stream") def job_log_stream(job_id: str, log_name: str) -> StreamingResponse: if log_name not in {"preprocess", "optimization"}: raise HTTPException(status_code=400, detail="Unknown log name") log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log" if not log_path.exists(): raise HTTPException(status_code=404, detail="Log not found") def event_stream(): with log_path.open("r", encoding="utf-8") as log_file: for line in log_file: yield f"data: {line.rstrip()}\n\n" while True: line = log_file.readline() if line: yield f"data: {line.rstrip()}\n\n" else: time.sleep(0.5) return StreamingResponse(event_stream(), media_type="text/event-stream") if FRONTEND_DIST.exists(): assets_dir = FRONTEND_DIST / "assets" if assets_dir.exists(): app.mount("/assets", StaticFiles(directory=assets_dir), name="frontend-assets") @app.get("/", include_in_schema=False) def frontend_index() -> FileResponse: return FileResponse(FRONTEND_DIST / "index.html") @app.get("/{full_path:path}", include_in_schema=False) def frontend_spa(full_path: str) -> FileResponse: if full_path.startswith("api/"): raise HTTPException(status_code=404, detail="Not found") target = FRONTEND_DIST / full_path if target.exists() and target.is_file(): return FileResponse(target) return FileResponse(FRONTEND_DIST / "index.html")