From 2f294d522e107b69b66af973e7a2c143eb74012c Mon Sep 17 00:00:00 2001 From: stephanie battini Date: Mon, 23 Sep 2024 14:45:03 +0200 Subject: [PATCH 1/7] load graph using nexus --- examples/load_graph_archngv_nexus.py | 189 +++++++++++++++++++++++ examples/load_graph_archngv_nexus.sbatch | 38 +++++ setup.py | 2 + 3 files changed, 229 insertions(+) create mode 100644 examples/load_graph_archngv_nexus.py create mode 100755 examples/load_graph_archngv_nexus.sbatch diff --git a/examples/load_graph_archngv_nexus.py b/examples/load_graph_archngv_nexus.py new file mode 100644 index 0000000..37e224d --- /dev/null +++ b/examples/load_graph_archngv_nexus.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +# coding: utf-8 + +import argparse +import multiprocessing +import pickle +from functools import partial +from pathlib import Path + +import numpy as np +import pandas as pd +import psutil +from archngv import NGVCircuit +from joblib import Parallel, delayed, parallel_config +from tqdm import tqdm + +from astrovascpy import bloodflow +from astrovascpy.exceptions import BloodFlowError +from astrovascpy.utils import Graph + +import getpass +from kgforge.core import KnowledgeGraphForge +from kgforge.core import Resource +from kgforge.specializations.resources import Dataset +import glob + + +def get_circuit_conf(circuit_name): + """ + retreive nexus NGV config entry + param: + circuit_name(str): the nexus name of the NGV circuit to load" + """ + + TOKEN = getpass.getpass() + + nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment + + ORG = "bbp" + PROJECT = "mmb-neocortical-regions-ngv" + + forge = KnowledgeGraphForge("https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", + endpoint=nexus_endpoint, + bucket=f"{ORG}/{PROJECT}", + token= TOKEN, + debug=True + ) + + p = forge.paths("Dataset") + resources = forge.search(p.type=="DetailedCircuit", p.name == 'NGV O1.v5 (Rat)', limit=30) + + forge.as_dataframe(resources) + if len(resources) != 1: + print('There are several NGV circuit with ths name') + return None + else: + circuit = resources[0] + circuitConfigPath = circuit.circuitConfigPath.url + circuitConfigPath = circuitConfigPath[len('file://'):] + print(f'circuitConfigPath: {circuitConfigPath}') + + return circuitConfigPath + + + + + +def load_graph_archngv_parallel( + filename, n_workers, n_astro=None, parallelization_backend="multiprocessing" +): + """Load a vasculature from an NGV circuit. + + Args: + filename (str): vasculature dataset. + n_workers (int): number of processes to set endfeet on edges. + n_astro (int): for testing, if not None, it will reduce the number of astrocytes used + parallelization_backend (str): Either multiprocessing or joblib + + Returns: + vasculatureAPI.PointVasculature: graph containing point vasculature skeleton. + + Raises: + BloodFlowError: if the file object identified by filename is not in h5 format. + """ + if not Path(filename).exists(): + raise BloodFlowError("File provided does not exist") + circuit = NGVCircuit(filename) + pv = circuit.vasculature.point_graph + graph = Graph.from_point_vasculature(pv) + graph.edge_properties.index = pd.MultiIndex.from_frame( + graph.edge_properties.loc[:, ["section_id", "segment_id"]] + ) + gv_conn = circuit.gliovascular_connectome + worker = partial(bloodflow.get_closest_edges, graph=graph) + + args = ( + ( + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_section_id.values[0], + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_segment_id.values[0], + gv_conn.get(endfoot_id, ["endfoot_compartment_length"]).values[0], + ) + for astro_id in np.arange(n_astro or circuit.astrocytes.size) + for endfoot_id in gv_conn.astrocyte_endfeet(astro_id) + ) + endfoot_ids = [ + endfoot_id + for astro_id in np.arange(n_astro or circuit.astrocytes.size) + for endfoot_id in gv_conn.astrocyte_endfeet(astro_id) + ] + + if parallelization_backend == "multiprocessing": + with multiprocessing.Pool(n_workers) as pool: + for result_ids, result_endfeet in zip( + tqdm( + pool.imap(worker, args, chunksize=max(1, int(len(endfoot_ids) / n_workers))), + total=len(endfoot_ids), + ), + endfoot_ids, + ): + # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) + + elif parallelization_backend == "joblib": + with parallel_config( + backend="loky", prefer="processes", n_jobs=n_workers, inner_max_num_threads=1 + ): + parallel = Parallel(return_as="generator", batch_size="auto") + parallelized_region = parallel( + delayed(worker)(arg) for arg in tqdm(args, total=len(endfoot_ids)) + ) + + for result_ids, result_endfeet in zip(parallelized_region, endfoot_ids): + # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) + + else: + raise BloodFlowError( + f"parallelization_backend={parallelization_backend} invalid option. Use 'joblib' or 'multiprocessing'." + ) + + return graph + + +def main(): + global print + print = partial(print, flush=True) + + parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") + parser.add_argument( + "--circuit_name", type=str, required=True, help="NGV circuits nexus name" + ) + parser.add_argument( + "--output_graph", type=str, required=True, help="Path to the output graph file" + ) + args = parser.parse_args() + + circuit_name = args.circuit_name + #filename_ngv = args.filename_ngv + filename_ngv = get_circuit_conf(circuit_name) + + output_graph = args.output_graph + + n_cores = psutil.cpu_count(logical=False) + print(f"number of physical CPU cores = {n_cores}") + + print(f"NGV Circuits file: {filename_ngv}") + print("loading circuit : start") + graph = load_graph_archngv_parallel( + filename_ngv, n_workers=n_cores + ) # n_astro=50 for debugging (smaller processing needs) + print("loading circuit : finish") + + print("pickle graph : start") + filehandler = open(output_graph, "wb") + pickle.dump(graph, filehandler) + print("pickle graph : finish") + print(f"Graph file: {output_graph}") + + +if __name__ == "__main__": + main() + + + + diff --git a/examples/load_graph_archngv_nexus.sbatch b/examples/load_graph_archngv_nexus.sbatch new file mode 100755 index 0000000..938e8cd --- /dev/null +++ b/examples/load_graph_archngv_nexus.sbatch @@ -0,0 +1,38 @@ +#!/bin/bash + +#SBATCH --job-name="archngv" +#SBATCH --nodes=1 + +#SBATCH --account=proj137 +#SBATCH --partition=prod +#SBATCH --constraint=cpu +#SBATCH --time=00:40:00 + + +#JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}') +#JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT}) +#JOB_SCRIPT_DIR=/gpfs/bbp.cscs.ch/project/proj137/scratch/jacquemi/Nexus + +SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh" +SETUP_SCRIPT="./setup.sh" +if [[ ! -f ${SETUP_SCRIPT} ]]; then + >&2 echo "[ERROR] The 'setup.sh' script could not be found!" + exit 2 +fi + +#source ${SETUP_SCRIPT} + +#FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1" +#FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1/build/ngv_config.json" +CIRCUIT_NAME="NGV O1.v5 (Rat)" + + +GRAPH_PATH="./data/graphs_folder/dumped_graph_nexus.bin" + +echo +echo "### Loading graph" +echo +# It is imperative to use srun and dplace, otherwise the Python processes +# do not work properly (possible deadlocks and/or performance degradation) +#time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit_name ${CIRCUIT_NAME} --output_graph ${GRAPH_PATH} +time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv_nexus.py --circuit_name "${CIRCUIT_NAME}" --output_graph ${GRAPH_PATH} diff --git a/setup.py b/setup.py index 0cf069d..f851e65 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ VERSION = module.VERSION reqs = [ + "archngv", "cached-property", "click", "coverage", @@ -24,6 +25,7 @@ "morphio", "mpi4py", "networkx", + "nexusforge", "numpy", "pandas", "psutil", From 1f5feef92e79a4c7d1ee81dca804777a4af271d2 Mon Sep 17 00:00:00 2001 From: Nicola Date: Tue, 24 Sep 2024 16:13:02 +0200 Subject: [PATCH 2/7] install packages only in venv --- examples/load_graph_archngv_nexus.py | 50 +++++++++++----------------- setup.py | 2 -- setup.sh | 4 +-- 3 files changed, 22 insertions(+), 34 deletions(-) diff --git a/examples/load_graph_archngv_nexus.py b/examples/load_graph_archngv_nexus.py index 37e224d..6192318 100644 --- a/examples/load_graph_archngv_nexus.py +++ b/examples/load_graph_archngv_nexus.py @@ -2,6 +2,8 @@ # coding: utf-8 import argparse +import getpass +import glob import multiprocessing import pickle from functools import partial @@ -12,18 +14,14 @@ import psutil from archngv import NGVCircuit from joblib import Parallel, delayed, parallel_config +from kgforge.core import KnowledgeGraphForge, Resource +from kgforge.specializations.resources import Dataset from tqdm import tqdm from astrovascpy import bloodflow from astrovascpy.exceptions import BloodFlowError from astrovascpy.utils import Graph -import getpass -from kgforge.core import KnowledgeGraphForge -from kgforge.core import Resource -from kgforge.specializations.resources import Dataset -import glob - def get_circuit_conf(circuit_name): """ @@ -34,35 +32,33 @@ def get_circuit_conf(circuit_name): TOKEN = getpass.getpass() - nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment + nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment ORG = "bbp" PROJECT = "mmb-neocortical-regions-ngv" - forge = KnowledgeGraphForge("https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", - endpoint=nexus_endpoint, - bucket=f"{ORG}/{PROJECT}", - token= TOKEN, - debug=True - ) + forge = KnowledgeGraphForge( + "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", + endpoint=nexus_endpoint, + bucket=f"{ORG}/{PROJECT}", + token=TOKEN, + debug=True, + ) p = forge.paths("Dataset") - resources = forge.search(p.type=="DetailedCircuit", p.name == 'NGV O1.v5 (Rat)', limit=30) + resources = forge.search(p.type == "DetailedCircuit", p.name == "NGV O1.v5 (Rat)", limit=30) forge.as_dataframe(resources) if len(resources) != 1: - print('There are several NGV circuit with ths name') + print("There are several NGV circuit with ths name") return None else: circuit = resources[0] circuitConfigPath = circuit.circuitConfigPath.url - circuitConfigPath = circuitConfigPath[len('file://'):] - print(f'circuitConfigPath: {circuitConfigPath}') - - return circuitConfigPath - - + circuitConfigPath = circuitConfigPath[len("file://") :] + print(f"circuitConfigPath: {circuitConfigPath}") + return circuitConfigPath def load_graph_archngv_parallel( @@ -150,17 +146,15 @@ def main(): print = partial(print, flush=True) parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") - parser.add_argument( - "--circuit_name", type=str, required=True, help="NGV circuits nexus name" - ) + parser.add_argument("--circuit_name", type=str, required=True, help="NGV circuits nexus name") parser.add_argument( "--output_graph", type=str, required=True, help="Path to the output graph file" ) args = parser.parse_args() circuit_name = args.circuit_name - #filename_ngv = args.filename_ngv - filename_ngv = get_circuit_conf(circuit_name) + # filename_ngv = args.filename_ngv + filename_ngv = get_circuit_conf(circuit_name) output_graph = args.output_graph @@ -183,7 +177,3 @@ def main(): if __name__ == "__main__": main() - - - - diff --git a/setup.py b/setup.py index f851e65..0cf069d 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,6 @@ VERSION = module.VERSION reqs = [ - "archngv", "cached-property", "click", "coverage", @@ -25,7 +24,6 @@ "morphio", "mpi4py", "networkx", - "nexusforge", "numpy", "pandas", "psutil", diff --git a/setup.sh b/setup.sh index d666bba..5955b93 100755 --- a/setup.sh +++ b/setup.sh @@ -28,7 +28,7 @@ else conda install -y pip conda install -y -c conda-forge mpi mpi4py petsc petsc4py - "$CONDA_PREFIX/bin/pip" install tox joblib archngv + "$CONDA_PREFIX/bin/pip" install tox joblib archngv nexusforge # If complex number support is needed #conda install -y -c conda-forge mpi mpi4py "petsc=*=*complex*" "petsc4py=*=*complex*" fi @@ -65,7 +65,7 @@ then python3 -m pip install --upgrade pip fi pip3 install -e ${SETUP_DIR} - pip3 install tox joblib archngv + pip3 install tox joblib archngv nexusforge else conda_bin=`conda info | grep "active env location" | grep -o "/.*"`/bin $conda_bin/pip install -e ${SETUP_DIR} From 9cd14d8d3b41f0303fa1d1be06dda8b8fad55ccf Mon Sep 17 00:00:00 2001 From: Nicola Date: Tue, 24 Sep 2024 16:17:40 +0200 Subject: [PATCH 3/7] format --- examples/load_graph_archngv_nexus.py | 4 ++-- examples/load_graph_archngv_nexus.sbatch | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/load_graph_archngv_nexus.py b/examples/load_graph_archngv_nexus.py index 6192318..50ccbde 100644 --- a/examples/load_graph_archngv_nexus.py +++ b/examples/load_graph_archngv_nexus.py @@ -25,7 +25,7 @@ def get_circuit_conf(circuit_name): """ - retreive nexus NGV config entry + retrieve nexus NGV config entry param: circuit_name(str): the nexus name of the NGV circuit to load" """ @@ -50,7 +50,7 @@ def get_circuit_conf(circuit_name): forge.as_dataframe(resources) if len(resources) != 1: - print("There are several NGV circuit with ths name") + print("There are several NGV circuit with this name") return None else: circuit = resources[0] diff --git a/examples/load_graph_archngv_nexus.sbatch b/examples/load_graph_archngv_nexus.sbatch index 938e8cd..3a59a28 100755 --- a/examples/load_graph_archngv_nexus.sbatch +++ b/examples/load_graph_archngv_nexus.sbatch @@ -11,7 +11,7 @@ #JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}') #JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT}) -#JOB_SCRIPT_DIR=/gpfs/bbp.cscs.ch/project/proj137/scratch/jacquemi/Nexus +#JOB_SCRIPT_DIR=/gpfs/bbp.cscs.ch/project/proj137/scratch/jacquemi/Nexus SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh" SETUP_SCRIPT="./setup.sh" From 494ef1a84701bf41a587b9fd52ac319e4a49f097 Mon Sep 17 00:00:00 2001 From: Jean Jacquemier Date: Tue, 8 Oct 2024 11:41:23 +0200 Subject: [PATCH 4/7] NGV graph path can be retrievedx thanks to Nexus --- examples/load_graph_archngv.py | 165 ++++++++++++++++++++++++++--- examples/load_graph_archngv.sbatch | 24 +++-- 2 files changed, 166 insertions(+), 23 deletions(-) diff --git a/examples/load_graph_archngv.py b/examples/load_graph_archngv.py index 73742b7..4b89b21 100755 --- a/examples/load_graph_archngv.py +++ b/examples/load_graph_archngv.py @@ -1,14 +1,24 @@ +#!/usr/bin/env python +# coding: utf-8 + import argparse +import getpass +from os import environ +import glob import multiprocessing import pickle from functools import partial from pathlib import Path +import base64 +import requests import numpy as np import pandas as pd import psutil from archngv import NGVCircuit from joblib import Parallel, delayed, parallel_config +from kgforge.core import KnowledgeGraphForge, Resource +from kgforge.specializations.resources import Dataset from tqdm import tqdm from astrovascpy import bloodflow @@ -16,6 +26,93 @@ from astrovascpy.utils import Graph +def get_nexus_token( + client_id="bbp-molsys-sa", + environ_name="KCS", + nexus_url="https://bbpauth.epfl.ch/auth/realms/BBP/protocol/openid-connect/token", +): + """ + retreive a Nexus Token from keycloak + param: + client_id(str): the keycloak client id + environ_name(str): the name of the environement variable that holds the keycloak secret + nexus_url(str) + """ + try: + client_secret = environ[environ_name] + + # bbp keycloack token endpoint + url = nexus_url + + payload = "grant_type=client_credentials&scope=openid" + authorization = base64.b64encode( + f"{client_id}:{client_secret}".encode("utf-8") + ).decode("ascii") + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Basic {authorization}", + } + + # request the token + r = requests.request( + "POST", + url=url, + headers=headers, + data=payload, + ) + + # get access token + mexus_token = r.json()["access_token"] + return mexus_token + except Exception as error: + print(f'Error: {error}') + return None + + +def get_nexus_circuit_conf( + circuit_name, nexus_org="bbp", nexus_project="mmb-neocortical-regions-ngv" +): + """ + retrieve nexus NGV config entry + param: + circuit_name(str): the Nexus name of the NGV circuit to load + nexus_org(str): The Nexus organisation + nexus_projec(str): The Nexus project that holds the circuit + """ + + nexus_token = get_nexus_token() + if nexus_token is None: + print("Error: Cannot get a valid Nexus token") + return None + + nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment + + forge = KnowledgeGraphForge( + "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", + endpoint=nexus_endpoint, + bucket=f"{nexus_org}/{nexus_project}", + token=nexus_token, + debug=True, + ) + + p = forge.paths("Dataset") + resources = forge.search( + p.type == "DetailedCircuit", p.name == circuit_name, limit=30 + ) + + forge.as_dataframe(resources) + if len(resources) != 1: + print("There are several NGV circuit with this name") + return None + else: + circuit = resources[0] + circuitConfigPath = circuit.circuitConfigPath.url + circuitConfigPath = circuitConfigPath[len("file://") :] + print(f"circuitConfigPath: {circuitConfigPath}") + + return circuitConfigPath + + def load_graph_archngv_parallel( filename, n_workers, n_astro=None, parallelization_backend="multiprocessing" ): @@ -46,8 +143,12 @@ def load_graph_archngv_parallel( args = ( ( - gv_conn.vasculature_sections_segments(endfoot_id).vasculature_section_id.values[0], - gv_conn.vasculature_sections_segments(endfoot_id).vasculature_segment_id.values[0], + gv_conn.vasculature_sections_segments( + endfoot_id + ).vasculature_section_id.values[0], + gv_conn.vasculature_sections_segments( + endfoot_id + ).vasculature_segment_id.values[0], gv_conn.get(endfoot_id, ["endfoot_compartment_length"]).values[0], ) for astro_id in np.arange(n_astro or circuit.astrocytes.size) @@ -63,19 +164,26 @@ def load_graph_archngv_parallel( with multiprocessing.Pool(n_workers) as pool: for result_ids, result_endfeet in zip( tqdm( - pool.imap(worker, args, chunksize=max(1, int(len(endfoot_ids) / n_workers))), + pool.imap( + worker, + args, + chunksize=max(1, int(len(endfoot_ids) / n_workers)), + ), total=len(endfoot_ids), ), endfoot_ids, ): # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data - graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( - result_endfeet - ) + graph.edge_properties.loc[ + pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id" + ] = result_endfeet elif parallelization_backend == "joblib": with parallel_config( - backend="loky", prefer="processes", n_jobs=n_workers, inner_max_num_threads=1 + backend="loky", + prefer="processes", + n_jobs=n_workers, + inner_max_num_threads=1, ): parallel = Parallel(return_as="generator", batch_size="auto") parallelized_region = parallel( @@ -84,9 +192,9 @@ def load_graph_archngv_parallel( for result_ids, result_endfeet in zip(parallelized_region, endfoot_ids): # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data - graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( - result_endfeet - ) + graph.edge_properties.loc[ + pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id" + ] = result_endfeet else: raise BloodFlowError( @@ -100,16 +208,38 @@ def main(): global print print = partial(print, flush=True) - parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") + parser = argparse.ArgumentParser( + description="File paths for NGVCircuits and output graph." + ) + parser.add_argument( + "--circuit-name", type=str, required=False, help="NGV circuits nexus name" + ) parser.add_argument( - "--filename_ngv", type=str, required=True, help="Path to the NGV circuits file" + "--circuit-path", type=str, required=False, help="Path to the NGV circuits" ) parser.add_argument( - "--output_graph", type=str, required=True, help="Path to the output graph file" + "--output-graph", type=str, required=True, help="Path to the output graph file" ) args = parser.parse_args() - filename_ngv = args.filename_ngv + if args.circuit_name is not None: + circuit_name = args.circuit_name + circtui_path = None + filename_ngv = get_nexus_circuit_conf(circuit_name) + if filename_ngv == None: + print('Error: Could not obtain a valid file path for the NGV circuit') + return -1 + + elif args.circuit_path is not None: + filename_ngv = args.circuit_path + # filename_ngv = args.filename_ngv + + else: + print(f"ERROR: circuit-name or circuit-path must be provided") + return -1 + + print(f"INFO: filename_ngv {filename_ngv} ") + output_graph = args.output_graph n_cores = psutil.cpu_count(logical=False) @@ -123,11 +253,12 @@ def main(): print("loading circuit : finish") print("pickle graph : start") - filehandler = open(output_graph, "wb") - pickle.dump(graph, filehandler) - print("pickle graph : finish") + with open(output_graph, "wb") as filehandler: + pickle.dump(graph, filehandler) + print("pickle graph : finish") print(f"Graph file: {output_graph}") if __name__ == "__main__": + print('INFO: Start load_graph_archngv.py') main() diff --git a/examples/load_graph_archngv.sbatch b/examples/load_graph_archngv.sbatch index d6750ae..91bcd32 100644 --- a/examples/load_graph_archngv.sbatch +++ b/examples/load_graph_archngv.sbatch @@ -3,7 +3,7 @@ #SBATCH --job-name="archngv" #SBATCH --nodes=1 -#SBATCH --account=proj16 +#SBATCH --account=proj137 #SBATCH --partition=prod #SBATCH --constraint=cpu #SBATCH --time=00:30:00 @@ -15,6 +15,8 @@ JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}') JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT}) +JOB_SCRIPT_DIR='/gpfs/bbp.cscs.ch/data/scratch/proj137/jacquemi/Nexus' + SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh" if [[ ! -f ${SETUP_SCRIPT} ]]; then @@ -24,13 +26,23 @@ fi source ${SETUP_SCRIPT} -FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1" +FILENAME_NGV="" GRAPH_PATH="./data/graphs_folder/dumped_graph.bin" -echo -echo "### Loading graph" -echo +#CIRCUIT_NAME="tiny_real_spine_morph" +CIRCUIT_NAME="NGV O1.v5 (Rat)" + # It is imperative to use srun and dplace, otherwise the Python processes # do not work properly (possible deadlocks and/or performance degradation) -time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --filename_ngv ${FILENAME_NGV} --output_graph ${GRAPH_PATH} +if [[ -z $FILENAME_NGV ]]; then + echo "### Loading graph from name: $CIRCUIT_NAME from Nexus" + echo "Execute ${JOB_SCRIPT_DIR}/load_graph_archngv.py with srun" + time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit-name "${CIRCUIT_NAME}" --output-graph ${GRAPH_PATH} +else + echo "### Loading graph from filename: $FILENAME_NGV" + echo "Execute ${JOB_SCRIPT_DIR}/load_graph_archngv.py with srun" + time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit-path "${FILENAME_NGV}" --output-graph ${GRAPH_PATH} +fi + +echo From 4a9ca43574dfb18b32310944bcbd9aea24503f63 Mon Sep 17 00:00:00 2001 From: Jean Jacquemier Date: Tue, 8 Oct 2024 11:48:10 +0200 Subject: [PATCH 5/7] Pilint correction with black --- examples/load_graph_archngv.py | 48 +++++++++++++--------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/examples/load_graph_archngv.py b/examples/load_graph_archngv.py index 4b89b21..cbf5095 100755 --- a/examples/load_graph_archngv.py +++ b/examples/load_graph_archngv.py @@ -45,9 +45,9 @@ def get_nexus_token( url = nexus_url payload = "grant_type=client_credentials&scope=openid" - authorization = base64.b64encode( - f"{client_id}:{client_secret}".encode("utf-8") - ).decode("ascii") + authorization = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode( + "ascii" + ) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": f"Basic {authorization}", @@ -65,7 +65,7 @@ def get_nexus_token( mexus_token = r.json()["access_token"] return mexus_token except Exception as error: - print(f'Error: {error}') + print(f"Error: {error}") return None @@ -96,9 +96,7 @@ def get_nexus_circuit_conf( ) p = forge.paths("Dataset") - resources = forge.search( - p.type == "DetailedCircuit", p.name == circuit_name, limit=30 - ) + resources = forge.search(p.type == "DetailedCircuit", p.name == circuit_name, limit=30) forge.as_dataframe(resources) if len(resources) != 1: @@ -143,12 +141,8 @@ def load_graph_archngv_parallel( args = ( ( - gv_conn.vasculature_sections_segments( - endfoot_id - ).vasculature_section_id.values[0], - gv_conn.vasculature_sections_segments( - endfoot_id - ).vasculature_segment_id.values[0], + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_section_id.values[0], + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_segment_id.values[0], gv_conn.get(endfoot_id, ["endfoot_compartment_length"]).values[0], ) for astro_id in np.arange(n_astro or circuit.astrocytes.size) @@ -174,9 +168,9 @@ def load_graph_archngv_parallel( endfoot_ids, ): # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data - graph.edge_properties.loc[ - pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id" - ] = result_endfeet + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) elif parallelization_backend == "joblib": with parallel_config( @@ -192,9 +186,9 @@ def load_graph_archngv_parallel( for result_ids, result_endfeet in zip(parallelized_region, endfoot_ids): # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data - graph.edge_properties.loc[ - pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id" - ] = result_endfeet + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) else: raise BloodFlowError( @@ -208,15 +202,9 @@ def main(): global print print = partial(print, flush=True) - parser = argparse.ArgumentParser( - description="File paths for NGVCircuits and output graph." - ) - parser.add_argument( - "--circuit-name", type=str, required=False, help="NGV circuits nexus name" - ) - parser.add_argument( - "--circuit-path", type=str, required=False, help="Path to the NGV circuits" - ) + parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") + parser.add_argument("--circuit-name", type=str, required=False, help="NGV circuits nexus name") + parser.add_argument("--circuit-path", type=str, required=False, help="Path to the NGV circuits") parser.add_argument( "--output-graph", type=str, required=True, help="Path to the output graph file" ) @@ -227,7 +215,7 @@ def main(): circtui_path = None filename_ngv = get_nexus_circuit_conf(circuit_name) if filename_ngv == None: - print('Error: Could not obtain a valid file path for the NGV circuit') + print("Error: Could not obtain a valid file path for the NGV circuit") return -1 elif args.circuit_path is not None: @@ -260,5 +248,5 @@ def main(): if __name__ == "__main__": - print('INFO: Start load_graph_archngv.py') + print("INFO: Start load_graph_archngv.py") main() From 0918c914f29f87efe37fcc0d17710b6f48ef45a3 Mon Sep 17 00:00:00 2001 From: Jean Jacquemier Date: Tue, 8 Oct 2024 12:33:00 +0200 Subject: [PATCH 6/7] ruff/black for pylint --- examples/load_graph_archngv.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/examples/load_graph_archngv.py b/examples/load_graph_archngv.py index cbf5095..685a9c9 100755 --- a/examples/load_graph_archngv.py +++ b/examples/load_graph_archngv.py @@ -2,19 +2,19 @@ # coding: utf-8 import argparse +import base64 import getpass -from os import environ import glob import multiprocessing import pickle from functools import partial +from os import environ from pathlib import Path -import base64 -import requests import numpy as np import pandas as pd import psutil +import requests from archngv import NGVCircuit from joblib import Parallel, delayed, parallel_config from kgforge.core import KnowledgeGraphForge, Resource @@ -212,9 +212,8 @@ def main(): if args.circuit_name is not None: circuit_name = args.circuit_name - circtui_path = None filename_ngv = get_nexus_circuit_conf(circuit_name) - if filename_ngv == None: + if filename_ngv is None: print("Error: Could not obtain a valid file path for the NGV circuit") return -1 @@ -223,7 +222,7 @@ def main(): # filename_ngv = args.filename_ngv else: - print(f"ERROR: circuit-name or circuit-path must be provided") + print("ERROR: circuit-name or circuit-path must be provided") return -1 print(f"INFO: filename_ngv {filename_ngv} ") From 30f207cafa1ff1e4bb66d133429efb81f9aed280 Mon Sep 17 00:00:00 2001 From: Jean Jacquemier Date: Tue, 8 Oct 2024 12:43:19 +0200 Subject: [PATCH 7/7] Spelling correction --- examples/load_graph_archngv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/load_graph_archngv.py b/examples/load_graph_archngv.py index 685a9c9..142cb43 100755 --- a/examples/load_graph_archngv.py +++ b/examples/load_graph_archngv.py @@ -32,10 +32,10 @@ def get_nexus_token( nexus_url="https://bbpauth.epfl.ch/auth/realms/BBP/protocol/openid-connect/token", ): """ - retreive a Nexus Token from keycloak + retrieve a Nexus Token from keycloak param: client_id(str): the keycloak client id - environ_name(str): the name of the environement variable that holds the keycloak secret + environ_name(str): the name of the environment variable that holds the keycloak secret nexus_url(str) """ try: