DjangoChannelsGraphqlWs
Django Channels based WebSocket GraphQL server with Graphene-like subscriptions
Install / Use
/learn @psevensas/DjangoChannelsGraphqlWsREADME
Django Channels based WebSocket GraphQL server with Graphene-like subscriptions
- Django Channels based WebSocket GraphQL server with Graphene-like subscriptions
Features
- WebSocket-based GraphQL server implemented on the Django Channels v3.
- WebSocket protocol is compatible with Apollo GraphQL client.
- Graphene-like subscriptions.
- All GraphQL requests are processed concurrently (in parallel).
- Subscription notifications delivered in the order they were issued.
- Optional subscription activation message can be sent to a client. This is useful to avoid race conditions on the client side. Consider the case when client subscribes to some subscription and immediately invokes a mutations which triggers this subscription. In such case the subscription notification can be lost, cause these subscription and mutation requests are processed concurrently. To avoid this client shall wait for the subscription activation message before sending such mutation request.
- Customizable notification strategies:
- A subscription can be put to one or many subscription groups. This allows to granularly notify only selected clients, or, looking from the client's perspective - to subscribe to some selected source of events. For example, imaginary subscription "OnNewMessage" may accept argument "user" so subscription will only trigger on new messages from the selected user.
- Notification can be suppressed in the subscription resolver method
publish. For example, this is useful to avoid sending self-notifications.
- All GraphQL "resolvers" run in the main eventloop. Asynchronous
"resolvers" able to execute blocking calls with
asyncio.to_threadorchannels.db.database_sync_to_asyncwrappers. - Resolvers (including subscription's
subscribe&publish) can be represented both as synchronous or asynchronous (async def) methods. - Subscription notifications can be sent from both synchronous and
asynchronous contexts. Just call
MySubscription.broadcast()orawait MySubscription.broadcast()depending on the context. - Clients for the GraphQL WebSocket server:
- AIOHTTP-based client.
- Client for unit test based on the Channels testing communicator.
- Requires Python 3.8 and newer. Tests run on 3.8, 3.9, 3.10.
- Works on Linux, macOS, and Windows.
Installation
pip install django-channels-graphql-ws
Getting started
Create a GraphQL schema using Graphene. Note the MySubscription class.
import channels_graphql_ws
import graphene
class MySubscription(channels_graphql_ws.Subscription):
"""Simple GraphQL subscription."""
# Leave only latest 64 messages in the server queue.
notification_queue_limit = 64
# Subscription payload.
event = graphene.String()
class Arguments:
"""That is how subscription arguments are defined."""
arg1 = graphene.String()
arg2 = graphene.String()
@staticmethod
def subscribe(root, info, arg1, arg2):
"""Called when user subscribes."""
# Return the list of subscription group names.
return ["group42"]
@staticmethod
def publish(payload, info, arg1, arg2):
"""Called to notify the client."""
# Here `payload` contains the `payload` from the `broadcast()`
# invocation (see below). You can return `None` if you wish to
# suppress the notification to a particular client. For example,
# this allows to avoid notifications for the actions made by
# this particular client.
return MySubscription(event="Something has happened!")
class Query(graphene.ObjectType):
"""Root GraphQL query."""
# Graphene requires at least one field to be present. Check
# Graphene docs to see how to define queries.
value = graphene.String()
async def resolve_value(self):
return "test"
class Mutation(graphene.ObjectType):
"""Root GraphQL mutation."""
# Check Graphene docs to see how to define mutations.
pass
class Subscription(graphene.ObjectType):
"""Root GraphQL subscription."""
my_subscription = MySubscription.Field()
graphql_schema = graphene.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription,
)
Make your own WebSocket consumer subclass and set the schema it serves:
class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
"""Channels WebSocket consumer which provides GraphQL API."""
schema = graphql_schema
# Uncomment to send ping message every 42 seconds.
# send_ping_every = 42
# Uncomment to process requests sequentially (useful for tests).
# strict_ordering = True
async def on_connect(self, payload):
"""New client connection handler."""
# You can `raise` from here to reject the connection.
print("New client connected!")
Setup Django Channels routing:
application = channels.routing.ProtocolTypeRouter({
"websocket": channels.routing.URLRouter([
django.urls.path("graphql/", MyGraphqlWsConsumer.as_asgi()),
])
})
Notify<sup>﹡</sup> clients when some event happens using
the broadcast() or broadcast_sync() method from the OS thread where
there is no running event loop:
MySubscription.broadcast(
# Subscription group to notify clients in.
group="group42",
# Dict delivered to the `publish` method.
payload={},
)
Notify<sup>﹡</sup> clients in a coroutine function
with async broadcast() or broadcast_async() method:
await MySubscription.broadcast(
# Subscription group to notify clients in.
group="group42",
# Dict delivered to the `publish` method.
payload={},
)
<a name="redis-layer">﹡)</a> In case you are testing your client code by notifying it from the Django Shell, you have to setup a channel layer in order for the two instance of your application. The same applies in production with workers.
You should prefer async resolvers and async middleware over sync ones. Async versions will result in faster code execution. To do DB operations you can use Django 4 asynchronous queries.
Example
You can find simple usage example in the example directory.
Run:
cd example/
# Initialize database.
./manage.py migrate
# Create "user" with password "user".
./manage.py createsuperuser
# Run
