"""
Crossbar Driver Module
"""
from __future__ import print_function
import asyncio
import logging
import socket
import sys
import time
import traceback
import txaio
import asyncpg
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.exception import TransportLost
from eventify import Eventify
from eventify.drivers.base import BaseComponent
from eventify.persist import persist_event
from eventify.exceptions import EventifyHandlerInitializationFailed
txaio.use_asyncio()
[docs]class Component(BaseComponent, ApplicationSession):
"""
Handle subscribing to topics
"""
log = logging.getLogger("eventify.drivers.crossbar")
[docs] async def emit_event(self, event):
"""
Publish an event back to crossbar
:param event: Event object
"""
self.log.debug("publishing event on %s", self.publish_topic)
if self.config.extra['config']['pub_options']['retain']:
try:
await persist_event(
self.publish_topic,
event,
self.pool
)
except SystemError as error:
self.log.error(error)
return
try:
await self.publish(
self.publish_topic,
event.__dict__,
options=self.publish_options
)
except TransportLost as error:
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.get_event_loop().stop()
self.log.error(error)
[docs] def onClose(self, wasClean):
"""
Disconnect when connection to message
broker is lost
"""
self.log.error('lost connection to crossbar on session %' + str(self.session_id))
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.get_event_loop().stop()
[docs] def onDisconnect(self):
"""
Event fired when transport is lost
"""
self.log.error('onDisconnect event fired')
[docs] def onLeave(self, reason=None, message=None):
"""
:param reason:
:param message:
"""
self.log.info('Leaving realm; reason: %s', reason)
[docs] def onUserError(self, fail, message):
"""
Handle user errors
"""
self.log.error(fail)
self.log.error(message)
[docs] async def onJoin(self, details):
self.log.debug("joined websocket realm: %s", details)
# set session_id for reconnect
self.session_id = details.session
self.realm_id = details.realm
for handler in self.handlers:
# initialize handler
handler_instance = handler()
handler_instance.set_session(self)
if hasattr(handler_instance, 'init'):
try:
await handler_instance.init()
except Exception:
self.capture_exception()
if hasattr(handler_instance, 'on_event'):
self.log.debug("subscribing to topic %s", handler_instance.subscribe_topic)
# Used with base handler defined subscribe_topic
if handler_instance.subscribe_topic is not None:
await self.subscribe(
handler_instance.on_event,
handler_instance.subscribe_topic,
)
self.log.debug("subscribed to topic: %s", handler_instance.subscribe_topic)
else:
# Used with config.json defined topics
if self.subscribed_topics is not None:
for topic in self.subscribed_topics:
await self.subscribe(
handler_instance.on_event,
topic
)
self.log.debug("subscribed to topic: %s", topic)
if hasattr(handler_instance, 'worker'):
# or just await handler.worker()
while True:
try:
await handler_instance.worker()
except Exception as error:
self.capture_exception()
self.log.error("Operation failed. %s", error)
traceback.print_exc(file=sys.stdout)
continue
[docs] async def show_sessions(self):
"""
Returns an object with a lists of the session IDs
for all sessions currently attached to the realm
http://crossbar.io/docs/Session-Metaevents-and-Procedures/
"""
res = await self.call("wamp.session.list")
for session_id in res:
session = await self.call("wamp.session.get", session_id)
self.log.info(session)
[docs] async def total_sessions(self):
"""
Returns the number of sessions currently attached to the realm.
http://crossbar.io/docs/Session-Metaevents-and-Procedures/
"""
res = await self.call("wamp.session.count")
self.log.info(res)
[docs] async def lookup_session(self, topic_name):
"""
Attempts to find the session id for a given topic
http://crossbar.io/docs/Subscription-Meta-Events-and-Procedures/
"""
res = await self.call("wamp.subscription.lookup", topic_name)
self.log.info(res)
[docs]class Service(Eventify):
"""
Create crossbar service
"""
[docs] def setup_runner(self):
"""
Setup instance of runner var
"""
runner = ApplicationRunner(
url=self.config['transport_host'],
realm=u'realm1',
extra={
'config': self.config,
'handlers': self.handlers,
}
)
return runner
[docs] def check_transport_host(self):
"""
Check if crossbar port is open
on transport host
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('events-server', 8080)) # TODO: Read from config vs using hard coded hostname
if result == 0:
logging.info('port 8080 on crossbar is open!')
return True
return False
[docs] def reconnect(self):
"""
Handle reconnect logic if connection
to crossbar is lost
"""
connect_attempt = 0
max_retries = self.config['max_reconnect_retries']
logging.info('attempting to reconnect to crossbar')
runner = self.setup_runner()
while True:
if connect_attempt == max_retries:
logging.info('max retries reached; stopping service')
sys.exit(1)
self.check_event_loop()
try:
logging.info('waiting 5 seconds')
time.sleep(5)
if self.check_transport_host():
logging.info('waiting 10 seconds to ensure that crossbar has initialized before reconnecting')
time.sleep(10)
runner.run(Component)
else:
logging.error('crossbar host port 8080 not available...')
except RuntimeError as error:
logging.error(error)
except ConnectionRefusedError as error:
logging.error(error)
except ConnectionError as error:
logging.error(error)
except KeyboardInterrupt:
logging.info('User initiated shutdown')
loop = asyncio.get_event_loop()
loop.stop()
sys.exit(1)
connect_attempt += 1
[docs] def start(self, start_loop=True):
"""
Start a producer/consumer service
"""
txaio.start_logging()
runner = self.setup_runner()
if start_loop:
try:
runner.run(Component)
except EventifyHandlerInitializationFailed as initError:
logging.error('Unable to initialize handler: %s.' % initError.message)
sys.exit(1)
except ConnectionRefusedError:
logging.error('Unable to connect to crossbar instance. Is it running?')
sys.exit(1)
except KeyboardInterrupt:
logging.info('User initiated shutdown')
loop = asyncio.get_event_loop()
loop.stop()
sys.exit(1)
self.check_event_loop()
self.reconnect()
else:
return runner.run(
Component,
start_loop=start_loop
)