From 6012d5edce0affe7303d42de5c1c2dcde78b5341 Mon Sep 17 00:00:00 2001 From: muyuying1 Date: Fri, 2 Jun 2023 12:39:59 +0800 Subject: [PATCH] update cve fix and cve scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ceres/function/schema.py | 3 +- ceres/function/util.py | 12 ++++ ceres/manages/vulnerability_manage.py | 88 ++++++++++++++++++++++++--- 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/ceres/function/schema.py b/ceres/function/schema.py index 603a588..5200665 100644 --- a/ceres/function/schema.py +++ b/ceres/function/schema.py @@ -122,7 +122,8 @@ CVE_FIX_SCHEMA = { "required": ["cve_id", "hotpatch"], "properties": { "cve_id": {"type": "string", "minLength": 1}, - "hotpatch": {"enum": [True, False]} + "hotpatch": {"enum": [True, False]}, + "accepted": {"enum": [True, False]} } } diff --git a/ceres/function/util.py b/ceres/function/util.py index 42cebe2..9aa0909 100644 --- a/ceres/function/util.py +++ b/ceres/function/util.py @@ -13,6 +13,7 @@ import configparser import json import os +import subprocess from typing import Union, List, Any, Dict, NoReturn from subprocess import Popen, PIPE, STDOUT @@ -25,6 +26,8 @@ from ceres.models.custom_exception import InputError from ceres.function.schema import STRING_ARRAY from ceres.function.status import PARAM_ERROR +FAIL = 255 + def load_conf(file_path: str) -> configparser.RawConfigParser: """ @@ -94,6 +97,15 @@ def get_shell_data(command_list: List[str], key: bool = True, env=None, return res +def cmd_output(cmd): + try: + result = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result.wait() + return result.stdout.read().decode('utf-8'), result.returncode + except Exception as e: + return str(e), FAIL + + def load_gopher_config(gopher_config_path: str) -> AttrDict: """ get AttrDict from config file diff --git a/ceres/manages/vulnerability_manage.py b/ceres/manages/vulnerability_manage.py index 5a475e4..e605a17 100644 --- a/ceres/manages/vulnerability_manage.py +++ b/ceres/manages/vulnerability_manage.py @@ -28,9 +28,12 @@ from ceres.function.status import ( StatusCode, COMMAND_EXEC_ERROR ) -from ceres.function.util import get_shell_data +from ceres.function.util import get_shell_data, cmd_output from ceres.models.custom_exception import InputError +SUCCEED = 0 +FAIL = 255 + class VulnerabilityManage: def repo_set(self, data: dict) -> int: @@ -209,10 +212,11 @@ class VulnerabilityManage: # CVE-2022-3080 A-1.1-1/HP3 ACTIVED for hotpatch_fixed in hotpatch_fixed_result.strip().split("\n")[1:]: hotpatch_fixed_split = hotpatch_fixed.split(" ") - if hotpatch_fixed_split[-1] in ["ACTIVED", "ACCEPT"]: + if hotpatch_fixed_split[-1] in ["ACTIVED", "ACCEPTED"]: result_dict["fixed_cves"].append({ "cve_id": hotpatch_fixed_split[0], - "fixed_by_hp": True + "fixed_by_hp": True, + "hp_status": hotpatch_fixed_split[-1] }) return SUCCESS, result_dict @@ -262,8 +266,7 @@ class VulnerabilityManage: return SUCCESS, result_list - @staticmethod - def _fix_cve_by_dnf(cve: dict) -> Tuple[bool, str]: + def _fix_cve_by_dnf(self, cve: dict) -> Tuple[bool, str]: """ Fix CVE by dnf based on repo source named update @@ -289,7 +292,13 @@ class VulnerabilityManage: res = 'Host has no command dnf' if hotpatch: - return "Apply hot patch succeed" in res or "No hot patches marked for install" in res, res + hot_pkg = self._hotpatch_list_cve_with_cveid(cve.get('cve_id')) + if not hot_pkg: + return False, res + syscare_res = self._syscare_change_status(hot_pkg, cve.get('accepted')) + if not syscare_res: + return False, res + return "Active/Accept hot patch succeed" in res or "No hot patches marked for install" in res, res else: return "Complete" in res, res @@ -316,7 +325,8 @@ class VulnerabilityManage: """ if not self._validate_repo_source(REPO_ID_FOR_CVE_MANAGE): - return REPO_NOT_SET, [dict(cve_id=cve["cve_id"], log=StatusCode.mapping[REPO_NOT_SET]['msg'], result="fail") for cve in cves] + return REPO_NOT_SET, [dict(cve_id=cve["cve_id"], log=StatusCode.mapping[REPO_NOT_SET]['msg'], result="fail") + for cve in cves] return self._cve_rollback(cves) @@ -402,6 +412,70 @@ class VulnerabilityManage: return hotpatch_list + @staticmethod + def _hotpatch_list_cve_with_cveid(cve_id) -> str: + """ + Run the dnf hotpatch list cve command to query the hotpatch list corresponding to the cve + + Returns: + str + e.g. + """ + # Run the dnf command to query the hotpatch list,e.g + # Last metadata expiration check: + # CVE-id base-pkg/hotpatch status + # CVE-2023-1111 redis-6.2.5-1/HP001 ACTIVED + hotpatch_list_output, status_code = cmd_output(["dnf", "hotpatch", "--list", "cves", "--cve", cve_id]) + if status_code == FAIL or not re.search("base-pkg/hotpatch", hotpatch_list_output): + return None + + for hotpatch_info in [line for line in hotpatch_list_output.split(os.linesep) if line]: + if not hotpatch_info.startswith("CVE"): + continue + _, hot_pkg, _, = [info.strip() for info in hotpatch_info.split()] + if hot_pkg == "base-pkg/hotpatch": + continue + return hot_pkg + return "" + + def _syscare_operate(self, operate, patch_name=None): + """ + + """ + _, operate_code = cmd_output(["syscare", "save"]) + if operate_code == FAIL: + LOGGER.error(f"syscare save failed") + _, operate_code = cmd_output(["syscare", operate, patch_name]) + if operate_code == FAIL: + LOGGER.error(f"syscare {operate} {patch_name} failed,start roll back") + cmd_output(["syscare", "restore"]) + if operate_code == FAIL: + LOGGER.error(f"syscare restore failed,status roll back failed") + else: + LOGGER.info(f"syscare restore success") + return False + LOGGER.info(f"syscare {operate} {patch_name} success ") + return True + + def _syscare_change_status(self, hot_pkg: str, accepted=False): + """ + Apply hot patch use syscare accept + + Args: + hot_pkg: cve is rolled back + """ + res = self._syscare_operate("apply", hot_pkg) + if not res: + return False + res = self._syscare_operate("active", hot_pkg) + if not res: + return False + if accepted: + res = self._syscare_operate("accept", hot_pkg) + if not res: + return False + return True + def _hotpatch_rollback(self, base_pkg_hotpatch: str) -> Tuple[bool, str]: """ Hot patch is rolled back -- Gitee