Source code for eurito_daps.routines.centrality.centrality
'''
Centrality Pipeline
===================
Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes
'''
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.luigihacks.misctools import get_config
from eurito_daps.packages.utils import centrality_utils
from nesta.core.orms.orm_utils import graph_session
import igraph as ig
from py2neo import Graph as pgraph
import luigi
import datetime
import os
import logging
[docs]class RootTask(luigi.WrapperTask):
'''The root task, which collects the supplied parameters and calls the main task.
Args:
date (datetime): Date used to label the outputs
output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects"
production (bool): test mode or production mode
'''
date = luigi.DateParameter(default=datetime.datetime.today())
output_type = luigi.Parameter()
production = luigi.BoolParameter(default=False)
[docs] def requires(self):
'''Call the task to run before this in the pipeline.'''
logging.getLogger().setLevel(logging.INFO)
return CalcCentralityTask(date=self.date,
output_type=self.output_type,
test=not self.production)
[docs]class CalcCentralityTask(luigi.Task):
'''Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes
Args:
date (datetime): Date used to label the outputs
output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects"
test (bool): run a shorter version of the task if in test mode
'''
date = luigi.DateParameter(default=datetime.datetime.today())
output_type = luigi.Parameter()
test = luigi.BoolParameter()
[docs] def output(self):
'''Points to the output database engine where the task is marked as done.
The luigi_table_updates table exists in test and production databases.
'''
db_config = get_config(os.environ["MYSQLDB"], 'mysqldb')
db_config["database"] = 'dev' if self.test else 'production'
db_config["table"] = "Example <dummy>" # Note, not a real table
update_id = "OpenAireToNeo4jTask_{}".format(self.date)
return MySqlTarget(update_id=update_id, **db_config)
[docs] def run(self):
conf = get_config('neo4j.config', 'neo4j')
gkwargs = dict(host=conf['host'], secure=True,
auth=(conf['user'], conf['password']))
igr = ig.Graph()
with graph_session(**gkwargs) as tx:
graph = tx.graph
all_rels = list(graph.relationships.match() ) #finds all relationships in a graph
tuplelist = list()
for index,rel in enumerate(all_rels):
rel_tuple = (rel.start_node.identity, rel.end_node.identity)
tuplelist.append(rel_tuple)
newgraph = igr.TupleList(tuplelist)
betw_list = newgraph.betweenness(vertices=None, directed=False, cutoff=3, weights=None, nobigint=True)
centrality_utils.add_betw_property(graph, newgraph, betw_list)
logging.debug('Writing to DB complete')
# mark as done
logging.info("Task complete")
self.output().touch()