import logging
from typing import Dict
from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer
from channels_presence.models import Presence, Room
from django.conf import settings
from django.shortcuts import resolve_url
from django.utils.functional import cached_property
from .models import PokerSession, Story
#: Moderators will have this permission.
MODERATE_PERMISSION = '.'.join((Story._meta.app_label, 'moderate'))
#: Voters will have this permission.
VOTE_PERMISSION = '.'.join((Story._meta.app_label, 'vote'))
logger = logging.getLogger(__name__)
[docs]class PokerConsumer(JsonWebsocketConsumer):
"""Consumer responsible for the communication between the moderator's and voter's websockets."""
def __init__(self, *args, **kwargs):
"""Initialize a PokerConsumer and set the dict of callable commands."""
super().__init__(*args, **kwargs)
self.commands = {
'reset_requested': {'method': self.reset_requested, 'required_permission': MODERATE_PERMISSION},
'next_story_requested': {'method': self.next_story_requested, 'required_permission': MODERATE_PERMISSION},
'points_submitted': {'method': self.set_story_points, 'required_permission': MODERATE_PERMISSION},
'vote_submitted': {'method': self.vote_submitted, 'required_permission': VOTE_PERMISSION},
'heartbeat': {'method': self.heartbeat_received, 'required_permission': None}
}
@cached_property
def poker_session(self) -> PokerSession:
"""Return the planning_poker session corresponding to the websocket's url.
:return: The planning_poker session corresponding to the websocket's url.
"""
return PokerSession.objects.select_related('active_story').get(
pk=self.scope['url_route']['kwargs']['poker_session']
)
[docs] def connect(self):
"""Accept the connection from a websocket."""
self.room_group_name = 'poker_session_{}'.format(self.poker_session.id)
Room.objects.add(self.room_group_name, self.channel_name, self.scope['user'])
self.accept()
if self.poker_session.active_story:
self.send_active_story_information(send_to_group=False)
[docs] def disconnect(self, status_code):
"""Remove self from the current group."""
Room.objects.remove(self.room_group_name, self.channel_name)
[docs] def receive_json(self, content: Dict, **kwargs):
"""Call the given method with its arguments.
A log entry is made and no method is executed, if the user doesn't have the required permission.
:param content: A dict containing the command and additional arguments.
For example: '{'event': 'foo', 'data': data}'
:param kwargs: Additional keyword arguments.
"""
user = self.scope['user']
command = self.commands[content.pop('event')]
required_permission = command['required_permission']
if required_permission is None or user.has_perm(required_permission):
command['method'](**content['data'])
else:
logger.warning(
'%(user)s tried to execute %(method)s without the required permission: %(required_permission)s',
{'user': user, **command}
)
[docs] def send_event(self, event: str, send_to_group: bool = True, **data: Dict):
"""Send an event with the given data either to the channel or to the whole group.
:param event: The name of the event which should be sent.
:param send_to_group: Flag whether the event should be sent to the whole group or not. Default True.
:param data: The data which should be sent along the event.
"""
if send_to_group:
send_method = self.channel_layer.group_send
channel_name = self.room_group_name
else:
send_method = self.channel_layer.send
channel_name = self.channel_name
async_to_sync(send_method)(
channel_name,
{
'type': 'send_json',
'event': event,
'data': data
}
)
[docs] def next_story_requested(self, story_id: int = None):
"""Set the planning_poker session's active story and send all necessary data to the websockets.
:param story_id: The id of the story which should become the active story.
"""
self.poker_session.refresh_from_db()
if story_id is None:
active_story_order = self.poker_session.active_story._order if self.poker_session.active_story else -1
new_active_story = self.poker_session.stories.filter(
story_points__isnull=True,
_order__gt=active_story_order
).first()
else:
new_active_story = self.poker_session.stories.get(pk=story_id)
if new_active_story is None:
self.end_poker_session()
else:
self.poker_session.active_story = new_active_story
self.poker_session.save()
self.send_active_story_information()
[docs] def reset_requested(self):
"""Remove all votes from the currently active story."""
self.poker_session.active_story.votes.all().delete()
self.send_active_story_information()
[docs] def set_story_points(self, story_points: int):
"""Set the story's story points in the database.
:param story_points: The points which the story should have.
"""
active_story = self.poker_session.active_story
if active_story is None:
logger.warning('%(user)s tried to set story points with no active story.', {'user': self.scope['user']})
return
active_story.story_points = story_points
active_story.save()
self.send_event('story_points_submitted', story_points=story_points)
[docs] def vote_submitted(self, choice: str):
"""Update or create the vote for the user with their choice for the current story and broadcast it to all users.
:param choice: The choice for the story's points the user made.
"""
self.poker_session.refresh_from_db()
active_story = self.poker_session.active_story
user = self.scope['user']
if active_story is None:
logger.warning('%(user)s tried to vote with no active story.', {'user': user})
return
active_story.votes.update_or_create(user=user, defaults={'choice': choice})
self.send_active_story_information()
[docs] def send_active_story_information(self, send_to_group: bool = True):
"""Dispatch an Event containing the story's information.
:param send_to_group: Flag whether the event should be sent to the whole group or not. Default True.
"""
self.send_event(
'story_changed',
send_to_group=send_to_group,
id=self.poker_session.active_story.id,
story_label=str(self.poker_session.active_story),
description=self.poker_session.active_story.description,
votes=self.poker_session.active_story.get_votes_with_voter_information()
)
[docs] def end_poker_session(self):
"""Dispatch an Event which ends the planning_poker session."""
self.poker_session.active_story = None
self.poker_session.save()
self.send_event(
'poker_session_ended',
send_to_group=True,
poker_session_end_redirect_url=resolve_url(
getattr(settings, 'POKER_SESSION_END_REDIRECT_URL', 'planning_poker:index')
)
)
[docs] def heartbeat_received(self):
"""Update the 'last seen' timestamp."""
Presence.objects.touch(self.channel_name)
[docs] def participants_changed(self, message: Dict):
"""Dispatch an event containing a list of all participants.
:param message: The message contains information about the planning_poker session's active participants.
"""
self.send_event('participants_changed', **message['data'])