From 38b5a44f993ffaa4da3d038e50d00385ebcfe516 Mon Sep 17 00:00:00 2001 From: Arnab Bose Date: Wed, 8 May 2024 00:59:50 +0530 Subject: [PATCH] Moved rollbacker into the abstraction Also organized into directories. Issue #32 --- src/code/configs.py | 2 +- src/code/global_flags.py | 5 + src/code/main.py | 4 +- src/code/mechanisms/__init__.py | 13 ++ src/code/mechanisms/abstract_mechanism.py | 48 ++++++ .../btrfs_mechanism.py} | 76 +++------ src/code/mechanisms/rollback_btrfs.py | 141 ++++++++++++++++ .../rollback_btrfs_test.py} | 35 ++-- src/code/mechanisms/snap_mechanisms.py | 35 ++++ src/code/rollbacker.py | 152 ++---------------- src/code/snap_holder.py | 9 +- src/code/snap_holder_test.py | 11 +- src/code/snap_operator.py | 8 +- src/code/snap_operator_test.py | 6 +- 14 files changed, 323 insertions(+), 222 deletions(-) create mode 100644 src/code/mechanisms/__init__.py create mode 100644 src/code/mechanisms/abstract_mechanism.py rename src/code/{snap_mechanisms.py => mechanisms/btrfs_mechanism.py} (53%) create mode 100644 src/code/mechanisms/rollback_btrfs.py rename src/code/{rollbacker_test.py => mechanisms/rollback_btrfs_test.py} (77%) create mode 100644 src/code/mechanisms/snap_mechanisms.py diff --git a/src/code/configs.py b/src/code/configs.py index a1a825f..b1e7bb6 100644 --- a/src/code/configs.py +++ b/src/code/configs.py @@ -22,7 +22,7 @@ from . import human_interval from . import os_utils -from . import snap_mechanisms +from .mechanisms import snap_mechanisms from typing import Iterable, Iterator, Optional diff --git a/src/code/global_flags.py b/src/code/global_flags.py index edeea36..9c1c81a 100644 --- a/src/code/global_flags.py +++ b/src/code/global_flags.py @@ -14,6 +14,11 @@ import dataclasses +# Format of time used for files. +TIME_FORMAT = r"%Y%m%d%H%M%S" +# Length of the string produced by this format. +TIME_FORMAT_LEN = 14 + @dataclasses.dataclass class _Flags: diff --git a/src/code/main.py b/src/code/main.py index 53ed554..51d5b85 100644 --- a/src/code/main.py +++ b/src/code/main.py @@ -20,10 +20,10 @@ from . import colored_logs from . import configs from . import global_flags -from . import rollbacker from . import os_utils -from . import snap_mechanisms +from . import rollbacker from . import snap_operator +from .mechanisms import snap_mechanisms def _parse_args() -> argparse.Namespace: diff --git a/src/code/mechanisms/__init__.py b/src/code/mechanisms/__init__.py new file mode 100644 index 0000000..6d6d126 --- /dev/null +++ b/src/code/mechanisms/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/code/mechanisms/abstract_mechanism.py b/src/code/mechanisms/abstract_mechanism.py new file mode 100644 index 0000000..ff7ca36 --- /dev/null +++ b/src/code/mechanisms/abstract_mechanism.py @@ -0,0 +1,48 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import enum + + +# The type of snapshot is maintained in two places - +# 1. Config +# 2. Snapshot +class SnapType(enum.Enum): + UNKNOWN = "UNKNOWN" + BTRFS = "BTRFS" + + +class SnapMechanism(abc.ABC): + """Interface with necessary methods to implement snapshotting system. + + Implementations may be based on btrfs, rsync, bcachefs, etc. + """ + + @abc.abstractmethod + def verify_volume(self, mount_point: str) -> bool: + """Confirms that the source path can be snapshotted.""" + + @abc.abstractmethod + def create(self, source: str, destination: str): + """Creates a snapshot of source in a destination path.""" + + @abc.abstractmethod + def delete(self, destination: str): + """Deletes an existing snapshot.""" + + @abc.abstractmethod + def rollback_gen(self, source_dests: list[tuple[str, str]]) -> list[str]: + """Returns shell lines which when executed will result in a rollback of snapshots.""" + # It is okay to leave unimplemented, and raise NotImplementedError(). diff --git a/src/code/snap_mechanisms.py b/src/code/mechanisms/btrfs_mechanism.py similarity index 53% rename from src/code/snap_mechanisms.py rename to src/code/mechanisms/btrfs_mechanism.py index 8e9bde7..5faae4e 100644 --- a/src/code/snap_mechanisms.py +++ b/src/code/mechanisms/btrfs_mechanism.py @@ -1,38 +1,23 @@ -import abc -import enum -import functools -import logging - -from . import global_flags -from . import os_utils - -# TODO: Move btrfs based rollback to this abstraction. -# TODO: Move btrfs sync to this abstraction. - -# The type of snapshot is maintained in two places - -# 1. Config -# 2. Snapshot -class SnapType(enum.Enum): - UNKNOWN = "UNKNOWN" - BTRFS = "BTRFS" +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging -class SnapMechanism(abc.ABC): - """Interface with necessary methods to implement snapshotting system. - - Example implementations may be based on btrfs, rsync, bcachefs, etc. - """ - @abc.abstractmethod - def verify_volume(self, mount_point: str) -> bool: - pass - - @abc.abstractmethod - def create(self, source: str, destination: str): - pass - - @abc.abstractmethod - def delete(self, destination: str): - pass +from . import abstract_mechanism +from . import rollback_btrfs +from .. import global_flags +from .. import os_utils def _execute_sh(cmd: str): @@ -42,16 +27,7 @@ def _execute_sh(cmd: str): os_utils.execute_sh(cmd) -class _BtrfsSnapMechanism(SnapMechanism): - # @staticmethod - # def sync(mount_paths: set[str]) -> None: - # for mount_path in sorted(mount_paths): - # if global_flags.FLAGS.dryrun: - # os_utils.eprint(f"Would sync {mount_path}") - # continue - # os_utils.eprint("Syncing ...", flush=True) - # os_utils.execute_sh(f"btrfs subvolume sync {mount_path}") - +class BtrfsSnapMechanism(abstract_mechanism.SnapMechanism): def verify_volume(self, mount_point: str) -> bool: # Based on https://stackoverflow.com/a/32865333/196462 fstype = os_utils.execute_sh( @@ -88,10 +64,10 @@ def delete(self, destination: str): logging.error("Unable to delete; are you running as root?") raise - -@functools.cache -def get(snap_type: SnapType) -> SnapMechanism: - """Singleton implementation.""" - if snap_type == SnapType.BTRFS: - return _BtrfsSnapMechanism() - raise RuntimeError(f"Unknown snap_type {snap_type}") + def rollback_gen(self, source_dests: list[tuple[str, str]]) -> list[str]: + for source, _ in source_dests: + if not self.verify_volume(source): + raise RuntimeError( + f"Mount point may no longer be a btrfs volume: {source}" + ) + return rollback_btrfs.rollback_gen(source_dests) diff --git a/src/code/mechanisms/rollback_btrfs.py b/src/code/mechanisms/rollback_btrfs.py new file mode 100644 index 0000000..09a606f --- /dev/null +++ b/src/code/mechanisms/rollback_btrfs.py @@ -0,0 +1,141 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import datetime +import os + +from .. import global_flags + +from typing import Iterable, Optional + + +# This will be cleaned up if it exists by rollback script. +_PACMAN_LOCK_FILE = "/var/lib/pacman/db.lck" + + +@dataclasses.dataclass +class _MountAttributes: + # The device e.g. /mnt/volume/subv under which we have the subv or its parent is mounted. + device: str + subvol_name: str + + +def _get_mount_attributes( + mount_point: str, mtab_lines: Iterable[str] +) -> _MountAttributes: + # For a mount point, this denotes the longest path that was seen in /etc/mtab. + # This is therefore the point where that directory is mounted. + longest_match_to_mount_point = "" + # Which line matches the mount point. + matched_line: str = "" + for this_line in mtab_lines: + this_tokens = this_line.split() + if mount_point.startswith(this_tokens[1]): + if len(this_tokens[1]) > len(longest_match_to_mount_point): + longest_match_to_mount_point = this_tokens[1] + matched_line = this_line + + if not matched_line: + raise ValueError(f"Mount point not found: {mount_point}") + tokens = matched_line.split() + + if tokens[2] != "btrfs": + raise ValueError(f"Mount point is not btrfs: {mount_point} ({tokens[2]})") + mount_param: str = "" + for mount_param in tokens[3].split(","): + subvol_prefix = "subvol=" + if mount_param.startswith(subvol_prefix): + break + else: + raise RuntimeError(f"Could not find subvol= in {matched_line!r}") + subvol_name = mount_param[len(subvol_prefix) :] + if tokens[1] != mount_point: + nested_subvol = mount_point.removeprefix(tokens[1]) + assert nested_subvol.startswith("/") + subvol_name = nested_subvol + return _MountAttributes(device=tokens[0], subvol_name=subvol_name) + + +def _get_mount_attributes_from_mtab(mount_point: str) -> _MountAttributes: + return _get_mount_attributes(mount_point, open("/etc/mtab")) + + +def _get_now_str(): + return datetime.datetime.now().strftime(global_flags.TIME_FORMAT) + + +def rollback_gen(source_dests: list[tuple[str, str]]) -> list[str]: + if not source_dests: + return ["# No snapshot matched to rollback."] + + sh_lines = [ + "#!/bin/bash", + "# Save this to a script, review and run as root to perform the rollback.", + "", + "set -uexo pipefail", + "", + ] + + # Mount all required volumes at root. + mount_points: dict[str, str] = {} + for source, _ in source_dests: + live_subvolume = _get_mount_attributes_from_mtab(source) + if live_subvolume.device not in mount_points: + mount_pt = f"/run/mount/_yabsnap_internal_{len(mount_points)}" + mount_points[live_subvolume.device] = mount_pt + sh_lines += [ + f"mkdir -p {mount_pt}", + f"mount {live_subvolume.device} {mount_pt} -o subvolid=5", + ] + + now_str = _get_now_str() + + def drop_root_slash(s: str) -> str: + if s[0] != "/": + raise RuntimeError(f"Could not drop initial / from {s!r}") + if "/" in s[1:]: + raise RuntimeError(f"Unexpected / after the first one in subvolume {s!r}") + return s[1:] + + sh_lines.append("") + backup_paths: list[str] = [] + current_dir: Optional[str] = None + for source, dest in source_dests: + live_subvolume = _get_mount_attributes_from_mtab(source) + backup_subvolume = _get_mount_attributes_from_mtab(os.path.dirname(dest)) + # The snapshot must be on the same block device as the original (target) volume. + assert backup_subvolume.device == live_subvolume.device + mount_pt = mount_points[live_subvolume.device] + if current_dir != mount_pt: + sh_lines += [f"cd {mount_pt}", ""] + current_dir = mount_pt + live_path = drop_root_slash(live_subvolume.subvol_name) + backup_path = f"{drop_root_slash(backup_subvolume.subvol_name)}/rollback_{now_str}_{live_path}" + backup_path_after_reboot = ( + f"{os.path.dirname(dest)}/rollback_{now_str}_{live_path}" + ) + # sh_lines.append(f'[[ -e {backup_path} ]] && btrfs subvolume delete {backup_path}') + sh_lines.append(f"mv {live_path} {backup_path}") + backup_paths.append(backup_path_after_reboot) + sh_lines.append(f"btrfs subvolume snapshot {dest} {live_path}") + if os.path.isfile(dest + _PACMAN_LOCK_FILE): + sh_lines.append(f"rm {live_path}{_PACMAN_LOCK_FILE}") + sh_lines.append("") + sh_lines += ["echo Please reboot to complete the rollback.", "echo"] + sh_lines.append("echo After reboot you may delete -") + for backup_path in backup_paths: + sh_lines.append(f'echo "# sudo btrfs subvolume delete {backup_path}"') + + return sh_lines diff --git a/src/code/rollbacker_test.py b/src/code/mechanisms/rollback_btrfs_test.py similarity index 77% rename from src/code/rollbacker_test.py rename to src/code/mechanisms/rollback_btrfs_test.py index f8fa5e2..4b07647 100644 --- a/src/code/rollbacker_test.py +++ b/src/code/mechanisms/rollback_btrfs_test.py @@ -15,10 +15,9 @@ import unittest from unittest import mock -from . import rollbacker -from . import os_utils -from . import snap_holder -from . import snap_mechanisms +from . import rollback_btrfs +from . import btrfs_mechanism +from .. import snap_holder # For testing, we can access private methods. # pyright: reportPrivateUsage=false @@ -31,7 +30,7 @@ def _mock_get_mount_attributes_from_mtab(mount_pt: str): - return rollbacker._MountAttributes( + return rollback_btrfs._MountAttributes( device=_MOUNT_LOOKUP[mount_pt][0], subvol_name=_MOUNT_LOOKUP[mount_pt][1] ) @@ -48,19 +47,21 @@ def test_get_mount_attributes(self): ] # Assertiions. self.assertEqual( - rollbacker._MountAttributes("/dev/mapper/luksdev", "/@home"), - rollbacker._get_mount_attributes("/home", lines), + rollback_btrfs._MountAttributes("/dev/mapper/luksdev", "/@home"), + rollback_btrfs._get_mount_attributes("/home", lines), ) self.assertEqual( - rollbacker._MountAttributes("/dev/mapper/myhome", "/@special_home"), - rollbacker._get_mount_attributes("/home/myhome", lines), + rollback_btrfs._MountAttributes("/dev/mapper/myhome", "/@special_home"), + rollback_btrfs._get_mount_attributes("/home/myhome", lines), ) self.assertEqual( - rollbacker._MountAttributes("/dev/mapper/opened_rootbtrfs", "/@nestedvol"), - rollbacker._get_mount_attributes("/mnt/rootbtrfs/@nestedvol", lines), + rollback_btrfs._MountAttributes( + "/dev/mapper/opened_rootbtrfs", "/@nestedvol" + ), + rollback_btrfs._get_mount_attributes("/mnt/rootbtrfs/@nestedvol", lines), ) - def test_rollback_for_two_snaps(self): + def test_rollback_btrfs_for_two_snaps(self): # config_list = [configs.Config('test.conf', source='/home', dest_prefix='/snaps/@home-')] snaps_list = [ snap_holder.Snapshot("/snaps/@home-20220101130000"), @@ -70,15 +71,17 @@ def test_rollback_for_two_snaps(self): snaps_list[1].metadata.source = "/root" with mock.patch.object( - snap_mechanisms._BtrfsSnapMechanism, "verify_volume", return_value=True + btrfs_mechanism.BtrfsSnapMechanism, "verify_volume", return_value=True ) as mock_is_btrfs_volume, mock.patch.object( - rollbacker, "_get_now_str", return_value="20220202220000" + rollback_btrfs, "_get_now_str", return_value="20220202220000" ), mock.patch.object( - rollbacker, + rollback_btrfs, "_get_mount_attributes_from_mtab", side_effect=_mock_get_mount_attributes_from_mtab, ): - generated = rollbacker._rollback_snapshots(snapshots=snaps_list) + generated = rollback_btrfs.rollback_gen( + source_dests=[(s.metadata.source, s.target) for s in snaps_list] + ) self.assertEqual( mock_is_btrfs_volume.call_args_list, diff --git a/src/code/mechanisms/snap_mechanisms.py b/src/code/mechanisms/snap_mechanisms.py new file mode 100644 index 0000000..a97c568 --- /dev/null +++ b/src/code/mechanisms/snap_mechanisms.py @@ -0,0 +1,35 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +import functools + +from . import abstract_mechanism +from . import btrfs_mechanism + + +# The type of snapshot is maintained in two places - +# 1. Config +# 2. Snapshot +class SnapType(enum.Enum): + UNKNOWN = "UNKNOWN" + BTRFS = "BTRFS" + + +@functools.cache +def get(snap_type: SnapType) -> abstract_mechanism.SnapMechanism: + """Singleton factory implementation.""" + if snap_type == SnapType.BTRFS: + return btrfs_mechanism.BtrfsSnapMechanism() + raise RuntimeError(f"Unknown snap_type {snap_type}") diff --git a/src/code/rollbacker.py b/src/code/rollbacker.py index d86303d..9b6fc78 100644 --- a/src/code/rollbacker.py +++ b/src/code/rollbacker.py @@ -12,148 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -import dataclasses -import datetime -import os - from . import configs -from . import os_utils -from . import snap_holder -from . import snap_mechanisms from . import snap_operator +from .mechanisms import snap_mechanisms -from typing import Iterable, Optional - -# This will be cleaned up if it exists by rollback script. -_PACMAN_LOCK_FILE = "/var/lib/pacman/db.lck" - - -@dataclasses.dataclass -class _MountAttributes: - # The device e.g. /mnt/volume/subv under which we have the subv or its parent is mounted. - device: str - subvol_name: str - - -def _get_mount_attributes( - mount_point: str, mtab_lines: Iterable[str] -) -> _MountAttributes: - # For a mount point, this denotes the longest path that was seen in /etc/mtab. - # This is therefore the point where that directory is mounted. - longest_match_to_mount_point = "" - # Which line matches the mount point. - matched_line: str = "" - for this_line in mtab_lines: - this_tokens = this_line.split() - if mount_point.startswith(this_tokens[1]): - if len(this_tokens[1]) > len(longest_match_to_mount_point): - longest_match_to_mount_point = this_tokens[1] - matched_line = this_line - - if not matched_line: - raise ValueError(f"Mount point not found: {mount_point}") - tokens = matched_line.split() - - if tokens[2] != "btrfs": - raise ValueError(f"Mount point is not btrfs: {mount_point} ({tokens[2]})") - mount_param: str = "" - for mount_param in tokens[3].split(","): - subvol_prefix = "subvol=" - if mount_param.startswith(subvol_prefix): - break - else: - raise RuntimeError(f"Could not find subvol= in {matched_line!r}") - subvol_name = mount_param[len(subvol_prefix) :] - if tokens[1] != mount_point: - nested_subvol = mount_point.removeprefix(tokens[1]) - assert nested_subvol.startswith("/") - subvol_name = nested_subvol - return _MountAttributes(device=tokens[0], subvol_name=subvol_name) - - -def _get_mount_attributes_from_mtab(mount_point: str) -> _MountAttributes: - return _get_mount_attributes(mount_point, open("/etc/mtab")) +from typing import Iterable def rollback(configs_iter: Iterable[configs.Config], path_suffix: str): - to_rollback: list[snap_holder.Snapshot] = [] + source_dests: list[tuple[str, str]] = [] for config in configs_iter: snap = snap_operator.find_target(config, path_suffix) if snap: - to_rollback.append(snap) - print("\n".join(_rollback_snapshots(to_rollback))) - - -def _get_now_str(): - return datetime.datetime.now().strftime(snap_holder.TIME_FORMAT) - - -def _rollback_snapshots(snapshots: list[snap_holder.Snapshot]) -> list[str]: - if not snapshots: - return ["# No snapshot matched to rollback."] - - sh_lines = [ - "#!/bin/bash", - "# Save this to a script, review and run as root to perform the rollback.", - "", - "set -uexo pipefail", - "", - ] - - # Mount all required volumes at root. - mount_points: dict[str, str] = {} - for snap in snapshots: - live_subvolume = _get_mount_attributes_from_mtab(snap.metadata.source) - if live_subvolume.device not in mount_points: - mount_pt = f"/run/mount/_yabsnap_internal_{len(mount_points)}" - mount_points[live_subvolume.device] = mount_pt - sh_lines += [ - f"mkdir -p {mount_pt}", - f"mount {live_subvolume.device} {mount_pt} -o subvolid=5", - ] - - now_str = _get_now_str() - - def drop_root_slash(s: str) -> str: - if s[0] != "/": - raise RuntimeError(f"Could not drop initial / from {s!r}") - if "/" in s[1:]: - raise RuntimeError(f"Unexpected / after the first one in subvolume {s!r}") - return s[1:] - - sh_lines.append("") - backup_paths: list[str] = [] - current_dir: Optional[str] = None - for snap in snapshots: - if not snap_mechanisms.get(snap_mechanisms.SnapType.BTRFS).verify_volume( - snap.metadata.source - ): - raise ValueError( - f"Mount point may no longer be a btrfs volume: {snap.metadata.source}" - ) - live_subvolume = _get_mount_attributes_from_mtab(snap.metadata.source) - backup_subvolume = _get_mount_attributes_from_mtab(os.path.dirname(snap.target)) - # The snapshot must be on the same block device as the original (target) volume. - assert backup_subvolume.device == live_subvolume.device - mount_pt = mount_points[live_subvolume.device] - if current_dir != mount_pt: - sh_lines += [f"cd {mount_pt}", ""] - current_dir = mount_pt - live_path = drop_root_slash(live_subvolume.subvol_name) - backup_path = f"{drop_root_slash(backup_subvolume.subvol_name)}/rollback_{now_str}_{live_path}" - backup_path_after_reboot = ( - f"{os.path.dirname(snap.target)}/rollback_{now_str}_{live_path}" - ) - # sh_lines.append(f'[[ -e {backup_path} ]] && btrfs subvolume delete {backup_path}') - sh_lines.append(f"mv {live_path} {backup_path}") - backup_paths.append(backup_path_after_reboot) - sh_lines.append(f"btrfs subvolume snapshot {snap.target} {live_path}") - if os.path.isfile(snap.target + _PACMAN_LOCK_FILE): - sh_lines.append(f"rm {live_path}{_PACMAN_LOCK_FILE}") - sh_lines.append("") - sh_lines += ["echo Please reboot to complete the rollback.", "echo"] - sh_lines.append("echo After reboot you may delete -") - for backup_path in backup_paths: - sh_lines.append(f'echo "# sudo btrfs subvolume delete {backup_path}"') - - return sh_lines + if snap.metadata.snap_type == snap_mechanisms.SnapType.BTRFS: + source_dests.append((snap.metadata.source, snap.target)) + else: + raise RuntimeError( + f"Cannot rollback snap of type {snap.metadata.snap_type} yet" + ) + print("\n".join(_rollback_btrfs_snapshots(source_dests))) + + +def _rollback_btrfs_snapshots(source_dests: list[tuple[str, str]]) -> list[str]: + return snap_mechanisms.get(snap_mechanisms.SnapType.BTRFS).rollback_gen( + source_dests + ) diff --git a/src/code/snap_holder.py b/src/code/snap_holder.py index c893afc..ab1f949 100644 --- a/src/code/snap_holder.py +++ b/src/code/snap_holder.py @@ -24,11 +24,8 @@ import os from . import global_flags -from . import snap_mechanisms from . import os_utils - -TIME_FORMAT = r"%Y%m%d%H%M%S" -TIME_FORMAT_LEN = 14 +from .mechanisms import snap_mechanisms @dataclasses.dataclass @@ -74,8 +71,8 @@ def __init__(self, target: str) -> None: # The full pathname of the snapshot directory. # Also exposed as a public property .target. self._target = target - timestr = self._target[-TIME_FORMAT_LEN:] - self._snaptime = datetime.datetime.strptime(timestr, TIME_FORMAT) + timestr = self._target[-global_flags.TIME_FORMAT_LEN :] + self._snaptime = datetime.datetime.strptime(timestr, global_flags.TIME_FORMAT) self._metadata_fname = target + "-meta.json" self.metadata = _Metadata.load_file(self._metadata_fname) self._dryrun = False diff --git a/src/code/snap_holder_test.py b/src/code/snap_holder_test.py index 8f0dfaa..0eedfa9 100644 --- a/src/code/snap_holder_test.py +++ b/src/code/snap_holder_test.py @@ -19,7 +19,8 @@ from unittest import mock from . import snap_holder -from . import snap_mechanisms +from .mechanisms import btrfs_mechanism +from .mechanisms import snap_mechanisms # For testing, we can access private methods. # pyright: reportPrivateUsage=false @@ -33,9 +34,11 @@ def test_create_and_delete(self): self.assertEqual(snap._snap_type, snap_mechanisms.SnapType.UNKNOWN) with mock.patch.object( - snap_mechanisms._BtrfsSnapMechanism, "verify_volume", return_value=True + btrfs_mechanism.BtrfsSnapMechanism, + "verify_volume", + return_value=True, ) as mock_verify_volume, mock.patch.object( - snap_mechanisms._BtrfsSnapMechanism, "create", return_value=None + btrfs_mechanism.BtrfsSnapMechanism, "create", return_value=None ) as mock_create: snap.create_from(snap_mechanisms.SnapType.BTRFS, "parent") mock_verify_volume.assert_called_once_with("parent") @@ -50,7 +53,7 @@ def test_create_and_delete(self): ) with mock.patch.object( - snap_mechanisms._BtrfsSnapMechanism, "delete", return_value=None + btrfs_mechanism.BtrfsSnapMechanism, "delete", return_value=None ) as mock_delete: snap2.delete() mock_delete.assert_called_once_with(snap_destination) diff --git a/src/code/snap_operator.py b/src/code/snap_operator.py index 92eb9eb..4f52b6f 100644 --- a/src/code/snap_operator.py +++ b/src/code/snap_operator.py @@ -19,10 +19,10 @@ from . import configs from . import deletion_logic +from . import global_flags from . import human_interval from . import os_utils from . import snap_holder -from . import snap_mechanisms from typing import Any, Iterable, Iterator, Optional, TypeVar @@ -43,10 +43,10 @@ def _get_old_backups(config: configs.Config) -> Iterator[snap_holder.Snapshot]: def find_target(config: configs.Config, suffix: str) -> Optional[snap_holder.Snapshot]: - if len(suffix) < snap_holder.TIME_FORMAT_LEN: + if len(suffix) < global_flags.TIME_FORMAT_LEN: raise ValueError( "Length of snapshot identifier suffix " - f"must be at least {snap_holder.TIME_FORMAT_LEN}." + f"must be at least {global_flags.TIME_FORMAT_LEN}." ) for snap in _get_old_backups(config): if snap.target.endswith(suffix): @@ -71,7 +71,7 @@ class SnapOperator: def __init__(self, config: configs.Config, now: datetime.datetime) -> None: self._config = config self._now = now - self._now_str = self._now.strftime(snap_holder.TIME_FORMAT) + self._now_str = self._now.strftime(global_flags.TIME_FORMAT) # Set to true on any create operation. self.snaps_created = False # Set to true on any delete operation. If True, may run a btrfs subv sync. diff --git a/src/code/snap_operator_test.py b/src/code/snap_operator_test.py index b99ce56..03dd9a2 100644 --- a/src/code/snap_operator_test.py +++ b/src/code/snap_operator_test.py @@ -21,10 +21,10 @@ from . import configs from . import deletion_logic -from . import os_utils from . import snap_holder -from . import snap_mechanisms from . import snap_operator +from .mechanisms import btrfs_mechanism +from .mechanisms import snap_mechanisms # For testing, we can access private methods. # pyright: reportPrivateUsage=false @@ -216,7 +216,7 @@ def setUp(self) -> None: self._exit_stack = contextlib.ExitStack() self._exit_stack.enter_context( mock.patch.object( - snap_mechanisms._BtrfsSnapMechanism, + btrfs_mechanism.BtrfsSnapMechanism, "verify_volume", lambda self, _: True, )