advent-2022-python/advent/days/day16/solution.py
2023-01-20 07:10:24 +01:00

317 lines
10 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from itertools import product
from queue import PriorityQueue
import re
from typing import Iterator, Literal, NamedTuple, Self
day_num = 16
def part1(lines: Iterator[str]) -> int:
system = Network.parse(lines)
return system.under_pressure(30, 1)
def part2(lines: Iterator[str]) -> int:
system = Network.parse(lines)
return system.under_pressure(26, 2)
pattern = re.compile(r"Valve (?P<name>[a-zA-Z]+)[^=]+=(?P<flow_rate>\d+).+valves? (?P<exits>.*)")
class RawValve(NamedTuple):
name: str
flow_rate: int
exits: list[str]
@classmethod
def parse(cls, line: str) -> Self:
result = pattern.match(line)
if not result:
raise Exception("Not a valid valve")
return RawValve(result.group('name'),
int(result.group('flow_rate')),
result.group('exits').split(', '))
@dataclass(slots=True)
class Valve:
name: str
flow_rate: int
exits: list[Valve]
paths: dict[str, int] = field(default_factory=dict, init=False)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Valve):
return False
return self.name == other.name
def __hash__(self) -> int:
return hash(self.name)
def __lt__(self, other: Valve) -> bool:
return self.name < other.name
def __repr__(self) -> str:
return f"{self.name}:{self.flow_rate}->{','.join(v.name for v in self.exits)}"
def travel_time(self, to: str) -> int:
return self.paths[to]
def create_paths(self):
paths: dict[str, tuple[int, bool]] = {}
to_check: list[tuple[Valve, int]] = [(self, 0)]
while to_check:
current, steps = to_check[0]
to_check = to_check[1:]
paths[current.name] = steps, (current.flow_rate > 0)
for next in current.exits:
known_path, _ = paths.get(next.name, (steps + 2, False))
if known_path > steps + 1:
to_check.append((next, steps + 1))
self.paths = {name: steps
for name, (steps, has_valve) in paths.items()
if has_valve is True}
class Actor(NamedTuple):
position: Valve
next_time: int
@property
def finished(self) -> bool:
return self.next_time <= 0
@dataclass(slots=True, frozen=True, kw_only=True)
class SystemProgress(ABC):
time: int
pressure: int
flow_rate: int
closed_valves: list[Valve]
def next_steps_for(self, actor: Actor) -> Iterator[Actor]:
if actor.next_time != self.time:
yield actor
else:
reached_any_target = False
for target in self.closed_valves:
next_time = self.time - (actor.position.travel_time(target.name) + 1)
if next_time > 0:
reached_any_target = True
yield Actor(target, next_time)
if not reached_any_target:
yield Actor(actor.position, 0)
@classmethod
def create(cls, run_time: int,
valves: dict[str, Valve], start: str,
num_actors: Literal[1] | Literal[2]) -> SystemProgress:
closed_valves = list(sorted(valve for valve in valves.values() if valve.flow_rate > 0))
start_valve = valves[start]
for valve in valves.values():
if valve.name == start or valve.flow_rate > 0:
valve.create_paths()
match num_actors:
case 1:
return OneActorProgress(time=run_time,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor=Actor(start_valve, run_time))
case 2:
return TwoActorProgress(time=run_time,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor1=Actor(start_valve, run_time),
actor2=Actor(start_valve, run_time))
case _:
assert False, "Unreachable"
def __lt__(self, other: OneActorProgress) -> bool:
if self.time != other.time:
return self.time > other.time
return self.min_potential_pressure() > other.min_potential_pressure()
@abstractmethod
def open_valves(self) -> Iterator[SystemProgress]:
...
@abstractmethod
def info(self) -> str:
...
@abstractmethod
def min_potential_pressure(self) -> int:
...
@abstractmethod
def still_possible(self) -> int:
...
def max_potential_pressure(self) -> int:
return self.min_potential_pressure() + self.still_possible()
@dataclass(slots=True, frozen=True)
class OneActorProgress(SystemProgress):
actor: Actor
def min_potential_pressure(self) -> int:
return self.pressure + self.flow_rate * self.time
def still_possible(self) -> int:
result = 0
for valve in self.closed_valves:
time = self.actor.position.travel_time(valve.name) + 1
if self.time > time:
result += (self.time - time) * valve.flow_rate
return result
def open_valves(self) -> Iterator[SystemProgress]:
for actor in self.next_steps_for(self.actor):
closed_valves = self.closed_valves.copy()
if not actor.finished:
closed_valves.remove(actor.position)
flow_rate = self.flow_rate + actor.position.flow_rate
else:
flow_rate = self.flow_rate
yield OneActorProgress(
time=actor.next_time,
flow_rate=flow_rate,
pressure=self.pressure + self.flow_rate * (self.time - actor.next_time),
closed_valves=closed_valves,
actor=actor,
)
def info(self) -> str:
return ",".join(valve.name for valve in self.closed_valves)
@dataclass(slots=True, frozen=True)
class TwoActorProgress(SystemProgress):
actor1: Actor
actor2: Actor
def min_potential_pressure(self) -> int:
pressure = self.pressure + self.flow_rate * self.time
if self.actor1.next_time != self.time:
pressure += self.actor1.position.flow_rate * self.actor1.next_time
if self.actor2.next_time != self.time:
pressure += self.actor2.position.flow_rate * self.actor2.next_time
return pressure
def still_possible(self) -> int:
result = 0
for valve in self.closed_valves:
t1 = self.actor1.position.travel_time(valve.name)
t2 = self.actor2.position.travel_time(valve.name)
time = min(t1, t2) + 1
if self.time > time:
result += (self.time - time) * valve.flow_rate
return result
def open_valves(self) -> Iterator[SystemProgress]:
actor1_actions = self.next_steps_for(self.actor1)
actor2_actions = self.next_steps_for(self.actor2)
for actor1, actor2 in product(actor1_actions, actor2_actions):
if actor1.position == actor2.position:
continue
closed_valves = self.closed_valves.copy()
next_time = max(actor1.next_time, actor2.next_time, 0)
flow_rate = self.flow_rate
if next_time > 0:
if actor1.position in closed_valves:
closed_valves.remove(actor1.position)
if actor1.next_time == next_time:
flow_rate += actor1.position.flow_rate
if actor2.position in closed_valves:
closed_valves.remove(actor2.position)
if actor2.next_time == next_time:
flow_rate += actor2.position.flow_rate
next = TwoActorProgress(
time=next_time,
flow_rate=flow_rate,
pressure=self.pressure + self.flow_rate * (self.time - next_time),
closed_valves=closed_valves,
actor1=actor1,
actor2=actor2,
)
yield next
def info(self) -> str:
opening = ""
if self.actor1.next_time != self.time:
opening = self.actor1.position.name
if self.actor2.next_time != self.time:
opening = self.actor2.position.name
closed = ",".join(valve.name for valve in self.closed_valves)
if opening:
return f"{closed}+{opening}"
else:
return closed
@dataclass(slots=True)
class Network:
valves: dict[str, Valve]
paths: dict[tuple[str, str], list[str]] = field(default_factory=dict)
@classmethod
def parse(cls, lines: Iterator[str]) -> Self:
raw_system = [RawValve.parse(line) for line in lines]
valves = {valve.name: Valve(valve.name, valve.flow_rate, []) for valve in raw_system}
for raw in raw_system:
current = valves[raw.name]
for follow in raw.exits:
current.exits.append(valves[follow])
return Network(valves)
def under_pressure(self, minutes: int, number_actors: Literal[1] | Literal[2]) -> int:
queue: PriorityQueue[SystemProgress] = PriorityQueue()
queue.put(SystemProgress.create(
run_time=minutes,
valves=self.valves,
start="AA",
num_actors=number_actors
))
min_pressure = 0
known: dict[str, int] = {}
while not queue.empty():
current = queue.get()
if current.time == 0:
return current.pressure
info = current.info()
prev_pressure = known.get(info)
if prev_pressure is not None and prev_pressure >= current.pressure:
continue
known[info] = current.pressure
if min_pressure > current.max_potential_pressure():
continue
min_pressure = max(min_pressure, current.min_potential_pressure())
min_pressure = min_pressure
for next in current.open_valves():
queue.put(next)
raise Exception("No best System found")