summaryrefslogtreecommitdiff
path: root/euler_msgbus.patch
diff options
context:
space:
mode:
authorCoprDistGit <infra@openeuler.org>2025-03-21 17:37:41 +0000
committerCoprDistGit <infra@openeuler.org>2025-03-21 17:37:41 +0000
commitbed3ab1ed2c9a92b20bae1d0f11f17d9051bfff9 (patch)
tree76779ef5ca00cab259eb516c4c44eef0b3048cc9 /euler_msgbus.patch
parent2132b310609fdedcd5427676a98f1174869b3a82 (diff)
automatic import of copr-backend
Diffstat (limited to 'euler_msgbus.patch')
-rw-r--r--euler_msgbus.patch83
1 files changed, 83 insertions, 0 deletions
diff --git a/euler_msgbus.patch b/euler_msgbus.patch
new file mode 100644
index 0000000..5eca90a
--- /dev/null
+++ b/euler_msgbus.patch
@@ -0,0 +1,83 @@
+diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py
+new file mode 100644
+index 000000000..1395249be
+--- /dev/null
++++ b/copr_backend/euler_msgbus.py
+@@ -0,0 +1,77 @@
++import socket
++
++from kafka import KafkaProducer
++import os
++import datetime
++import uuid
++import json
++import ssl
++
++
++def message_from_worker_job(topic, job, who, ip, pid):
++ message = {}
++ content = {
++ 'user': job.submitter,
++ 'copr': job.project_name,
++ 'owner': job.project_owner,
++ 'pkg': job.package_name,
++ 'build': job.build_id,
++ 'chroot': job.chroot,
++ 'version': job.package_version,
++ 'status': job.status,
++ }
++ content.update({'ip': ip, 'who': who, 'pid': pid})
++ message_types = {
++ 'build.start': {
++ 'what': "build start: user:{user} copr:{copr}" \
++ " pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
++ },
++ 'chroot.start': {
++ 'what': "chroot start: chroot:{chroot} user:{user}" \
++ " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
++ },
++ 'build.end': {
++ 'what': "build end: user:{user} copr:{copr} build:{build}" \
++ " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}",
++ },
++ }
++ content['what'] = message_types[topic]['what'].format(**content)
++ message['body'] = content
++ now = datetime.datetime.now().isoformat()
++ headers = {
++ "openEuler_messaging_schema": "eur." + topic,
++ "sent-at": now,
++ }
++ message['headers'] = headers
++ message['id'] = str(uuid.uuid4())
++ message['topic'] = "org.openEuler.prod.eur." + topic
++ return message
++
++
++class MessageSender:
++ def __init__(self, backend_opts, name, log):
++ self.log = log
++ self.name = name
++ self.pid = os.getpid()
++ self.opts = backend_opts
++
++ def announce(self, topic, job, host):
++ msg = message_from_worker_job(topic, job, self.name, host, self.pid)
++ self.send_message(msg)
++
++ def send_message(self, msg):
++ """ Send message to kafka """
++
++ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
++ context.verify_mode = ssl.CERT_REQUIRED
++ context.load_verify_locations("etc/copr/kafka.crt")
++ producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers,
++ api_version=(3, 5, 0),
++ sasl_mechanism="PLAIN",
++ ssl_context=context,
++ security_protocol='SASL_SSL',
++ sasl_plain_username=self.opts.message.user_name,
++ sasl_plain_password=self.opts.message.password,
++ value_serializer=lambda v: json.dumps(v).encode('utf-8'))
++ producer.send(str(self.opts.message.topic), msg)
++ producer.flush()