%global _empty_manifest_terminate_build 0 Name: python-amqp-mock Version: 0.4.0 Release: 1 Summary: Remote AMQP mock License: Apache-2.0 URL: https://github.com/nikitanovosibirsk/amqp-mock Source0: https://mirrors.nju.edu.cn/pypi/web/packages/f0/f9/170796086b34db1ebb87ab01591385cd9c31804fa9b994d4898471d56435/amqp-mock-0.4.0.tar.gz BuildArch: noarch Requires: python3-aiohttp Requires: python3-pamqp %description # AMQP Mock [![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock) [![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) * [Installation](#installation) * [Overview](#overview) * [Test Publishing](#test-publishing) * [Test Consuming](#test-consuming) * [Mock Server](#mock-server) * [Start server](#start-server) * [Publish message](#publish-message) * [Get queue message history](#get-queue-message-history) * [Get exchange messages](#get-exchange-messages) * [Delete exchange messages](#delete-exchange-messages) * [Reset](#reset) ## Installation ```sh pip3 install amqp-mock ``` ## Overview ### Test Publishing ```python from amqp_mock import create_amqp_mock # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Publish message via "system under test" publish_message([1, 2, 3], "exchange") # 3. Test message has been published messages = await mock.client.get_exchange_messages("exchange") assert messages[0].value == [1, 2, 3] ``` Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py) ### Test Consuming ```python from amqp_mock import create_amqp_mock, Message, MessageStatus # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Mock next message await mock.client.publish_message("queue", Message([1, 2, 3])) # 3. Consume message via "system under test" consume_message("queue") # 4. Test message has been consumed history = await mock.client.get_queue_message_history("queue") assert history[0].status == MessageStatus.ACKED ``` Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py) ## Mock Server ### Start server ```python import asyncio from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock async def run() -> None: storage = Storage() http_server = HttpServer(storage, port=8080) amqp_server = AmqpServer(storage, port=5672) async with create_amqp_mock(http_server, amqp_server): await asyncio.Future() asyncio.run(run()) ``` or via docker ```shell docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock ``` ### Publish message `POST /queues/{queue}/messages` ```js { "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968", "value": [1, 2, 3], "exchange": "", "routing_key": "", "properties": null } ```
HTTP

```sh $ http POST localhost/queues/test_queue/messages \ value:='[1, 2, 3]' \ exchange=test_exchange HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient, Message mock_client = AmqpMockClient() message = Message([1, 2, 3], exchange="test_exchange") await mock_client.publish_message("test_queue", message) ```

### Get queue message history `GET /queues/{queue}/messages/history`
HTTP

```sh $ http GET localhost/queues/test_queue/messages/history HTTP/1.1 200 OK Content-Length: 190 Content-Type: application/json; charset=utf-8 [ { "message": { "exchange": "test_exchange", "id": "94459a41-9119-479a-98c9-80bc9dabb719", "properties": null, "routing_key": "", "value": [1, 2, 3] }, "queue": "test_queue", "status": "ACKED" } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.get_queue_message_history("test_queue") # [ # , # queue='test_queue', # status=MessageStatus.ACKED> # ] ```

### Get exchange messages `GET /exchanges/{exchange}/messages`
HTTP

```sh $ http GET localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 423 Content-Type: application/json; charset=utf-8 [ { "exchange": "test_exchange", "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c", "properties": { "app_id": "", "cluster_id": "", "content_encoding": "", "content_type": "", "correlation_id": "", "delivery_mode": 1, "expiration": "", "headers": null, "message_id": "5ec9024c74eca2e419fd7e29f7be846c", "message_type": "", "priority": null, "reply_to": "", "timestamp": null, "user_id": "" }, "routing_key": "", "value": [1, 2, 3] } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() messages = await mock_client.get_exchange_messages("test_exchange") # [ # # ] ```

### Delete exchange messages `DELETE /exchanges/{exchange}/messages`
HTTP

```sh $ http DELETE localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.delete_exchange_messages("test_exchange") ```

### Reset `DELETE /`
HTTP

```sh $ http DELETE localhost/ HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.reset() ```

%package -n python3-amqp-mock Summary: Remote AMQP mock Provides: python-amqp-mock BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-amqp-mock # AMQP Mock [![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock) [![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) * [Installation](#installation) * [Overview](#overview) * [Test Publishing](#test-publishing) * [Test Consuming](#test-consuming) * [Mock Server](#mock-server) * [Start server](#start-server) * [Publish message](#publish-message) * [Get queue message history](#get-queue-message-history) * [Get exchange messages](#get-exchange-messages) * [Delete exchange messages](#delete-exchange-messages) * [Reset](#reset) ## Installation ```sh pip3 install amqp-mock ``` ## Overview ### Test Publishing ```python from amqp_mock import create_amqp_mock # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Publish message via "system under test" publish_message([1, 2, 3], "exchange") # 3. Test message has been published messages = await mock.client.get_exchange_messages("exchange") assert messages[0].value == [1, 2, 3] ``` Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py) ### Test Consuming ```python from amqp_mock import create_amqp_mock, Message, MessageStatus # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Mock next message await mock.client.publish_message("queue", Message([1, 2, 3])) # 3. Consume message via "system under test" consume_message("queue") # 4. Test message has been consumed history = await mock.client.get_queue_message_history("queue") assert history[0].status == MessageStatus.ACKED ``` Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py) ## Mock Server ### Start server ```python import asyncio from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock async def run() -> None: storage = Storage() http_server = HttpServer(storage, port=8080) amqp_server = AmqpServer(storage, port=5672) async with create_amqp_mock(http_server, amqp_server): await asyncio.Future() asyncio.run(run()) ``` or via docker ```shell docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock ``` ### Publish message `POST /queues/{queue}/messages` ```js { "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968", "value": [1, 2, 3], "exchange": "", "routing_key": "", "properties": null } ```
HTTP

```sh $ http POST localhost/queues/test_queue/messages \ value:='[1, 2, 3]' \ exchange=test_exchange HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient, Message mock_client = AmqpMockClient() message = Message([1, 2, 3], exchange="test_exchange") await mock_client.publish_message("test_queue", message) ```

### Get queue message history `GET /queues/{queue}/messages/history`
HTTP

```sh $ http GET localhost/queues/test_queue/messages/history HTTP/1.1 200 OK Content-Length: 190 Content-Type: application/json; charset=utf-8 [ { "message": { "exchange": "test_exchange", "id": "94459a41-9119-479a-98c9-80bc9dabb719", "properties": null, "routing_key": "", "value": [1, 2, 3] }, "queue": "test_queue", "status": "ACKED" } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.get_queue_message_history("test_queue") # [ # , # queue='test_queue', # status=MessageStatus.ACKED> # ] ```

### Get exchange messages `GET /exchanges/{exchange}/messages`
HTTP

```sh $ http GET localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 423 Content-Type: application/json; charset=utf-8 [ { "exchange": "test_exchange", "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c", "properties": { "app_id": "", "cluster_id": "", "content_encoding": "", "content_type": "", "correlation_id": "", "delivery_mode": 1, "expiration": "", "headers": null, "message_id": "5ec9024c74eca2e419fd7e29f7be846c", "message_type": "", "priority": null, "reply_to": "", "timestamp": null, "user_id": "" }, "routing_key": "", "value": [1, 2, 3] } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() messages = await mock_client.get_exchange_messages("test_exchange") # [ # # ] ```

### Delete exchange messages `DELETE /exchanges/{exchange}/messages`
HTTP

```sh $ http DELETE localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.delete_exchange_messages("test_exchange") ```

### Reset `DELETE /`
HTTP

```sh $ http DELETE localhost/ HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.reset() ```

%package help Summary: Development documents and examples for amqp-mock Provides: python3-amqp-mock-doc %description help # AMQP Mock [![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock) [![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) [![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock) * [Installation](#installation) * [Overview](#overview) * [Test Publishing](#test-publishing) * [Test Consuming](#test-consuming) * [Mock Server](#mock-server) * [Start server](#start-server) * [Publish message](#publish-message) * [Get queue message history](#get-queue-message-history) * [Get exchange messages](#get-exchange-messages) * [Delete exchange messages](#delete-exchange-messages) * [Reset](#reset) ## Installation ```sh pip3 install amqp-mock ``` ## Overview ### Test Publishing ```python from amqp_mock import create_amqp_mock # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Publish message via "system under test" publish_message([1, 2, 3], "exchange") # 3. Test message has been published messages = await mock.client.get_exchange_messages("exchange") assert messages[0].value == [1, 2, 3] ``` Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py) ### Test Consuming ```python from amqp_mock import create_amqp_mock, Message, MessageStatus # 1. Start AMQP mock server async with create_amqp_mock() as mock: # 2. Mock next message await mock.client.publish_message("queue", Message([1, 2, 3])) # 3. Consume message via "system under test" consume_message("queue") # 4. Test message has been consumed history = await mock.client.get_queue_message_history("queue") assert history[0].status == MessageStatus.ACKED ``` Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py) ## Mock Server ### Start server ```python import asyncio from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock async def run() -> None: storage = Storage() http_server = HttpServer(storage, port=8080) amqp_server = AmqpServer(storage, port=5672) async with create_amqp_mock(http_server, amqp_server): await asyncio.Future() asyncio.run(run()) ``` or via docker ```shell docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock ``` ### Publish message `POST /queues/{queue}/messages` ```js { "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968", "value": [1, 2, 3], "exchange": "", "routing_key": "", "properties": null } ```
HTTP

```sh $ http POST localhost/queues/test_queue/messages \ value:='[1, 2, 3]' \ exchange=test_exchange HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient, Message mock_client = AmqpMockClient() message = Message([1, 2, 3], exchange="test_exchange") await mock_client.publish_message("test_queue", message) ```

### Get queue message history `GET /queues/{queue}/messages/history`
HTTP

```sh $ http GET localhost/queues/test_queue/messages/history HTTP/1.1 200 OK Content-Length: 190 Content-Type: application/json; charset=utf-8 [ { "message": { "exchange": "test_exchange", "id": "94459a41-9119-479a-98c9-80bc9dabb719", "properties": null, "routing_key": "", "value": [1, 2, 3] }, "queue": "test_queue", "status": "ACKED" } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.get_queue_message_history("test_queue") # [ # , # queue='test_queue', # status=MessageStatus.ACKED> # ] ```

### Get exchange messages `GET /exchanges/{exchange}/messages`
HTTP

```sh $ http GET localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 423 Content-Type: application/json; charset=utf-8 [ { "exchange": "test_exchange", "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c", "properties": { "app_id": "", "cluster_id": "", "content_encoding": "", "content_type": "", "correlation_id": "", "delivery_mode": 1, "expiration": "", "headers": null, "message_id": "5ec9024c74eca2e419fd7e29f7be846c", "message_type": "", "priority": null, "reply_to": "", "timestamp": null, "user_id": "" }, "routing_key": "", "value": [1, 2, 3] } ] ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() messages = await mock_client.get_exchange_messages("test_exchange") # [ # # ] ```

### Delete exchange messages `DELETE /exchanges/{exchange}/messages`
HTTP

```sh $ http DELETE localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.delete_exchange_messages("test_exchange") ```

### Reset `DELETE /`
HTTP

```sh $ http DELETE localhost/ HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json ```

Python

```python from amqp_mock import AmqpMockClient mock_client = AmqpMockClient() await mock_client.reset() ```

%prep %autosetup -n amqp-mock-0.4.0 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-amqp-mock -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Tue May 30 2023 Python_Bot - 0.4.0-1 - Package Spec generated