1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
diff --git a/backend/copr_backend/euler_msgbus.py b/backend/copr_backend/euler_msgbus.py
new file mode 100644
index 000000000..1395249be
--- /dev/null
+++ b/backend/copr_backend/euler_msgbus.py
@@ -0,0 +1,77 @@
+import socket
+
+from kafka import KafkaProducer
+import os
+import datetime
+import uuid
+import json
+import ssl
+
+
+def message_from_worker_job(topic, job, who, ip, pid):
+ message = {}
+ content = {
+ 'user': job.submitter,
+ 'copr': job.project_name,
+ 'owner': job.project_owner,
+ 'pkg': job.package_name,
+ 'build': job.build_id,
+ 'chroot': job.chroot,
+ 'version': job.package_version,
+ 'status': job.status,
+ }
+ content.update({'ip': ip, 'who': who, 'pid': pid})
+ message_types = {
+ 'build.start': {
+ 'what': "build start: user:{user} copr:{copr}" \
+ " pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
+ },
+ 'chroot.start': {
+ 'what': "chroot start: chroot:{chroot} user:{user}" \
+ " copr:{copr} pkg:{pkg} build:{build} ip:{ip} pid:{pid}",
+ },
+ 'build.end': {
+ 'what': "build end: user:{user} copr:{copr} build:{build}" \
+ " pkg:{pkg} version:{version} ip:{ip} pid:{pid} status:{status}",
+ },
+ }
+ content['what'] = message_types[topic]['what'].format(**content)
+ message['body'] = content
+ now = datetime.datetime.now().isoformat()
+ headers = {
+ "openEuler_messaging_schema": "eur." + topic,
+ "sent-at": now,
+ }
+ message['headers'] = headers
+ message['id'] = str(uuid.uuid4())
+ message['topic'] = "org.openEuler.prod.eur." + topic
+ return message
+
+
+class MessageSender:
+ def __init__(self, backend_opts, name, log):
+ self.log = log
+ self.name = name
+ self.pid = os.getpid()
+ self.opts = backend_opts
+
+ def announce(self, topic, job, host):
+ msg = message_from_worker_job(topic, job, self.name, host, self.pid)
+ self.send_message(msg)
+
+ def send_message(self, msg):
+ """ Send message to kafka """
+
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.load_verify_locations("etc/copr/kafka.crt")
+ producer = KafkaProducer(bootstrap_servers=self.opts.message.bootstrap_servers,
+ api_version=(3, 5, 0),
+ sasl_mechanism="PLAIN",
+ ssl_context=context,
+ security_protocol='SASL_SSL',
+ sasl_plain_username=self.opts.message.user_name,
+ sasl_plain_password=self.opts.message.password,
+ value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ producer.send(str(self.opts.message.topic), msg)
+ producer.flush()
|