%global _empty_manifest_terminate_build 0
Name: python-amqp-mock
Version: 0.4.0
Release: 1
Summary: Remote AMQP mock
License: Apache-2.0
URL: https://github.com/nikitanovosibirsk/amqp-mock
Source0: https://mirrors.nju.edu.cn/pypi/web/packages/f0/f9/170796086b34db1ebb87ab01591385cd9c31804fa9b994d4898471d56435/amqp-mock-0.4.0.tar.gz
BuildArch: noarch
Requires: python3-aiohttp
Requires: python3-pamqp
%description
# AMQP Mock
[![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock)
[![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
* [Installation](#installation)
* [Overview](#overview)
* [Test Publishing](#test-publishing)
* [Test Consuming](#test-consuming)
* [Mock Server](#mock-server)
* [Start server](#start-server)
* [Publish message](#publish-message)
* [Get queue message history](#get-queue-message-history)
* [Get exchange messages](#get-exchange-messages)
* [Delete exchange messages](#delete-exchange-messages)
* [Reset](#reset)
## Installation
```sh
pip3 install amqp-mock
```
## Overview
### Test Publishing
```python
from amqp_mock import create_amqp_mock
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")
# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]
```
Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py)
### Test Consuming
```python
from amqp_mock import create_amqp_mock, Message, MessageStatus
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))
# 3. Consume message via "system under test"
consume_message("queue")
# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED
```
Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py)
## Mock Server
### Start server
```python
import asyncio
from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock
async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()
asyncio.run(run())
```
or via docker
```shell
docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock
```
### Publish message
`POST /queues/{queue}/messages`
```js
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}
```
HTTP
```sh
$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient, Message
mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
```
### Get queue message history
`GET /queues/{queue}/messages/history`
HTTP
```sh
$ http GET localhost/queues/test_queue/messages/history
HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8
[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# ,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]
```
### Get exchange messages
`GET /exchanges/{exchange}/messages`
HTTP
```sh
$ http GET localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8
[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#
# ]
```
### Delete exchange messages
`DELETE /exchanges/{exchange}/messages`
HTTP
```sh
$ http DELETE localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
```
### Reset
`DELETE /`
HTTP
```sh
$ http DELETE localhost/
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.reset()
```
%package -n python3-amqp-mock
Summary: Remote AMQP mock
Provides: python-amqp-mock
BuildRequires: python3-devel
BuildRequires: python3-setuptools
BuildRequires: python3-pip
%description -n python3-amqp-mock
# AMQP Mock
[![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock)
[![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
* [Installation](#installation)
* [Overview](#overview)
* [Test Publishing](#test-publishing)
* [Test Consuming](#test-consuming)
* [Mock Server](#mock-server)
* [Start server](#start-server)
* [Publish message](#publish-message)
* [Get queue message history](#get-queue-message-history)
* [Get exchange messages](#get-exchange-messages)
* [Delete exchange messages](#delete-exchange-messages)
* [Reset](#reset)
## Installation
```sh
pip3 install amqp-mock
```
## Overview
### Test Publishing
```python
from amqp_mock import create_amqp_mock
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")
# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]
```
Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py)
### Test Consuming
```python
from amqp_mock import create_amqp_mock, Message, MessageStatus
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))
# 3. Consume message via "system under test"
consume_message("queue")
# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED
```
Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py)
## Mock Server
### Start server
```python
import asyncio
from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock
async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()
asyncio.run(run())
```
or via docker
```shell
docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock
```
### Publish message
`POST /queues/{queue}/messages`
```js
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}
```
HTTP
```sh
$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient, Message
mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
```
### Get queue message history
`GET /queues/{queue}/messages/history`
HTTP
```sh
$ http GET localhost/queues/test_queue/messages/history
HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8
[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# ,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]
```
### Get exchange messages
`GET /exchanges/{exchange}/messages`
HTTP
```sh
$ http GET localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8
[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#
# ]
```
### Delete exchange messages
`DELETE /exchanges/{exchange}/messages`
HTTP
```sh
$ http DELETE localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
```
### Reset
`DELETE /`
HTTP
```sh
$ http DELETE localhost/
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.reset()
```
%package help
Summary: Development documents and examples for amqp-mock
Provides: python3-amqp-mock-doc
%description help
# AMQP Mock
[![Codecov](https://img.shields.io/codecov/c/github/nikitanovosibirsk/amqp-mock/master.svg?style=flat-square)](https://codecov.io/gh/nikitanovosibirsk/amqp-mock)
[![PyPI](https://img.shields.io/pypi/v/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/amqp-mock?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
[![Python Version](https://img.shields.io/pypi/pyversions/amqp-mock.svg?style=flat-square)](https://pypi.python.org/pypi/amqp-mock)
* [Installation](#installation)
* [Overview](#overview)
* [Test Publishing](#test-publishing)
* [Test Consuming](#test-consuming)
* [Mock Server](#mock-server)
* [Start server](#start-server)
* [Publish message](#publish-message)
* [Get queue message history](#get-queue-message-history)
* [Get exchange messages](#get-exchange-messages)
* [Delete exchange messages](#delete-exchange-messages)
* [Reset](#reset)
## Installation
```sh
pip3 install amqp-mock
```
## Overview
### Test Publishing
```python
from amqp_mock import create_amqp_mock
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")
# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]
```
Full code available here: [`./examples/publish_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/publish_example.py)
### Test Consuming
```python
from amqp_mock import create_amqp_mock, Message, MessageStatus
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))
# 3. Consume message via "system under test"
consume_message("queue")
# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED
```
Full code available here: [`./examples/consume_example.py`](https://github.com/nikitanovosibirsk/amqp-mock/blob/master/examples/consume_example.py)
## Mock Server
### Start server
```python
import asyncio
from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock
async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()
asyncio.run(run())
```
or via docker
```shell
docker run -p 8080:80 -p 5672:5672 nikitanovosibirsk/amqp-mock
```
### Publish message
`POST /queues/{queue}/messages`
```js
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}
```
HTTP
```sh
$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient, Message
mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
```
### Get queue message history
`GET /queues/{queue}/messages/history`
HTTP
```sh
$ http GET localhost/queues/test_queue/messages/history
HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8
[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# ,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]
```
### Get exchange messages
`GET /exchanges/{exchange}/messages`
HTTP
```sh
$ http GET localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8
[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#
# ]
```
### Delete exchange messages
`DELETE /exchanges/{exchange}/messages`
HTTP
```sh
$ http DELETE localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
```
### Reset
`DELETE /`
HTTP
```sh
$ http DELETE localhost/
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
```
Python
```python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.reset()
```
%prep
%autosetup -n amqp-mock-0.4.0
%build
%py3_build
%install
%py3_install
install -d -m755 %{buildroot}/%{_pkgdocdir}
if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
pushd %{buildroot}
if [ -d usr/lib ]; then
find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/lib64 ]; then
find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/bin ]; then
find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/sbin ]; then
find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst
fi
touch doclist.lst
if [ -d usr/share/man ]; then
find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst
fi
popd
mv %{buildroot}/filelist.lst .
mv %{buildroot}/doclist.lst .
%files -n python3-amqp-mock -f filelist.lst
%dir %{python3_sitelib}/*
%files help -f doclist.lst
%{_docdir}/*
%changelog
* Tue May 30 2023 Python_Bot - 0.4.0-1
- Package Spec generated