Source code for eventify.persist

"""
Persist Helper Module
"""
from __future__ import print_function
from datetime import datetime

import json
import os

import asyncio
import asyncpg

from eventify.exceptions import EventifySanityError


[docs]async def persist_event(topic, event, pool): """ Track event to prevent duplication of work and potential loss of event :param topic: The event topic :param event: The event object """ # Event to json json_event = json.dumps(event.__dict__) # Connect to database or create and connect if non existent conn = await pool.acquire() # Insert event if not processed try: query = """ CREATE TABLE IF NOT EXISTS public."topic_placeholder" ( id SERIAL PRIMARY KEY, event json NOT NULL, issued_at timestamp without time zone NOT NULL ) WITH ( OIDS=FALSE ); ALTER TABLE public."topic_placeholder" OWNER TO root; """ query = query.replace('topic_placeholder', topic) await conn.execute(query) issued_at = datetime.utcnow() query = 'INSERT INTO "%s" (event, issued_at) VALUES ($1, $2)' % topic await conn.execute(query, json_event, issued_at) finally: await pool.release(conn)