Source code for eurito_daps.routines.cordis.cordis_neo4j_task

"""
Cordis to Neo4j
===============

Task for piping Cordis data from SQL to Neo4j.
"""

from nesta.core.orms.cordis_orm import Base
from nesta.core.orms.orm_utils import db_session_query, get_mysql_engine
from nesta.core.orms.orm_utils import graph_session
from nesta.core.luigihacks.luigi_logging import set_log_level
from nesta.core.luigihacks.misctools import get_config
from eurito_daps.packages.cordis.cordis_neo4j import _extract_name, orm_to_neo4j
from eurito_daps.packages.cordis.cordis_neo4j import prepare_base_entities
from nesta.core.luigihacks.mysqldb import MySqlTarget

from datetime import datetime as dt
import luigi
import logging
import os
from py2neo import Graph


[docs]class CordisNeo4jTask(luigi.Task): """Task for piping Cordis data to neo4j""" test = luigi.BoolParameter(default=True) date = luigi.DateParameter(default=dt.now())
[docs] def output(self): ''' Points to the output database engine where the task is marked as done. The luigi_table_updates table exists in test and production databases. ''' db_config = get_config(os.environ["MYSQLDB"], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Cordis Neo4j <dummy>" # Note, not a real table update_id = "Cordis2Neo4j_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def run(self): limit = 100 if self.test else None flush_freq = 33 if self.test else 5000 # Get connection settings engine = get_mysql_engine('MYSQLDB', 'nesta', 'dev' if self.test else 'production') conf = get_config('neo4j.config', 'neo4j') gkwargs = dict(host=conf['host'], secure=True, auth=(conf['user'], conf['password'])) # Drop all neo4j data in advance # (WARNING: this is a hack in lieu of proper db staging/versioning) with graph_session(**gkwargs) as tx: logging.info('Dropping all previous data') tx.graph.delete_all() for constraint in tx.run('CALL db.constraints'): logging.info(f'Dropping constraint {constraint[0]}') tx.run(f'DROP {constraint[0]}') # Iterate over all tables in the ORM for tablename, table in Base.metadata.tables.items(): entity_name = _extract_name(tablename) logging.info(f'\tProcessing {entity_name}') orm, parent_orm, rel_name = prepare_base_entities(table) # Insert data to neo4j in one session per table, # to enable constraint and relationship lookups # after insertion irow = 0 uninterrupted = False while not uninterrupted: uninterrupted = True with graph_session(**gkwargs) as tx: # Iterate over rows in the database for db, orm_instance in db_session_query(query=orm, engine=engine, limit=limit, offset=irow): irow += 1 if irow == limit: break # Convert the ORM row to a neo4j object, and insert orm_to_neo4j(session=db, transaction=tx, orm_instance=orm_instance, parent_orm=parent_orm, rel_name=rel_name) if (irow % flush_freq) == 0: logging.info(f'\t\tFlushing at row {irow}') uninterrupted = False break # Confirm the task is finished self.output().touch()
[docs]class RootTask(luigi.WrapperTask): production = luigi.BoolParameter(default=False)
[docs] def requires(self): test = not self.production set_log_level(test) return CordisNeo4jTask(test=test)