summaryrefslogtreecommitdiff
path: root/euler_msgbus.patch
diff options
context:
space:
mode:
Diffstat (limited to 'euler_msgbus.patch')
-rw-r--r--euler_msgbus.patch83
1 files changed, 0 insertions, 83 deletions
diff --git a/euler_msgbus.patch b/euler_msgbus.patch
deleted file mode 100644
index 5eca90a..0000000
--- a/euler_msgbus.patch
+++ /dev/null
@@ -1,83 +0,0 @@
-diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py
-new file mode 100644
-index 000000000..1395249be
---- /dev/null
-+++ b/copr_backend/euler_msgbus.py
-@@ -0,0 +1,77 @@
-+import socket
-+
-+from kafka import KafkaProducer
-+import os
-+import datetime
-+import uuid
-+import json
-+import ssl
-+
-+
-+def message_from_worker_job(topic, job, who, ip, pid):
-+ message = {}
-+ content = {
-+ 'user': job.submitter,
-+ 'copr': job.project_name,
-+ 'owner': job.project_owner,
-+ 'pkg': job.package_name,
-+ 'build': job.build_id,
-+ 'chroot': job.chroot,
-+ 'version': job.package_version,
-+ 'status': job.status,
-+ }
-+ content.update({'ip': ip, 'who': who, 'pid': pid})
-+ message_types = {
-+ 'build.start': {
-+ 'what': "build start: user:{user} copr:{copr}" \
-+ " pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
-+ },
-+ 'chroot.start': {
-+ 'what': "chroot start: chroot:{chroot} user:{user}" \
-+ " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
-+ },
-+ 'build.end': {
-+ 'what': "build end: user:{user} copr:{copr} build:{build}" \
-+ " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}",
-+ },
-+ }
-+ content['what'] = message_types[topic]['what'].format(**content)
-+ message['body'] = content
-+ now = datetime.datetime.now().isoformat()
-+ headers = {
-+ "openEuler_messaging_schema": "eur." + topic,
-+ "sent-at": now,
-+ }
-+ message['headers'] = headers
-+ message['id'] = str(uuid.uuid4())
-+ message['topic'] = "org.openEuler.prod.eur." + topic
-+ return message
-+
-+
-+class MessageSender:
-+ def __init__(self, backend_opts, name, log):
-+ self.log = log
-+ self.name = name
-+ self.pid = os.getpid()
-+ self.opts = backend_opts
-+
-+ def announce(self, topic, job, host):
-+ msg = message_from_worker_job(topic, job, self.name, host, self.pid)
-+ self.send_message(msg)
-+
-+ def send_message(self, msg):
-+ """ Send message to kafka """
-+
-+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-+ context.verify_mode = ssl.CERT_REQUIRED
-+ context.load_verify_locations("etc/copr/kafka.crt")
-+ producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers,
-+ api_version=(3, 5, 0),
-+ sasl_mechanism="PLAIN",
-+ ssl_context=context,
-+ security_protocol='SASL_SSL',
-+ sasl_plain_username=self.opts.message.user_name,
-+ sasl_plain_password=self.opts.message.password,
-+ value_serializer=lambda v: json.dumps(v).encode('utf-8'))
-+ producer.send(str(self.opts.message.topic), msg)
-+ producer.flush()