Production pipelines¶
We use luigi routines to orchestrate our pipelines. The batching procedure relies on batchables as described in batchables
. Other than luigihacks.autobatch
, which is respectively documented in Nesta’s codebase, the routine procedure follows the Luigi documentation well.
Transfer of Elasticsearch data¶
This pipeline is responsible for the transfer of Elasticsearch data from a remote origin (in our case, Nesta’s Elasticsearch endpoint) to EURITO’s endpoint.
-
class
Es2EsTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
-
date
= <luigi.parameter.DateParameter object>¶
-
origin_endpoint
= <luigi.parameter.Parameter object>¶
-
origin_index
= <luigi.parameter.Parameter object>¶
-
dest_endpoint
= <luigi.parameter.Parameter object>¶
-
dest_index
= <luigi.parameter.Parameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
chunksize
= <luigi.parameter.IntParameter object>¶
-
do_transfer_index
= <luigi.parameter.BoolParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
-
class
EsLolveltyTask
(*args, **kwargs)[source]¶ Bases:
nesta.core.luigihacks.estask.LazyElasticsearchTask
-
date
= <luigi.parameter.DateParameter object>¶
-
origin_endpoint
= <luigi.parameter.Parameter object>¶
-
origin_index
= <luigi.parameter.Parameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
process_batch_size
= <luigi.parameter.IntParameter object>¶
-
do_transfer_index
= <luigi.parameter.BoolParameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-
-
class
RootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
-
production
= <luigi.parameter.BoolParameter object>¶
-
date
= <luigi.parameter.DateParameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-
Centrality Pipeline¶
Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes
-
class
RootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
The root task, which collects the supplied parameters and calls the main task.
Parameters: - date (datetime) – Date used to label the outputs
- output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
- production (bool) – test mode or production mode
-
date
= <luigi.parameter.DateParameter object>¶
-
output_type
= <luigi.parameter.Parameter object>¶
-
production
= <luigi.parameter.BoolParameter object>¶
-
class
CalcCentralityTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Takes network from Neo4j database, calculates network centrality measures and updates each node in the database with new centrality attributes
Parameters: - date (datetime) – Date used to label the outputs
- output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
- test (bool) – run a shorter version of the task if in test mode
-
date
= <luigi.parameter.DateParameter object>¶
-
output_type
= <luigi.parameter.Parameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
Cordis to Neo4j¶
Task for piping Cordis data from SQL to Neo4j.
-
class
CordisNeo4jTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Task for piping Cordis data to neo4j
-
test
= <luigi.parameter.BoolParameter object>¶
-
date
= <luigi.parameter.DateParameter object>¶
-
-
class
RootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
-
production
= <luigi.parameter.BoolParameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-
OpenAIRE to Neo4j¶
Pipe data directly from the OpenAIRE API to Neo4j by matching to Cordis projects already in Neo4j.
-
class
RootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
The root task, which collects the supplied parameters and calls the SimpleTask.
Parameters: - date (datetime) – Date used to label the outputs
- output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
- production (bool) – test mode or production mode
-
date
= <luigi.parameter.DateParameter object>¶
-
output_type
= <luigi.parameter.Parameter object>¶
-
production
= <luigi.parameter.BoolParameter object>¶
-
class
OpenAireToNeo4jTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Takes OpenAIRE entities from MySQL database and writes them into Neo4j database
Parameters: - date (datetime) – Date used to label the outputs
- output_type (str) – type of record to be extracted from OpenAIRE API. Accepts “software”, “datasets”, “publications”, “ECProjects”
- test (bool) – run a shorter version of the task if in test mode
-
date
= <luigi.parameter.DateParameter object>¶
-
output_type
= <luigi.parameter.Parameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶