diff --git a/copr_backend/constants.py b/backend/copr_backend/constants.py index a529be28a..83bcb8fbe 100644 --- a/copr_backend/constants.py +++ b/copr_backend/constants.py @@ -13,6 +13,7 @@ DEF_BUILD_USER = "mockbuilder" DEF_DESTDIR = os.getcwd() DEF_MACROS = {} DEF_BUILDROOT_PKGS = "" +DEF_SIGN_BACKEND = "obs-signd" DEF_CONSECUTIVE_FAILURE_THRESHOLD = 10 diff --git a/copr_backend/exceptions.py b/backend/copr_backend/exceptions.py index 21afb14c6..0865fcc8c 100644 --- a/copr_backend/exceptions.py +++ b/copr_backend/exceptions.py @@ -48,8 +48,8 @@ class CoprKeygenRequestError(Exception): def __str__(self): out = super(CoprKeygenRequestError, self).__str__() - out += "\nrequest to copr-keygen: {}\n".format(self.request) - if self.response: + out += "\nrequest to key_backend: {}\n".format(self.request) + if self.response is not None: out += "status code: {}\n" "response content: {}\n" \ .format(self.response.status_code, self.response.content) return out diff --git a/copr_backend/helpers.py b/backend/copr_backend/helpers.py index 75fa5e62d..291228912 100644 --- a/copr_backend/helpers.py +++ b/copr_backend/helpers.py @@ -31,7 +31,7 @@ from munch import Munch from copr_common.redis_helpers import get_redis_connection from copr.v3 import Client from copr_backend.constants import DEF_BUILD_USER, DEF_BUILD_TIMEOUT, DEF_CONSECUTIVE_FAILURE_THRESHOLD, \ - CONSECUTIVE_FAILURE_REDIS_KEY, default_log_format + CONSECUTIVE_FAILURE_REDIS_KEY, default_log_format, DEF_SIGN_BACKEND from copr_backend.exceptions import CoprBackendError from . import constants @@ -309,6 +309,18 @@ class BackendConfigReader(object): opts.sign_domain = _get_conf( cp, "backend", "sign_domain", DOMAIN) + opts.sign_backend = _get_conf( + cp, "backend", "sign_backend", DEF_SIGN_BACKEND) + + opts.signatrust_host = _get_conf( + cp, "backend", "signatrust_host", "") + + opts.signatrust_token = _get_conf( + cp, "backend", "signatrust_token", "") + + opts.signatrust_key_expire = _get_conf( + cp, "backend", "signatrust_key_expire", 1825, mode="int") + opts.build_groups = [] for group_id in range(opts.build_groups_count): archs = _get_conf(cp, "backend", diff --git a/copr_backend/sign.py b/backend/copr_backend/sign.py index e21653e78..599d209ee 100644 --- a/copr_backend/sign.py +++ b/copr_backend/sign.py @@ -7,17 +7,16 @@ Wrapper for /bin/sign from obs-sign package from subprocess import Popen, PIPE, SubprocessError import os import time +import functools from packaging import version from copr_common.request import SafeRequest -from copr_backend.helpers import get_redis_logger +from copr_backend.helpers import get_redis_logger, get_backend_opts from .exceptions import CoprSignError, CoprSignNoKeyError, \ CoprKeygenRequestError -SIGN_BINARY = "/bin/sign" - def create_gpg_email(username, projectname, domain): """ Creates canonical name_email to identify gpg key @@ -25,77 +24,17 @@ def create_gpg_email(username, projectname, domain): return "{}#{}@copr.{}".format(username, projectname, domain) - def call_sign_bin(cmd, log): - """ - Call /bin/sign and return (rc, stdout, stderr). Re-try the call - automatically upon certain failures (if that makes sense). - """ - cmd_pretty = ' '.join(cmd) - for attempt in [1, 2, 3]: - log.info("Calling '%s' (attempt #%s)", cmd_pretty, attempt) - try: - handle = Popen(cmd, stdout=PIPE, stderr=PIPE, encoding="utf-8") - stdout, stderr = handle.communicate() - except (SubprocessError, OSError) as err: - new_err = CoprSignError("Failed to invoke '{}'".format(cmd_pretty)) - raise new_err from err - - if handle.returncode != 0: - log.warning("Command '%s' failed with: %s", - cmd_pretty, stderr.rstrip()) - sleeptime = 20 - log.warning("Going to sleep %ss and re-try.", sleeptime) - time.sleep(sleeptime) - continue - break - return handle.returncode, stdout, stderr - + signer = new_signer() + return signer.call_sign_bin(cmd, log) def get_pubkey(username, projectname, log, sign_domain, outfile=None): - """ - Retrieves public key for user/project from signer host. - - :param sign_domain: the domain name of the sign key - :param outfile: [optional] file to write obtained key - :return: public keys - - :raises CoprSignError: failed to retrieve key, see error message - :raises CoprSignNoKeyError: if there are no such user in keyring - """ - usermail = create_gpg_email(username, projectname, sign_domain) - cmd = [SIGN_BINARY, "-u", usermail, "-p"] - - returncode, stdout, stderr = call_sign_bin(cmd, log) - if returncode != 0: - if "unknown key:" in stderr: - raise CoprSignNoKeyError( - "There are no gpg keys for user {} in keyring".format(username), - return_code=returncode, - cmd=cmd, stdout=stdout, stderr=stderr) - raise CoprSignError( - msg="Failed to get user pubkey\n" - "sign stdout: {}\n sign stderr: {}\n".format(stdout, stderr), - return_code=returncode, - cmd=cmd, stdout=stdout, stderr=stderr) - - if outfile: - with open(outfile, "w") as handle: - handle.write(stdout) - - return stdout - + signer = new_signer() + return signer.get_pubkey(username, projectname, log, sign_domain, outfile) def _sign_one(path, email, hashtype, log): - cmd = [SIGN_BINARY, "-4", "-h", hashtype, "-u", email, "-r", path] - returncode, stdout, stderr = call_sign_bin(cmd, log) - if returncode != 0: - raise CoprSignError( - msg="Failed to sign {} by user {}".format(path, email), - return_code=returncode, - cmd=cmd, stdout=stdout, stderr=stderr) - return stdout, stderr - + signer = new_signer() + return signer._sign_one(path, email, hashtype, log) def gpg_hashtype_for_chroot(chroot, opts): """ @@ -135,134 +74,455 @@ def gpg_hashtype_for_chroot(chroot, opts): # fallback to sha256 return "sha256" - def sign_rpms_in_dir(username, projectname, path, chroot, opts, log): - """ - Signs rpms using obs-signd. + signer = new_signer() + return signer.sign_rpms_in_dir(username, projectname, path, chroot, opts, log) - If some some pkgs failed to sign, entire build marked as failed, - but we continue to try sign other pkgs. - - :param username: copr username - :param projectname: copr projectname - :param path: directory with rpms to be signed - :param chroot: chroot name where we sign packages, affects the hash type - :param Munch opts: backend config +def create_user_keys(username, projectname, opts, try_indefinitely=False): + signer = new_signer() + return signer.create_user_keys(username, projectname, opts, try_indefinitely) - :type log: logging.Logger +def _unsign_one(path): + signer = new_signer() + return signer._unsign_one(path) - :raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package - """ +def unsign_rpms_in_dir(path, opts, log): + signer = new_signer() + return signer.unsign_rpms_in_dir(path, opts, log) + + +# a sign interface +class Signer(object): + @classmethod + def get_pubkey(cls, username, projectname, log, sign_domain, outfile=None): + """get public key""" + raise NotImplementedError + + @classmethod + def sign_rpms_in_dir(cls, username, projectname, path, chroot, opts, log): + """batch sign rpms""" + raise NotImplementedError + + @classmethod + def create_user_keys(username, projectname, opts, try_indefinitely=False): + """create user key pair""" + raise NotImplementedError + + @classmethod + def _sign_one(cls, path, email, hashtype, log): + """sign one rpm""" + raise NotImplementedError + + @classmethod + def _unsign_one(cls, path): + # Requires rpm-sign package + cmd = ["/usr/bin/rpm", "--delsign", path] + handle = Popen(cmd, stdout=PIPE, stderr=PIPE, encoding="utf-8") + stdout, stderr = handle.communicate() - rpm_list = [ - os.path.join(path, filename) - for filename in os.listdir(path) - if filename.endswith(".rpm") - ] + if handle.returncode != 0: + err = CoprSignError( + msg="Failed to unsign {}".format(path), + return_code=handle.returncode, + cmd=cmd, stdout=stdout, stderr=stderr) - if not rpm_list: - return + raise err + + return stdout, stderr + + @classmethod + def call_sign_bin(cls, cmd, log): + """ + Call sign_cmd and return (rc, stdout, stderr). Re-try the call + automatically upon certain failures (if that makes sense). + """ + cmd_pretty = ' '.join(cmd) + for attempt in [1, 2, 3]: + log.info("Calling '%s' (attempt #%s)", cmd_pretty, attempt) + try: + handle = Popen(cmd, stdout=PIPE, stderr=PIPE, encoding="utf-8") + stdout, stderr = handle.communicate() + except (SubprocessError, OSError) as err: + new_err = CoprSignError("Failed to invoke '{}'".format(cmd_pretty)) + raise new_err from err + + if handle.returncode != 0: + log.warning("Command '%s' failed with: %s", + cmd_pretty, stderr.rstrip()) + sleeptime = 20 + log.warning("Going to sleep %ss and re-try.", sleeptime) + time.sleep(sleeptime) + continue + break + return handle.returncode, stdout, stderr + + @classmethod + def sign_rpms_in_dir(cls, username, projectname, path, chroot, opts, log): + """ + Signs rpms using obs-signd. + + If some some pkgs failed to sign, entire build marked as failed, + but we continue to try sign other pkgs. + + :param username: copr username + :param projectname: copr projectname + :param path: directory with rpms to be signed + :param chroot: chroot name where we sign packages, affects the hash type + :param Munch opts: backend config + + :type log: logging.Logger + + :raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package + """ + rpm_list = [ + os.path.join(path, filename) + for filename in os.listdir(path) + if filename.endswith(".rpm") + ] - hashtype = gpg_hashtype_for_chroot(chroot, opts) + if not rpm_list: + return - try: - get_pubkey(username, projectname, log, opts.sign_domain) - except CoprSignNoKeyError: - create_user_keys(username, projectname, opts, try_indefinitely=True) + hashtype = gpg_hashtype_for_chroot(chroot, opts) - errors = [] # tuples (rpm_filepath, exception) - for rpm in rpm_list: try: - _sign_one(rpm, create_gpg_email(username, projectname, opts.sign_domain), - hashtype, log) - log.info("signed rpm: %s", rpm) - - except CoprSignError as e: - log.exception("failed to sign rpm: %s", rpm) - errors.append((rpm, e)) - - if errors: - raise CoprSignError("Rpm sign failed, affected rpms: {}" - .format([err[0] for err in errors])) - - -def create_user_keys(username, projectname, opts, try_indefinitely=False): - """ - Generate a new key-pair at sign host + cls.get_pubkey(username, projectname, log, opts.sign_domain) + except CoprSignNoKeyError: + cls.create_user_keys(username, projectname, opts, try_indefinitely=True) + + errors = [] # tuples (rpm_filepath, exception) + for rpm in rpm_list: + try: + cls._sign_one(rpm, create_gpg_email(username, projectname, opts.sign_domain), + hashtype, log) + log.info("signed rpm: %s", rpm) + + except CoprSignError as e: + log.exception("failed to sign rpm: %s", rpm) + errors.append((rpm, e)) + + if errors: + raise CoprSignError("Rpm sign failed, affected rpms: {}" + .format([err[0] for err in errors])) + + @classmethod + def unsign_rpms_in_dir(cls, path, opts, log): + """ + :param path: directory with rpms to be signed + :param Munch opts: backend config + :type log: logging.Logger + :raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package + """ + rpm_list = [ + os.path.join(path, filename) + for filename in os.listdir(path) + if filename.endswith(".rpm") + ] + + if not rpm_list: + return + + errors = [] # tuples (rpm_filepath, exception) + for rpm in rpm_list: + try: + cls._unsign_one(rpm) + log.info("unsigned rpm: %s", rpm) + + except CoprSignError as e: + log.exception("failed to unsign rpm: %s", rpm) + errors.append((rpm, e)) + + if errors: + raise CoprSignError("Rpm unsign failed, affected rpms: {}" + .format([err[0] for err in errors])) + +@functools.lru_cache(maxsize=1) +def new_signer(): + opts = get_backend_opts() + if hasattr(opts, "sign_backend") and opts.sign_backend == "signatrust": + Signatrust.signatrust_token = opts.signatrust_token + Signatrust.signatrust_host = opts.signatrust_host + return Signatrust + else: # keep obs-signd as default backend + return ObsSign + +class ObsSign(Signer): + sign_cmd = "/bin/sign" + + @classmethod + def get_pubkey(cls, username, projectname, log, sign_domain, outfile=None): + """ + Retrieves public key for user/project from signer host. + + :param sign_domain: the domain name of the sign key + :param outfile: [optional] file to write obtained key + :return: public keys + + :raises CoprSignError: failed to retrieve key, see error message + :raises CoprSignNoKeyError: if there are no such user in keyring + """ + usermail = create_gpg_email(username, projectname, sign_domain) + cmd = [cls.sign_cmd, "-u", usermail, "-p"] + + returncode, stdout, stderr = cls.call_sign_bin(cmd, log) + if returncode != 0: + if "unknown key:" in stderr: + raise CoprSignNoKeyError( + "There are no gpg keys for user {} in keyring".format(username), + return_code=returncode, + cmd=cmd, stdout=stdout, stderr=stderr) + raise CoprSignError( + msg="Failed to get user pubkey\n" + "sign stdout: {}\n sign stderr: {}\n".format(stdout, stderr), + return_code=returncode, + cmd=cmd, stdout=stdout, stderr=stderr) - :param username: - :param projectname: - :param opts: backend config + if outfile: + with open(outfile, "w") as handle: + handle.write(stdout) - :return: None - """ - data = { - "name_real": "{}_{}".format(username, projectname), - "name_email": create_gpg_email(username, projectname, opts.sign_domain) - } - - log = get_redis_logger(opts, "sign", "actions") - keygen_url = "http://{}/gen_key".format(opts.keygen_host) - query = dict(url=keygen_url, data=data, method="post") - try: - request = SafeRequest(log=log, try_indefinitely=try_indefinitely) - response = request.send(**query) - except Exception as e: - raise CoprKeygenRequestError( - msg="Failed to create key-pair for user: {}," - " project:{} with error: {}" - .format(username, projectname, e), request=query) - - if response.status_code >= 400: - raise CoprKeygenRequestError( - msg="Failed to create key-pair for user: {}, project:{}, status_code: {}, response: {}" - .format(username, projectname, response.status_code, response.text), - request=query, response=response) + return stdout + @classmethod + def _sign_one(cls, path, email, hashtype, log): + cmd = [cls.sign_cmd, "-4", "-h", hashtype, "-u", email, "-r", path] + returncode, stdout, stderr = cls.call_sign_bin(cmd, log) + if returncode != 0: + raise CoprSignError( + msg="Failed to sign {} by user {}".format(path, email), + return_code=returncode, + cmd=cmd, stdout=stdout, stderr=stderr) + return stdout, stderr + + @classmethod + def create_user_keys(cls, username, projectname, opts, try_indefinitely=False): + """ + Generate a new key-pair at sign host + + :param username: + :param projectname: + :param opts: backend config + + :return: None + """ + data = { + "name_real": "{}_{}".format(username, projectname), + "name_email": create_gpg_email(username, projectname, opts.sign_domain) + } + + log = get_redis_logger(opts, "sign", "actions") + keygen_url = "http://{}/gen_key".format(opts.keygen_host) + query = dict(url=keygen_url, data=data, method="post") + try: + request = SafeRequest(log=log, try_indefinitely=try_indefinitely) + response = request.send(**query) + except Exception as e: + raise CoprKeygenRequestError( + msg="Failed to create key-pair for user: {}," + " project:{} with error: {}" + .format(username, projectname, e), request=query) + + if response.status_code >= 400: + raise CoprKeygenRequestError( + msg="Failed to create key-pair for user: {}, project:{}, status_code: {}, response: {}" + .format(username, projectname, response.status_code, response.text), + request=query, response=response) + +class Signatrust(Signer): + sign_cmd = "/usr/local/bin/client" + prefix = "" + signatrust_host = "" + signatrust_token = "" + + @classmethod + def get_prefix(cls): + """ + Get prefix of the user + + As in copr, we set key attr with visibility=private + These keys' name were prefixed by user's email like: + tommylikehu@gmail.com:mywaaagh_admin_test + """ + headers = { + "accept": "application/json", + "Authorization": cls.signatrust_token + } + try: + r = requests.get("{}/api/v1/users/info".format(cls.signatrust_host), headers=headers).json() + cls.prefix = r.get("email") + except Exception as e: + raise CoprKeygenRequestError( + msg="Failed to get userinfo", request="/api/v1/users/info") + + @classmethod + def get_key_name(cls, username, projectname, prefix=True): + """ + copr key_name rule in signatrust: + :_ + """ + if not cls.prefix: + cls.get_prefix() + if prefix: + return "{}:{}_{}".format(cls.prefix, username, projectname) + return "{}_{}".format(username, projectname) + + @classmethod + def get_pubkey(cls, username, projectname, log, sign_domain, outfile=None): + """ + get public key + + https://domain:port/api/v1/keys//public_key + """ + if not cls.prefix: + cls.get_prefix() + + headers = { + "accept": "application/json", + "Authorization": cls.signatrust_token + } + + key_name = cls.get_key_name(username, projectname) + url = "{}/api/v1/keys/{}/public_key".format(cls.signatrust_host, key_name) + try: + r = requests.get(url, headers=headers) + except Exception as e: + raise CoprKeygenRequestError( + msg="Failed to get public_key", request="/api/v1/keys/{}/public_key".format(key_name), response=r) -def _unsign_one(path): - # Requires rpm-sign package - cmd = ["/usr/bin/rpm", "--delsign", path] - handle = Popen(cmd, stdout=PIPE, stderr=PIPE, encoding="utf-8") - stdout, stderr = handle.communicate() + if r.status_code == 404: + raise CoprSignNoKeyError( + "There are no gpg keys for user {} in keyring".format(username), + return_code=r.status_code, + cmd="GET {}".format(url), stdout="", stderr="") + elif r.status_code >= 400: + raise CoprKeygenRequestError( + msg="Failed to get user pubkey\n", + request="/api/v1/keys/{}/public_key".format(key_name), response=r) + + if outfile: + with open(outfile, "wb") as handle: + handle.write(r.content) + + return r.content + + @classmethod + def sign_rpms_in_dir(cls, username, projectname, path, chroot, opts, log): + """batch sign rpms""" + if not cls.prefix: + cls.get_prefix() + + if not cls._key_existed(username, projectname, opts): + cls.create_user_keys(username, projectname, opts) + + # when we migrate copr keys into signatrust + # we fellow the rules: + # key_name = :_ + cmd = [cls.sign_cmd, "-c", "/etc/signatrust.toml", "add", "--file-type", "rpm", "--key-type", "pgp", "--key-name", cls.get_key_name(username, projectname), path] + + returncode, stdout, stderr = cls.call_sign_bin(cmd, log) + if returncode != 0: + raise CoprSignError( + msg="Failed to sign rpms\n" + "sign stdout: {}\n sign stderr: {}\n".format(stdout, stderr), + return_code=returncode, + cmd=cmd, stdout=stdout, stderr=stderr) - if handle.returncode != 0: - err = CoprSignError( - msg="Failed to unsign {}".format(path), - return_code=handle.returncode, - cmd=cmd, stdout=stdout, stderr=stderr) + if stderr: + # signatrust client will print error message for one rpm per line + failed_list = re.findall(r"failed to sign file (.*.rpm) due to error", stderr) + + if failed_list: + failed_rpms = " ".join(map(os.path.basename, failed_list)) + log.exception("failed to sign rpm: %s".format(failed_rpms)) + raise CoprSignError("Rpm sign failed, affected rpms: {}" + .format(failed_rpms)) + + @classmethod + def unsign_rpms_in_dir(cls, path, opts, log): + """ + signatrust will replace the signature infomation defaultly, + so there is no need to unsign, just return + """ + return - raise err + @classmethod + def _key_existed(cls, username, projectname, opts): + """ + check keyname existence - return stdout, stderr + HEAD /api/v1/keys/ + """ + if not cls.prefix: + cls.get_prefix() + key_name = cls.get_key_name(username, projectname, prefix=False) -def unsign_rpms_in_dir(path, opts, log): - """ - :param path: directory with rpms to be signed - :param Munch opts: backend config - :type log: logging.Logger - :raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package - """ - rpm_list = [ - os.path.join(path, filename) - for filename in os.listdir(path) - if filename.endswith(".rpm") - ] + query = { + "name": key_name, + "visibility": "private" + } - if not rpm_list: - return + headers = { + "accept": "application/json", + "Authorization": cls.signatrust_token + } - errors = [] # tuples (rpm_filepath, exception) - for rpm in rpm_list: try: - _unsign_one(rpm) - log.info("unsigned rpm: %s", rpm) - - except CoprSignError as e: - log.exception("failed to unsign rpm: %s", rpm) - errors.append((rpm, e)) + res = requests.head("{}/api/v1/keys/name_identical".format(opts.signatrust_host), headers=headers, params=query) + except Exception as e: + raise CoprKeygenRequestError( + msg="Failed to check key existence", request="/api/v1/keys/name_identical", response=res) + + # signatrust return 200 means key name available + if res.status_code == 200: + return False + # signatrust return 409 means key name redundant + elif res.status_code == 409: + return True + else: + raise CoprKeygenRequestError( + msg="Failed to check key existence", request="/api/v1/keys/name_identical", response=res) + + @classmethod + def create_user_keys(cls, username, projectname, opts, try_indefinitely=False): + """ + create user key pair + + POST /api/v1/keys/ + """ + if not cls.prefix: + cls.get_prefix() + + if cls._key_existed(username, projectname, opts): + return + time_format = "%Y-%m-%d %H:%M:%S%z" + expire = datetime.now(datetime.now(timezone.utc).astimezone().tzinfo) + timedelta(days=opts.signatrust_key_expire) + data = { + "name": cls.get_key_name(username, projectname, prefix=False), + "description": "gpg key to sign rpms in {}/{}".format(username, projectname), + "key_type": "pgp", + "visibility": "private", # we use private key type for those key will not be seen by other users + "attributes": { + "digest_algorithm": "sha2_256", + "key_type": "rsa", + "key_length": "2048", + "email": "{}".format(create_gpg_email(username, projectname, opts.sign_domain)), + }, + "expire_at": datetime.strftime(expire, time_format) + } + + headers = { + "content-type": "application/json", + "accept": "application/json", + "Authorization": opts.signatrust_token + } - if errors: - raise CoprSignError("Rpm unsign failed, affected rpms: {}" - .format([err[0] for err in errors])) + try: + res = requests.post("{}/api/v1/keys/".format(opts.signatrust_host), headers=headers, json=data) + except Exception as e: + raise CoprKeygenRequestError( + msg="Failed to get userinfo", request="/api/v1/keys/", response=res) + + if res.status_code >= 400: + raise CoprKeygenRequestError( + msg="Failed to create user payload: {}".format(data), request="/api/v1/keys/", response=res) diff --git a/tests/test_background_worker_build.py b/backend/tests/test_background_worker_build.py index 1dfe19563..ba1e5aeab 100644 --- a/tests/test_background_worker_build.py +++ b/tests/test_background_worker_build.py @@ -88,6 +88,12 @@ def _fake_host(): host.release = mock.MagicMock() return host +@pytest.fixture(autouse=True) +def get_opts(): + with pytest.MonkeyPatch.context() as mp: + mp.setattr("copr_backend.sign.get_backend_opts", lambda: None) + yield mp + @pytest.fixture def f_build_something(): """ @@ -324,8 +330,8 @@ def test_full_srpm_build(f_build_srpm): "00855954/hello-2.8-1.src.rpm") -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") -@mock.patch("copr_backend.sign._sign_one") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign._sign_one") def test_build_and_sign(mc_sign_one, f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -351,7 +357,7 @@ def test_build_and_sign(mc_sign_one, f_build_rpm_sign_on, caplog): _, level, _ = record assert level <= logging.INFO -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") @mock.patch("copr_backend.sign._sign_one") @_patch_bwbuild_object("sign_rpms_in_dir") def test_sign_built_packages_exception(mc_sign_rpms, mc_sign_one, @@ -452,7 +458,7 @@ def test_invalid_job_info(f_build_rpm_case, caplog): @mock.patch("copr_backend.vm_alloc.time.sleep", mock.MagicMock()) @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_build_on_vm_allocation(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -513,7 +519,7 @@ class _CancelFunction(): time.sleep(0.25) @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_build_on_tail_log_no_ssh(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -542,7 +548,7 @@ def test_cancel_build_on_tail_log_no_ssh(f_build_rpm_sign_on, caplog): assert "canceled stdout" in log @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_before_vm(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -558,7 +564,7 @@ def test_cancel_before_vm(f_build_rpm_sign_on, caplog): assert_logs_dont_exist(["Releasing VM back to pool"], caplog) @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_before_start(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -578,7 +584,7 @@ def test_cancel_before_start(f_build_rpm_sign_on, caplog): ], caplog) @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_build_retry(f_build_rpm_sign_on): config = f_build_rpm_sign_on worker = config.bw @@ -650,7 +656,7 @@ def test_fe_failed_start(f_build_rpm_sign_on, caplog): assert worker.redis_get_worker_flag("status") == "done" @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_script_failure(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw @@ -672,7 +678,7 @@ def test_cancel_script_failure(f_build_rpm_sign_on, caplog): ], caplog) @_patch_bwbuild_object("CANCEL_CHECK_PERIOD", 0.5) -@mock.patch("copr_backend.sign.SIGN_BINARY", "tests/fake-bin-sign") +@mock.patch("copr_backend.sign.ObsSign.sign_cmd", "tests/fake-bin-sign") def test_cancel_build_during_log_download(f_build_rpm_sign_on, caplog): config = f_build_rpm_sign_on worker = config.bw diff --git a/tests/test_sign.py b/backend/tests/test_sign.py index bf2dd1b8c..13a7a2ebc 100644 --- a/tests/test_sign.py +++ b/tests/test_sign.py @@ -12,13 +12,16 @@ from copr_backend.exceptions import CoprSignError, CoprSignNoKeyError, CoprKeyge from copr_backend.sign import ( get_pubkey, _sign_one, sign_rpms_in_dir, create_user_keys, gpg_hashtype_for_chroot, - call_sign_bin, + call_sign_bin ) +from copr_backend import helpers STDOUT = "stdout" STDERR = "stderr" + + class TestSign(object): # pylint: disable=too-many-public-methods @@ -38,6 +41,12 @@ class TestSign(object): if self.tmp_dir_path: shutil.rmtree(self.tmp_dir_path) + @pytest.fixture(autouse=True) + def get_opts(self): + with pytest.MonkeyPatch.context() as mp: + mp.setattr("copr_backend.sign.get_backend_opts", lambda: None) + yield mp + @pytest.fixture def tmp_dir(self): subdir = "test_createrepo_{}".format(time.time()) @@ -54,7 +63,7 @@ class TestSign(object): handle.write("1") @mock.patch("copr_backend.sign.Popen") - def test_get_pubkey(self, mc_popen): + def test_get_pubkey(self, mc_popen, get_opts): mc_handle = MagicMock() mc_handle.communicate.return_value = (STDOUT, STDERR) mc_handle.returncode = 0 @@ -198,9 +207,9 @@ class TestSign(object): create_user_keys(self.username, self.projectname, self.opts) assert "Failed to create key-pair for user: foo, project:bar" in str(err) - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_nothing(self, mc_gp, mc_cuk, mc_so, tmp_dir): # empty target dir doesn't produce error @@ -212,9 +221,9 @@ class TestSign(object): assert not mc_cuk.called assert not mc_so.called - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_ok(self, mc_gp, mc_cuk, mc_so, tmp_dir, tmp_files): @@ -234,9 +243,9 @@ class TestSign(object): assert os.path.join(self.tmp_dir_path, name) in pathes assert len(pathes) == count - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_error_on_pubkey( self, mc_gp, mc_cuk, mc_so, tmp_dir, tmp_files): @@ -250,9 +259,9 @@ class TestSign(object): assert not mc_cuk.called assert not mc_so.called - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_no_pub_key( self, mc_gp, mc_cuk, mc_so, tmp_dir, tmp_files): @@ -266,9 +275,9 @@ class TestSign(object): assert mc_cuk.called assert mc_so.called - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_sign_error_one( self, mc_gp, mc_cuk, mc_so, tmp_dir, tmp_files): @@ -285,9 +294,9 @@ class TestSign(object): assert mc_so.called - @mock.patch("copr_backend.sign._sign_one") - @mock.patch("copr_backend.sign.create_user_keys") - @mock.patch("copr_backend.sign.get_pubkey") + @mock.patch("copr_backend.sign.ObsSign._sign_one") + @mock.patch("copr_backend.sign.ObsSign.create_user_keys") + @mock.patch("copr_backend.sign.ObsSign.get_pubkey") def test_sign_rpms_id_dir_sign_error_all( self, mc_gp, mc_cuk, mc_so, tmp_dir, tmp_files):