speed up day 16

This commit is contained in:
Ruediger Ludwig 2023-01-02 07:32:35 +01:00
parent cf05072def
commit 923e967056

View file

@ -4,7 +4,7 @@ from dataclasses import dataclass, field
from itertools import product
from queue import PriorityQueue
from typing import Iterator, Literal, Self
from typing import Iterator, Literal, NamedTuple, Self
from advent.parser.parser import P
day_num = 16
@ -29,16 +29,14 @@ valve_parser = P.map3(
)
@dataclass(slots=True)
class RawValve:
class RawValve(NamedTuple):
name: str
flow_rate: int
following: list[str]
@classmethod
def parse(cls, line: str) -> Self:
result = valve_parser.parse(line).get()
return result
return valve_parser.parse(line).get()
@dataclass(slots=True, unsafe_hash=True)
@ -46,38 +44,48 @@ class Valve:
name: str
flow_rate: int
following: list[Valve] = field(hash=False, compare=False)
paths: dict[str, int] | None = field(default=None, hash=False, init=False, compare=False)
paths: dict[str, int] = field(default_factory=dict, hash=False, init=False, compare=False)
def __repr__(self) -> str:
return f"{self.name}:{self.flow_rate}->{','.join(v.name for v in self.following)}"
def travel_time(self, to: str) -> int:
if self.paths is None:
if not self.paths:
self.create_paths()
return self.paths[to] # type: ignore
return self.paths[to]
def create_paths(self):
paths: dict[str, int] = {}
paths: dict[str, tuple[int, bool]] = {}
to_check: list[tuple[Valve, int]] = [(self, 0)]
while to_check:
current, steps = to_check[0]
to_check = to_check[1:]
paths[current.name] = steps
paths[current.name] = steps, (current.flow_rate > 0)
for next in current.following:
if paths.get(next.name, steps + 2) > steps + 1:
known_path, _ = paths.get(next.name, (steps + 2, False))
if known_path > steps + 1:
to_check.append((next, steps + 1))
self.paths = paths
self.paths = {name: steps
for name, (steps, has_valve) in paths.items()
if has_valve is True}
@dataclass(slots=True, frozen=True)
class Actor:
class Actor(NamedTuple):
position: Valve
next_time: int
finished: bool
@dataclass(slots=True, frozen=True)
class SystemInfo(NamedTuple):
max_pressure: int
min_pressure: int
closed_vales: frozenset[Valve]
opening: frozenset[Valve]
@dataclass(slots=True, frozen=True, kw_only=True)
class SystemProgress(ABC):
max_time: int
prev_time: int
@ -85,7 +93,6 @@ class SystemProgress(ABC):
pressure: int
flow_rate: int
closed_valves: frozenset[Valve]
path: frozenset[tuple[str, int, bool]]
def one_actor(self, actor: Actor) -> Iterator[Actor]:
if actor.finished or actor.next_time != self.time:
@ -103,16 +110,27 @@ class SystemProgress(ABC):
yield Actor(actor.position, self.max_time, True)
@classmethod
def create(cls, max_time: int, pressure: int, flow_rate: int,
def create(cls, max_time: int,
closed_valves: frozenset[Valve], start: Valve,
num_actors: Literal[1] | Literal[2]) -> SystemProgress:
match num_actors:
case 1:
return OneActorProgress(max_time, 0, 0, pressure, flow_rate, closed_valves,
frozenset(), Actor(start, 0, False))
return OneActorProgress(max_time=max_time,
prev_time=0,
time=0,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor=Actor(start, 0, False))
case 2:
return TwoActorProgress(max_time, 0, 0, pressure, flow_rate, closed_valves,
frozenset(), Actor(start, 0, False), Actor(start, 0, False))
return TwoActorProgress(max_time=max_time,
prev_time=0,
time=0,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor1=Actor(start, 0, False),
actor2=Actor(start, 0, False))
case _:
assert False, "Unreachable"
@ -122,11 +140,11 @@ class SystemProgress(ABC):
return self.pressure > other.pressure
@abstractmethod
def max_possible_pressure(self) -> int:
def open_valves(self) -> Iterator[SystemProgress]:
...
@abstractmethod
def open_valves(self) -> Iterator[SystemProgress]:
def get_info(self) -> SystemInfo:
...
@ -134,6 +152,17 @@ class SystemProgress(ABC):
class OneActorProgress(SystemProgress):
actor: Actor
def get_info(self) -> SystemInfo:
return SystemInfo(
min_pressure=self.min_possible_pressure(),
max_pressure=self.max_possible_pressure(),
closed_vales=self.closed_valves,
opening=frozenset()
)
def min_possible_pressure(self) -> int:
return self.pressure + self.flow_rate * (self.max_time - self.time)
def max_possible_pressure(self) -> int:
closed = sum(valve.flow_rate for valve in self.closed_valves)
return self.pressure + (self.flow_rate + closed) * (self.max_time - self.time)
@ -154,7 +183,6 @@ class OneActorProgress(SystemProgress):
pressure=self.pressure + self.flow_rate * (actor.next_time - self.time),
closed_valves=closed_valves,
actor=actor,
path=self.path | {(actor.position.name, actor.next_time, True)}
)
yield next
@ -164,27 +192,41 @@ class TwoActorProgress(SystemProgress):
actor1: Actor
actor2: Actor
def get_info(self) -> SystemInfo:
opening: set[Valve] = set()
if self.actor1.next_time != self.time and not self.actor1.finished:
opening.add(self.actor1.position)
if self.actor2.next_time != self.time and not self.actor2.finished:
opening.add(self.actor2.position)
return SystemInfo(
min_pressure=self.min_possible_pressure(),
max_pressure=self.max_possible_pressure(),
closed_vales=self.closed_valves,
opening=frozenset(opening)
)
def min_possible_pressure(self) -> int:
pressure = self.pressure + self.flow_rate * (self.max_time - self.time)
if self.actor1.next_time != self.time and not self.actor1.finished:
pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time)
if self.actor2.next_time != self.time and not self.actor2.finished:
pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time)
return pressure
def max_possible_pressure(self) -> int:
closed = sum(valve.flow_rate for valve in self.closed_valves)
pressure = self.pressure + (self.flow_rate + closed) * (self.max_time - self.time)
other = 0
if self.actor1.next_time != self.time and not self.actor1.finished:
other += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time)
pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time)
if self.actor2.next_time != self.time and not self.actor2.finished:
other += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time)
return self.pressure + (self.flow_rate + closed) * (self.max_time - self.time) + other
pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time)
return pressure
def open_valves(self) -> Iterator[SystemProgress]:
if self.actor1.next_time == self.time:
actor1_actions = self.one_actor(self.actor1)
else:
actor1_actions = [self.actor1]
if self.actor2.next_time == self.time:
actor2_actions = self.one_actor(self.actor2)
else:
actor2_actions = [self.actor2]
actor1_actions = self.one_actor(self.actor1)
actor2_actions = self.one_actor(self.actor2)
for actor1, actor2 in product(actor1_actions, actor2_actions):
if not actor1.finished and not actor2.finished and actor1.position == actor2.position:
@ -194,25 +236,15 @@ class TwoActorProgress(SystemProgress):
flow_rate = self.flow_rate
next_time = min(actor1.next_time, actor2.next_time)
path: set[tuple[str, int, bool]] = set(self.path)
if not actor1.finished:
closed_valves = closed_valves.difference({actor1.position})
if actor1.next_time == next_time:
flow_rate += actor1.position.flow_rate
path -= {(actor2.position.name, actor1.next_time, False)}
path.add((actor1.position.name, actor1.next_time, True))
else:
path.add((actor1.position.name, actor1.next_time, False))
if not actor2.finished:
closed_valves = closed_valves.difference({actor2.position})
if actor2.next_time == next_time:
flow_rate += actor2.position.flow_rate
path -= {(actor2.position.name, actor1.next_time, False)}
path.add((actor2.position.name, actor2.next_time, True))
else:
path.add((actor2.position.name, actor2.next_time, False))
next = TwoActorProgress(
max_time=self.max_time,
@ -223,7 +255,6 @@ class TwoActorProgress(SystemProgress):
closed_valves=closed_valves,
actor1=actor1,
actor2=actor2,
path=frozenset(path)
)
yield next
@ -250,24 +281,21 @@ class Network:
queue: PriorityQueue[SystemProgress] = PriorityQueue()
queue.put(SystemProgress.create(
max_time=minutes,
pressure=0,
flow_rate=0,
closed_valves=frozenset(closed_valves),
start=start,
num_actors=number_actors
))
max_pressure = 0
seen: set[frozenset[tuple[str, int, bool]]] = set()
min_pressure = 0
known_systems: set[SystemInfo] = set()
while not queue.empty():
current = queue.get()
if current.time == minutes:
return current.pressure
if max_pressure > current.max_possible_pressure():
info = current.get_info()
if min_pressure > info.max_pressure or info in known_systems:
continue
if current.path in seen:
continue
seen.add(current.path)
max_pressure = max(max_pressure, current.pressure)
known_systems.add(info)
min_pressure = max(min_pressure, info.min_pressure)
for next in current.open_valves():
queue.put(next)