997 lines
36 KiB
Python
997 lines
36 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, PlainTextResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
|
FRONTEND_DIST = PROJECT_ROOT / "webapp" / "frontend" / "dist"
|
|
VAR_DIR = PROJECT_ROOT / "var"
|
|
JOBS_DIR = VAR_DIR / "jobs"
|
|
JOBS_DIR.mkdir(parents=True, exist_ok=True)
|
|
ACTIVE_JOBS: dict[str, dict[str, object]] = {}
|
|
ACTIVE_JOBS_LOCK = threading.Lock()
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
def _get_solver_availability() -> dict[str, bool]:
|
|
try:
|
|
import pyomo.environ as pyo
|
|
except Exception:
|
|
return {"highs": False, "gurobi": False}
|
|
|
|
availability: dict[str, bool] = {}
|
|
for solver_name in ("highs", "gurobi", "scip"):
|
|
try:
|
|
solver = pyo.SolverFactory(solver_name)
|
|
availability[solver_name] = bool(solver) and bool(
|
|
solver.available(exception_flag=False)
|
|
)
|
|
except Exception:
|
|
availability[solver_name] = False
|
|
return availability
|
|
|
|
|
|
class JobCancelledError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _set_active_process(job_id: str, process) -> None:
|
|
with ACTIVE_JOBS_LOCK:
|
|
runtime = ACTIVE_JOBS.get(job_id)
|
|
if runtime is not None:
|
|
runtime["process"] = process
|
|
|
|
|
|
def _clear_active_process(job_id: str) -> None:
|
|
with ACTIVE_JOBS_LOCK:
|
|
runtime = ACTIVE_JOBS.get(job_id)
|
|
if runtime is not None:
|
|
runtime["process"] = None
|
|
|
|
|
|
def _run_step(
|
|
cmd: list[str],
|
|
log_path: Path,
|
|
env: dict[str, str] | None = None,
|
|
*,
|
|
job_id: str,
|
|
cancel_event: threading.Event,
|
|
) -> None:
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("w", encoding="utf-8") as log_file:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
cwd=PROJECT_ROOT,
|
|
env=env,
|
|
stdout=log_file,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
_set_active_process(job_id, process)
|
|
try:
|
|
while True:
|
|
if cancel_event.is_set():
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
process.wait(timeout=5)
|
|
raise JobCancelledError("Job cancelled by user")
|
|
result_code = process.poll()
|
|
if result_code is not None:
|
|
break
|
|
time.sleep(0.2)
|
|
finally:
|
|
_clear_active_process(job_id)
|
|
|
|
if result_code != 0:
|
|
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
|
|
|
|
|
|
def _run_job(
|
|
job_dir: Path,
|
|
input_path: Path,
|
|
processed_dir: Path,
|
|
output_dir: Path,
|
|
solver_name: str,
|
|
step_size_tonnes: int,
|
|
mip_gap: float,
|
|
time_limit_seconds: int,
|
|
cancel_event: threading.Event,
|
|
) -> None:
|
|
logs_dir = job_dir / "logs"
|
|
logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
env = os.environ.copy()
|
|
path_entries = env.get("PATH", "").split(os.pathsep)
|
|
for extra_path in ["/usr/local/bin", "/opt/homebrew/bin"]:
|
|
if extra_path not in path_entries:
|
|
path_entries.insert(0, extra_path)
|
|
env["PATH"] = os.pathsep.join(path_entries)
|
|
env["POC1_INPUT_XLSX"] = str(input_path)
|
|
env["POC1_OUTPUT_DIR"] = str(processed_dir)
|
|
|
|
try:
|
|
_write_status(job_dir, "processing")
|
|
_run_step(
|
|
[sys.executable, "src/preprocessing/exploration_preprocess.py"],
|
|
logs_dir / "preprocess.log",
|
|
env=env,
|
|
job_id=job_dir.name,
|
|
cancel_event=cancel_event,
|
|
)
|
|
output_path = output_dir / "output.xlsx"
|
|
_run_step(
|
|
[
|
|
sys.executable,
|
|
"src/optimization/run_optimization.py",
|
|
"--data-dir",
|
|
str(processed_dir),
|
|
"--solver",
|
|
solver_name,
|
|
"--step-size-tonnes",
|
|
str(step_size_tonnes),
|
|
"--mip-gap",
|
|
str(mip_gap),
|
|
"--time-limit",
|
|
str(time_limit_seconds),
|
|
"--output-xlsx",
|
|
str(output_path),
|
|
],
|
|
logs_dir / "optimization.log",
|
|
env=env,
|
|
job_id=job_dir.name,
|
|
cancel_event=cancel_event,
|
|
)
|
|
_write_status(job_dir, "completed", {"output": str(output_path)})
|
|
except JobCancelledError:
|
|
_write_status(job_dir, "cancelled")
|
|
except Exception as exc:
|
|
_write_status(job_dir, "failed", {"error": str(exc)})
|
|
finally:
|
|
with ACTIVE_JOBS_LOCK:
|
|
ACTIVE_JOBS.pop(job_dir.name, None)
|
|
|
|
|
|
def _write_status(job_dir: Path, status: str, extra: dict[str, str] | None = None) -> None:
|
|
payload = {"status": status}
|
|
if extra:
|
|
payload.update(extra)
|
|
(job_dir / "job.json").write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _read_monthly_flow_rows(output_path: Path) -> dict[str, object]:
|
|
try:
|
|
import pandas as pd
|
|
except Exception as exc: # pragma: no cover
|
|
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
|
|
|
|
try:
|
|
df = pd.read_excel(
|
|
output_path,
|
|
sheet_name="Sheet1",
|
|
header=[0, 1, 2],
|
|
index_col=[0, 1, 2],
|
|
)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc
|
|
|
|
source_names = {"Reichwalde", "Nochten", "Welzow"}
|
|
shift_names = {"F", "S", "N"}
|
|
|
|
flow_cols = []
|
|
for col in df.columns:
|
|
if not isinstance(col, tuple) or len(col) != 3:
|
|
continue
|
|
target, source, shift = col
|
|
if str(source) in source_names and str(shift) in shift_names:
|
|
flow_cols.append(col)
|
|
|
|
if not flow_cols:
|
|
return {"rows": [], "targets": [], "sources": sorted(source_names)}
|
|
|
|
flows = df[flow_cols].copy()
|
|
raw_dates = pd.to_datetime(flows.index.get_level_values(0), errors="coerce")
|
|
flows = flows.loc[~raw_dates.isna()].copy()
|
|
raw_dates = raw_dates[~raw_dates.isna()]
|
|
flows.index = pd.MultiIndex.from_arrays(
|
|
[
|
|
raw_dates,
|
|
flows.index.get_level_values(1),
|
|
flows.index.get_level_values(2),
|
|
],
|
|
names=flows.index.names,
|
|
)
|
|
|
|
monthly_by_shift = flows.groupby(pd.Grouper(level=0, freq="MS")).sum()
|
|
monthly = monthly_by_shift.T.groupby(level=[0, 1]).sum().T
|
|
|
|
target_labels = {
|
|
"J": "Jänschwalde",
|
|
"SP": "Schwarze Pumpe",
|
|
"B3": "Boxberg Werk 3",
|
|
"B4": "Boxberg Werk 4",
|
|
"V": "Veredlung",
|
|
}
|
|
|
|
rows: list[dict[str, object]] = []
|
|
for month, values in monthly.iterrows():
|
|
for (target, source), amount in values.items():
|
|
amount_value = float(amount) if amount is not None else 0.0
|
|
rows.append(
|
|
{
|
|
"month": month.strftime("%Y-%m"),
|
|
"target": str(target),
|
|
"target_label": target_labels.get(str(target), str(target)),
|
|
"source": str(source),
|
|
"amount_tonnes": amount_value,
|
|
}
|
|
)
|
|
|
|
targets = sorted({str(col[0]) for col in monthly.columns})
|
|
sources = sorted({str(col[1]) for col in monthly.columns})
|
|
return {"rows": rows, "targets": targets, "sources": sources}
|
|
|
|
|
|
def _read_flow_long_rows(output_path: Path):
|
|
try:
|
|
import pandas as pd
|
|
except Exception as exc: # pragma: no cover
|
|
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
|
|
|
|
try:
|
|
df = pd.read_excel(
|
|
output_path,
|
|
sheet_name="Sheet1",
|
|
header=[0, 1, 2],
|
|
index_col=[0, 1, 2],
|
|
)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not read output workbook: {exc}") from exc
|
|
|
|
source_names = {"Reichwalde", "Nochten", "Welzow"}
|
|
shift_names = {"F", "S", "N"}
|
|
flow_cols = [
|
|
col
|
|
for col in df.columns
|
|
if isinstance(col, tuple)
|
|
and len(col) == 3
|
|
and str(col[1]) in source_names
|
|
and str(col[2]) in shift_names
|
|
]
|
|
if not flow_cols:
|
|
return pd.DataFrame(columns=["date", "month", "target", "source", "shift", "amount_tonnes"])
|
|
|
|
rows = []
|
|
for (date_raw, _week, _day), values in df[flow_cols].iterrows():
|
|
date = pd.to_datetime(date_raw, errors="coerce")
|
|
if pd.isna(date):
|
|
continue
|
|
date = date.normalize()
|
|
for (target, source, shift), amount in values.items():
|
|
rows.append(
|
|
{
|
|
"date": date,
|
|
"month": date.strftime("%Y-%m"),
|
|
"target": str(target),
|
|
"source": str(source),
|
|
"shift": str(shift),
|
|
"amount_tonnes": float(amount) if amount is not None else 0.0,
|
|
}
|
|
)
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def _read_capacity_timeseries(job_dir: Path) -> dict[str, object]:
|
|
try:
|
|
import math
|
|
import pandas as pd
|
|
except Exception as exc: # pragma: no cover
|
|
raise HTTPException(status_code=500, detail=f"Pandas not available: {exc}") from exc
|
|
|
|
output_path = job_dir / "output" / "output.xlsx"
|
|
processed_dir = job_dir / "processed"
|
|
if not output_path.exists():
|
|
raise HTTPException(status_code=404, detail="Output not found")
|
|
|
|
flows = _read_flow_long_rows(output_path)
|
|
if flows.empty:
|
|
return {"groups": []}
|
|
|
|
def _point_x(date_val, shift=None, granularity="day"):
|
|
if granularity == "month":
|
|
return f"{date_val}-01"
|
|
if granularity == "shift":
|
|
hour_map = {"F": 6, "S": 14, "N": 22}
|
|
hour = hour_map.get(shift, 0)
|
|
return f"{date_val.strftime('%Y-%m-%d')}T{hour:02d}:00:00"
|
|
return date_val.strftime("%Y-%m-%d")
|
|
|
|
def _point_label(date_val, shift=None, granularity="day"):
|
|
if granularity == "month":
|
|
return str(date_val)
|
|
if granularity == "shift":
|
|
return f"{date_val.strftime('%Y-%m-%d')} {shift}"
|
|
return date_val.strftime("%Y-%m-%d")
|
|
|
|
def _series_from_dataframe(
|
|
*,
|
|
series_id: str,
|
|
label: str,
|
|
df_usage,
|
|
granularity: str,
|
|
limit_value: float | None = None,
|
|
limit_map: dict[tuple, float] | None = None,
|
|
) -> dict[str, object]:
|
|
points: list[dict[str, object]] = []
|
|
for row in df_usage.itertuples(index=False):
|
|
key = row.key
|
|
usage = float(row.usage_tonnes)
|
|
if granularity == "month":
|
|
x = _point_x(key, granularity="month")
|
|
point_label = _point_label(key, granularity="month")
|
|
limit = limit_value
|
|
util = (100 * usage / limit) if limit and limit > 0 else None
|
|
elif granularity == "day":
|
|
x = _point_x(key, granularity="day")
|
|
point_label = _point_label(key, granularity="day")
|
|
if limit_map is not None:
|
|
limit = limit_map.get(key)
|
|
else:
|
|
limit = limit_value
|
|
util = (100 * usage / limit) if limit and limit > 0 else None
|
|
else:
|
|
date_val, shift = key
|
|
x = _point_x(date_val, shift=shift, granularity="shift")
|
|
point_label = _point_label(date_val, shift=shift, granularity="shift")
|
|
if limit_map is not None:
|
|
limit = limit_map.get((date_val, shift))
|
|
else:
|
|
limit = limit_value
|
|
util = (100 * usage / limit) if limit and limit > 0 else None
|
|
|
|
points.append(
|
|
{
|
|
"x": x,
|
|
"label": point_label,
|
|
"usage_tonnes": usage,
|
|
"limit_tonnes": float(limit) if limit is not None else None,
|
|
"utilization_pct": float(util) if util is not None else None,
|
|
}
|
|
)
|
|
return {"id": series_id, "label": label, "granularity": granularity, "points": points}
|
|
|
|
def _aggregate_usage(
|
|
*,
|
|
sources: set[str] | None = None,
|
|
targets: set[str] | None = None,
|
|
granularity: str = "day",
|
|
):
|
|
df = flows
|
|
if sources is not None:
|
|
df = df[df["source"].isin(sources)]
|
|
if targets is not None:
|
|
df = df[df["target"].isin(targets)]
|
|
if granularity == "month":
|
|
agg = df.groupby("month", as_index=False)["amount_tonnes"].sum()
|
|
agg = agg.rename(columns={"month": "key", "amount_tonnes": "usage_tonnes"})
|
|
agg["key"] = agg["key"].astype(str)
|
|
return agg.sort_values("key")
|
|
if granularity == "day":
|
|
agg = df.groupby("date", as_index=False)["amount_tonnes"].sum()
|
|
agg = agg.rename(columns={"date": "key", "amount_tonnes": "usage_tonnes"})
|
|
return agg.sort_values("key")
|
|
agg = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum()
|
|
agg["_shift_sort"] = pd.Categorical(agg["shift"], categories=["F", "S", "N"], ordered=True)
|
|
agg = agg.sort_values(["date", "_shift_sort"])
|
|
agg["key"] = list(zip(agg["date"], agg["shift"]))
|
|
agg = agg.rename(columns={"amount_tonnes": "usage_tonnes"})
|
|
return agg[["key", "usage_tonnes"]]
|
|
|
|
groups: list[dict[str, object]] = []
|
|
|
|
# Foerderkapazitaeten (kumuliert je Monat auf Tagesbasis)
|
|
foerder_file = processed_dir / "foerderkapaz.parquet"
|
|
if foerder_file.exists():
|
|
cap_df = pd.read_parquet(foerder_file)
|
|
source_map = {
|
|
"Reichwalde (RW)": "Reichwalde",
|
|
"Nochten (NO)": "Nochten",
|
|
"Welzow-Süd": "Welzow",
|
|
}
|
|
label_map = {
|
|
"Reichwalde": "Förderung TB Reichwalde (kumuliert im Monat)",
|
|
"Nochten": "Förderung TB Nochten (kumuliert im Monat)",
|
|
"Welzow": "Förderung TB Welzow-Süd (kumuliert im Monat)",
|
|
}
|
|
limit_by_source: dict[str, float] = {}
|
|
for _, row in cap_df[cap_df["zeitraum"] == "pro Monat"].iterrows():
|
|
source = source_map.get(str(row["tagebau"]))
|
|
if source:
|
|
limit_by_source[source] = float(row["maximal"])
|
|
|
|
foerder_series: list[dict[str, object]] = []
|
|
|
|
def _aggregate_cumulative_daily_in_month(*, sources: set[str]):
|
|
usage_df = _aggregate_usage(sources=sources, granularity="day")
|
|
if usage_df.empty:
|
|
return usage_df, {}
|
|
usage_df = usage_df.copy()
|
|
usage_df["month"] = usage_df["key"].dt.strftime("%Y-%m")
|
|
usage_df["usage_tonnes"] = usage_df.groupby("month")["usage_tonnes"].cumsum()
|
|
|
|
limit_map: dict[object, float] = {}
|
|
for day in usage_df["key"]:
|
|
month_start = pd.Timestamp(day).replace(day=1)
|
|
month_days = int(month_start.days_in_month)
|
|
month_key = month_start.strftime("%Y-%m")
|
|
if sources == {"Reichwalde", "Nochten"}:
|
|
monthly_limit = 3_000_000.0
|
|
else:
|
|
source = next(iter(sources))
|
|
monthly_limit = limit_by_source.get(source)
|
|
if monthly_limit is None:
|
|
continue
|
|
# monthly cap line (constant during month) for cumulative comparison
|
|
limit_map[day] = float(monthly_limit)
|
|
return usage_df[["key", "usage_tonnes"]], limit_map
|
|
|
|
for source in ["Reichwalde", "Nochten", "Welzow"]:
|
|
usage_df, limit_map = _aggregate_cumulative_daily_in_month(sources={source})
|
|
if usage_df.empty:
|
|
continue
|
|
foerder_series.append(
|
|
_series_from_dataframe(
|
|
series_id=f"foerder_{source.lower()}_cum_month",
|
|
label=label_map[source],
|
|
df_usage=usage_df,
|
|
granularity="day",
|
|
limit_map=limit_map,
|
|
)
|
|
)
|
|
|
|
rwno_usage_df, rwno_limit_map = _aggregate_cumulative_daily_in_month(
|
|
sources={"Reichwalde", "Nochten"}
|
|
)
|
|
if not rwno_usage_df.empty:
|
|
foerder_series.append(
|
|
_series_from_dataframe(
|
|
series_id="foerder_rw_nochten_cum_month",
|
|
label="Förderung TB Reichwalde + Nochten (kumuliert im Monat, gemeinsame Grenze)",
|
|
df_usage=rwno_usage_df,
|
|
granularity="day",
|
|
limit_map=rwno_limit_map,
|
|
)
|
|
)
|
|
|
|
if foerder_series:
|
|
groups.append({"key": "foerder", "label": "Förderkapazitäten", "series": foerder_series})
|
|
|
|
# Verladungskapazitaeten (static + availability)
|
|
verladung_series: list[dict[str, object]] = []
|
|
verladung_file = processed_dir / "verladungskap.parquet"
|
|
if verladung_file.exists():
|
|
verladung_cap = pd.read_parquet(verladung_file)
|
|
|
|
def _cap_lookup(verladung_label: str, zeitraum_label: str):
|
|
row = verladung_cap[
|
|
(verladung_cap["verladung"] == verladung_label) & (verladung_cap["zeitraum"] == zeitraum_label)
|
|
]
|
|
if row.empty:
|
|
return None
|
|
return float(row.iloc[0]["maximal"])
|
|
|
|
boxberg_targets = {"J", "SP", "B3", "V"}
|
|
welzow_targets = {"J", "SP", "B3", "V"}
|
|
boxberg_sources = {"Reichwalde", "Nochten"}
|
|
welzow_sources = {"Welzow"}
|
|
|
|
for series_id, label, sources, targets, granularity, cap_label, cap_period in [
|
|
(
|
|
"verladung_boxberg_shift",
|
|
"Verladung Boxberg (RW+NO) pro Schicht",
|
|
boxberg_sources,
|
|
boxberg_targets,
|
|
"shift",
|
|
"Boxberg (RW+NO)",
|
|
"pro Schicht",
|
|
),
|
|
(
|
|
"verladung_boxberg_day",
|
|
"Verladung Boxberg (RW+NO) pro Tag",
|
|
boxberg_sources,
|
|
boxberg_targets,
|
|
"day",
|
|
"Boxberg (RW+NO)",
|
|
"pro Tag",
|
|
),
|
|
(
|
|
"verladung_welzow_shift",
|
|
"Verladung Welzow-Süd pro Schicht",
|
|
welzow_sources,
|
|
welzow_targets,
|
|
"shift",
|
|
"Welzow-Süd",
|
|
"pro Schicht",
|
|
),
|
|
(
|
|
"verladung_welzow_day",
|
|
"Verladung Welzow-Süd pro Tag",
|
|
welzow_sources,
|
|
welzow_targets,
|
|
"day",
|
|
"Welzow-Süd",
|
|
"pro Tag",
|
|
),
|
|
]:
|
|
usage_df = _aggregate_usage(sources=sources, targets=targets, granularity=granularity)
|
|
if usage_df.empty:
|
|
continue
|
|
cap = _cap_lookup(cap_label, cap_period)
|
|
verladung_series.append(
|
|
_series_from_dataframe(
|
|
series_id=series_id,
|
|
label=label,
|
|
df_usage=usage_df,
|
|
granularity=granularity,
|
|
limit_value=cap,
|
|
)
|
|
)
|
|
|
|
avail_file = processed_dir / "Verfuegbarkeiten.parquet"
|
|
if avail_file.exists():
|
|
avail = pd.read_parquet(avail_file)
|
|
shift_map = {"S1": "F", "S2": "S", "S3": "N"}
|
|
|
|
for scope, label, cols, sources in [
|
|
(
|
|
"welzow",
|
|
"Verfügbarkeit Welzow-Süd (pro Schicht)",
|
|
["Welzow_Sued_S1_t", "Welzow_Sued_S2_t", "Welzow_Sued_S3_t"],
|
|
{"Welzow"},
|
|
),
|
|
(
|
|
"rw_no",
|
|
"Verfügbarkeit Boxberg RW+NO (pro Schicht)",
|
|
["Boxberg_NO_RW_S1_t", "Boxberg_NO_RW_S2_t", "Boxberg_NO_RW_S3_t"],
|
|
{"Reichwalde", "Nochten"},
|
|
),
|
|
]:
|
|
limit_map: dict[tuple, float] = {}
|
|
for _, row in avail.iterrows():
|
|
date = pd.to_datetime(row["datum"], errors="coerce")
|
|
if pd.isna(date):
|
|
continue
|
|
date = date.normalize()
|
|
for col in cols:
|
|
val = row.get(col)
|
|
if val is None or (isinstance(val, float) and math.isnan(val)):
|
|
continue
|
|
shift = shift_map[col.split("_")[-2]]
|
|
limit_map[(date, shift)] = float(val)
|
|
|
|
if not limit_map:
|
|
continue
|
|
|
|
usage_df = _aggregate_usage(sources=sources, granularity="shift")
|
|
if usage_df.empty:
|
|
continue
|
|
verladung_series.append(
|
|
_series_from_dataframe(
|
|
series_id=f"availability_{scope}_shift",
|
|
label=label,
|
|
df_usage=usage_df,
|
|
granularity="shift",
|
|
limit_map=limit_map,
|
|
)
|
|
)
|
|
|
|
if verladung_series:
|
|
groups.append({"key": "verladung", "label": "Verladungskapazitäten", "series": verladung_series})
|
|
|
|
# Zugdurchlasskapazitaeten (per shift)
|
|
zug_file = processed_dir / "zugdurchlass.parquet"
|
|
zug_series: list[dict[str, object]] = []
|
|
if zug_file.exists():
|
|
zug = pd.read_parquet(zug_file).copy()
|
|
|
|
def _zug_cap(start_exact: str, ziel_exact: str):
|
|
s = zug["start"].astype(str).str.strip()
|
|
z = zug["ziel"].astype(str).str.strip()
|
|
row = zug[(s == start_exact) & (z == ziel_exact)]
|
|
if row.empty:
|
|
return None
|
|
return float(row.iloc[0]["maximal"])
|
|
|
|
def _add_zug_series(series_id: str, label: str, limit: float | None, terms: list[tuple[str, str]]):
|
|
if limit is None:
|
|
return
|
|
mask = None
|
|
for source, target in terms:
|
|
part = (flows["source"] == source) & (flows["target"] == target)
|
|
mask = part if mask is None else (mask | part)
|
|
if mask is None:
|
|
return
|
|
df = flows[mask].copy()
|
|
if df.empty:
|
|
return
|
|
usage_df = df.groupby(["date", "shift"], as_index=False)["amount_tonnes"].sum()
|
|
usage_df["_shift_sort"] = pd.Categorical(
|
|
usage_df["shift"], categories=["F", "S", "N"], ordered=True
|
|
)
|
|
usage_df = usage_df.sort_values(["date", "_shift_sort"])
|
|
usage_df["key"] = list(zip(usage_df["date"], usage_df["shift"]))
|
|
usage_df = usage_df.rename(columns={"amount_tonnes": "usage_tonnes"})[["key", "usage_tonnes"]]
|
|
zug_series.append(
|
|
_series_from_dataframe(
|
|
series_id=series_id,
|
|
label=label,
|
|
df_usage=usage_df,
|
|
granularity="shift",
|
|
limit_value=limit,
|
|
)
|
|
)
|
|
|
|
_add_zug_series(
|
|
"zug_rwno_to_j",
|
|
"Zugdurchlass KLP: RW+NO -> KW Jänschwalde",
|
|
_zug_cap("Verladung Boxberg (KLP)", "KW Jänschwalde"),
|
|
[("Reichwalde", "J"), ("Nochten", "J")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_to_sp",
|
|
"Zugdurchlass KLP: RW+NO -> KW Schwarze Pumpe",
|
|
_zug_cap("Verladung Boxberg (KLP)", "KW Schwarze Pumpe"),
|
|
[("Reichwalde", "SP"), ("Nochten", "SP")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_to_v",
|
|
"Zugdurchlass KLP: RW+NO -> Veredlung",
|
|
_zug_cap("Verladung Boxberg (KLP)", "Veredlung ISP"),
|
|
[("Reichwalde", "V"), ("Nochten", "V")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_to_b3",
|
|
"Zugdurchlass KLP: RW+NO -> KW Boxberg Werk 3",
|
|
_zug_cap("Verladung Boxberg (KLP)", "KW Boxberg Werk 3"),
|
|
[("Reichwalde", "B3"), ("Nochten", "B3")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_j",
|
|
"Zugdurchlass KUP: Welzow -> KW Jänschwalde",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Jänschwalde"),
|
|
[("Welzow", "J")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_sp",
|
|
"Zugdurchlass KUP: Welzow -> KW Schwarze Pumpe",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Schwarze Pumpe"),
|
|
[("Welzow", "SP")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_v",
|
|
"Zugdurchlass KUP: Welzow -> Veredlung",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "Veredlung ISP"),
|
|
[("Welzow", "V")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_b3",
|
|
"Zugdurchlass KUP: Welzow -> KW Boxberg Werk 3",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "KW Boxberg Werk 3"),
|
|
[("Welzow", "B3")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_to_sp_v",
|
|
"Zugdurchlass KLP kombiniert: RW+NO -> SP + Veredlung",
|
|
_zug_cap("Verladung Boxberg (KLP)", "KW SP + V ISP"),
|
|
[("Reichwalde", "SP"), ("Nochten", "SP"), ("Reichwalde", "V"), ("Nochten", "V")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_to_all",
|
|
"Zugdurchlass KLP kombiniert: RW+NO -> JW + SP + V + B3",
|
|
_zug_cap("Verladung Boxberg (KLP)", "KW JW + KW SP + V ISP + KW B3"),
|
|
[
|
|
("Reichwalde", "J"),
|
|
("Reichwalde", "SP"),
|
|
("Reichwalde", "V"),
|
|
("Reichwalde", "B3"),
|
|
("Nochten", "J"),
|
|
("Nochten", "SP"),
|
|
("Nochten", "V"),
|
|
("Nochten", "B3"),
|
|
],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_sp_v",
|
|
"Zugdurchlass KUP kombiniert: Welzow -> SP + Veredlung",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP"),
|
|
[("Welzow", "SP"), ("Welzow", "V")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_w_to_sp_v_b3",
|
|
"Zugdurchlass KUP kombiniert: Welzow -> SP + V + B3",
|
|
_zug_cap("Verladung Welzow-Süd (KUP)", "KW SP + V ISP + KW B3"),
|
|
[("Welzow", "SP"), ("Welzow", "V"), ("Welzow", "B3")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_all_to_j",
|
|
"Zugdurchlass KUP+KLP: gesamt -> KW Jänschwalde",
|
|
_zug_cap("KUP + KLP", "KW Jänschwalde"),
|
|
[("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "J")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_all_to_b3",
|
|
"Zugdurchlass KUP+KLP: gesamt -> KW Boxberg Werk 3",
|
|
_zug_cap("KUP + KLP", "KW Boxberg Werk 3"),
|
|
[("Reichwalde", "B3"), ("Nochten", "B3"), ("Welzow", "B3")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_j_and_w_b3",
|
|
"Zugdurchlass kombiniert: KLP->JW + KUP->B3",
|
|
_zug_cap("KLP zum KW JW", "KUP zum KW B3"),
|
|
[("Reichwalde", "J"), ("Nochten", "J"), ("Welzow", "B3")],
|
|
)
|
|
_add_zug_series(
|
|
"zug_rwno_jspv_and_w_b3",
|
|
"Zugdurchlass kombiniert: KLP->JW+SP+V + KUP->B3",
|
|
_zug_cap("KLP zum KW JW + KW SP + V ISP", "KUP zum KW B3"),
|
|
[
|
|
("Reichwalde", "J"),
|
|
("Nochten", "J"),
|
|
("Reichwalde", "SP"),
|
|
("Nochten", "SP"),
|
|
("Reichwalde", "V"),
|
|
("Nochten", "V"),
|
|
("Welzow", "B3"),
|
|
],
|
|
)
|
|
|
|
kvb_file = processed_dir / "zugdurchlass_kvb_nord.parquet"
|
|
if kvb_file.exists():
|
|
kvb = pd.read_parquet(kvb_file)
|
|
limit_map: dict[tuple, float] = {}
|
|
shift_cols = [("KVB_Nord_S1_t", "F"), ("KVB_Nord_S2_t", "S"), ("KVB_Nord_S3_t", "N")]
|
|
for _, row in kvb.iterrows():
|
|
date = pd.to_datetime(row["datum"], errors="coerce")
|
|
if pd.isna(date):
|
|
continue
|
|
date = date.normalize()
|
|
for col, shift in shift_cols:
|
|
val = row.get(col)
|
|
if val is None or pd.isna(val):
|
|
continue
|
|
limit_map[(date, shift)] = float(val)
|
|
|
|
usage_df = _aggregate_usage(targets={"J"}, granularity="shift")
|
|
if limit_map and not usage_df.empty:
|
|
zug_series.append(
|
|
_series_from_dataframe(
|
|
series_id="zug_kvb_nord_dynamic_j",
|
|
label="Zugdurchlass KVB Nord (dynamisch) -> KW Jänschwalde",
|
|
df_usage=usage_df,
|
|
granularity="shift",
|
|
limit_map=limit_map,
|
|
)
|
|
)
|
|
|
|
if zug_series:
|
|
groups.append({"key": "zugdurchlass", "label": "Zugdurchlasskapazitäten", "series": zug_series})
|
|
|
|
return {"groups": groups}
|
|
|
|
|
|
@app.get("/api/health")
|
|
def health() -> dict[str, object]:
|
|
return {"status": "ok", "solvers": _get_solver_availability()}
|
|
|
|
|
|
@app.post("/api/run")
|
|
async def run(
|
|
file: UploadFile = File(...),
|
|
solver: str = Form("highs"),
|
|
step_size_tonnes: int = Form(1000),
|
|
mip_gap_pct: float = Form(5.0),
|
|
max_runtime_minutes: float = Form(10.0),
|
|
) -> dict[str, str]:
|
|
solver = solver.lower().strip()
|
|
if solver not in {"highs", "gurobi", "scip"}:
|
|
raise HTTPException(status_code=400, detail="Unsupported solver")
|
|
availability = _get_solver_availability()
|
|
if not availability.get(solver, False):
|
|
raise HTTPException(status_code=400, detail=f"Solver not available: {solver}")
|
|
if step_size_tonnes not in {960, 1000}:
|
|
raise HTTPException(status_code=400, detail="Unsupported step size")
|
|
if not (0 <= float(mip_gap_pct) <= 100):
|
|
raise HTTPException(status_code=400, detail="mip_gap_pct must be between 0 and 100")
|
|
if not (0 < float(max_runtime_minutes) <= 24 * 60):
|
|
raise HTTPException(status_code=400, detail="max_runtime_minutes must be between 0 and 1440")
|
|
mip_gap = float(mip_gap_pct) / 100.0
|
|
time_limit_seconds = max(1, int(round(float(max_runtime_minutes) * 60)))
|
|
job_id = uuid.uuid4().hex
|
|
job_dir = JOBS_DIR / job_id
|
|
input_dir = job_dir / "input"
|
|
processed_dir = job_dir / "processed"
|
|
output_dir = job_dir / "output"
|
|
|
|
input_dir.mkdir(parents=True, exist_ok=True)
|
|
processed_dir.mkdir(parents=True, exist_ok=True)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
input_path = input_dir / "PoC1_Rohkohleverteilung_Input_Parameter.xlsx"
|
|
with input_path.open("wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
cancel_event = threading.Event()
|
|
with ACTIVE_JOBS_LOCK:
|
|
ACTIVE_JOBS[job_id] = {"cancel_event": cancel_event, "process": None}
|
|
|
|
_write_status(
|
|
job_dir,
|
|
"queued",
|
|
{
|
|
"solver": solver,
|
|
"step_size_tonnes": str(step_size_tonnes),
|
|
"mip_gap_pct": str(float(mip_gap_pct)),
|
|
"max_runtime_minutes": str(float(max_runtime_minutes)),
|
|
},
|
|
)
|
|
thread = threading.Thread(
|
|
target=_run_job,
|
|
args=(
|
|
job_dir,
|
|
input_path,
|
|
processed_dir,
|
|
output_dir,
|
|
solver,
|
|
step_size_tonnes,
|
|
mip_gap,
|
|
time_limit_seconds,
|
|
cancel_event,
|
|
),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"download_url": f"/api/jobs/{job_id}/output",
|
|
"solver": solver,
|
|
"step_size_tonnes": str(step_size_tonnes),
|
|
"mip_gap_pct": str(float(mip_gap_pct)),
|
|
"max_runtime_minutes": str(float(max_runtime_minutes)),
|
|
}
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}")
|
|
def job_status(job_id: str) -> dict[str, str]:
|
|
job_dir = JOBS_DIR / job_id
|
|
status_path = job_dir / "job.json"
|
|
if not status_path.exists():
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
return json.loads(status_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
@app.post("/api/jobs/{job_id}/cancel")
|
|
def cancel_job(job_id: str) -> dict[str, str]:
|
|
job_dir = JOBS_DIR / job_id
|
|
status_path = job_dir / "job.json"
|
|
if not status_path.exists():
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
payload = json.loads(status_path.read_text(encoding="utf-8"))
|
|
if payload.get("status") in {"completed", "failed", "cancelled"}:
|
|
return {"status": str(payload.get("status"))}
|
|
|
|
with ACTIVE_JOBS_LOCK:
|
|
runtime = ACTIVE_JOBS.get(job_id)
|
|
if runtime is None:
|
|
_write_status(job_dir, "cancelled")
|
|
return {"status": "cancelled"}
|
|
cancel_event = runtime.get("cancel_event")
|
|
process = runtime.get("process")
|
|
if isinstance(cancel_event, threading.Event):
|
|
cancel_event.set()
|
|
if process is not None and getattr(process, "poll", lambda: None)() is None:
|
|
try:
|
|
process.terminate()
|
|
except Exception:
|
|
pass
|
|
|
|
_write_status(job_dir, "cancelling")
|
|
return {"status": "cancelling"}
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/output")
|
|
def job_output(job_id: str) -> FileResponse:
|
|
output_path = JOBS_DIR / job_id / "output" / "output.xlsx"
|
|
if not output_path.exists():
|
|
raise HTTPException(status_code=404, detail="Output not found")
|
|
return FileResponse(output_path, filename="output.xlsx")
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/monthly-flows")
|
|
def job_monthly_flows(job_id: str) -> dict[str, object]:
|
|
output_path = JOBS_DIR / job_id / "output" / "output.xlsx"
|
|
if not output_path.exists():
|
|
raise HTTPException(status_code=404, detail="Output not found")
|
|
return _read_monthly_flow_rows(output_path)
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/capacity-timeseries")
|
|
def job_capacity_timeseries(job_id: str) -> dict[str, object]:
|
|
job_dir = JOBS_DIR / job_id
|
|
if not job_dir.exists():
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
return _read_capacity_timeseries(job_dir)
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/logs/{log_name}")
|
|
def job_log(job_id: str, log_name: str) -> PlainTextResponse:
|
|
if log_name not in {"preprocess", "optimization"}:
|
|
raise HTTPException(status_code=400, detail="Unknown log name")
|
|
log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log"
|
|
if not log_path.exists():
|
|
raise HTTPException(status_code=404, detail="Log not found")
|
|
return PlainTextResponse(log_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/logs/{log_name}/stream")
|
|
def job_log_stream(job_id: str, log_name: str) -> StreamingResponse:
|
|
if log_name not in {"preprocess", "optimization"}:
|
|
raise HTTPException(status_code=400, detail="Unknown log name")
|
|
log_path = JOBS_DIR / job_id / "logs" / f"{log_name}.log"
|
|
if not log_path.exists():
|
|
raise HTTPException(status_code=404, detail="Log not found")
|
|
|
|
def event_stream():
|
|
with log_path.open("r", encoding="utf-8") as log_file:
|
|
for line in log_file:
|
|
yield f"data: {line.rstrip()}\n\n"
|
|
while True:
|
|
line = log_file.readline()
|
|
if line:
|
|
yield f"data: {line.rstrip()}\n\n"
|
|
else:
|
|
time.sleep(0.5)
|
|
|
|
return StreamingResponse(event_stream(), media_type="text/event-stream")
|
|
|
|
|
|
if FRONTEND_DIST.exists():
|
|
assets_dir = FRONTEND_DIST / "assets"
|
|
if assets_dir.exists():
|
|
app.mount("/assets", StaticFiles(directory=assets_dir), name="frontend-assets")
|
|
|
|
@app.get("/", include_in_schema=False)
|
|
def frontend_index() -> FileResponse:
|
|
return FileResponse(FRONTEND_DIST / "index.html")
|
|
|
|
@app.get("/{full_path:path}", include_in_schema=False)
|
|
def frontend_spa(full_path: str) -> FileResponse:
|
|
if full_path.startswith("api/"):
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
target = FRONTEND_DIST / full_path
|
|
if target.exists() and target.is_file():
|
|
return FileResponse(target)
|
|
return FileResponse(FRONTEND_DIST / "index.html")
|