diff options
author | CoprDistGit <infra@openeuler.org> | 2025-03-21 17:37:41 +0000 |
---|---|---|
committer | CoprDistGit <infra@openeuler.org> | 2025-03-21 17:37:41 +0000 |
commit | bed3ab1ed2c9a92b20bae1d0f11f17d9051bfff9 (patch) | |
tree | 76779ef5ca00cab259eb516c4c44eef0b3048cc9 /euler_msgbus.patch | |
parent | 2132b310609fdedcd5427676a98f1174869b3a82 (diff) |
automatic import of copr-backend
Diffstat (limited to 'euler_msgbus.patch')
-rw-r--r-- | euler_msgbus.patch | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/euler_msgbus.patch b/euler_msgbus.patch new file mode 100644 index 0000000..5eca90a --- /dev/null +++ b/euler_msgbus.patch @@ -0,0 +1,83 @@ +diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py +new file mode 100644 +index 000000000..1395249be +--- /dev/null ++++ b/copr_backend/euler_msgbus.py +@@ -0,0 +1,77 @@ ++import socket ++ ++from kafka import KafkaProducer ++import os ++import datetime ++import uuid ++import json ++import ssl ++ ++ ++def message_from_worker_job(topic, job, who, ip, pid): ++ message = {} ++ content = { ++ 'user': job.submitter, ++ 'copr': job.project_name, ++ 'owner': job.project_owner, ++ 'pkg': job.package_name, ++ 'build': job.build_id, ++ 'chroot': job.chroot, ++ 'version': job.package_version, ++ 'status': job.status, ++ } ++ content.update({'ip': ip, 'who': who, 'pid': pid}) ++ message_types = { ++ 'build.start': { ++ 'what': "build start: user:{user} copr:{copr}" \ ++ " pkg:{pkg} build:{build} ip:{ip} pid:{pid}", ++ }, ++ 'chroot.start': { ++ 'what': "chroot start: chroot:{chroot} user:{user}" \ ++ " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}", ++ }, ++ 'build.end': { ++ 'what': "build end: user:{user} copr:{copr} build:{build}" \ ++ " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}", ++ }, ++ } ++ content['what'] = message_types[topic]['what'].format(**content) ++ message['body'] = content ++ now = datetime.datetime.now().isoformat() ++ headers = { ++ "openEuler_messaging_schema": "eur." + topic, ++ "sent-at": now, ++ } ++ message['headers'] = headers ++ message['id'] = str(uuid.uuid4()) ++ message['topic'] = "org.openEuler.prod.eur." + topic ++ return message ++ ++ ++class MessageSender: ++ def __init__(self, backend_opts, name, log): ++ self.log = log ++ self.name = name ++ self.pid = os.getpid() ++ self.opts = backend_opts ++ ++ def announce(self, topic, job, host): ++ msg = message_from_worker_job(topic, job, self.name, host, self.pid) ++ self.send_message(msg) ++ ++ def send_message(self, msg): ++ """ Send message to kafka """ ++ ++ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ++ context.verify_mode = ssl.CERT_REQUIRED ++ context.load_verify_locations("etc/copr/kafka.crt") ++ producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers, ++ api_version=(3, 5, 0), ++ sasl_mechanism="PLAIN", ++ ssl_context=context, ++ security_protocol='SASL_SSL', ++ sasl_plain_username=self.opts.message.user_name, ++ sasl_plain_password=self.opts.message.password, ++ value_serializer=lambda v: json.dumps(v).encode('utf-8')) ++ producer.send(str(self.opts.message.topic), msg) ++ producer.flush() |