Source code for eurito_daps.routines.centrality.centrality

Centrality Pipeline

Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes

from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.luigihacks.misctools import get_config

from eurito_daps.packages.utils import centrality_utils
from nesta.core.orms.orm_utils import graph_session

import igraph as ig
from py2neo import Graph as pgraph
import luigi
import datetime
import os
import logging

[docs]class RootTask(luigi.WrapperTask): '''The root task, which collects the supplied parameters and calls the main task. Args: date (datetime): Date used to label the outputs output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects" production (bool): test mode or production mode ''' date = luigi.DateParameter( output_type = luigi.Parameter() production = luigi.BoolParameter(default=False)
[docs] def requires(self): '''Call the task to run before this in the pipeline.''' logging.getLogger().setLevel(logging.INFO) return CalcCentralityTask(, output_type=self.output_type, test=not self.production)
[docs]class CalcCentralityTask(luigi.Task): '''Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes Args: date (datetime): Date used to label the outputs output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects" test (bool): run a shorter version of the task if in test mode ''' date = luigi.DateParameter( output_type = luigi.Parameter() test = luigi.BoolParameter()
[docs] def output(self): '''Points to the output database engine where the task is marked as done. The luigi_table_updates table exists in test and production databases. ''' db_config = get_config(os.environ["MYSQLDB"], 'mysqldb') db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Example <dummy>" # Note, not a real table update_id = "OpenAireToNeo4jTask_{}".format( return MySqlTarget(update_id=update_id, **db_config)
[docs] def run(self): conf = get_config('neo4j.config', 'neo4j') gkwargs = dict(host=conf['host'], secure=True, auth=(conf['user'], conf['password'])) igr = ig.Graph() with graph_session(**gkwargs) as tx: graph = tx.graph all_rels = list(graph.relationships.match() ) #finds all relationships in a graph tuplelist = list() for index,rel in enumerate(all_rels): rel_tuple = (rel.start_node.identity, rel.end_node.identity) tuplelist.append(rel_tuple) newgraph = igr.TupleList(tuplelist) betw_list = newgraph.betweenness(vertices=None, directed=False, cutoff=3, weights=None, nobigint=True) centrality_utils.add_betw_property(graph, newgraph, betw_list) logging.debug('Writing to DB complete') # mark as done"Task complete") self.output().touch()