mirror of
				https://github.com/enpaul/vault2vault.git
				synced 2025-11-04 09:26:46 +00:00 
			
		
		
		
	Update internal logic
Reduce duplication in password loading logic Reorder internal functions to improve logical grouping Improve logging clarity Add better handling of decryption errors
This commit is contained in:
		
							
								
								
									
										457
									
								
								vault2vault.py
									
									
									
									
									
								
							
							
						
						
									
										457
									
								
								vault2vault.py
									
									
									
									
									
								
							@@ -9,6 +9,7 @@ from pathlib import Path
 | 
			
		||||
from typing import Any
 | 
			
		||||
from typing import Iterable
 | 
			
		||||
from typing import List
 | 
			
		||||
from typing import Optional
 | 
			
		||||
from typing import Tuple
 | 
			
		||||
from typing import Union
 | 
			
		||||
 | 
			
		||||
@@ -16,10 +17,13 @@ import ruamel.yaml
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import ansible.constants
 | 
			
		||||
    import ansible.parsing.vault
 | 
			
		||||
    from ansible.parsing.vault import VaultSecret
 | 
			
		||||
    from ansible.parsing.vault import VaultLib
 | 
			
		||||
    from ansible.parsing.vault import AnsibleVaultError
 | 
			
		||||
except ImportError:
 | 
			
		||||
    print(
 | 
			
		||||
        "FATAL: No supported version of Ansible could be imported under the current python interpreter"
 | 
			
		||||
        "FATAL: No supported version of Ansible could be imported under the current python interpreter",
 | 
			
		||||
        file=sys.stderr,
 | 
			
		||||
    )
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
 | 
			
		||||
@@ -44,8 +48,8 @@ ruamel.yaml.add_constructor(
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def rekey(
 | 
			
		||||
    old: ansible.parsing.vault.VaultLib,
 | 
			
		||||
    new: ansible.parsing.vault.VaultLib,
 | 
			
		||||
    old: VaultLib,
 | 
			
		||||
    new: VaultLib,
 | 
			
		||||
    content: bytes,
 | 
			
		||||
) -> bytes:
 | 
			
		||||
    """Rekey vaulted content to use a new vault password
 | 
			
		||||
@@ -61,6 +65,189 @@ def rekey(
 | 
			
		||||
    return new.encrypt(old.decrypt(content))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# This whole function needs to be rebuilt from the ground up so I don't
 | 
			
		||||
# feel bad about disabling this warning
 | 
			
		||||
def _process_file(  # pylint: disable=too-many-statements
 | 
			
		||||
    path: Path,
 | 
			
		||||
    old: VaultLib,
 | 
			
		||||
    new: VaultLib,
 | 
			
		||||
    interactive: bool,
 | 
			
		||||
    backup: bool,
 | 
			
		||||
    ignore: bool,
 | 
			
		||||
) -> None:
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
    logger.debug(f"Processing file {path}")
 | 
			
		||||
 | 
			
		||||
    def _process_yaml_data(content: bytes, data: Any, ignore: bool, name: str = ""):
 | 
			
		||||
        if isinstance(data, dict):
 | 
			
		||||
            for key, value in data.items():
 | 
			
		||||
                content = _process_yaml_data(
 | 
			
		||||
                    content, value, ignore, name=f"{name}.{key}"
 | 
			
		||||
                )
 | 
			
		||||
        elif isinstance(data, list):
 | 
			
		||||
            for index, item in enumerate(data):
 | 
			
		||||
                content = _process_yaml_data(
 | 
			
		||||
                    content, item, ignore, name=f"{name}.{index}"
 | 
			
		||||
                )
 | 
			
		||||
        elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted(
 | 
			
		||||
            data.value
 | 
			
		||||
        ):
 | 
			
		||||
            logger.info(f"Identified vaulted content in {path} at {name}")
 | 
			
		||||
            confirm = (
 | 
			
		||||
                _confirm(f"Rekey vault encrypted variable {name} in file {path}?")
 | 
			
		||||
                if interactive
 | 
			
		||||
                else True
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not confirm:
 | 
			
		||||
                logger.debug(
 | 
			
		||||
                    f"User skipped vault encrypted content in {path} at {name} via interactive mode"
 | 
			
		||||
                )
 | 
			
		||||
                return content
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                new_data = rekey(old, new, data.value.encode())
 | 
			
		||||
            except AnsibleVaultError as err:
 | 
			
		||||
                msg = f"Failed to decrypt vault encrypted data in {path} at {name} with provided vault secret"
 | 
			
		||||
                if ignore:
 | 
			
		||||
                    logger.warning(msg)
 | 
			
		||||
                    return content
 | 
			
		||||
                raise RuntimeError(msg) from err
 | 
			
		||||
            content_decoded = content.decode("utf-8")
 | 
			
		||||
 | 
			
		||||
            # Ok so this next section is probably the worst possible way to do this, but I did
 | 
			
		||||
            # it this way to solve a very specific problem that would absolutely prevent people
 | 
			
		||||
            # from using this tool: round trip YAML format preservation. Namely, that it's impossible.
 | 
			
		||||
            # Ruamel gets the closest to achieving this: it can do round trip format preservation
 | 
			
		||||
            # when the starting state is in _some_ known state (this is better than competitors which
 | 
			
		||||
            # require the starting state to be in a _specific_ known state). But given how many
 | 
			
		||||
            # ways there are to write YAML- and by extension, how many opinions there are on the
 | 
			
		||||
            # "correct" way to write YAML- it is not possible to configure ruamel to account for all of
 | 
			
		||||
            # them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note:
 | 
			
		||||
            # they aren't). So there's the problem: to be useful, this tool would need to reformat every
 | 
			
		||||
            # YAML file it touched, which means nobody would use it.
 | 
			
		||||
            #
 | 
			
		||||
            # To avoid the YAML formatting problem, we need a way to replace the target content
 | 
			
		||||
            # in the raw text of the file without dumping the parsed YAML. We want to preserve
 | 
			
		||||
            # indendation, remove any extra newlines that would be left over, add any necessary
 | 
			
		||||
            # newlines without clobbering the following lines, and ideally avoid reimplementing
 | 
			
		||||
            # a YAML formatter. The answer to this problem- as the answer to so many stupid problems
 | 
			
		||||
            # seems to be- is a regex. If this is too janky for you (I know it is for me) go support
 | 
			
		||||
            # the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven
 | 
			
		||||
            #
 | 
			
		||||
            # Ok, thanks for sticking with me as I was poetic about this. The solution below...
 | 
			
		||||
            # is awful, I can admit that. But it does work, so I'll leave it up to
 | 
			
		||||
            # your judgement as to whether it's worthwhile or not. Here's how it works:
 | 
			
		||||
            #
 | 
			
		||||
            # 1. First we take the first line of the original (unmodified) vaulted content. This line
 | 
			
		||||
            #    of text has several important qualities: 1) it exists in the raw text of the file, 2)
 | 
			
		||||
            #    it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content
 | 
			
		||||
            #    will be at least one line long, but possibly no more)
 | 
			
		||||
            search_data = data.value.split("\n")[1]
 | 
			
		||||
            try:
 | 
			
		||||
                # 2. Next we use a regex to grab the full line of text from the file that includes the above
 | 
			
		||||
                #    string. This is important because the full line of text will include the leading
 | 
			
		||||
                #    whitespace, which ruamel helpfully strips out from the parsed data.
 | 
			
		||||
                # 3. Next we grab the number of leading spaces on the line using the capture group from the
 | 
			
		||||
                #    regex
 | 
			
		||||
                padding = len(
 | 
			
		||||
                    re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0]
 | 
			
		||||
                )
 | 
			
		||||
            except (TypeError, AttributeError):
 | 
			
		||||
                # This is to handle an edgecase where the vaulted content is actually a yaml anchor. For
 | 
			
		||||
                # example, if a single vaulted secret needs to be stored under multiple variable names.
 | 
			
		||||
                # In that case, the vaulted content iself will only appear once in the file, but the data
 | 
			
		||||
                # parsed by ruamel will include it twice. If we fail to get a match on the first line, then
 | 
			
		||||
                # we check whether the data is a yaml anchor and, if it is, we skip it.
 | 
			
		||||
                if data.anchor.value:
 | 
			
		||||
                    logger.debug(
 | 
			
		||||
                        f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor"
 | 
			
		||||
                    )
 | 
			
		||||
                    return content
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            # 4. Now with the leading whitespace padding, we add this same number of spaces to each line
 | 
			
		||||
            #    of *both* the old vaulted data and the new vaulted data. It's important to do both because
 | 
			
		||||
            #    we'll need to do a replacement in a moment so we need to know both what we're replacing
 | 
			
		||||
            #    and what we're replacing it with.
 | 
			
		||||
            padded_old_data = "\n".join(
 | 
			
		||||
                [f"{' ' * padding}{item}" for item in data.value.split("\n") if item]
 | 
			
		||||
            )
 | 
			
		||||
            padded_new_data = "\n".join(
 | 
			
		||||
                [
 | 
			
		||||
                    f"{' ' * padding}{item}"
 | 
			
		||||
                    for item in new_data.decode("utf-8").split("\n")
 | 
			
		||||
                    if item
 | 
			
		||||
                ]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes
 | 
			
		||||
            #    because all file operations with vault are done in bytes mode
 | 
			
		||||
            content = content_decoded.replace(padded_old_data, padded_new_data).encode()
 | 
			
		||||
        return content
 | 
			
		||||
 | 
			
		||||
    with path.open("rb") as infile:
 | 
			
		||||
        raw = infile.read()
 | 
			
		||||
 | 
			
		||||
    # The 'is_encrypted' check doesn't rely on the vault secret in the VaultLib matching the
 | 
			
		||||
    # secret the data was encrypted with, it just checks that the data is encrypted with some
 | 
			
		||||
    # vault secret. We could use either `old` or `new` for this check, it doesn't actually matter.
 | 
			
		||||
    if old.is_encrypted(raw):
 | 
			
		||||
        logger.info(f"Identified vault encrypted file: {path}")
 | 
			
		||||
 | 
			
		||||
        confirm = (
 | 
			
		||||
            _confirm(f"Rekey vault encrypted file {path}?") if interactive else True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not confirm:
 | 
			
		||||
            logger.debug(
 | 
			
		||||
                f"User skipped vault encrypted file {path} via interactive mode"
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if backup:
 | 
			
		||||
            path.rename(f"{path}.bak")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            updated = rekey(old, new, raw)
 | 
			
		||||
        except AnsibleVaultError:
 | 
			
		||||
            msg = f"Failed to decrypt vault encrypted file {path} with provided vault secret"
 | 
			
		||||
            if ignore:
 | 
			
		||||
                logger.warning(msg)
 | 
			
		||||
                return
 | 
			
		||||
            raise RuntimeError(msg) from None
 | 
			
		||||
    elif path.suffix.lower() in YAML_FILE_EXTENSIONS:
 | 
			
		||||
        logger.debug(f"Identified YAML file: {path}")
 | 
			
		||||
 | 
			
		||||
        confirm = (
 | 
			
		||||
            _confirm(f"Search YAML file {path} for vault encrypted variables?")
 | 
			
		||||
            if interactive
 | 
			
		||||
            else True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        data = yaml.load(raw)
 | 
			
		||||
 | 
			
		||||
        if not confirm:
 | 
			
		||||
            logger.debug(
 | 
			
		||||
                f"User skipped processing YAML file {path} via interactive mode"
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if backup:
 | 
			
		||||
            shutil.copy(path, f"{path}.bak")
 | 
			
		||||
 | 
			
		||||
        updated = _process_yaml_data(raw, data, ignore=ignore)
 | 
			
		||||
    else:
 | 
			
		||||
        logger.debug(f"Skipping non-vault file {path}")
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    logger.debug(f"Writing updated file contents to {path}")
 | 
			
		||||
 | 
			
		||||
    with path.open("wb") as outfile:
 | 
			
		||||
        outfile.write(updated)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_args() -> argparse.Namespace:
 | 
			
		||||
    parser = argparse.ArgumentParser(
 | 
			
		||||
        prog=__title__,
 | 
			
		||||
@@ -131,174 +318,6 @@ def _confirm(prompt: str, default: bool = True) -> bool:
 | 
			
		||||
        print("Please input one of the specified options", file=sys.stderr)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# This whole function needs to be rebuilt from the ground up so I don't
 | 
			
		||||
# feel bad about disabling this warning
 | 
			
		||||
def _process_file(  # pylint: disable=too-many-statements
 | 
			
		||||
    path: Path,
 | 
			
		||||
    old: ansible.parsing.vault.VaultLib,
 | 
			
		||||
    new: ansible.parsing.vault.VaultLib,
 | 
			
		||||
    interactive: bool,
 | 
			
		||||
    backup: bool,
 | 
			
		||||
    ignore: bool,
 | 
			
		||||
) -> None:
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
    logger.debug(f"Processing file {path}")
 | 
			
		||||
 | 
			
		||||
    def _process_yaml_data(content: bytes, data: Any, name: str = ""):
 | 
			
		||||
        if isinstance(data, dict):
 | 
			
		||||
            for key, value in data.items():
 | 
			
		||||
                content = _process_yaml_data(content, value, f"{name}.{key}")
 | 
			
		||||
        elif isinstance(data, list):
 | 
			
		||||
            for index, item in enumerate(data):
 | 
			
		||||
                content = _process_yaml_data(content, item, f"{name}.{index}")
 | 
			
		||||
        elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted(
 | 
			
		||||
            data.value
 | 
			
		||||
        ):
 | 
			
		||||
            logger.debug(f"Identified vaulted content in {path} at '{name}'")
 | 
			
		||||
            confirm = (
 | 
			
		||||
                _confirm(f"Rekey vault encrypted variable {name} in file {path}?")
 | 
			
		||||
                if interactive
 | 
			
		||||
                else True
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not confirm:
 | 
			
		||||
                logger.debug(
 | 
			
		||||
                    f"User skipped vault encrypted content in {path} at '{name}' via interactive mode"
 | 
			
		||||
                )
 | 
			
		||||
                return content
 | 
			
		||||
 | 
			
		||||
            new_data = rekey(old, new, data.value.encode())
 | 
			
		||||
            content_decoded = content.decode("utf-8")
 | 
			
		||||
 | 
			
		||||
            # Ok so this next section is probably the worst possible way to do this, but I did
 | 
			
		||||
            # it this way to solve a very specific problem that would absolutely prevent people
 | 
			
		||||
            # from using this tool: round trip YAML format preservation. Namely, that it's impossible.
 | 
			
		||||
            # Ruamel gets the closest to achieving this: it can do round trip format preservation
 | 
			
		||||
            # when the starting state is in _some_ known state (this is better than competitors which
 | 
			
		||||
            # require the starting state to be in a _specific_ known state). But given how many
 | 
			
		||||
            # ways there are to write YAML- and by extension, how many opinions there are on the
 | 
			
		||||
            # "correct" way to write YAML- it is not possible to configure ruamel to account for all of
 | 
			
		||||
            # them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note:
 | 
			
		||||
            # they aren't). So there's the problem: to be useful, this tool would need to reformat every
 | 
			
		||||
            # YAML file it touched, which means nobody would use it.
 | 
			
		||||
            #
 | 
			
		||||
            # To avoid the YAML formatting problem, we need a way to replace the target content
 | 
			
		||||
            # in the raw text of the file without dumping the parsed YAML. We want to preserve
 | 
			
		||||
            # indendation, remove any extra newlines that would be left over, add any necessary
 | 
			
		||||
            # newlines without clobbering the following lines, and ideally avoid reimplementing
 | 
			
		||||
            # a YAML formatter. The answer to this problem- as the answer to so many stupid problems
 | 
			
		||||
            # seems to be- is a regex. If this is too janky for you (I know it is for me) go support
 | 
			
		||||
            # the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven
 | 
			
		||||
            #
 | 
			
		||||
            # Ok, thanks for sticking with me as I was poetic about this. The solution below...
 | 
			
		||||
            # is awful, I can admit that. But it does work, so I'll leave it up to
 | 
			
		||||
            # your judgement as to whether it's worthwhile or not. Here's how it works:
 | 
			
		||||
            #
 | 
			
		||||
            # 1. First we take the first line of the original (unmodified) vaulted content. This line
 | 
			
		||||
            #    of text has several important qualities: 1) it exists in the raw text of the file, 2)
 | 
			
		||||
            #    it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content
 | 
			
		||||
            #    will be at least one line long, but possibly no more)
 | 
			
		||||
            search_data = data.value.split("\n")[1]
 | 
			
		||||
            try:
 | 
			
		||||
                # 2. Next we use a regex to grab the full line of text from the file that includes the above
 | 
			
		||||
                #    string. This is important because the full line of text will include the leading
 | 
			
		||||
                #    whitespace, which ruamel helpfully strips out from the parsed data.
 | 
			
		||||
                # 3. Next we grab the number of leading spaces on the line using the capture group from the
 | 
			
		||||
                #    regex
 | 
			
		||||
                padding = len(
 | 
			
		||||
                    re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0]
 | 
			
		||||
                )
 | 
			
		||||
            except (TypeError, AttributeError):
 | 
			
		||||
                # This is to handle an edgecase where
 | 
			
		||||
                if data.anchor.value:
 | 
			
		||||
                    logger.debug(
 | 
			
		||||
                        f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor"
 | 
			
		||||
                    )
 | 
			
		||||
                    return content
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            # 4. Now with the leading whitespace padding, we add this same number of spaces to each line
 | 
			
		||||
            #    of *both* the old vaulted data and the new vaulted data. It's important to do both because
 | 
			
		||||
            #    we'll need to do a replacement in a moment so we need to know both what we're replacing
 | 
			
		||||
            #    and what we're replacing it with.
 | 
			
		||||
            padded_old_data = "\n".join(
 | 
			
		||||
                [f"{' ' * padding}{item}" for item in data.value.split("\n") if item]
 | 
			
		||||
            )
 | 
			
		||||
            padded_new_data = "\n".join(
 | 
			
		||||
                [
 | 
			
		||||
                    f"{' ' * padding}{item}"
 | 
			
		||||
                    for item in new_data.decode("utf-8").split("\n")
 | 
			
		||||
                    if item
 | 
			
		||||
                ]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes
 | 
			
		||||
            #    because all file operations with vault are done in bytes mode
 | 
			
		||||
            content = content_decoded.replace(padded_old_data, padded_new_data).encode()
 | 
			
		||||
        return content
 | 
			
		||||
 | 
			
		||||
    with path.open("rb") as infile:
 | 
			
		||||
        raw = infile.read()
 | 
			
		||||
 | 
			
		||||
    # The 'is_encrypted' check doesn't rely on the vault secret in the VaultLib matching the
 | 
			
		||||
    # secret the data was encrypted with, it just checks that the data is encrypted with some
 | 
			
		||||
    # vault secret. We could use either `old` or `new` for this check, it doesn't actually matter.
 | 
			
		||||
    if old.is_encrypted(raw):
 | 
			
		||||
        logger.debug(f"Identified vault encrypted file: {path}")
 | 
			
		||||
 | 
			
		||||
        confirm = (
 | 
			
		||||
            _confirm(f"Rekey vault encrypted file {path}?") if interactive else True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not confirm:
 | 
			
		||||
            logger.debug(
 | 
			
		||||
                f"User skipped vault encrypted file {path} via interactive mode"
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if backup:
 | 
			
		||||
            path.rename(f"{path}.bak")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            updated = rekey(old, new, raw)
 | 
			
		||||
        except ansible.parsing.vault.AnsibleVaultError:
 | 
			
		||||
            msg = f"Failed to decrypt vault encrypted file {path} with provided vault secret"
 | 
			
		||||
            if ignore:
 | 
			
		||||
                logger.warning(msg)
 | 
			
		||||
                return
 | 
			
		||||
            raise RuntimeError(msg) from None
 | 
			
		||||
    elif path.suffix.lower() in YAML_FILE_EXTENSIONS:
 | 
			
		||||
        logger.debug(f"Identified YAML file: {path}")
 | 
			
		||||
 | 
			
		||||
        confirm = (
 | 
			
		||||
            _confirm(f"Search YAML file {path} for vault encrypted variables?")
 | 
			
		||||
            if interactive
 | 
			
		||||
            else True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        data = yaml.load(raw)
 | 
			
		||||
 | 
			
		||||
        if not confirm:
 | 
			
		||||
            logger.debug(
 | 
			
		||||
                f"User skipped processing YAML file {path} via interactive mode"
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if backup:
 | 
			
		||||
            shutil.copy(path, f"{path}.bak")
 | 
			
		||||
 | 
			
		||||
        updated = _process_yaml_data(raw, data)
 | 
			
		||||
    else:
 | 
			
		||||
        logger.debug(f"Skipping non-vault file {path}")
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    logger.debug(f"Writing updated file contents to {path}")
 | 
			
		||||
 | 
			
		||||
    with path.open("wb") as outfile:
 | 
			
		||||
        outfile.write(updated)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _expand_paths(paths: Iterable[Path]) -> List[Path]:
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
@@ -309,61 +328,52 @@ def _expand_paths(paths: Iterable[Path]) -> List[Path]:
 | 
			
		||||
            logger.debug(f"Including file {path}")
 | 
			
		||||
            results.append(path)
 | 
			
		||||
        elif path.is_dir():
 | 
			
		||||
            logger.debug(f"Descending into subdirectory {path}")
 | 
			
		||||
            logger.debug(f"Identifying files under {path}")
 | 
			
		||||
            results += _expand_paths(path.iterdir())
 | 
			
		||||
        else:
 | 
			
		||||
            logger.debug(f"Discarding path {path}")
 | 
			
		||||
    return results
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _read_vault_pass_file(path: Union[Path, str]) -> str:
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
    try:
 | 
			
		||||
        with Path(path).resolve().open(encoding="utf-8") as infile:
 | 
			
		||||
            return infile.read()
 | 
			
		||||
    except (FileNotFoundError, PermissionError):
 | 
			
		||||
        logger.error(
 | 
			
		||||
            f"Specified vault password file '{path}' does not exist or is unreadable"
 | 
			
		||||
        )
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
def _load_password(
 | 
			
		||||
    fpath: Optional[str], desc: str = "", confirm: bool = True
 | 
			
		||||
) -> VaultSecret:
 | 
			
		||||
    """Load a password from a file or interactively
 | 
			
		||||
 | 
			
		||||
    :param fpath: Optional path to the file containing the vault password. If not provided then
 | 
			
		||||
                  the password will be prompted for interactively.
 | 
			
		||||
    :param desc: Description text to inject into the interactive password prompt. Useful when using
 | 
			
		||||
                 this function multiple times to identify different passwords to the user.
 | 
			
		||||
    :returns: Populated vault secret object with the loaded password
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
def _load_passwords(
 | 
			
		||||
    old_file: str, new_file: str
 | 
			
		||||
) -> Tuple[ansible.parsing.vault.VaultSecret, ansible.parsing.vault.VaultSecret]:
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
    if old_file:
 | 
			
		||||
        old_vault_pass = _read_vault_pass_file(old_file)
 | 
			
		||||
        logger.info(f"Loaded old vault password from {Path(old_file).resolve()}")
 | 
			
		||||
    else:
 | 
			
		||||
        logger.debug(
 | 
			
		||||
            "No old vault password file provided, prompting for old vault password input"
 | 
			
		||||
        )
 | 
			
		||||
        old_vault_pass = getpass.getpass(
 | 
			
		||||
            prompt="Old Ansible Vault password: ", stream=sys.stderr
 | 
			
		||||
    if fpath:
 | 
			
		||||
        try:
 | 
			
		||||
            with Path(fpath).resolve().open("rb", encoding="utf-8") as infile:
 | 
			
		||||
                return VaultSecret(infile.read())
 | 
			
		||||
        except (FileNotFoundError, PermissionError) as err:
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                f"Specified vault password file '{fpath}' does not exist or is unreadable"
 | 
			
		||||
            ) from err
 | 
			
		||||
 | 
			
		||||
    logger.debug("No vault password file provided, prompting for interactive input")
 | 
			
		||||
 | 
			
		||||
    password_1 = getpass.getpass(
 | 
			
		||||
        prompt=f"Enter {desc} Ansible Vault password: ", stream=sys.stderr
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if confirm:
 | 
			
		||||
        password_2 = getpass.getpass(
 | 
			
		||||
            prompt=f"Confirm (re-enter) {desc} Ansible Vault password: ",
 | 
			
		||||
            stream=sys.stderr,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    if new_file:
 | 
			
		||||
        new_vault_pass = _read_vault_pass_file(new_file)
 | 
			
		||||
        logger.info(f"Loaded new vault password from {Path(new_file).resolve()}")
 | 
			
		||||
    else:
 | 
			
		||||
        logger.debug(
 | 
			
		||||
            "No new vault password file provided, prompting for new vault password input"
 | 
			
		||||
        )
 | 
			
		||||
        new_vault_pass = getpass.getpass(
 | 
			
		||||
            prompt="New Ansible Vault password: ", stream=sys.stderr
 | 
			
		||||
        )
 | 
			
		||||
        confirm = getpass.getpass(
 | 
			
		||||
            prompt="Confirm new Ansible Vault password: ", stream=sys.stderr
 | 
			
		||||
        )
 | 
			
		||||
        if new_vault_pass != confirm:
 | 
			
		||||
            logger.error("New vault passwords do not match")
 | 
			
		||||
            sys.exit(1)
 | 
			
		||||
        if password_1 != password_2:
 | 
			
		||||
            raise RuntimeError(f"Provided {desc} passwords do not match")
 | 
			
		||||
 | 
			
		||||
    return ansible.parsing.vault.VaultSecret(
 | 
			
		||||
        old_vault_pass.encode("utf-8")
 | 
			
		||||
    ), ansible.parsing.vault.VaultSecret(new_vault_pass.encode("utf-8"))
 | 
			
		||||
    return VaultSecret(password_1.encode("utf-8"))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
@@ -383,17 +393,26 @@ def main():
 | 
			
		||||
        sys.exit(0)
 | 
			
		||||
 | 
			
		||||
    if not args.paths:
 | 
			
		||||
        logger.warning("No path provided, nothing to do!")
 | 
			
		||||
        logger.warning("No paths provided, nothing to do!")
 | 
			
		||||
        sys.exit(0)
 | 
			
		||||
 | 
			
		||||
    old_pass, new_pass = _load_passwords(args.old_pass_file, args.new_pass_file)
 | 
			
		||||
    in_vault = ansible.parsing.vault.VaultLib([(args.vault_id, old_pass)])
 | 
			
		||||
    out_vault = ansible.parsing.vault.VaultLib([(args.vault_id, new_pass)])
 | 
			
		||||
    try:
 | 
			
		||||
        old_pass = _load_password(args.old_pass_file, desc="existing", confirm=False)
 | 
			
		||||
        new_pass = _load_password(args.new_pass_file, desc="new", confirm=True)
 | 
			
		||||
 | 
			
		||||
    logger.debug(
 | 
			
		||||
        in_vault = VaultLib([(args.vault_id, old_pass)])
 | 
			
		||||
        out_vault = VaultLib([(args.vault_id, new_pass)])
 | 
			
		||||
    except RuntimeError as err:
 | 
			
		||||
        logger.error(str(err))
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        sys.exit(130)
 | 
			
		||||
 | 
			
		||||
    logger.info(
 | 
			
		||||
        f"Identifying all files under {len(args.paths)} input paths: {', '.join(args.paths)}"
 | 
			
		||||
    )
 | 
			
		||||
    files = _expand_paths(args.paths)
 | 
			
		||||
    logger.info(f"Identified {len(files)} files for processing")
 | 
			
		||||
 | 
			
		||||
    for filepath in files:
 | 
			
		||||
        _process_file(
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user