Source code for eurito_daps.routines.openaire.openaire_to_neo4j_search
'''
OpenAIRE to Neo4j
=================
Pipe data directly from the OpenAIRE API to Neo4j by matching to Cordis projects
already in Neo4j.
'''
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.luigihacks.misctools import get_config
from eurito_daps.packages.utils import openaire_utils
from nesta.core.orms.orm_utils import get_mysql_engine
from nesta.core.orms.orm_utils import graph_session
from py2neo.data import Relationship
import luigi
import datetime
import os
import logging
import requests
[docs]class RootTask(luigi.WrapperTask):
'''The root task, which collects the supplied parameters and calls the SimpleTask.
Args:
date (datetime): Date used to label the outputs
output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects"
production (bool): test mode or production mode
'''
date = luigi.DateParameter(default=datetime.datetime.today())
output_type = luigi.Parameter()
production = luigi.BoolParameter(default=False)
[docs] def requires(self):
'''Call the task to run before this in the pipeline.'''
logging.getLogger().setLevel(logging.INFO)
return OpenAireToNeo4jTask(date=self.date,
output_type=self.output_type,
test=not self.production)
[docs]class OpenAireToNeo4jTask(luigi.Task):
'''Takes OpenAIRE entities from MySQL database and writes them into Neo4j database
Args:
date (datetime): Date used to label the outputs
output_type (str): type of record to be extracted from OpenAIRE API. Accepts "software", "datasets", "publications", "ECProjects"
test (bool): run a shorter version of the task if in test mode
'''
date = luigi.DateParameter(default=datetime.datetime.today())
output_type = luigi.Parameter()
test = luigi.BoolParameter()
[docs] def output(self):
'''Points to the output database engine where the task is marked as done.
The luigi_table_updates table exists in test and production databases.
'''
db_config = get_config(os.environ["MYSQLDB"], 'mysqldb')
db_config["database"] = 'dev' if self.test else 'production'
db_config["table"] = "Example <dummy>" # Note, not a real table
update_id = "OpenAireToNeo4jTask_{}".format(self.date)
return MySqlTarget(update_id=update_id, **db_config)
[docs] def run(self):
def get_page_records (soup):
all_results = list()
return all_results
# Get connection settings
engine = get_mysql_engine('MYSQLDB', 'mysqldb',
'dev' if self.test else 'production')
conf = get_config('neo4j.config', 'neo4j')
gkwargs = dict(host=conf['host'], secure=True,
auth=(conf['user'], conf['password']))
# open up requests session
reqsession = requests.session()
reqsession.keep_alive = False
base_url = 'http://api.openaire.eu/search/'
count = 0
# for each project in Neo4j
with graph_session(**gkwargs) as tx:
graph = tx.graph
neo_projects = graph.nodes.match("Project")
total_projects = len(neo_projects)
sum = 0
for index, neo_project in enumerate(neo_projects):
if index % 1000 == 0:
logging.info("Checking project %d out of %d" % (index, total_projects) )
souplist = openaire_utils.get_project_soups(base_url, reqsession,
self.output_type, neo_project.identity)
# get all results, not just from one page
results = openaire_utils.get_results_from_soups(souplist)
sum = sum + len(results)
if len(results) > 0:
logging.info("Found %d related records" % len(results))
for result in results:
title = result.find("title", recursive=False)
pid = result.find("pid", recursive=False)
record_obj = dict()
record_obj['title'] = title.text
record_obj['pid'] = pid.text
# create record object in Neo4j and return it
created_node = openaire_utils.write_record_to_neo(record_obj, self.output_type, graph)
# create relationship between neo_project and record_obj
relationship_type = "HAS_" + self.output_type.upper()
project_has_node = Relationship(neo_project, relationship_type, created_node)
graph.create(project_has_node)
node_has_project = Relationship(created_node, "HAS_PROJECT", neo_project)
graph.create(node_has_project)
if index > 60000 and self.test:
logging.info("Breaking after %d results in test mode" % index)
break
# mark as done
logging.info("Task complete")
self.output().touch()