diff options
Diffstat (limited to 'euler_msgbus.patch')
-rw-r--r-- | euler_msgbus.patch | 83 |
1 files changed, 0 insertions, 83 deletions
diff --git a/euler_msgbus.patch b/euler_msgbus.patch deleted file mode 100644 index 5eca90a..0000000 --- a/euler_msgbus.patch +++ /dev/null @@ -1,83 +0,0 @@ -diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py -new file mode 100644 -index 000000000..1395249be ---- /dev/null -+++ b/copr_backend/euler_msgbus.py -@@ -0,0 +1,77 @@ -+import socket -+ -+from kafka import KafkaProducer -+import os -+import datetime -+import uuid -+import json -+import ssl -+ -+ -+def message_from_worker_job(topic, job, who, ip, pid): -+ message = {} -+ content = { -+ 'user': job.submitter, -+ 'copr': job.project_name, -+ 'owner': job.project_owner, -+ 'pkg': job.package_name, -+ 'build': job.build_id, -+ 'chroot': job.chroot, -+ 'version': job.package_version, -+ 'status': job.status, -+ } -+ content.update({'ip': ip, 'who': who, 'pid': pid}) -+ message_types = { -+ 'build.start': { -+ 'what': "build start: user:{user} copr:{copr}" \ -+ " pkg:{pkg} build:{build} ip:{ip} pid:{pid}", -+ }, -+ 'chroot.start': { -+ 'what': "chroot start: chroot:{chroot} user:{user}" \ -+ " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}", -+ }, -+ 'build.end': { -+ 'what': "build end: user:{user} copr:{copr} build:{build}" \ -+ " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}", -+ }, -+ } -+ content['what'] = message_types[topic]['what'].format(**content) -+ message['body'] = content -+ now = datetime.datetime.now().isoformat() -+ headers = { -+ "openEuler_messaging_schema": "eur." + topic, -+ "sent-at": now, -+ } -+ message['headers'] = headers -+ message['id'] = str(uuid.uuid4()) -+ message['topic'] = "org.openEuler.prod.eur." + topic -+ return message -+ -+ -+class MessageSender: -+ def __init__(self, backend_opts, name, log): -+ self.log = log -+ self.name = name -+ self.pid = os.getpid() -+ self.opts = backend_opts -+ -+ def announce(self, topic, job, host): -+ msg = message_from_worker_job(topic, job, self.name, host, self.pid) -+ self.send_message(msg) -+ -+ def send_message(self, msg): -+ """ Send message to kafka """ -+ -+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) -+ context.verify_mode = ssl.CERT_REQUIRED -+ context.load_verify_locations("etc/copr/kafka.crt") -+ producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers, -+ api_version=(3, 5, 0), -+ sasl_mechanism="PLAIN", -+ ssl_context=context, -+ security_protocol='SASL_SSL', -+ sasl_plain_username=self.opts.message.user_name, -+ sasl_plain_password=self.opts.message.password, -+ value_serializer=lambda v: json.dumps(v).encode('utf-8')) -+ producer.send(str(self.opts.message.topic), msg) -+ producer.flush() |