diff --git a/advent/__main__.py b/advent/__main__.py index 3ed1302..7a722d4 100644 --- a/advent/__main__.py +++ b/advent/__main__.py @@ -1,18 +1,18 @@ -import sys from importlib import import_module - +import time +import sys from advent.common import input from advent.days.template import Day, ResultType, is_day -def output(day: int, part: int, result: ResultType | None) -> None: +def output(day: int, part: int, result: ResultType | None, delta: float) -> None: match result: case int(value): - print('Day {0:02} Part {1}: {2}'.format(day, part, value)) + print(f'Day {day:02} Part {part}: {value} ({delta:0.3}s)') case str(value): - print('Day {0:02} Part {1}: {2}'.format(day, part, value)) + print(f'Day {day:02} Part {part}: {value} ({delta:0.3}s)') case list(value): - print('Day {0:02} Part {1}: {2}'.format(day, part, value[0])) + print(f'Day {day:02} Part {part}: {value[0]} ({delta:0.3}s)') for line in value[1:]: print(f' {line}') case None: @@ -29,23 +29,31 @@ def get_day(day_num: int) -> Day: return day_module -def run(day: Day, part: int) -> None: +def run(day: Day, part: int) -> float: data = input.read_lines(day.day_num, 'input.txt') + t0 = time.time() match part: - case 1: output(day.day_num, 1, day.part1(data)) - case 2: output(day.day_num, 2, day.part2(data)) + case 1: result = day.part1(data) + case 2: result = day.part2(data) case _: raise Exception(f'Unknown part {part}') + t1 = time.time() + delta = t1 - t0 + output(day.day_num, part, result, delta) + return delta -def run_from_string(day_str: str) -> None: +def run_from_string(day_str: str) -> float: match day_str.split('/'): case [d]: day_num = int(d) day = get_day(day_num) if day_num == day.day_num: - run(day, 1) - run(day, 2) + p1 = run(day, 1) + p2 = run(day, 2) + return p1 + p2 + + assert False, "We should never get here" case [d, p]: day_num = int(d) @@ -53,30 +61,36 @@ def run_from_string(day_str: str) -> None: if day_num == day.day_num: part = int(p) - run(day, part) + return run(day, part) + + assert False, "We should never get here" case _: raise Exception(f'{day_str} is not a valid day description') def main() -> None: + print() + time = 0.0 match sys.argv: case [_]: try: for day_num in range(1, 25): day = get_day(day_num) if day_num == day.day_num: - run(day, 1) - run(day, 2) + time += run(day, 1) + time += run(day, 2) except ModuleNotFoundError: pass case [_, argument]: - run_from_string(argument) + time += run_from_string(argument) case _: raise Exception(f'Usage: python {sys.argv[0]} [day[/part]]') + print(f"\nTotal time: {time:0.3}s") + if __name__ == '__main__': main() diff --git a/advent/days/day16/solution.py b/advent/days/day16/solution.py index d61ef62..ac7ee09 100644 --- a/advent/days/day16/solution.py +++ b/advent/days/day16/solution.py @@ -39,19 +39,28 @@ class RawValve(NamedTuple): return valve_parser.parse(line).get() -@dataclass(slots=True, unsafe_hash=True) +@dataclass(slots=True) class Valve: name: str flow_rate: int - following: list[Valve] = field(hash=False, compare=False) - paths: dict[str, int] = field(default_factory=dict, hash=False, init=False, compare=False) + following: list[Valve] + paths: dict[str, int] = field(default_factory=dict, init=False) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Valve): + return False + return self.name == other.name + + def __hash__(self) -> int: + return hash(self.name) + + def __lt__(self, other: Valve) -> bool: + return self.name < other.name def __repr__(self) -> str: return f"{self.name}:{self.flow_rate}->{','.join(v.name for v in self.following)}" def travel_time(self, to: str) -> int: - if not self.paths: - self.create_paths() return self.paths[to] def create_paths(self): @@ -75,116 +84,120 @@ class Valve: class Actor(NamedTuple): position: Valve next_time: int - finished: bool - -class SystemInfo(NamedTuple): - max_pressure: int - min_pressure: int - closed_vales: frozenset[Valve] - opening: frozenset[Valve] + @property + def finished(self) -> bool: + return self.next_time <= 0 @dataclass(slots=True, frozen=True, kw_only=True) class SystemProgress(ABC): - max_time: int - prev_time: int time: int pressure: int flow_rate: int - closed_valves: frozenset[Valve] + closed_valves: list[Valve] - def one_actor(self, actor: Actor) -> Iterator[Actor]: - if actor.finished or actor.next_time != self.time: + def next_steps_for(self, actor: Actor) -> Iterator[Actor]: + if actor.next_time != self.time: yield actor - elif not self.closed_valves: - yield Actor(actor.position, self.max_time, True) + else: reached_any_target = False for target in self.closed_valves: - finished = self.time + actor.position.travel_time(target.name) + 1 - if finished < self.max_time: + next_time = self.time - (actor.position.travel_time(target.name) + 1) + if next_time > 0: reached_any_target = True - yield Actor(target, finished, False) + yield Actor(target, next_time) + if not reached_any_target: - yield Actor(actor.position, self.max_time, True) + yield Actor(actor.position, 0) @classmethod - def create(cls, max_time: int, - closed_valves: frozenset[Valve], start: Valve, + def create(cls, run_time: int, + valves: dict[str, Valve], start: str, num_actors: Literal[1] | Literal[2]) -> SystemProgress: + closed_valves = list(sorted(valve for valve in valves.values() if valve.flow_rate > 0)) + start_valve = valves[start] + for valve in valves.values(): + if valve.name == start or valve.flow_rate > 0: + valve.create_paths() + match num_actors: case 1: - return OneActorProgress(max_time=max_time, - prev_time=0, - time=0, + return OneActorProgress(time=run_time, pressure=0, flow_rate=0, closed_valves=closed_valves, - actor=Actor(start, 0, False)) + actor=Actor(start_valve, run_time)) case 2: - return TwoActorProgress(max_time=max_time, - prev_time=0, - time=0, + return TwoActorProgress(time=run_time, pressure=0, flow_rate=0, closed_valves=closed_valves, - actor1=Actor(start, 0, False), - actor2=Actor(start, 0, False)) + actor1=Actor(start_valve, run_time), + actor2=Actor(start_valve, run_time)) case _: assert False, "Unreachable" def __lt__(self, other: OneActorProgress) -> bool: if self.time != other.time: - return self.time < other.time - return self.pressure > other.pressure + return self.time > other.time + return self.min_potential_pressure() > other.min_potential_pressure() @abstractmethod def open_valves(self) -> Iterator[SystemProgress]: ... @abstractmethod - def get_info(self) -> SystemInfo: + def info(self) -> str: ... + @abstractmethod + def min_potential_pressure(self) -> int: + ... + + @abstractmethod + def still_possible(self) -> int: + ... + + def max_potential_pressure(self) -> int: + return self.min_potential_pressure() + self.still_possible() + @dataclass(slots=True, frozen=True) class OneActorProgress(SystemProgress): actor: Actor - def get_info(self) -> SystemInfo: - return SystemInfo( - min_pressure=self.min_possible_pressure(), - max_pressure=self.max_possible_pressure(), - closed_vales=self.closed_valves, - opening=frozenset() - ) + def min_potential_pressure(self) -> int: + return self.pressure + self.flow_rate * self.time - def min_possible_pressure(self) -> int: - return self.pressure + self.flow_rate * (self.max_time - self.time) - - def max_possible_pressure(self) -> int: - closed = sum(valve.flow_rate for valve in self.closed_valves) - return self.pressure + (self.flow_rate + closed) * (self.max_time - self.time) + def still_possible(self) -> int: + result = 0 + for valve in self.closed_valves: + time = self.actor.position.travel_time(valve.name) + 1 + if self.time > time: + result += (self.time - time) * valve.flow_rate + return result def open_valves(self) -> Iterator[SystemProgress]: - for actor in self.one_actor(self.actor): - closed_valves = self.closed_valves + for actor in self.next_steps_for(self.actor): + closed_valves = self.closed_valves.copy() if not actor.finished: - closed_valves = closed_valves.difference({actor.position}) + closed_valves.remove(actor.position) flow_rate = self.flow_rate + actor.position.flow_rate else: flow_rate = self.flow_rate - next = OneActorProgress( - max_time=self.max_time, - prev_time=self.time, + + yield OneActorProgress( time=actor.next_time, flow_rate=flow_rate, - pressure=self.pressure + self.flow_rate * (actor.next_time - self.time), + pressure=self.pressure + self.flow_rate * (self.time - actor.next_time), closed_valves=closed_valves, actor=actor, ) - yield next + + def info(self) -> str: + return ",".join(valve.name for valve in self.closed_valves) @dataclass(slots=True, frozen=True) @@ -192,72 +205,70 @@ class TwoActorProgress(SystemProgress): actor1: Actor actor2: Actor - def get_info(self) -> SystemInfo: - opening: set[Valve] = set() - if self.actor1.next_time != self.time and not self.actor1.finished: - opening.add(self.actor1.position) - if self.actor2.next_time != self.time and not self.actor2.finished: - opening.add(self.actor2.position) - - return SystemInfo( - min_pressure=self.min_possible_pressure(), - max_pressure=self.max_possible_pressure(), - closed_vales=self.closed_valves, - opening=frozenset(opening) - ) - - def min_possible_pressure(self) -> int: - pressure = self.pressure + self.flow_rate * (self.max_time - self.time) - if self.actor1.next_time != self.time and not self.actor1.finished: - pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time) - if self.actor2.next_time != self.time and not self.actor2.finished: - pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time) + def min_potential_pressure(self) -> int: + pressure = self.pressure + self.flow_rate * self.time + if self.actor1.next_time != self.time: + pressure += self.actor1.position.flow_rate * self.actor1.next_time + if self.actor2.next_time != self.time: + pressure += self.actor2.position.flow_rate * self.actor2.next_time return pressure - def max_possible_pressure(self) -> int: - closed = sum(valve.flow_rate for valve in self.closed_valves) - pressure = self.pressure + (self.flow_rate + closed) * (self.max_time - self.time) - - if self.actor1.next_time != self.time and not self.actor1.finished: - pressure += self.actor1.position.flow_rate * (self.max_time - self.actor1.next_time) - if self.actor2.next_time != self.time and not self.actor2.finished: - pressure += self.actor2.position.flow_rate * (self.max_time - self.actor2.next_time) - return pressure + def still_possible(self) -> int: + result = 0 + for valve in self.closed_valves: + t1 = self.actor1.position.travel_time(valve.name) + t2 = self.actor2.position.travel_time(valve.name) + time = min(t1, t2) + 1 + if self.time > time: + result += (self.time - time) * valve.flow_rate + return result def open_valves(self) -> Iterator[SystemProgress]: - actor1_actions = self.one_actor(self.actor1) - actor2_actions = self.one_actor(self.actor2) + actor1_actions = self.next_steps_for(self.actor1) + actor2_actions = self.next_steps_for(self.actor2) for actor1, actor2 in product(actor1_actions, actor2_actions): - if not actor1.finished and not actor2.finished and actor1.position == actor2.position: + if actor1.position == actor2.position: continue - closed_valves = self.closed_valves - flow_rate = self.flow_rate - next_time = min(actor1.next_time, actor2.next_time) + closed_valves = self.closed_valves.copy() + next_time = max(actor1.next_time, actor2.next_time, 0) - if not actor1.finished: - closed_valves = closed_valves.difference({actor1.position}) + flow_rate = self.flow_rate + if next_time > 0: + if actor1.position in closed_valves: + closed_valves.remove(actor1.position) if actor1.next_time == next_time: flow_rate += actor1.position.flow_rate - if not actor2.finished: - closed_valves = closed_valves.difference({actor2.position}) + if actor2.position in closed_valves: + closed_valves.remove(actor2.position) if actor2.next_time == next_time: flow_rate += actor2.position.flow_rate next = TwoActorProgress( - max_time=self.max_time, - prev_time=self.time, time=next_time, flow_rate=flow_rate, - pressure=self.pressure + self.flow_rate * (next_time - self.time), + pressure=self.pressure + self.flow_rate * (self.time - next_time), closed_valves=closed_valves, actor1=actor1, actor2=actor2, ) yield next + def info(self) -> str: + opening = "" + if self.actor1.next_time != self.time: + opening = self.actor1.position.name + if self.actor2.next_time != self.time: + opening = self.actor2.position.name + + closed = ",".join(valve.name for valve in self.closed_valves) + if opening: + return f"{closed}+{opening}" + else: + return closed + @dataclass(slots=True) class Network: @@ -276,26 +287,37 @@ class Network: return Network(valves) def under_pressure(self, minutes: int, number_actors: Literal[1] | Literal[2]) -> int: - closed_valves = [valve for valve in self.valves.values() if valve.flow_rate > 0] - start = self.valves["AA"] queue: PriorityQueue[SystemProgress] = PriorityQueue() queue.put(SystemProgress.create( - max_time=minutes, - closed_valves=frozenset(closed_valves), - start=start, + run_time=minutes, + valves=self.valves, + start="AA", num_actors=number_actors )) min_pressure = 0 - known_systems: set[SystemInfo] = set() + known: dict[str, int] = {} + ticks = 0 + drop_known = 0 + drop_pressure = 0 while not queue.empty(): + ticks += 1 current = queue.get() - if current.time == minutes: + if current.time == 0: + print(f"{ticks=} {drop_known=} {drop_pressure=} {len(known)=}") return current.pressure - info = current.get_info() - if min_pressure > info.max_pressure or info in known_systems: + + info = current.info() + prev_pressure = known.get(info) + if prev_pressure is not None and prev_pressure >= current.pressure: + drop_known += 1 continue - known_systems.add(info) - min_pressure = max(min_pressure, info.min_pressure) + known[info] = current.pressure + + if min_pressure > current.max_potential_pressure(): + drop_pressure += 1 + continue + min_pressure = max(min_pressure, current.min_potential_pressure()) + min_pressure = min_pressure for next in current.open_valves(): queue.put(next) diff --git a/advent/days/day16/test_solution.py b/advent/days/day16/test_solution.py index 0390eb0..1ac4983 100644 --- a/advent/days/day16/test_solution.py +++ b/advent/days/day16/test_solution.py @@ -11,7 +11,7 @@ def test_part1(): def test_part2(): - lines = input.read_lines(day_num, 'example01.txt') + lines = input.read_lines(day_num, 'input.txt') expected = 1707 result = part2(lines) assert result == expected