day 16 much, much quicker

This commit is contained in:
Ruediger Ludwig 2023-01-18 20:17:27 +01:00
parent d02d8190f0
commit eb12a799fd
3 changed files with 164 additions and 128 deletions

View file

@ -39,19 +39,28 @@ class RawValve(NamedTuple):
return valve_parser.parse(line).get()
@dataclass(slots=True, unsafe_hash=True)
@dataclass(slots=True)
class Valve:
name: str
flow_rate: int
following: list[Valve] = field(hash=False, compare=False)
paths: dict[str, int] = field(default_factory=dict, hash=False, init=False, compare=False)
following: list[Valve]
paths: dict[str, int] = field(default_factory=dict, init=False)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Valve):
return False
return self.name == other.name
def __hash__(self) -> int:
return hash(self.name)
def __lt__(self, other: Valve) -> bool:
return self.name < other.name
def __repr__(self) -> str:
return f"{self.name}:{self.flow_rate}->{','.join(v.name for v in self.following)}"
def travel_time(self, to: str) -> int:
if not self.paths:
self.create_paths()
return self.paths[to]
def create_paths(self):
@ -75,116 +84,120 @@ class Valve:
class Actor(NamedTuple):
position: Valve
next_time: int
finished: bool
class SystemInfo(NamedTuple):
max_pressure: int
min_pressure: int
closed_vales: frozenset[Valve]
opening: frozenset[Valve]
@property
def finished(self) -> bool:
return self.next_time <= 0
@dataclass(slots=True, frozen=True, kw_only=True)
class SystemProgress(ABC):
max_time: int
prev_time: int
time: int
pressure: int
flow_rate: int
closed_valves: frozenset[Valve]
closed_valves: list[Valve]
def one_actor(self, actor: Actor) -> Iterator[Actor]:
if actor.finished or actor.next_time != self.time:
def next_steps_for(self, actor: Actor) -> Iterator[Actor]:
if actor.next_time != self.time:
yield actor
elif not self.closed_valves:
yield Actor(actor.position, self.max_time, True)
else:
reached_any_target = False
for target in self.closed_valves:
finished = self.time + actor.position.travel_time(target.name) + 1
if finished < self.max_time:
next_time = self.time - (actor.position.travel_time(target.name) + 1)
if next_time > 0:
reached_any_target = True
yield Actor(target, finished, False)
yield Actor(target, next_time)
if not reached_any_target:
yield Actor(actor.position, self.max_time, True)
yield Actor(actor.position, 0)
@classmethod
def create(cls, max_time: int,
closed_valves: frozenset[Valve], start: Valve,
def create(cls, run_time: int,
valves: dict[str, Valve], start: str,
num_actors: Literal[1] | Literal[2]) -> SystemProgress:
closed_valves = list(sorted(valve for valve in valves.values() if valve.flow_rate > 0))
start_valve = valves[start]
for valve in valves.values():
if valve.name == start or valve.flow_rate > 0:
valve.create_paths()
match num_actors:
case 1:
return OneActorProgress(max_time=max_time,
prev_time=0,
time=0,
return OneActorProgress(time=run_time,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor=Actor(start, 0, False))
actor=Actor(start_valve, run_time))
case 2:
return TwoActorProgress(max_time=max_time,
prev_time=0,
time=0,
return TwoActorProgress(time=run_time,
pressure=0,
flow_rate=0,
closed_valves=closed_valves,
actor1=Actor(start, 0, False),
actor2=Actor(start, 0, False))
actor1=Actor(start_valve, run_time),
actor2=Actor(start_valve, run_time))
case _:
assert False, "Unreachable"
def __lt__(self, other: OneActorProgress) -> bool:
if self.time != other.time:
return self.time < other.time
return self.pressure > other.pressure
return self.time > other.time
return self.min_potential_pressure() > other.min_potential_pressure()
@abstractmethod
def open_valves(self) -> Iterator[SystemProgress]:
...
@abstractmethod
def get_info(self) -> SystemInfo:
def info(self) -> str:
...
@abstractmethod
def min_potential_pressure(self) -> int:
...
@abstractmethod
def still_possible(self) -> int:
...
def max_potential_pressure(self) -> int:
return self.min_potential_pressure() + self.still_possible()
@dataclass(slots=True, frozen=True)
class OneActorProgress(SystemProgress):
actor: Actor
def get_info(self) -> SystemInfo:
return SystemInfo(
min_pressure=self.min_possible_pressure(),
max_pressure=self.max_possible_pressure(),
closed_vales=self.closed_valves,
opening=frozenset()
)
def min_potential_pressure(self) -> int:
return self.pressure + self.flow_rate * self.time
def min_possible_pressure(self) -> int:
return self.pressure + self.flow_rate * (self.max_time - self.time)
def max_possible_pressure(self) -> int:
closed = sum(valve.flow_rate for valve in self.closed_valves)
return self.pressure + (self.flow_rate + closed) * (self.max_time - self.time)
def still_possible(self) -> int:
result = 0
for valve in self.closed_valves:
time = self.actor.position.travel_time(valve.name) + 1
if self.time > time:
result += (self.time - time) * valve.flow_rate
return result
def open_valves(self) -> Iterator[SystemProgress]:
for actor in self.one_actor(self.actor):
closed_valves = self.closed_valves
for actor in self.next_steps_for(self.actor):
closed_valves = self.closed_valves.copy()
if not actor.finished:
closed_valves = closed_valves.difference({actor.position})
closed_valves.remove(actor.position)
flow_rate = self.flow_rate + actor.position.flow_rate
else:
flow_rate = self.flow_rate
next = OneActorProgress(
max_time=self.max_time,
prev_time=self.time,
yield OneActorProgress(
time=actor.next_time,
flow_rate=flow_rate,
pressure=self.pressure + self.flow_rate * (actor.next_time - self.time),
pressure=self.pressure + self.flow_rate * (self.time - actor.next_time),
closed_valves=closed_valves,
actor=actor,
)
yield next
def info(self) -> str:
return ",".join(valve.name for valve in self.closed_valves)
@dataclass(slots=True, frozen=True)
@ -192,72 +205,70 @@ class TwoActorProgress(SystemProgress):
actor1: Actor
actor2: Actor
def get_info(self) -> SystemInfo:
opening: set[Valve] = set()
if self.actor1.next_time != self.time and not self.actor1.finished:
opening.add(self.actor1.position)
if self.actor2.next_time != self.time and not self.actor2.finished:
opening.add(self.actor2.position)
return SystemInfo(
min_pressure=self.min_possible_pressure(),
max_pressure=self.max_possible_pressure(),
closed_vales=self.closed_valves,
opening=frozenset(opening)
)
def min_possible_pressure(self) -> int:
pressure = self.pressure + self.flow_rate * (self.max_time - self.time)
if self.actor1.next_time != self.time and not self.actor1.finished:
pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time)
if self.actor2.next_time != self.time and not self.actor2.finished:
pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time)
def min_potential_pressure(self) -> int:
pressure = self.pressure + self.flow_rate * self.time
if self.actor1.next_time != self.time:
pressure += self.actor1.position.flow_rate * self.actor1.next_time
if self.actor2.next_time != self.time:
pressure += self.actor2.position.flow_rate * self.actor2.next_time
return pressure
def max_possible_pressure(self) -> int:
closed = sum(valve.flow_rate for valve in self.closed_valves)
pressure = self.pressure + (self.flow_rate + closed) * (self.max_time - self.time)
if self.actor1.next_time != self.time and not self.actor1.finished:
pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time)
if self.actor2.next_time != self.time and not self.actor2.finished:
pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time)
return pressure
def still_possible(self) -> int:
result = 0
for valve in self.closed_valves:
t1 = self.actor1.position.travel_time(valve.name)
t2 = self.actor2.position.travel_time(valve.name)
time = min(t1, t2) + 1
if self.time > time:
result += (self.time - time) * valve.flow_rate
return result
def open_valves(self) -> Iterator[SystemProgress]:
actor1_actions = self.one_actor(self.actor1)
actor2_actions = self.one_actor(self.actor2)
actor1_actions = self.next_steps_for(self.actor1)
actor2_actions = self.next_steps_for(self.actor2)
for actor1, actor2 in product(actor1_actions, actor2_actions):
if not actor1.finished and not actor2.finished and actor1.position == actor2.position:
if actor1.position == actor2.position:
continue
closed_valves = self.closed_valves
flow_rate = self.flow_rate
next_time = min(actor1.next_time, actor2.next_time)
closed_valves = self.closed_valves.copy()
next_time = max(actor1.next_time, actor2.next_time, 0)
if not actor1.finished:
closed_valves = closed_valves.difference({actor1.position})
flow_rate = self.flow_rate
if next_time > 0:
if actor1.position in closed_valves:
closed_valves.remove(actor1.position)
if actor1.next_time == next_time:
flow_rate += actor1.position.flow_rate
if not actor2.finished:
closed_valves = closed_valves.difference({actor2.position})
if actor2.position in closed_valves:
closed_valves.remove(actor2.position)
if actor2.next_time == next_time:
flow_rate += actor2.position.flow_rate
next = TwoActorProgress(
max_time=self.max_time,
prev_time=self.time,
time=next_time,
flow_rate=flow_rate,
pressure=self.pressure + self.flow_rate * (next_time - self.time),
pressure=self.pressure + self.flow_rate * (self.time - next_time),
closed_valves=closed_valves,
actor1=actor1,
actor2=actor2,
)
yield next
def info(self) -> str:
opening = ""
if self.actor1.next_time != self.time:
opening = self.actor1.position.name
if self.actor2.next_time != self.time:
opening = self.actor2.position.name
closed = ",".join(valve.name for valve in self.closed_valves)
if opening:
return f"{closed}+{opening}"
else:
return closed
@dataclass(slots=True)
class Network:
@ -276,26 +287,37 @@ class Network:
return Network(valves)
def under_pressure(self, minutes: int, number_actors: Literal[1] | Literal[2]) -> int:
closed_valves = [valve for valve in self.valves.values() if valve.flow_rate > 0]
start = self.valves["AA"]
queue: PriorityQueue[SystemProgress] = PriorityQueue()
queue.put(SystemProgress.create(
max_time=minutes,
closed_valves=frozenset(closed_valves),
start=start,
run_time=minutes,
valves=self.valves,
start="AA",
num_actors=number_actors
))
min_pressure = 0
known_systems: set[SystemInfo] = set()
known: dict[str, int] = {}
ticks = 0
drop_known = 0
drop_pressure = 0
while not queue.empty():
ticks += 1
current = queue.get()
if current.time == minutes:
if current.time == 0:
print(f"{ticks=} {drop_known=} {drop_pressure=} {len(known)=}")
return current.pressure
info = current.get_info()
if min_pressure > info.max_pressure or info in known_systems:
info = current.info()
prev_pressure = known.get(info)
if prev_pressure is not None and prev_pressure >= current.pressure:
drop_known += 1
continue
known_systems.add(info)
min_pressure = max(min_pressure, info.min_pressure)
known[info] = current.pressure
if min_pressure > current.max_potential_pressure():
drop_pressure += 1
continue
min_pressure = max(min_pressure, current.min_potential_pressure())
min_pressure = min_pressure
for next in current.open_valves():
queue.put(next)

View file

@ -11,7 +11,7 @@ def test_part1():
def test_part2():
lines = input.read_lines(day_num, 'example01.txt')
lines = input.read_lines(day_num, 'input.txt')
expected = 1707
result = part2(lines)
assert result == expected