Source code for eurito_daps.batchables.novelty.lolvelty.run

"""
run.py (lolvelty)
-----------------

Calculates the "lolvelty" novelty score to documents
in Elasticsearch, on a document-by-document basis.
Note that this is a slow procedure, and the bounds of
document "lolvelty" can't be known a priori.
"""

from nesta.core.luigihacks.elasticsearchplus import ElasticsearchPlus
from nesta.packages.novelty.lolvelty import lolvelty
from ast import literal_eval
import os
import boto3
import json
import logging

[docs]def run(): s3_bucket = os.environ["BATCHPAR_bucket"] batch_file = os.environ["BATCHPAR_batch_file"] count = int(os.environ['BATCHPAR_count']) es_index = os.environ['BATCHPAR_index'] es_host = os.environ['BATCHPAR_outinfo'] es_port = int(os.environ['BATCHPAR_out_port']) es_index = os.environ['BATCHPAR_index'] es_type = os.environ['BATCHPAR_out_type'] entity_type = os.environ["BATCHPAR_entity_type"] aws_auth_region = os.environ["BATCHPAR_aws_auth_region"] fields = literal_eval(os.environ["BATCHPAR_fields"]) score_field = os.environ["BATCHPAR_score_field"] test = literal_eval(os.environ["BATCHPAR_test"]) # Extract all document ids in this chunk s3 = boto3.resource('s3') ids_obj = s3.Object(s3_bucket, batch_file) logging.info(f'Getting document ids...') all_doc_ids = json.loads(ids_obj.get()['Body']._raw_stream.read()) logging.info(f'Got {len(all_doc_ids)} document ids') # Set up Elasticsearch es = ElasticsearchPlus(hosts=es_host, port=es_port, aws_auth_region=aws_auth_region, no_commit=("AWSBATCHTEST" in os.environ), entity_type=entity_type, do_sort=False) min_match = 0.3 if not test else 0.05 for doc_id in all_doc_ids: # Check whether the doc exists with the correct fields existing = es.get(es_index, doc_type=es_type, id=doc_id)['_source'] # Get the score score = None if any(f in existing for f in fields): score = lolvelty(es, es_index, doc_id, fields, total=count, minimum_should_match=min_match) # Merge existing info into new doc doc = {**existing} doc[score_field] = score es.index(index=es_index, doc_type=es_type, id=doc_id, body=doc)
if __name__ == "__main__": logging.basicConfig(handlers=[logging.StreamHandler(), ], level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s") run()