Production pipelines

We use luigi routines to orchestrate our pipelines. The batching procedure relies on batchables as described in batchables. Other than luigihacks.autobatch, which is respectively documented in Nesta’s codebase, the routine procedure follows the Luigi documentation well.

Transfer of Elasticsearch data

This pipeline is responsible for the transfer of Elasticsearch data from a remote origin (in our case, Nesta’s Elasticsearch endpoint) to EURITO’s endpoint.

class Es2EsTask(*args, **kwargs)[source]

Bases: luigi.task.Task

date = <luigi.parameter.DateParameter object>
origin_endpoint = <luigi.parameter.Parameter object>
origin_index = <luigi.parameter.Parameter object>
dest_endpoint = <luigi.parameter.Parameter object>
dest_index = <luigi.parameter.Parameter object>
test = <luigi.parameter.BoolParameter object>
chunksize = <luigi.parameter.IntParameter object>
do_transfer_index = <luigi.parameter.BoolParameter object>
db_config_path = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

class EsLolveltyTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.estask.LazyElasticsearchTask

date = <luigi.parameter.DateParameter object>
origin_endpoint = <luigi.parameter.Parameter object>
origin_index = <luigi.parameter.Parameter object>
test = <luigi.parameter.BoolParameter object>
process_batch_size = <luigi.parameter.IntParameter object>
do_transfer_index = <luigi.parameter.BoolParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

production = <luigi.parameter.BoolParameter object>
date = <luigi.parameter.DateParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

Centrality Pipeline

Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

The root task, which collects the supplied parameters and calls the main task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
  • production (bool) – test mode or production mode
date = <luigi.parameter.DateParameter object>
output_type = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
requires()[source]

Call the task to run before this in the pipeline.

class CalcCentralityTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes

Parameters:
  • date (datetime) – Date used to label the outputs
  • output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
  • test (bool) – run a shorter version of the task if in test mode
date = <luigi.parameter.DateParameter object>
output_type = <luigi.parameter.Parameter object>
test = <luigi.parameter.BoolParameter object>
output()[source]

Points to the output database engine where the task is marked as done. The luigi_table_updates table exists in test and production databases.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

Cordis to Neo4j

Task for piping Cordis data from SQL to Neo4j.

class CordisNeo4jTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Task for piping Cordis data to neo4j

test = <luigi.parameter.BoolParameter object>
date = <luigi.parameter.DateParameter object>
output()[source]

Points to the output database engine where the task is marked as done. The luigi_table_updates table exists in test and production databases.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

production = <luigi.parameter.BoolParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

OpenAIRE to Neo4j

Pipe data directly from the OpenAIRE API to Neo4j by matching to Cordis projects already in Neo4j.

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

The root task, which collects the supplied parameters and calls the SimpleTask.

Parameters:
  • date (datetime) – Date used to label the outputs
  • output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
  • production (bool) – test mode or production mode
date = <luigi.parameter.DateParameter object>
output_type = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
requires()[source]

Call the task to run before this in the pipeline.

class OpenAireToNeo4jTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Takes OpenAIRE entities from MySQL database and writes them into Neo4j database

Parameters:
  • date (datetime) – Date used to label the outputs
  • output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
  • test (bool) – run a shorter version of the task if in test mode
date = <luigi.parameter.DateParameter object>
output_type = <luigi.parameter.Parameter object>
test = <luigi.parameter.BoolParameter object>
output()[source]

Points to the output database engine where the task is marked as done. The luigi_table_updates table exists in test and production databases.

run()[source]

The task run method, to be overridden in a subclass.

See Task.run