Source code for cellmaps_ppi_embedding.runner

#! /usr/bin/env python

import os
import numpy as np
import time
from datetime import date
import logging
import csv
import networkx as nx
from node2vec import Node2Vec
from cellmaps_utils import constants
from cellmaps_utils import logutils
from cellmaps_utils.provenance import ProvenanceUtil
import warnings

import cellmaps_ppi_embedding
from cellmaps_ppi_embedding.exceptions import CellMapsPPIEmbeddingError

logger = logging.getLogger(__name__)


[docs] class EmbeddingGenerator(object): """ Base class for implementations that generate network embeddings """ DIMENSIONS = 1024 def __init__(self, dimensions=DIMENSIONS): """ Constructor """ self._dimensions = dimensions
[docs] def get_dimensions(self): """ Gets number of dimensions this embedding will generate :return: number of dimensions aka vector length :rtype: int """ return self._dimensions
[docs] def get_next_embedding(self): """ Generator method for getting next embedding. Caller should implement with ``yield`` operator :raises: NotImplementedError: Subclasses should implement this :return: Embedding :rtype: list """ raise NotImplementedError('Subclasses should implement')
[docs] class Node2VecEmbeddingGenerator(EmbeddingGenerator): """ """ P_DEFAULT = 2 Q_DEFAULT = 1 WALK_LENGTH = 80 NUM_WALKS = 10 WORKERS = 8 def __init__(self, nx_network, p=P_DEFAULT, q=Q_DEFAULT, dimensions=EmbeddingGenerator.DIMENSIONS, walk_length=WALK_LENGTH, num_walks=NUM_WALKS, workers=WORKERS): """ Constructor """ super().__init__(dimensions=dimensions) self._nx_network = nx_network self._p = p self._q = q self._walk_length = walk_length self._num_walks = num_walks self._workers = workers def _remove_header_edge_from_network(self): """ Removes edge named geneA geneB which is actually the header of the edge list file """ # remove geneA geneB edge cause it is the header of file try: self._nx_network.remove_edge('geneA', 'geneB') except nx.NetworkXError as ne: logger.debug('No edge named geneA -> geneB to remove' + str(ne)) self._nx_network.remove_nodes_from(['geneA', 'geneB'])
[docs] def get_next_embedding(self): """ :return: """ if self._nx_network is None: raise CellMapsPPIEmbeddingError('network is None') self._remove_header_edge_from_network() n2v_obj = Node2Vec(self._nx_network, dimensions=self._dimensions, walk_length=self._walk_length, num_walks=self._num_walks, workers=self._workers, q=self._q, p=self._p) # Embed nodes model = n2v_obj.fit(window=10, min_count=0, sg=1, epochs=1) for key in model.wv.index_to_key: row = [key] row.extend(model.wv[key].tolist()) yield row
[docs] class FakeEmbeddingGenerator(EmbeddingGenerator): """ Fakes PPI embedding """ def __init__(self, ppi_downloaddir, dimensions=1024): """ Constructor :param dimensions: Desired size of output embedding :type dimensions: int """ super().__init__(dimensions=dimensions) self._ppi_downloaddir = ppi_downloaddir self._gene_list = self._get_gene_list() warnings.warn(constants.PPI_EMBEDDING_FILE + ' contains FAKE DATA!!!!\n' 'You have been warned\nHave a nice day\n') logger.error(constants.PPI_EMBEDDING_FILE + ' contains FAKE DATA!!!! ' 'You have been warned. Have a nice day') def _get_gene_list(self): ppi_gene_node_attrs_file = os.path.join(self._ppi_downloaddir, constants.PPI_GENE_NODE_ATTR_FILE) gene_list = [] with open(ppi_gene_node_attrs_file, 'r') as f: reader = csv.DictReader(f, delimiter='\t') for row in reader: gene_list.append(row['name']) return gene_list
[docs] def get_next_embedding(self): """ Generator method for getting next embedding. Caller should implement with ``yield`` operator :raises: NotImplementedError: Subclasses should implement this :return: Embedding :rtype: list """ for g in self._gene_list: row = [g] row.extend(np.random.normal(size=self.get_dimensions())) # sample normal distribution yield row
[docs] class CellMapsPPIEmbedder(object): """ Class to run algorithm """ PPI_EDGELIST_FILEKEY = 'edgelist' def __init__(self, outdir=None, embedding_generator=None, inputdir=None, skip_logging=True, name=None, organization_name=None, project_name=None, provenance_utils=ProvenanceUtil(), input_data_dict=None, provenance=None): """ Constructor :param outdir: directory where ppi embeddings will be saved :type outdir: str :param embedding_generator: :type embedding_generator: :param inputdir: :type inputdir: str :param skip_logging: If ``True`` skip logging, if ``None`` or ``False`` do NOT skip logging :type skip_logging: bool :param provenance: :type provenance: dict :param input_data_dict: :type input_data_dict: dict """ if outdir is None: raise CellMapsPPIEmbeddingError('outdir is None') self._outdir = os.path.abspath(outdir) self._inputdir = os.path.abspath(inputdir) if inputdir is not None else inputdir self._start_time = int(time.time()) self._end_time = -1 self._embedding_generator = embedding_generator self._name = name self._project_name = project_name self._organization_name = organization_name self._keywords = None self._description = None self._input_data_dict = input_data_dict self._provenance_utils = provenance_utils self._provenance = provenance self._inputdataset_ids = [] if skip_logging is None: self._skip_logging = False else: self._skip_logging = skip_logging if self._input_data_dict is None or not self._input_data_dict: self._input_data_dict = {'outdir': self._outdir, 'inputdir': self._inputdir, 'embedding_generator': str(self._embedding_generator), 'name': self._name, 'project_name': self._project_name, 'organization_name': self._organization_name, 'skip_logging': self._skip_logging, 'provenance': str(self._provenance) } logger.debug('In constructor')
[docs] @staticmethod def get_apms_edgelist_file(input_dir=None, edgelist_filename=constants.PPI_EDGELIST_FILE): """ :param edgelist_filename: :return: :param input_dir: :return: """ return os.path.join(input_dir, edgelist_filename)
def _write_task_start_json(self): """ Writes task_start.json file with information about what is to be run """ logutils.write_task_start_json(outdir=self._outdir, start_time=self._start_time, version=cellmaps_ppi_embedding.__version__, data={'commandlineargs': self._input_data_dict}) def _update_provenance_fields(self): """ :return: """ if os.path.exists(os.path.join(self._inputdir, constants.RO_CRATE_METADATA_FILE)): prov_attrs = self._provenance_utils.get_merged_rocrate_provenance_attrs(self._inputdir, override_name=self._name, override_project_name=self._project_name, override_organization_name=self._organization_name, extra_keywords=['AP-MS Embedding', 'AP-MS', 'embedding']) self._name = prov_attrs.get_name() self._organization_name = prov_attrs.get_organization_name() self._project_name = prov_attrs.get_project_name() self._keywords = prov_attrs.get_keywords() self._description = prov_attrs.get_description() elif self._provenance is not None: self._name = self._provenance['name'] if 'name' in self._provenance else 'PPI Embedding' self._organization_name = self._provenance['organization-name'] \ if 'organization-name' in self._provenance else 'NA' self._project_name = self._provenance['project-name']\ if 'project-name' in self._provenance else 'NA' self._keywords = self._provenance['keywords'] if 'keywords' in self._provenance else ['ppi'] self._description = self._provenance['description'] if 'description' in self._provenance else \ 'Embedding of PPIs' else: raise CellMapsPPIEmbeddingError('Input directory should be an RO-Crate or provenance should be specified.') def _create_run_crate(self): """ Creates rocrate for output directory :raises CellMapsProvenanceError: If there is an error """ logger.debug('Registering rocrate with FAIRSCAPE') try: self._provenance_utils.register_rocrate(self._outdir, name=self._name, organization_name=self._organization_name, project_name=self._project_name, description=self._description, keywords=self._keywords) except TypeError as te: raise CellMapsPPIEmbeddingError('Invalid provenance: ' + str(te)) except KeyError as ke: raise CellMapsPPIEmbeddingError('Key missing in provenance: ' + str(ke)) def _register_software(self): """ Registers this tool :raises CellMapsImageEmbeddingError: If fairscape call fails """ logger.debug('Registering software with FAIRSCAPE') software_keywords = self._keywords software_keywords.extend(['tools', cellmaps_ppi_embedding.__name__]) software_description = self._description + ' ' + \ cellmaps_ppi_embedding.__description__ self._softwareid = self._provenance_utils.register_software(self._outdir, name=cellmaps_ppi_embedding.__name__, description=software_description, author=cellmaps_ppi_embedding.__author__, version=cellmaps_ppi_embedding.__version__, file_format='py', keywords=software_keywords, url=cellmaps_ppi_embedding.__repo_url__) def _register_computation(self): """ # Todo: added in used dataset, software and what is being generated :return: """ logger.debug('Getting id of input rocrate') if os.path.exists(os.path.join(self._inputdir, constants.RO_CRATE_METADATA_FILE)): self._inputdataset_ids.append(self._provenance_utils.get_id_of_rocrate(self._inputdir)) else: self._register_input_datasets() logger.debug('Registering computation with FAIRSCAPE') keywords = self._keywords keywords.extend(['computation']) description = self._description + ' run of ' + cellmaps_ppi_embedding.__name__ self._provenance_utils.register_computation(self._outdir, name=cellmaps_ppi_embedding.__computation_name__, run_by=str(self._provenance_utils.get_login()), command=str(self._input_data_dict), description=description, keywords=keywords, used_software=[self._softwareid], used_dataset=self._inputdataset_ids, generated=[self._embedding_file_id]) def _register_input_datasets(self): """ Registers cm4ai/apms dataset or samples and unique input datasets with FAIRSCAPE adding values to **self._inputdataset_ids** """ if CellMapsPPIEmbedder.PPI_EDGELIST_FILEKEY not in self._provenance: logger.debug('Provenance information for input file (PPI edgelist) was not specified.') return ppi_edgelist_datasetid = None if 'guid' in self._provenance[CellMapsPPIEmbedder.PPI_EDGELIST_FILEKEY]: ppi_edgelist_datasetid = self._provenance[CellMapsPPIEmbedder.PPI_EDGELIST_FILEKEY]['guid'] if ppi_edgelist_datasetid is not None: self._inputdataset_ids.append(ppi_edgelist_datasetid) logger.debug('PPI edgelist have dataset id.') else: ppi_edgelist_datasetid = self._provenance_utils.register_dataset( self._outdir, source_file=self.get_apms_edgelist_file(self._inputdir), data_dict=self._provenance[CellMapsPPIEmbedder.PPI_EDGELIST_FILEKEY], skip_copy=False) self._inputdataset_ids.append(ppi_edgelist_datasetid) logger.debug('PPI edgelist dataset id: ' + str(ppi_edgelist_datasetid)) def _register_embedding_file(self): """ Registers embedding file as a dataset :return: id of datafile dataset :rtype: str """ logger.debug('Registering embedding file with FAIRSCAPE') description = self._description description += ' file' keywords = self._keywords keywords.extend(['file']) data_dict = {'name': cellmaps_ppi_embedding.__name__ + ' output file', 'description': description, 'keywords': keywords, 'data-format': 'tsv', 'author': cellmaps_ppi_embedding.__name__, 'version': cellmaps_ppi_embedding.__version__, 'schema': 'https://raw.githubusercontent.com/fairscape/cm4ai-schemas/main/v0.1.0/cm4ai_schema_apms_embedding.json', 'date-published': date.today().strftime(self._provenance_utils.get_default_date_format_str())} self._embedding_file_id = self._provenance_utils.register_dataset(self._outdir, source_file=self.get_ppi_embedding_file(), data_dict=data_dict)
[docs] def get_ppi_embedding_file(self): """ Gets PPI embedding file in output directory :return: :rtype: str """ return os.path.join(self._outdir, constants.PPI_EMBEDDING_FILE)
[docs] def generate_readme(self): description = getattr(cellmaps_ppi_embedding, '__description__', 'No description provided.') version = getattr(cellmaps_ppi_embedding, '__version__', '0.0.0') with open(os.path.join(os.path.dirname(__file__), 'readme_outputs.txt'), 'r') as f: readme_outputs = f.read() readme = readme_outputs.format(DESCRIPTION=description, VERSION=version) with open(os.path.join(self._outdir, 'README.txt'), 'w') as f: f.write(readme)
[docs] def run(self): """ Run node2vec to create embeddings :return: """ logger.debug('In run method') exitcode = 99 try: if not os.path.isdir(self._outdir): os.makedirs(self._outdir, mode=0o755) if self._skip_logging is False: logutils.setup_filelogger(outdir=self._outdir, handlerprefix='cellmaps_ppi_embedding') self._write_task_start_json() self.generate_readme() self._update_provenance_fields() self._create_run_crate() self._register_software() with open(self.get_ppi_embedding_file(), 'w', newline='') as f: writer = csv.writer(f, delimiter='\t') header_line = [''] header_line.extend([x for x in range(1, self._embedding_generator.get_dimensions())]) writer.writerow(header_line) for row in self._embedding_generator.get_next_embedding(): writer.writerow(row) self._register_embedding_file() self._register_computation() exitcode = 0 finally: self._end_time = int(time.time()) # write a task finish file logutils.write_task_finish_json(outdir=self._outdir, start_time=self._start_time, end_time=self._end_time, status=exitcode) return exitcode