318 lines
10 KiB
Python
318 lines
10 KiB
Python
from __future__ import annotations
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from itertools import product
|
|
from queue import PriorityQueue
|
|
|
|
from typing import Iterator, Literal, NamedTuple, Self
|
|
from advent.parser.parser import P
|
|
|
|
day_num = 16
|
|
|
|
|
|
def part1(lines: Iterator[str]) -> int:
|
|
system = Network.parse(lines)
|
|
return system.under_pressure(30, 1)
|
|
|
|
|
|
def part2(lines: Iterator[str]) -> int:
|
|
system = Network.parse(lines)
|
|
return system.under_pressure(26, 2)
|
|
|
|
|
|
valve_parser = P.map3(
|
|
P.second(P.string("Valve "), P.upper().word()),
|
|
P.second(P.string(" has flow rate="), P.unsigned()),
|
|
P.second(P.either(P.string("; tunnels lead to valves "), P.string("; tunnel leads to valve ")),
|
|
P.upper().word().sep_by(P.string(", "))),
|
|
lambda name, flow_rate, following: RawValve(name, flow_rate, following)
|
|
)
|
|
|
|
|
|
class RawValve(NamedTuple):
|
|
name: str
|
|
flow_rate: int
|
|
following: list[str]
|
|
|
|
@classmethod
|
|
def parse(cls, line: str) -> Self:
|
|
return valve_parser.parse(line).get()
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class Valve:
|
|
name: str
|
|
flow_rate: int
|
|
following: list[Valve]
|
|
paths: dict[str, int] = field(default_factory=dict, init=False)
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if not isinstance(other, Valve):
|
|
return False
|
|
return self.name == other.name
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.name)
|
|
|
|
def __lt__(self, other: Valve) -> bool:
|
|
return self.name < other.name
|
|
|
|
def __repr__(self) -> str:
|
|
return f"{self.name}:{self.flow_rate}->{','.join(v.name for v in self.following)}"
|
|
|
|
def travel_time(self, to: str) -> int:
|
|
return self.paths[to]
|
|
|
|
def create_paths(self):
|
|
paths: dict[str, tuple[int, bool]] = {}
|
|
to_check: list[tuple[Valve, int]] = [(self, 0)]
|
|
while to_check:
|
|
current, steps = to_check[0]
|
|
to_check = to_check[1:]
|
|
|
|
paths[current.name] = steps, (current.flow_rate > 0)
|
|
for next in current.following:
|
|
known_path, _ = paths.get(next.name, (steps + 2, False))
|
|
if known_path > steps + 1:
|
|
to_check.append((next, steps + 1))
|
|
|
|
self.paths = {name: steps
|
|
for name, (steps, has_valve) in paths.items()
|
|
if has_valve is True}
|
|
|
|
|
|
class Actor(NamedTuple):
|
|
position: Valve
|
|
next_time: int
|
|
|
|
@property
|
|
def finished(self) -> bool:
|
|
return self.next_time <= 0
|
|
|
|
|
|
@dataclass(slots=True, frozen=True, kw_only=True)
|
|
class SystemProgress(ABC):
|
|
time: int
|
|
pressure: int
|
|
flow_rate: int
|
|
closed_valves: list[Valve]
|
|
|
|
def next_steps_for(self, actor: Actor) -> Iterator[Actor]:
|
|
if actor.next_time != self.time:
|
|
yield actor
|
|
|
|
else:
|
|
reached_any_target = False
|
|
for target in self.closed_valves:
|
|
next_time = self.time - (actor.position.travel_time(target.name) + 1)
|
|
if next_time > 0:
|
|
reached_any_target = True
|
|
yield Actor(target, next_time)
|
|
|
|
if not reached_any_target:
|
|
yield Actor(actor.position, 0)
|
|
|
|
@classmethod
|
|
def create(cls, run_time: int,
|
|
valves: dict[str, Valve], start: str,
|
|
num_actors: Literal[1] | Literal[2]) -> SystemProgress:
|
|
closed_valves = list(sorted(valve for valve in valves.values() if valve.flow_rate > 0))
|
|
start_valve = valves[start]
|
|
for valve in valves.values():
|
|
if valve.name == start or valve.flow_rate > 0:
|
|
valve.create_paths()
|
|
|
|
match num_actors:
|
|
case 1:
|
|
return OneActorProgress(time=run_time,
|
|
pressure=0,
|
|
flow_rate=0,
|
|
closed_valves=closed_valves,
|
|
actor=Actor(start_valve, run_time))
|
|
case 2:
|
|
return TwoActorProgress(time=run_time,
|
|
pressure=0,
|
|
flow_rate=0,
|
|
closed_valves=closed_valves,
|
|
actor1=Actor(start_valve, run_time),
|
|
actor2=Actor(start_valve, run_time))
|
|
case _:
|
|
assert False, "Unreachable"
|
|
|
|
def __lt__(self, other: OneActorProgress) -> bool:
|
|
if self.time != other.time:
|
|
return self.time > other.time
|
|
return self.min_potential_pressure() > other.min_potential_pressure()
|
|
|
|
@abstractmethod
|
|
def open_valves(self) -> Iterator[SystemProgress]:
|
|
...
|
|
|
|
@abstractmethod
|
|
def info(self) -> str:
|
|
...
|
|
|
|
@abstractmethod
|
|
def min_potential_pressure(self) -> int:
|
|
...
|
|
|
|
@abstractmethod
|
|
def still_possible(self) -> int:
|
|
...
|
|
|
|
def max_potential_pressure(self) -> int:
|
|
return self.min_potential_pressure() + self.still_possible()
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class OneActorProgress(SystemProgress):
|
|
actor: Actor
|
|
|
|
def min_potential_pressure(self) -> int:
|
|
return self.pressure + self.flow_rate * self.time
|
|
|
|
def still_possible(self) -> int:
|
|
result = 0
|
|
for valve in self.closed_valves:
|
|
time = self.actor.position.travel_time(valve.name) + 1
|
|
if self.time > time:
|
|
result += (self.time - time) * valve.flow_rate
|
|
return result
|
|
|
|
def open_valves(self) -> Iterator[SystemProgress]:
|
|
for actor in self.next_steps_for(self.actor):
|
|
closed_valves = self.closed_valves.copy()
|
|
if not actor.finished:
|
|
closed_valves.remove(actor.position)
|
|
flow_rate = self.flow_rate + actor.position.flow_rate
|
|
else:
|
|
flow_rate = self.flow_rate
|
|
|
|
yield OneActorProgress(
|
|
time=actor.next_time,
|
|
flow_rate=flow_rate,
|
|
pressure=self.pressure + self.flow_rate * (self.time - actor.next_time),
|
|
closed_valves=closed_valves,
|
|
actor=actor,
|
|
)
|
|
|
|
def info(self) -> str:
|
|
return ",".join(valve.name for valve in self.closed_valves)
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class TwoActorProgress(SystemProgress):
|
|
actor1: Actor
|
|
actor2: Actor
|
|
|
|
def min_potential_pressure(self) -> int:
|
|
pressure = self.pressure + self.flow_rate * self.time
|
|
if self.actor1.next_time != self.time:
|
|
pressure += self.actor1.position.flow_rate * self.actor1.next_time
|
|
if self.actor2.next_time != self.time:
|
|
pressure += self.actor2.position.flow_rate * self.actor2.next_time
|
|
return pressure
|
|
|
|
def still_possible(self) -> int:
|
|
result = 0
|
|
for valve in self.closed_valves:
|
|
t1 = self.actor1.position.travel_time(valve.name)
|
|
t2 = self.actor2.position.travel_time(valve.name)
|
|
time = min(t1, t2) + 1
|
|
if self.time > time:
|
|
result += (self.time - time) * valve.flow_rate
|
|
return result
|
|
|
|
def open_valves(self) -> Iterator[SystemProgress]:
|
|
actor1_actions = self.next_steps_for(self.actor1)
|
|
actor2_actions = self.next_steps_for(self.actor2)
|
|
|
|
for actor1, actor2 in product(actor1_actions, actor2_actions):
|
|
if actor1.position == actor2.position:
|
|
continue
|
|
|
|
closed_valves = self.closed_valves.copy()
|
|
next_time = max(actor1.next_time, actor2.next_time, 0)
|
|
|
|
flow_rate = self.flow_rate
|
|
if next_time > 0:
|
|
if actor1.position in closed_valves:
|
|
closed_valves.remove(actor1.position)
|
|
if actor1.next_time == next_time:
|
|
flow_rate += actor1.position.flow_rate
|
|
|
|
if actor2.position in closed_valves:
|
|
closed_valves.remove(actor2.position)
|
|
if actor2.next_time == next_time:
|
|
flow_rate += actor2.position.flow_rate
|
|
|
|
next = TwoActorProgress(
|
|
time=next_time,
|
|
flow_rate=flow_rate,
|
|
pressure=self.pressure + self.flow_rate * (self.time - next_time),
|
|
closed_valves=closed_valves,
|
|
actor1=actor1,
|
|
actor2=actor2,
|
|
)
|
|
yield next
|
|
|
|
def info(self) -> str:
|
|
opening = ""
|
|
if self.actor1.next_time != self.time:
|
|
opening = self.actor1.position.name
|
|
if self.actor2.next_time != self.time:
|
|
opening = self.actor2.position.name
|
|
|
|
closed = ",".join(valve.name for valve in self.closed_valves)
|
|
if opening:
|
|
return f"{closed}+{opening}"
|
|
else:
|
|
return closed
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class Network:
|
|
valves: dict[str, Valve]
|
|
paths: dict[tuple[str, str], list[str]] = field(default_factory=dict)
|
|
|
|
@classmethod
|
|
def parse(cls, lines: Iterator[str]) -> Self:
|
|
raw_system = [RawValve.parse(line) for line in lines]
|
|
valves = {valve.name: Valve(valve.name, valve.flow_rate, []) for valve in raw_system}
|
|
for raw in raw_system:
|
|
current = valves[raw.name]
|
|
for follow in raw.following:
|
|
current.following.append(valves[follow])
|
|
|
|
return Network(valves)
|
|
|
|
def under_pressure(self, minutes: int, number_actors: Literal[1] | Literal[2]) -> int:
|
|
queue: PriorityQueue[SystemProgress] = PriorityQueue()
|
|
queue.put(SystemProgress.create(
|
|
run_time=minutes,
|
|
valves=self.valves,
|
|
start="AA",
|
|
num_actors=number_actors
|
|
))
|
|
min_pressure = 0
|
|
known: dict[str, int] = {}
|
|
while not queue.empty():
|
|
current = queue.get()
|
|
if current.time == 0:
|
|
return current.pressure
|
|
|
|
info = current.info()
|
|
prev_pressure = known.get(info)
|
|
if prev_pressure is not None and prev_pressure >= current.pressure:
|
|
continue
|
|
known[info] = current.pressure
|
|
|
|
if min_pressure > current.max_potential_pressure():
|
|
continue
|
|
min_pressure = max(min_pressure, current.min_potential_pressure())
|
|
min_pressure = min_pressure
|
|
|
|
for next in current.open_valves():
|
|
queue.put(next)
|
|
|
|
raise Exception("No best System found")
|