diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py new file mode 100644 index 000000000..1395249be --- /dev/null +++ b/copr_backend/euler_msgbus.py @@ -0,0 +1,77 @@ +import socket + +from kafka import KafkaProducer +import os +import datetime +import uuid +import json +import ssl + + +def message_from_worker_job(topic, job, who, ip, pid): + message = {} + content = { + 'user': job.submitter, + 'copr': job.project_name, + 'owner': job.project_owner, + 'pkg': job.package_name, + 'build': job.build_id, + 'chroot': job.chroot, + 'version': job.package_version, + 'status': job.status, + } + content.update({'ip': ip, 'who': who, 'pid': pid}) + message_types = { + 'build.start': { + 'what': "build start: user:{user} copr:{copr}" \ + " pkg:{pkg} build:{build} ip:{ip} pid:{pid}", + }, + 'chroot.start': { + 'what': "chroot start: chroot:{chroot} user:{user}" \ + " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}", + }, + 'build.end': { + 'what': "build end: user:{user} copr:{copr} build:{build}" \ + " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}", + }, + } + content['what'] = message_types[topic]['what'].format(**content) + message['body'] = content + now = datetime.datetime.now().isoformat() + headers = { + "openEuler_messaging_schema": "eur." + topic, + "sent-at": now, + } + message['headers'] = headers + message['id'] = str(uuid.uuid4()) + message['topic'] = "org.openEuler.prod.eur." + topic + return message + + +class MessageSender: + def __init__(self, backend_opts, name, log): + self.log = log + self.name = name + self.pid = os.getpid() + self.opts = backend_opts + + def announce(self, topic, job, host): + msg = message_from_worker_job(topic, job, self.name, host, self.pid) + self.send_message(msg) + + def send_message(self, msg): + """ Send message to kafka """ + + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations("etc/copr/kafka.crt") + producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers, + api_version=(3, 5, 0), + sasl_mechanism="PLAIN", + ssl_context=context, + security_protocol='SASL_SSL', + sasl_plain_username=self.opts.message.user_name, + sasl_plain_password=self.opts.message.password, + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send(str(self.opts.message.topic), msg) + producer.flush()