From bed3ab1ed2c9a92b20bae1d0f11f17d9051bfff9 Mon Sep 17 00:00:00 2001
From: CoprDistGit <infra@openeuler.org>
Date: Fri, 21 Mar 2025 17:37:41 +0000
Subject: automatic import of copr-backend

---
 euler_msgbus.patch | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 83 insertions(+)
 create mode 100644 euler_msgbus.patch

(limited to 'euler_msgbus.patch')

diff --git a/euler_msgbus.patch b/euler_msgbus.patch
new file mode 100644
index 0000000..5eca90a
--- /dev/null
+++ b/euler_msgbus.patch
@@ -0,0 +1,83 @@
+diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py
+new file mode 100644
+index 000000000..1395249be
+--- /dev/null
++++ b/copr_backend/euler_msgbus.py
+@@ -0,0 +1,77 @@
++import socket
++
++from kafka import KafkaProducer
++import os
++import datetime
++import uuid
++import json
++import ssl
++
++
++def message_from_worker_job(topic, job, who, ip, pid):
++    message = {}
++    content = {
++        'user': job.submitter,
++        'copr': job.project_name,
++        'owner': job.project_owner,
++        'pkg': job.package_name,
++        'build': job.build_id,
++        'chroot': job.chroot,
++        'version': job.package_version,
++        'status': job.status,
++    }
++    content.update({'ip': ip, 'who': who, 'pid': pid})
++    message_types = {
++        'build.start': {
++            'what': "build start: user:{user} copr:{copr}" \
++                    " pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
++        },
++        'chroot.start': {
++            'what': "chroot start: chroot:{chroot} user:{user}" \
++                    " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
++        },
++        'build.end': {
++            'what': "build end: user:{user} copr:{copr} build:{build}" \
++                    " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}",
++        },
++    }
++    content['what'] = message_types[topic]['what'].format(**content)
++    message['body'] = content
++    now = datetime.datetime.now().isoformat()
++    headers = {
++        "openEuler_messaging_schema": "eur." + topic,
++        "sent-at": now,
++    }
++    message['headers'] = headers
++    message['id'] = str(uuid.uuid4())
++    message['topic'] = "org.openEuler.prod.eur." + topic
++    return message
++
++
++class MessageSender:
++    def __init__(self, backend_opts, name, log):
++        self.log = log
++        self.name = name
++        self.pid = os.getpid()
++        self.opts = backend_opts
++
++    def announce(self, topic, job, host):
++        msg = message_from_worker_job(topic, job, self.name, host, self.pid)
++        self.send_message(msg)
++
++    def send_message(self, msg):
++        """ Send message to kafka """
++
++        context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
++        context.verify_mode = ssl.CERT_REQUIRED
++        context.load_verify_locations("etc/copr/kafka.crt")
++        producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers,
++                                 api_version=(3, 5, 0),
++                                 sasl_mechanism="PLAIN",
++                                 ssl_context=context,
++                                 security_protocol='SASL_SSL',
++                                 sasl_plain_username=self.opts.message.user_name,
++                                 sasl_plain_password=self.opts.message.password,
++                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))
++        producer.send(str(self.opts.message.topic), msg)
++        producer.flush()
-- 
cgit v1.2.3