%global _empty_manifest_terminate_build 0
Name: python-graphene-subscriptions
Version: 1.0.2
Release: 1
Summary: A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels.
License: MIT
URL: https://github.com/jaydenwindle/graphene-subscriptions
Source0: https://mirrors.aliyun.com/pypi/web/packages/a8/2d/6bf52367abb29f93b075b2050f3a8635d455544566c0bb51a7536b4be10d/graphene_subscriptions-1.0.2.tar.gz
BuildArch: noarch
Requires: python3-django
Requires: python3-channels
Requires: python3-graphene-django
%description
# Graphene Subscriptions
A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels. Provides support for model creation, mutation and deletion subscriptions out of the box.
## Installation
1. Install `graphene-subscriptions`
```bash
$ pip install graphene-subscriptions
```
2. Add `graphene_subscriptions` to `INSTALLED_APPS`:
```python
# your_project/settings.py
INSTALLED_APPS = [
# ...
'graphene_subscriptions'
]
```
3. Add Django Channels to your project (see: [Django Channels installation docs](https://channels.readthedocs.io/en/latest/installation.html)) and set up [Channel Layers](https://channels.readthedocs.io/en/latest/topics/channel_layers.html). If you don't want to set up a Redis instance in your dev environment yet, you can use the in-memory Channel Layer:
```python
# your_project/settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer"
}
}
```
4. Add `GraphqlSubscriptionConsumer` to your `routing.py` file.
```python
# your_project/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from graphene_subscriptions.consumers import GraphqlSubscriptionConsumer
application = ProtocolTypeRouter({
"websocket": URLRouter([
path('graphql/', GraphqlSubscriptionConsumer)
]),
})
```
5. Connect signals for any models you want to create subscriptions for
```python
# your_app/signals.py
from django.db.models.signals import post_save, post_delete
from graphene_subscriptions.signals import post_save_subscription, post_delete_subscription
from your_app.models import YourModel
post_save.connect(post_save_subscription, sender=YourModel, dispatch_uid="your_model_post_save")
post_delete.connect(post_delete_subscription, sender=YourModel, dispatch_uid="your_model_post_delete")
# your_app/apps.py
from django.apps import AppConfig
class YourAppConfig(AppConfig):
name = 'your_app'
def ready(self):
import your_app.signals
```
6. Define your subscriptions and connect them to your project schema
```python
#your_project/schema.py
import graphene
from your_app.graphql.subscriptions import YourSubscription
class Query(graphene.ObjectType):
base = graphene.String()
class Subscription(YourSubscription):
pass
schema = graphene.Schema(
query=Query,
subscription=Subscription
)
```
## Defining Subscriptions
Subscriptions in Graphene are defined as normal `ObjectType`'s. Each subscription field resolver must return an observable which emits values matching the field's type.
A simple hello world subscription (which returns the value `"hello world!"` every 3 seconds) could be defined as follows:
```python
import graphene
from rx import Observable
class Subscription(graphene.ObjectType):
hello = graphene.String()
def resolve_hello(root, info):
return Observable.interval(3000) \
.map(lambda i: "hello world!")
```
## Responding to Model Events
Each subscription that you define will receive a an `Observable` of `SubscriptionEvent`'s as the `root` parameter, which will emit a new `SubscriptionEvent` each time one of the connected signals are fired.
A `SubscriptionEvent` has two attributes: the `operation` that triggered the event, usually `CREATED`, `UPDATED` or `DELETED`) and the `instance` that triggered the signal.
Since `root` is an `Observable`, you can apply any `rxpy` operations before returning it.
### Model Created Subscriptions
For example, let's create a subscription called `yourModelCreated` that will be fired whenever an instance of `YourModel` is created. Since `root` receives a new event *every time a connected signal is fired*, we'll need to filter for only the events we want. In this case, we want all events where `operation` is `created` and the event `instance` is an instance of our model.
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import CREATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)
def resolve_your_model_created(root, info):
return root.filter(
lambda event:
event.operation == CREATED and
isinstance(event.instance, YourModel)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
You can also filter events based on a subscription's arguments. For example, here's a subscription that fires whenever a model is updated:
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import UPDATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_updated = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_updated(root, info, id):
return root.filter(
lambda event:
event.operation == UPDATED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
Defining a subscription that is fired whenever a given model instance is deleted can be accomplished like so
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import DELETED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_deleted = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_deleted(root, info, id):
return root.filter(
lambda event:
event.operation == DELETED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
## Custom Events
Sometimes you need to create subscriptions which responds to events other than Django signals. In this case, you can use the `SubscriptionEvent` class directly. (Note: in order to maintain compatibility with Django channels, all `instance` values must be json serializable)
For example, a custom event subscription might look like this:
```python
import graphene
CUSTOM_EVENT = 'custom_event'
class CustomEventSubscription(graphene.ObjectType):
custom_subscription = graphene.Field(CustomType)
def resolve_custom_subscription(root, info):
return root.filter(
lambda event:
event.operation == CUSTOM_EVENT
).map(lambda event: event.instance)
# elsewhere in your app:
from graphene_subscriptions.events import SubscriptionEvent
event = SubscriptionEvent(
operation=CUSTOM_EVENT,
instance=
)
event.send()
```
## Production Readiness
This implementation was spun out of an internal implementation I developed which we've been using in production for the past 6 months at [Jetpack](https://www.tryjetpack.com/). We've had relatively few issues with it, and I am confident that it can be reliably used in production environments.
However, being a startup, our definition of production-readiness may be slightly different from your own. Also keep in mind that the scale at which we operate hasn't been taxing enough to illuminate where the scaling bottlenecks in this implementation may hide.
If you end up running this in production, please [reach out](https://twitter.com/jayden_windle) and let me know!
## Contributing
PRs and other contributions are very welcome! To set up `graphene_subscriptions` in a development envrionment, do the following:
1. Clone the repo
```bash
$ git clone git@github.com:jaydenwindle/graphene-subscriptions.git
```
2. Install [poetry](https://poetry.eustace.io/)
```bash
$ curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
```
3. Install dependencies
```bash
$ poetry install
```
4. Run the test suite
```bash
$ poetry run pytest
```
%package -n python3-graphene-subscriptions
Summary: A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels.
Provides: python-graphene-subscriptions
BuildRequires: python3-devel
BuildRequires: python3-setuptools
BuildRequires: python3-pip
%description -n python3-graphene-subscriptions
# Graphene Subscriptions
A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels. Provides support for model creation, mutation and deletion subscriptions out of the box.
## Installation
1. Install `graphene-subscriptions`
```bash
$ pip install graphene-subscriptions
```
2. Add `graphene_subscriptions` to `INSTALLED_APPS`:
```python
# your_project/settings.py
INSTALLED_APPS = [
# ...
'graphene_subscriptions'
]
```
3. Add Django Channels to your project (see: [Django Channels installation docs](https://channels.readthedocs.io/en/latest/installation.html)) and set up [Channel Layers](https://channels.readthedocs.io/en/latest/topics/channel_layers.html). If you don't want to set up a Redis instance in your dev environment yet, you can use the in-memory Channel Layer:
```python
# your_project/settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer"
}
}
```
4. Add `GraphqlSubscriptionConsumer` to your `routing.py` file.
```python
# your_project/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from graphene_subscriptions.consumers import GraphqlSubscriptionConsumer
application = ProtocolTypeRouter({
"websocket": URLRouter([
path('graphql/', GraphqlSubscriptionConsumer)
]),
})
```
5. Connect signals for any models you want to create subscriptions for
```python
# your_app/signals.py
from django.db.models.signals import post_save, post_delete
from graphene_subscriptions.signals import post_save_subscription, post_delete_subscription
from your_app.models import YourModel
post_save.connect(post_save_subscription, sender=YourModel, dispatch_uid="your_model_post_save")
post_delete.connect(post_delete_subscription, sender=YourModel, dispatch_uid="your_model_post_delete")
# your_app/apps.py
from django.apps import AppConfig
class YourAppConfig(AppConfig):
name = 'your_app'
def ready(self):
import your_app.signals
```
6. Define your subscriptions and connect them to your project schema
```python
#your_project/schema.py
import graphene
from your_app.graphql.subscriptions import YourSubscription
class Query(graphene.ObjectType):
base = graphene.String()
class Subscription(YourSubscription):
pass
schema = graphene.Schema(
query=Query,
subscription=Subscription
)
```
## Defining Subscriptions
Subscriptions in Graphene are defined as normal `ObjectType`'s. Each subscription field resolver must return an observable which emits values matching the field's type.
A simple hello world subscription (which returns the value `"hello world!"` every 3 seconds) could be defined as follows:
```python
import graphene
from rx import Observable
class Subscription(graphene.ObjectType):
hello = graphene.String()
def resolve_hello(root, info):
return Observable.interval(3000) \
.map(lambda i: "hello world!")
```
## Responding to Model Events
Each subscription that you define will receive a an `Observable` of `SubscriptionEvent`'s as the `root` parameter, which will emit a new `SubscriptionEvent` each time one of the connected signals are fired.
A `SubscriptionEvent` has two attributes: the `operation` that triggered the event, usually `CREATED`, `UPDATED` or `DELETED`) and the `instance` that triggered the signal.
Since `root` is an `Observable`, you can apply any `rxpy` operations before returning it.
### Model Created Subscriptions
For example, let's create a subscription called `yourModelCreated` that will be fired whenever an instance of `YourModel` is created. Since `root` receives a new event *every time a connected signal is fired*, we'll need to filter for only the events we want. In this case, we want all events where `operation` is `created` and the event `instance` is an instance of our model.
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import CREATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)
def resolve_your_model_created(root, info):
return root.filter(
lambda event:
event.operation == CREATED and
isinstance(event.instance, YourModel)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
You can also filter events based on a subscription's arguments. For example, here's a subscription that fires whenever a model is updated:
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import UPDATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_updated = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_updated(root, info, id):
return root.filter(
lambda event:
event.operation == UPDATED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
Defining a subscription that is fired whenever a given model instance is deleted can be accomplished like so
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import DELETED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_deleted = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_deleted(root, info, id):
return root.filter(
lambda event:
event.operation == DELETED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
## Custom Events
Sometimes you need to create subscriptions which responds to events other than Django signals. In this case, you can use the `SubscriptionEvent` class directly. (Note: in order to maintain compatibility with Django channels, all `instance` values must be json serializable)
For example, a custom event subscription might look like this:
```python
import graphene
CUSTOM_EVENT = 'custom_event'
class CustomEventSubscription(graphene.ObjectType):
custom_subscription = graphene.Field(CustomType)
def resolve_custom_subscription(root, info):
return root.filter(
lambda event:
event.operation == CUSTOM_EVENT
).map(lambda event: event.instance)
# elsewhere in your app:
from graphene_subscriptions.events import SubscriptionEvent
event = SubscriptionEvent(
operation=CUSTOM_EVENT,
instance=
)
event.send()
```
## Production Readiness
This implementation was spun out of an internal implementation I developed which we've been using in production for the past 6 months at [Jetpack](https://www.tryjetpack.com/). We've had relatively few issues with it, and I am confident that it can be reliably used in production environments.
However, being a startup, our definition of production-readiness may be slightly different from your own. Also keep in mind that the scale at which we operate hasn't been taxing enough to illuminate where the scaling bottlenecks in this implementation may hide.
If you end up running this in production, please [reach out](https://twitter.com/jayden_windle) and let me know!
## Contributing
PRs and other contributions are very welcome! To set up `graphene_subscriptions` in a development envrionment, do the following:
1. Clone the repo
```bash
$ git clone git@github.com:jaydenwindle/graphene-subscriptions.git
```
2. Install [poetry](https://poetry.eustace.io/)
```bash
$ curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
```
3. Install dependencies
```bash
$ poetry install
```
4. Run the test suite
```bash
$ poetry run pytest
```
%package help
Summary: Development documents and examples for graphene-subscriptions
Provides: python3-graphene-subscriptions-doc
%description help
# Graphene Subscriptions
A plug-and-play GraphQL subscription implementation for Graphene + Django built using Django Channels. Provides support for model creation, mutation and deletion subscriptions out of the box.
## Installation
1. Install `graphene-subscriptions`
```bash
$ pip install graphene-subscriptions
```
2. Add `graphene_subscriptions` to `INSTALLED_APPS`:
```python
# your_project/settings.py
INSTALLED_APPS = [
# ...
'graphene_subscriptions'
]
```
3. Add Django Channels to your project (see: [Django Channels installation docs](https://channels.readthedocs.io/en/latest/installation.html)) and set up [Channel Layers](https://channels.readthedocs.io/en/latest/topics/channel_layers.html). If you don't want to set up a Redis instance in your dev environment yet, you can use the in-memory Channel Layer:
```python
# your_project/settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer"
}
}
```
4. Add `GraphqlSubscriptionConsumer` to your `routing.py` file.
```python
# your_project/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from graphene_subscriptions.consumers import GraphqlSubscriptionConsumer
application = ProtocolTypeRouter({
"websocket": URLRouter([
path('graphql/', GraphqlSubscriptionConsumer)
]),
})
```
5. Connect signals for any models you want to create subscriptions for
```python
# your_app/signals.py
from django.db.models.signals import post_save, post_delete
from graphene_subscriptions.signals import post_save_subscription, post_delete_subscription
from your_app.models import YourModel
post_save.connect(post_save_subscription, sender=YourModel, dispatch_uid="your_model_post_save")
post_delete.connect(post_delete_subscription, sender=YourModel, dispatch_uid="your_model_post_delete")
# your_app/apps.py
from django.apps import AppConfig
class YourAppConfig(AppConfig):
name = 'your_app'
def ready(self):
import your_app.signals
```
6. Define your subscriptions and connect them to your project schema
```python
#your_project/schema.py
import graphene
from your_app.graphql.subscriptions import YourSubscription
class Query(graphene.ObjectType):
base = graphene.String()
class Subscription(YourSubscription):
pass
schema = graphene.Schema(
query=Query,
subscription=Subscription
)
```
## Defining Subscriptions
Subscriptions in Graphene are defined as normal `ObjectType`'s. Each subscription field resolver must return an observable which emits values matching the field's type.
A simple hello world subscription (which returns the value `"hello world!"` every 3 seconds) could be defined as follows:
```python
import graphene
from rx import Observable
class Subscription(graphene.ObjectType):
hello = graphene.String()
def resolve_hello(root, info):
return Observable.interval(3000) \
.map(lambda i: "hello world!")
```
## Responding to Model Events
Each subscription that you define will receive a an `Observable` of `SubscriptionEvent`'s as the `root` parameter, which will emit a new `SubscriptionEvent` each time one of the connected signals are fired.
A `SubscriptionEvent` has two attributes: the `operation` that triggered the event, usually `CREATED`, `UPDATED` or `DELETED`) and the `instance` that triggered the signal.
Since `root` is an `Observable`, you can apply any `rxpy` operations before returning it.
### Model Created Subscriptions
For example, let's create a subscription called `yourModelCreated` that will be fired whenever an instance of `YourModel` is created. Since `root` receives a new event *every time a connected signal is fired*, we'll need to filter for only the events we want. In this case, we want all events where `operation` is `created` and the event `instance` is an instance of our model.
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import CREATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)
def resolve_your_model_created(root, info):
return root.filter(
lambda event:
event.operation == CREATED and
isinstance(event.instance, YourModel)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
You can also filter events based on a subscription's arguments. For example, here's a subscription that fires whenever a model is updated:
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import UPDATED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_updated = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_updated(root, info, id):
return root.filter(
lambda event:
event.operation == UPDATED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
### Model Updated Subscriptions
Defining a subscription that is fired whenever a given model instance is deleted can be accomplished like so
```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import DELETED
from your_app.models import YourModel
class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
class Subscription(graphene.ObjectType):
your_model_deleted = graphene.Field(YourModelType, id=graphene.ID())
def resolve_your_model_deleted(root, info, id):
return root.filter(
lambda event:
event.operation == DELETED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
```
## Custom Events
Sometimes you need to create subscriptions which responds to events other than Django signals. In this case, you can use the `SubscriptionEvent` class directly. (Note: in order to maintain compatibility with Django channels, all `instance` values must be json serializable)
For example, a custom event subscription might look like this:
```python
import graphene
CUSTOM_EVENT = 'custom_event'
class CustomEventSubscription(graphene.ObjectType):
custom_subscription = graphene.Field(CustomType)
def resolve_custom_subscription(root, info):
return root.filter(
lambda event:
event.operation == CUSTOM_EVENT
).map(lambda event: event.instance)
# elsewhere in your app:
from graphene_subscriptions.events import SubscriptionEvent
event = SubscriptionEvent(
operation=CUSTOM_EVENT,
instance=
)
event.send()
```
## Production Readiness
This implementation was spun out of an internal implementation I developed which we've been using in production for the past 6 months at [Jetpack](https://www.tryjetpack.com/). We've had relatively few issues with it, and I am confident that it can be reliably used in production environments.
However, being a startup, our definition of production-readiness may be slightly different from your own. Also keep in mind that the scale at which we operate hasn't been taxing enough to illuminate where the scaling bottlenecks in this implementation may hide.
If you end up running this in production, please [reach out](https://twitter.com/jayden_windle) and let me know!
## Contributing
PRs and other contributions are very welcome! To set up `graphene_subscriptions` in a development envrionment, do the following:
1. Clone the repo
```bash
$ git clone git@github.com:jaydenwindle/graphene-subscriptions.git
```
2. Install [poetry](https://poetry.eustace.io/)
```bash
$ curl -sSL https://raw.githubusercontent.com/sdispater/poetry/master/get-poetry.py | python
```
3. Install dependencies
```bash
$ poetry install
```
4. Run the test suite
```bash
$ poetry run pytest
```
%prep
%autosetup -n graphene_subscriptions-1.0.2
%build
%py3_build
%install
%py3_install
install -d -m755 %{buildroot}/%{_pkgdocdir}
if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
pushd %{buildroot}
if [ -d usr/lib ]; then
find usr/lib -type f -printf "\"/%h/%f\"\n" >> filelist.lst
fi
if [ -d usr/lib64 ]; then
find usr/lib64 -type f -printf "\"/%h/%f\"\n" >> filelist.lst
fi
if [ -d usr/bin ]; then
find usr/bin -type f -printf "\"/%h/%f\"\n" >> filelist.lst
fi
if [ -d usr/sbin ]; then
find usr/sbin -type f -printf "\"/%h/%f\"\n" >> filelist.lst
fi
touch doclist.lst
if [ -d usr/share/man ]; then
find usr/share/man -type f -printf "\"/%h/%f.gz\"\n" >> doclist.lst
fi
popd
mv %{buildroot}/filelist.lst .
mv %{buildroot}/doclist.lst .
%files -n python3-graphene-subscriptions -f filelist.lst
%dir %{python3_sitelib}/*
%files help -f doclist.lst
%{_docdir}/*
%changelog
* Thu Jun 08 2023 Python_Bot - 1.0.2-1
- Package Spec generated