2026-03-17 14:41:51 +01:00

997 lines
36 KiB
Python

from __future__ import annotations
import json
import os
import shutil
import subprocess
import sys
import threading
import time
import uuid
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
PROJECT_ROOT = Path(__file__).resolve().parents[2]
FRONTEND_DIST = PROJECT_ROOT / "webapp" / "frontend" / "dist"
VAR_DIR = PROJECT_ROOT / "var"
JOBS_DIR = VAR_DIR / "jobs"
JOBS_DIR.mkdir(parents=True, exist_ok=True)
ACTIVE_JOBS: dict[str, dict[str, object]] = {}
ACTIVE_JOBS_LOCK = threading.Lock()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _get_solver_availability() -> dict[str, bool]:
try:
import pyomo.environ as pyo
except Exception:
return {"highs": False, "gurobi": False}
availability: dict[str, bool] = {}
for solver_name in ("highs", "gurobi", "scip"):
try:
solver = pyo.SolverFactory(solver_name)
availability[solver_name] = bool(solver) and bool(
solver.available(exception_flag=False)
)
except Exception:
availability[solver_name] = False
return availability
class JobCancelledError(RuntimeError):
pass
def _set_active_process(job_id: str, process) -> None:
with ACTIVE_JOBS_LOCK:
runtime = ACTIVE_JOBS.get(job_id)
if runtime is not None:
runtime["process"] = process
def _clear_active_process(job_id: str) -> None:
with ACTIVE_JOBS_LOCK:
runtime = ACTIVE_JOBS.get(job_id)
if runtime is not None:
runtime["process"] = None
def _run_step(
cmd: list[str],
log_path: Path,
env: dict[str, str] | None = None,
*,
job_id: str,
cancel_event: threading.Event,
) -> None:
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("w", encoding="utf-8") as log_file:
process = subprocess.Popen(
cmd,
cwd=PROJECT_ROOT,
env=env,
stdout=log_file,
stderr=subprocess.STDOUT,
text=True,
)
_set_active_process(job_id, process)
try:
while True:
if cancel_event.is_set():
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=5)
raise JobCancelledError("Job cancelled by user")
result_code = process.poll()
if result_code is not None:
break
time.sleep(0.2)
finally:
_clear_active_process(job_id)
if result_code != 0:
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
def _run_job(
job_dir: Path,
input_path: Path,
processed_dir: Path,
output_dir: Path,
solver_name: str,
step_size_tonnes: int,
mip_gap: float,
time_limit_seconds: int,
cancel_event: threading.Event,
) -> None:
logs_dir = job_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
path_entries = env.get("PATH", "").split(os.pathsep)
for extra_path in ["/usr/local/bin", "/opt/homebrew/bin"]:
if extra_path not in path_entries:
path_entries.insert(0, extra_path)
env["PATH"] = os.pathsep.join(path_entries)
env["POC1_INPUT_XLSX"] = str(input_path)
env["POC1_OUTPUT_DIR"] = str(processed_dir)
try:
_write_status(job_dir, "processing")
_run_step(
[sys.executable, "src/preprocessing/exploration_preprocess.py"],
logs_dir / "preprocess.log",
env=env,
job_id=job_dir.name,
cancel_event=cancel_event,
)
output_path = output_dir / "output.xlsx"
_run_step(
[
sys.executable,
"src/optimization/run_optimization.py",
"--data-dir",
str(processed_dir),
"--solver",
solver_name,
"--step-size-tonnes",
str(step_size_tonnes),
"--mip-gap",
str(mip_gap),
"--time-limit",
str(time_limit_seconds),
"--output-xlsx",
str(output_path),
],
logs_dir / "optimization.log",
env=env,
job_id=job_dir.name,
cancel_event=cancel_event,
)
_write_status(job_dir, "completed", {"output": str(output_path)})
except JobCancelledError:
_write_status(job_dir, "cancelled")
except Exception as exc:
_write_status(job_dir, "failed", {"error": str(exc)})
finally:
with ACTIVE_JOBS_LOCK:
ACTIVE_JOBS.pop(job_dir.name, None)
def _write_status(job_dir: Path, status: str, extra: dict[str, str] | None = None) -> None:
payload = {"status": status}
if extra:
payload.update(extra)
(job_dir / "job.json").write_text(json.dumps(payload, indent=2), encoding="utf-8")
def _read_monthly_flow_rows(output_path: Path) -> dict[str, object]:
try:
import pandas as pd
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
try:
df = pd.read_excel(
output_path,
sheet_name="Sheet1",
header=[0, 1, 2],
index_col=[0, 1, 2],
)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc
source_names = {"Reichwalde", "Nochten", "Welzow"}
shift_names = {"F", "S", "N"}
flow_cols = []
for col in df.columns:
if not isinstance(col, tuple) or len(col) != 3:
continue
target, source, shift = col
if str(source) in source_names and str(shift) in shift_names:
flow_cols.append(col)
if not flow_cols:
return {"rows": [], "targets": [], "sources": sorted(source_names)}
flows = df[flow_cols].copy()
raw_dates = pd.to_datetime(flows.index.get_level_values(0), errors="coerce")
flows = flows.loc[~raw_dates.isna()].copy()
raw_dates = raw_dates[~raw_dates.isna()]
flows.index = pd.MultiIndex.from_arrays(
[
raw_dates,
flows.index.get_level_values(1),
flows.index.get_level_values(2),
],
names=flows.index.names,
)
monthly_by_shift = flows.groupby(pd.Grouper(level=0, freq="MS")).sum()
monthly = monthly_by_shift.T.groupby(level=[0, 1]).sum().T
target_labels = {
"J": "Jänschwalde",
"SP": "Schwarze Pumpe",
"B3": "Boxberg Werk 3",
"B4": "Boxberg Werk 4",
"V": "Veredlung",
}
rows: list[dict[str, object]] = []
for month, values in monthly.iterrows():
for (target, source), amount in values.items():
amount_value = float(amount) if amount is not None else 0.0
rows.append(
{
"month": month.strftime("%Y-%m"),
"target": str(target),
"target_label": target_labels.get(str(target), str(target)),
"source": str(source),
"amount_tonnes": amount_value,
}
)
targets = sorted({str(col[0]) for col in monthly.columns})
sources = sorted({str(col[1]) for col in monthly.columns})
return {"rows": rows, "targets": targets, "sources": sources}
def _read_flow_long_rows(output_path: Path):
try:
import pandas as pd
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
try:
df = pd.read_excel(
output_path,
sheet_name="Sheet1",
header=[0, 1, 2],
index_col=[0, 1, 2],
)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc
source_names = {"Reichwalde", "Nochten", "Welzow"}
shift_names = {"F", "S", "N"}
flow_cols = [
col
for col in df.columns
if isinstance(col, tuple)
and len(col) == 3
and str(col[1]) in source_names
and str(col[2]) in shift_names
]
if not flow_cols:
return pd.DataFrame(columns=["date", "month", "target", "source", "shift", "amount_tonnes"])
rows = []
for (date_raw, _week, _day), values in df[flow_cols].iterrows():
date = pd.to_datetime(date_raw, errors="coerce")
if pd.isna(date):
continue
date = date.normalize()
for (target, source, shift), amount in values.items():
rows.append(
{
"date": date,
"month": date.strftime("%Y-%m"),
"target": str(target),
"source": str(source),
"shift": str(shift),
"amount_tonnes": float(amount) if amount is not None else 0.0,
}
)
return pd.DataFrame(rows)
def _read_capacity_timeseries(job_dir: Path) -> dict[str, object]:
try:
import math
import pandas as pd
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
output_path = job_dir / "output" / "output.xlsx"
processed_dir = job_dir / "processed"
if not output_path.exists():
raise HTTPException(status_code=404, detail="Output not found")
flows = _read_flow_long_rows(output_path)
if flows.empty:
return {"groups": []}
def _point_x(date_val, shift=None, granularity="day"):
if granularity == "month":
return f"{date_val}-01"
if granularity == "shift":
hour_map = {"F": 6, "S": 14, "N": 22}
hour = hour_map.get(shift, 0)
return f"{date_val.strftime('%Y-%m-%d')}T{hour:02d}:00:00"
return date_val.strftime("%Y-%m-%d")
def _point_label(date_val, shift=None, granularity="day"):
if granularity == "month":
return str(date_val)
if granularity == "shift":
return f"{date_val.strftime('%Y-%m-%d')} {shift}"
return date_val.strftime("%Y-%m-%d")
def _series_from_dataframe(
*,
series_id: str,
label: str,
df_usage,
granularity: str,
limit_value: float | None = None,
limit_map: dict[tuple, float] | None = None,
) -> dict[str, object]:
points: list[dict[str, object]] = []
for row in df_usage.itertuples(index=False):
key = row.key
usage = float(row.usage_tonnes)
if granularity == "month":
x = _point_x(key, granularity="month")
point_label = _point_label(key, granularity="month")
limit = limit_value
util = (100 * usage / limit) if limit and limit > 0 else None
elif granularity == "day":
x = _point_x(key, granularity="day")
point_label = _point_label(key, granularity="day")
if limit_map is not None:
limit = limit_map.get(key)
else:
limit = limit_value
util = (100 * usage / limit) if limit and limit > 0 else None
else:
date_val, shift = key
x = _point_x(date_val, shift=shift, granularity="shift")
point_label = _point_label(date_val, shift=shift, granularity="shift")
if limit_map is not None:
limit = limit_map.get((date_val, shift))
else:
limit = limit_value
util = (100 * usage / limit) if limit and limit > 0 else None
points.append(
{
"x": x,
"label": point_label,
"usage_tonnes": usage,
"limit_tonnes": float(limit) if limit is not None else None,
"utilization_pct": float(util) if util is not None else None,
}
)
return {"id": series_id, "label": label, "granularity": granularity, "points": points}
def _aggregate_usage(
*,
sources: set[str] | None = None,
targets: set[str] | None = None,
granularity: str = "day",
):
df = flows
if sources is not None:
df = df[df["source"].isin(sources)]
if targets is not None:
df = df[df["target"].isin(targets)]
if granularity == "month":
agg = df.groupby("month", as_index=False)["amount_tonnes"].sum()
agg = agg.rename(columns={"month": "key", "amount_tonnes": "usage_tonnes"})
agg["key"] = agg["key"].astype(str)
return agg.sort_values("key")
if granularity == "day":
agg = df.groupby("date", as_index=False)["amount_tonnes"].sum()
agg = agg.rename(columns={"date": "key", "amount_tonnes": "usage_tonnes"})
return agg.sort_values("key")
agg = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum()
agg["_shift_sort"] = pd.Categorical(agg["shift"], categories=["F", "S", "N"], ordered=True)
agg = agg.sort_values(["date", "_shift_sort"])
agg["key"] = list(zip(agg["date"], agg["shift"]))
agg = agg.rename(columns={"amount_tonnes": "usage_tonnes"})
return agg[["key", "usage_tonnes"]]
groups: list[dict[str, object]] = []
# Foerderkapazitaeten (kumuliert je Monat auf Tagesbasis)
foerder_file = processed_dir / "foerderkapaz.parquet"
if foerder_file.exists():
cap_df = pd.read_parquet(foerder_file)
source_map = {
"Reichwalde (RW)": "Reichwalde",
"Nochten (NO)": "Nochten",
"Welzow-Süd": "Welzow",
}
label_map = {
"Reichwalde": "Förderung TB Reichwalde (kumuliert im Monat)",
"Nochten": "Förderung TB Nochten (kumuliert im Monat)",
"Welzow": "Förderung TB Welzow-Süd (kumuliert im Monat)",
}
limit_by_source: dict[str, float] = {}
for _, row in cap_df[cap_df["zeitraum"] == "pro Monat"].iterrows():
source = source_map.get(str(row["tagebau"]))
if source:
limit_by_source[source] = float(row["maximal"])
foerder_series: list[dict[str, object]] = []
def _aggregate_cumulative_daily_in_month(*, sources: set[str]):
usage_df = _aggregate_usage(sources=sources, granularity="day")
if usage_df.empty:
return usage_df, {}
usage_df = usage_df.copy()
usage_df["month"] = usage_df["key"].dt.strftime("%Y-%m")
usage_df["usage_tonnes"] = usage_df.groupby("month")["usage_tonnes"].cumsum()
limit_map: dict[object, float] = {}
for day in usage_df["key"]:
month_start = pd.Timestamp(day).replace(day=1)
month_days = int(month_start.days_in_month)
month_key = month_start.strftime("%Y-%m")
if sources == {"Reichwalde", "Nochten"}:
monthly_limit = 3_000_000.0
else:
source = next(iter(sources))
monthly_limit = limit_by_source.get(source)
if monthly_limit is None:
continue
# monthly cap line (constant during month) for cumulative comparison
limit_map[day] = float(monthly_limit)
return usage_df[["key", "usage_tonnes"]], limit_map
for source in ["Reichwalde", "Nochten", "Welzow"]:
usage_df, limit_map = _aggregate_cumulative_daily_in_month(sources={source})
if usage_df.empty:
continue
foerder_series.append(
_series_from_dataframe(
series_id=f"foerder_{source.lower()}_cum_month",
label=label_map[source],
df_usage=usage_df,
granularity="day",
limit_map=limit_map,
)
)
rwno_usage_df, rwno_limit_map = _aggregate_cumulative_daily_in_month(
sources={"Reichwalde", "Nochten"}
)
if not rwno_usage_df.empty:
foerder_series.append(
_series_from_dataframe(
series_id="foerder_rw_nochten_cum_month",
label="Förderung TB Reichwalde + Nochten (kumuliert im Monat, gemeinsame Grenze)",
df_usage=rwno_usage_df,
granularity="day",
limit_map=rwno_limit_map,
)
)
if foerder_series:
groups.append({"key": "foerder", "label": "Förderkapazitäten", "series": foerder_series})
# Verladungskapazitaeten (static + availability)
verladung_series: list[dict[str, object]] = []
verladung_file = processed_dir / "verladungskap.parquet"
if verladung_file.exists():
verladung_cap = pd.read_parquet(verladung_file)
def _cap_lookup(verladung_label: str, zeitraum_label: str):
row = verladung_cap[
(verladung_cap["verladung"] == verladung_label) & (verladung_cap["zeitraum"] == zeitraum_label)
]
if row.empty:
return None
return float(row.iloc[0]["maximal"])
boxberg_targets = {"J", "SP", "B3", "V"}
welzow_targets = {"J", "SP", "B3", "V"}
boxberg_sources = {"Reichwalde", "Nochten"}
welzow_sources = {"Welzow"}
for series_id, label, sources, targets, granularity, cap_label, cap_period in [
(
"verladung_boxberg_shift",
"Verladung Boxberg (RW+NO) pro Schicht",
boxberg_sources,
boxberg_targets,
"shift",
"Boxberg (RW+NO)",
"pro Schicht",
),
(
"verladung_boxberg_day",
"Verladung Boxberg (RW+NO) pro Tag",
boxberg_sources,
boxberg_targets,
"day",
"Boxberg (RW+NO)",
"pro Tag",
),
(
"verladung_welzow_shift",
"Verladung Welzow-Süd pro Schicht",
welzow_sources,
welzow_targets,
"shift",
"Welzow-Süd",
"pro Schicht",
),
(
"verladung_welzow_day",
"Verladung Welzow-Süd pro Tag",
welzow_sources,
welzow_targets,
"day",
"Welzow-Süd",
"pro Tag",
),
]:
usage_df = _aggregate_usage(sources=sources, targets=targets, granularity=granularity)
if usage_df.empty:
continue
cap = _cap_lookup(cap_label, cap_period)
verladung_series.append(
_series_from_dataframe(
series_id=series_id,
label=label,
df_usage=usage_df,
granularity=granularity,
limit_value=cap,
)
)
avail_file = processed_dir / "Verfuegbarkeiten.parquet"
if avail_file.exists():
avail = pd.read_parquet(avail_file)
shift_map = {"S1": "F", "S2": "S", "S3": "N"}
for scope, label, cols, sources in [
(
"welzow",
"Verfügbarkeit Welzow-Süd (pro Schicht)",
["Welzow_Sued_S1_t", "Welzow_Sued_S2_t", "Welzow_Sued_S3_t"],
{"Welzow"},
),
(
"rw_no",
"Verfügbarkeit Boxberg RW+NO (pro Schicht)",
["Boxberg_NO_RW_S1_t", "Boxberg_NO_RW_S2_t", "Boxberg_NO_RW_S3_t"],
{"Reichwalde", "Nochten"},
),
]:
limit_map: dict[tuple, float] = {}
for _, row in avail.iterrows():
date = pd.to_datetime(row["datum"], errors="coerce")
if pd.isna(date):
continue
date = date.normalize()
for col in cols:
val = row.get(col)
if val is None or (isinstance(val, float) and math.isnan(val)):
continue
shift = shift_map[col.split("_")[-2]]
limit_map[(date, shift)] = float(val)
if not limit_map:
continue
usage_df = _aggregate_usage(sources=sources, granularity="shift")
if usage_df.empty:
continue
verladung_series.append(
_series_from_dataframe(
series_id=f"availability_{scope}_shift",
label=label,
df_usage=usage_df,
granularity="shift",
limit_map=limit_map,
)
)
if verladung_series:
groups.append({"key": "verladung", "label": "Verladungskapazitäten", "series": verladung_series})
# Zugdurchlasskapazitaeten (per shift)
zug_file = processed_dir / "zugdurchlass.parquet"
zug_series: list[dict[str, object]] = []
if zug_file.exists():
zug = pd.read_parquet(zug_file).copy()
def _zug_cap(start_exact: str, ziel_exact: str):
s = zug["start"].astype(str).str.strip()
z = zug["ziel"].astype(str).str.strip()
row = zug[(s == start_exact) & (z == ziel_exact)]
if row.empty:
return None
return float(row.iloc[0]["maximal"])
def _add_zug_series(series_id: str, label: str, limit: float | None, terms: list[tuple[str, str]]):
if limit is None:
return
mask = None
for source, target in terms:
part = (flows["source"] == source) & (flows["target"] == target)
mask = part if mask is None else (mask | part)
if mask is None:
return
df = flows[mask].copy()
if df.empty:
return
usage_df = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum()
usage_df["_shift_sort"] = pd.Categorical(
usage_df["shift"], categories=["F", "S", "N"], ordered=True
)
usage_df = usage_df.sort_values(["date", "_shift_sort"])
usage_df["key"] = list(zip(usage_df["date"], usage_df["shift"]))
usage_df = usage_df.rename(columns={"amount_tonnes": "usage_tonnes"})[["key", "usage_tonnes"]]
zug_series.append(
_series_from_dataframe(
series_id=series_id,
label=label,
df_usage=usage_df,
granularity="shift",
limit_value=limit,
)
)
_add_zug_series(
"zug_rwno_to_j",
"Zugdurchlass KLP: RW+NO -> KW Jänschwalde",
_zug_cap("Verladung Boxberg (KLP)", "KW Jänschwalde"),
[("Reichwalde", "J"), ("Nochten", "J")],
)
_add_zug_series(
"zug_rwno_to_sp",
"Zugdurchlass KLP: RW+NO -> KW Schwarze Pumpe",
_zug_cap("Verladung Boxberg (KLP)", "KW Schwarze Pumpe"),
[("Reichwalde", "SP"), ("Nochten", "SP")],
)
_add_zug_series(
"zug_rwno_to_v",
"Zugdurchlass KLP: RW+NO -> Veredlung",
_zug_cap("Verladung Boxberg (KLP)", "Veredlung ISP"),
[("Reichwalde", "V"), ("Nochten", "V")],
)
_add_zug_series(
"zug_rwno_to_b3",
"Zugdurchlass KLP: RW+NO -> KW Boxberg Werk 3",
_zug_cap("Verladung Boxberg (KLP)", "KW Boxberg Werk 3"),
[("Reichwalde", "B3"), ("Nochten", "B3")],
)
_add_zug_series(
"zug_w_to_j",
"Zugdurchlass KUP: Welzow -> KW Jänschwalde",
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Jänschwalde"),
[("Welzow", "J")],
)
_add_zug_series(
"zug_w_to_sp",
"Zugdurchlass KUP: Welzow -> KW Schwarze Pumpe",
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Schwarze Pumpe"),
[("Welzow", "SP")],
)
_add_zug_series(
"zug_w_to_v",
"Zugdurchlass KUP: Welzow -> Veredlung",
_zug_cap("Verladung Welzow-Süd (KUP)", "Veredlung ISP"),
[("Welzow", "V")],
)
_add_zug_series(
"zug_w_to_b3",
"Zugdurchlass KUP: Welzow -> KW Boxberg Werk 3",
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Boxberg Werk 3"),
[("Welzow", "B3")],
)
_add_zug_series(
"zug_rwno_to_sp_v",
"Zugdurchlass KLP kombiniert: RW+NO -> SP + Veredlung",
_zug_cap("Verladung Boxberg (KLP)", "KW SP + V ISP"),
[("Reichwalde", "SP"), ("Nochten", "SP"), ("Reichwalde", "V"), ("Nochten", "V")],
)
_add_zug_series(
"zug_rwno_to_all",
"Zugdurchlass KLP kombiniert: RW+NO -> JW + SP + V + B3",
_zug_cap("Verladung Boxberg (KLP)", "KW JW + KW SP + V ISP + KW B3"),
[
("Reichwalde", "J"),
("Reichwalde", "SP"),
("Reichwalde", "V"),
("Reichwalde", "B3"),
("Nochten", "J"),
("Nochten", "SP"),
("Nochten", "V"),
("Nochten", "B3"),
],
)
_add_zug_series(
"zug_w_to_sp_v",
"Zugdurchlass KUP kombiniert: Welzow -> SP + Veredlung",
_zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP"),
[("Welzow", "SP"), ("Welzow", "V")],
)
_add_zug_series(
"zug_w_to_sp_v_b3",
"Zugdurchlass KUP kombiniert: Welzow -> SP + V + B3",
_zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP + KW B3"),
[("Welzow", "SP"), ("Welzow", "V"), ("Welzow", "B3")],
)
_add_zug_series(
"zug_all_to_j",
"Zugdurchlass KUP+KLP: gesamt -> KW Jänschwalde",
_zug_cap("KUP + KLP", "KW Jänschwalde"),
[("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "J")],
)
_add_zug_series(
"zug_all_to_b3",
"Zugdurchlass KUP+KLP: gesamt -> KW Boxberg Werk 3",
_zug_cap("KUP + KLP", "KW Boxberg Werk 3"),
[("Reichwalde", "B3"), ("Nochten", "B3"), ("Welzow", "B3")],
)
_add_zug_series(
"zug_rwno_j_and_w_b3",
"Zugdurchlass kombiniert: KLP->JW + KUP->B3",
_zug_cap("KLP zum KW JW", "KUP zum KW B3"),
[("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "B3")],
)
_add_zug_series(
"zug_rwno_jspv_and_w_b3",
"Zugdurchlass kombiniert: KLP->JW+SP+V + KUP->B3",
_zug_cap("KLP zum KW JW + KW SP + V ISP", "KUP zum KW B3"),
[
("Reichwalde", "J"),
("Nochten", "J"),
("Reichwalde", "SP"),
("Nochten", "SP"),
("Reichwalde", "V"),
("Nochten", "V"),
("Welzow", "B3"),
],
)
kvb_file = processed_dir / "zugdurchlass_kvb_nord.parquet"
if kvb_file.exists():
kvb = pd.read_parquet(kvb_file)
limit_map: dict[tuple, float] = {}
shift_cols = [("KVB_Nord_S1_t", "F"), ("KVB_Nord_S2_t", "S"), ("KVB_Nord_S3_t", "N")]
for _, row in kvb.iterrows():
date = pd.to_datetime(row["datum"], errors="coerce")
if pd.isna(date):
continue
date = date.normalize()
for col, shift in shift_cols:
val = row.get(col)
if val is None or pd.isna(val):
continue
limit_map[(date, shift)] = float(val)
usage_df = _aggregate_usage(targets={"J"}, granularity="shift")
if limit_map and not usage_df.empty:
zug_series.append(
_series_from_dataframe(
series_id="zug_kvb_nord_dynamic_j",
label="Zugdurchlass KVB Nord (dynamisch) -> KW Jänschwalde",
df_usage=usage_df,
granularity="shift",
limit_map=limit_map,
)
)
if zug_series:
groups.append({"key": "zugdurchlass", "label": "Zugdurchlasskapazitäten", "series": zug_series})
return {"groups": groups}
@app.get("/api/health")
def health() -> dict[str, object]:
return {"status": "ok", "solvers": _get_solver_availability()}
@app.post("/api/run")
async def run(
file: UploadFile = File(...),
solver: str = Form("highs"),
step_size_tonnes: int = Form(1000),
mip_gap_pct: float = Form(5.0),
max_runtime_minutes: float = Form(10.0),
) -> dict[str, str]:
solver = solver.lower().strip()
if solver not in {"highs", "gurobi", "scip"}:
raise HTTPException(status_code=400, detail="Unsupported solver")
availability = _get_solver_availability()
if not availability.get(solver, False):
raise HTTPException(status_code=400, detail=f"Solver not available: {solver}")
if step_size_tonnes not in {960, 1000}:
raise HTTPException(status_code=400, detail="Unsupported step size")
if not (0 <= float(mip_gap_pct) <= 100):
raise HTTPException(status_code=400, detail="mip_gap_pct must be between 0 and 100")
if not (0 < float(max_runtime_minutes) <= 24 * 60):
raise HTTPException(status_code=400, detail="max_runtime_minutes must be between 0 and 1440")
mip_gap = float(mip_gap_pct) / 100.0
time_limit_seconds = max(1, int(round(float(max_runtime_minutes) * 60)))
job_id = uuid.uuid4().hex
job_dir = JOBS_DIR / job_id
input_dir = job_dir / "input"
processed_dir = job_dir / "processed"
output_dir = job_dir / "output"
input_dir.mkdir(parents=True, exist_ok=True)
processed_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
input_path = input_dir / "PoC1_Rohkohleverteilung_Input_Parameter.xlsx"
with input_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
cancel_event = threading.Event()
with ACTIVE_JOBS_LOCK:
ACTIVE_JOBS[job_id] = {"cancel_event": cancel_event, "process": None}
_write_status(
job_dir,
"queued",
{
"solver": solver,
"step_size_tonnes": str(step_size_tonnes),
"mip_gap_pct": str(float(mip_gap_pct)),
"max_runtime_minutes": str(float(max_runtime_minutes)),
},
)
thread = threading.Thread(
target=_run_job,
args=(
job_dir,
input_path,
processed_dir,
output_dir,
solver,
step_size_tonnes,
mip_gap,
time_limit_seconds,
cancel_event,
),
daemon=True,
)
thread.start()
return {
"job_id": job_id,
"download_url": f"/api/jobs/{job_id}/output",
"solver": solver,
"step_size_tonnes": str(step_size_tonnes),
"mip_gap_pct": str(float(mip_gap_pct)),
"max_runtime_minutes": str(float(max_runtime_minutes)),
}
@app.get("/api/jobs/{job_id}")
def job_status(job_id: str) -> dict[str, str]:
job_dir = JOBS_DIR / job_id
status_path = job_dir / "job.json"
if not status_path.exists():
raise HTTPException(status_code=404, detail="Job not found")
return json.loads(status_path.read_text(encoding="utf-8"))
@app.post("/api/jobs/{job_id}/cancel")
def cancel_job(job_id: str) -> dict[str, str]:
job_dir = JOBS_DIR / job_id
status_path = job_dir / "job.json"
if not status_path.exists():
raise HTTPException(status_code=404, detail="Job not found")
payload = json.loads(status_path.read_text(encoding="utf-8"))
if payload.get("status") in {"completed", "failed", "cancelled"}:
return {"status": str(payload.get("status"))}
with ACTIVE_JOBS_LOCK:
runtime = ACTIVE_JOBS.get(job_id)
if runtime is None:
_write_status(job_dir, "cancelled")
return {"status": "cancelled"}
cancel_event = runtime.get("cancel_event")
process = runtime.get("process")
if isinstance(cancel_event, threading.Event):
cancel_event.set()
if process is not None and getattr(process, "poll", lambda: None)() is None:
try:
process.terminate()
except Exception:
pass
_write_status(job_dir, "cancelling")
return {"status": "cancelling"}
@app.get("/api/jobs/{job_id}/output")
def job_output(job_id: str) -> FileResponse:
output_path = JOBS_DIR / job_id / "output" / "output.xlsx"
if not output_path.exists():
raise HTTPException(status_code=404, detail="Output not found")
return FileResponse(output_path, filename="output.xlsx")
@app.get("/api/jobs/{job_id}/monthly-flows")
def job_monthly_flows(job_id: str) -> dict[str, object]:
output_path = JOBS_DIR / job_id / "output" / "output.xlsx"
if not output_path.exists():
raise HTTPException(status_code=404, detail="Output not found")
return _read_monthly_flow_rows(output_path)
@app.get("/api/jobs/{job_id}/capacity-timeseries")
def job_capacity_timeseries(job_id: str) -> dict[str, object]:
job_dir = JOBS_DIR / job_id
if not job_dir.exists():
raise HTTPException(status_code=404, detail="Job not found")
return _read_capacity_timeseries(job_dir)
@app.get("/api/jobs/{job_id}/logs/{log_name}")
def job_log(job_id: str, log_name: str) -> PlainTextResponse:
if log_name not in {"preprocess", "optimization"}:
raise HTTPException(status_code=400, detail="Unknown log name")
log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log"
if not log_path.exists():
raise HTTPException(status_code=404, detail="Log not found")
return PlainTextResponse(log_path.read_text(encoding="utf-8"))
@app.get("/api/jobs/{job_id}/logs/{log_name}/stream")
def job_log_stream(job_id: str, log_name: str) -> StreamingResponse:
if log_name not in {"preprocess", "optimization"}:
raise HTTPException(status_code=400, detail="Unknown log name")
log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log"
if not log_path.exists():
raise HTTPException(status_code=404, detail="Log not found")
def event_stream():
with log_path.open("r", encoding="utf-8") as log_file:
for line in log_file:
yield f"data: {line.rstrip()}\n\n"
while True:
line = log_file.readline()
if line:
yield f"data: {line.rstrip()}\n\n"
else:
time.sleep(0.5)
return StreamingResponse(event_stream(), media_type="text/event-stream")
if FRONTEND_DIST.exists():
assets_dir = FRONTEND_DIST / "assets"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=assets_dir), name="frontend-assets")
@app.get("/", include_in_schema=False)
def frontend_index() -> FileResponse:
return FileResponse(FRONTEND_DIST / "index.html")
@app.get("/{full_path:path}", include_in_schema=False)
def frontend_spa(full_path: str) -> FileResponse:
if full_path.startswith("api/"):
raise HTTPException(status_code=404, detail="Not found")
target = FRONTEND_DIST / full_path
if target.exists() and target.is_file():
return FileResponse(target)
return FileResponse(FRONTEND_DIST / "index.html")