summaryrefslogtreecommitdiff
path: root/euler_msgbus.patch
blob: 5eca90a3a872452eddde4a6d1c0d4f1dec54ce8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
diff --git a/copr_backend/euler_msgbus.py b/copr_backend/euler_msgbus.py
new file mode 100644
index 000000000..1395249be
--- /dev/null
+++ b/copr_backend/euler_msgbus.py
@@ -0,0 +1,77 @@
+import socket
+
+from kafka import KafkaProducer
+import os
+import datetime
+import uuid
+import json
+import ssl
+
+
+def message_from_worker_job(topic, job, who, ip, pid):
+    message = {}
+    content = {
+        'user': job.submitter,
+        'copr': job.project_name,
+        'owner': job.project_owner,
+        'pkg': job.package_name,
+        'build': job.build_id,
+        'chroot': job.chroot,
+        'version': job.package_version,
+        'status': job.status,
+    }
+    content.update({'ip': ip, 'who': who, 'pid': pid})
+    message_types = {
+        'build.start': {
+            'what': "build start: user:{user} copr:{copr}" \
+                    " pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
+        },
+        'chroot.start': {
+            'what': "chroot start: chroot:{chroot} user:{user}" \
+                    " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
+        },
+        'build.end': {
+            'what': "build end: user:{user} copr:{copr} build:{build}" \
+                    " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}",
+        },
+    }
+    content['what'] = message_types[topic]['what'].format(**content)
+    message['body'] = content
+    now = datetime.datetime.now().isoformat()
+    headers = {
+        "openEuler_messaging_schema": "eur." + topic,
+        "sent-at": now,
+    }
+    message['headers'] = headers
+    message['id'] = str(uuid.uuid4())
+    message['topic'] = "org.openEuler.prod.eur." + topic
+    return message
+
+
+class MessageSender:
+    def __init__(self, backend_opts, name, log):
+        self.log = log
+        self.name = name
+        self.pid = os.getpid()
+        self.opts = backend_opts
+
+    def announce(self, topic, job, host):
+        msg = message_from_worker_job(topic, job, self.name, host, self.pid)
+        self.send_message(msg)
+
+    def send_message(self, msg):
+        """ Send message to kafka """
+
+        context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        context.verify_mode = ssl.CERT_REQUIRED
+        context.load_verify_locations("etc/copr/kafka.crt")
+        producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers,
+                                 api_version=(3, 5, 0),
+                                 sasl_mechanism="PLAIN",
+                                 ssl_context=context,
+                                 security_protocol='SASL_SSL',
+                                 sasl_plain_username=self.opts.message.user_name,
+                                 sasl_plain_password=self.opts.message.password,
+                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+        producer.send(str(self.opts.message.topic), msg)
+        producer.flush()