{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Amplicon PiCrust\n", "\n", "This document contains many functions for all kinds of amplicon analysis used in unlock" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Connection settings" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from irods.session import iRODSSession\n", "import ssl\n", "from irods.meta import iRODSMeta\n", "import os, sys\n", "\n", "# iRODS authentication information\n", "host = \"unlock-icat.irods.surfsara.nl\"\n", "port = 1247\n", "zone = \"unlock\"\n", "user = os.getenv('irodsUserName')\n", "password = os.getenv('irodsPassword')\n", "\n", "# SSL settings\n", "context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=None, capath=None, cadata=None)\n", "\n", "ssl_settings = {'irods_client_server_negotiation': 'request_server_negotiation',\n", " 'irods_client_server_policy': 'CS_NEG_REQUIRE',\n", " 'irods_encryption_algorithm': 'AES-256-CBC',\n", " 'irods_encryption_key_size': 32,\n", " 'irods_encryption_num_hash_rounds': 16,\n", " 'irods_encryption_salt_size': 8,\n", " \"irods_authentication_scheme\": \"pam\",\n", " 'ssl_context': context}\n", " " ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Data retrieval\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Downloading: /unlock/home/wur.unlock.hivghana_drp0070/STU_DRP007092/OBS_XDRS176960/SAM_DRS176960/amplicon/ASY_DRR243924/NGTAX_Silva138.1-picrust_100_DRR243924/3_PICRUSt2/TIGRFAM_predicted.tsv.gz\r" ] } ], "source": [ "from irods.models import Collection, DataObject\n", "from irods.column import Criterion\n", "import os\n", "\n", "GROUP = \"wur.unlock.hivghana_drp0070\"\n", "\n", "# Standard authentication\n", "with iRODSSession(host = host, port = port, user = user, password = password, zone = zone, **ssl_settings) as session:\n", " # Set the filters to select the files needed for downloading\n", " results = session.query(Collection, DataObject).filter( \\\n", " Criterion('like', Collection.name, '%'+GROUP+'%')).filter( \\\n", " Criterion('like', DataObject.name, '%_predicted.tsv.gz'))\n", " \n", " for r in results:\n", " # Download the result files\n", " data_file = r[Collection.name] + \"/\" + r[DataObject.name]\n", " # Original path in irods is local path from current location\n", " if not os.path.exists(\".\" + data_file):\n", " os.makedirs(\".\" + r[Collection.name], exist_ok=True)\n", " print(\"Downloading: \" + data_file, end=\"\\r\")\n", " session.data_objects.get(data_file, \".\" + data_file)\n", " else:\n", " print(\"Already received: \" + data_file + \" \"*10, end=\"\\r\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### MIMOSA data preparation\n", "\n", "All NG-Tax TURTLE data needs to be downloaded from the iRODS instance" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Already received: /unlock/trash/home/jkoehorst/wur.unlock.hivghana_drp0070/demo-13.ttl 76960/amplicon/ASY_DRR243924/NGTAX_Silva138.1-picrust_100_DRR243924/2_Classification/DRR243924_NG-Tax_100.ttl \r" ] } ], "source": [ "# ...\n", "from irods.models import Collection, DataObject\n", "from irods.column import Criterion\n", "import os\n", "\n", "# Standard authentication\n", "with iRODSSession(host = host, port = port, user = user, password = password, zone = zone, **ssl_settings) as session:\n", " # Set the filters to select the files needed for downloading\n", " results = session.query(Collection, DataObject).filter( \\\n", " Criterion('like', Collection.name, '%'+GROUP+'%')).filter( \\\n", " Criterion('like', DataObject.name, '%ttl'))\n", " # Download the result files\n", " for r in results:\n", " # Collection and data object\n", " data_file = r[Collection.name] + \"/\" + r[DataObject.name]\n", " # Original path in irods is local path from current location\n", " if not os.path.exists(\".\" + data_file):\n", " # Create the directory if it does not exist\n", " os.makedirs(\".\" + r[Collection.name], exist_ok=True)\n", " print(\"Downloading: \" + data_file + \" \"*10, end=\"\\r\")\n", " # Download the file\n", " session.data_objects.get(data_file, \".\" + data_file)\n", " else:\n", " print(\"Already received: \" + data_file + \" \"*10, end=\"\\r\")\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def create_repository(label):\n", " import requests\n", " # RDF4J Repository config file\n", " content = \"\"\"\n", "#\n", "# RDF4J configuration template for a GraphDB repository\n", "#\n", "@prefix rdfs: .\n", "@prefix rep: .\n", "@prefix sr: .\n", "@prefix sail: .\n", "@prefix graphdb: .\n", "\n", "[] a rep:Repository ;\n", " rep:repositoryID \"XXXX\" ;\n", " rdfs:label \"\" ;\n", " rep:repositoryImpl [\n", " rep:repositoryType \"graphdb:SailRepository\" ;\n", " sr:sailImpl [\n", " sail:sailType \"graphdb:Sail\" ;\n", "\n", " graphdb:read-only \"false\" ;\n", "\n", " # Inference and Validation\n", " graphdb:ruleset \"rdfsplus-optimized\" ;\n", " graphdb:disable-sameAs \"true\" ;\n", " graphdb:check-for-inconsistencies \"false\" ;\n", "\n", " # Indexing\n", " graphdb:entity-id-size \"32\" ;\n", " graphdb:enable-context-index \"false\" ;\n", " graphdb:enablePredicateList \"true\" ;\n", " graphdb:enable-fts-index \"false\" ;\n", " graphdb:fts-indexes (\"default\" \"iri\") ;\n", " graphdb:fts-string-literals-index \"default\" ;\n", " graphdb:fts-iris-index \"none\" ;\n", "\n", " # Queries and Updates\n", " graphdb:query-timeout \"0\" ;\n", " graphdb:throw-QueryEvaluationException-on-timeout \"false\" ;\n", " graphdb:query-limit-results \"0\" ;\n", "\n", " # Settable in the file but otherwise hidden in the UI and in the RDF4J console\n", " graphdb:base-URL \"http://example.org/owlim#\" ;\n", " graphdb:defaultNS \"\" ;\n", " graphdb:imports \"\" ;\n", " graphdb:repository-type \"file-repository\" ;\n", " graphdb:storage-folder \"storage\" ;\n", " graphdb:entity-index-size \"10000000\" ;\n", " graphdb:in-memory-literal-properties \"true\" ;\n", " graphdb:enable-literal-index \"true\" ;\n", " ]\n", " ].\n", " \"\"\"\n", " content = content.replace(\"XXXX\", label)\n", " writer = open(\"config.ttl\",\"w\")\n", " print(content, file=writer)\n", " writer.close()\n", "\n", " headers = {\n", " # requests won't add a boundary if this header is set when you pass files=\n", " # 'Content-Type': 'multipart/form-data',\n", " }\n", "\n", " files = {\n", " 'config': open('./config.ttl', 'rb'),\n", " }\n", "\n", " response = requests.post('http://localhost:7200/rest/repositories', headers=headers, files=files)\n", "# print(response.__dict__)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Available repositories:\n", "test\n", "gbol\n", "Workflow\n", "refine\n", "ngtax\n", "envo\n", "example\n", "geo\n", "bacdive\n", "Gyan2\n", "wur_unlock_hivghana_drp0070\n", "isa\n", "bla\n", "observations\n", "mgnify\n", "meesike\n", "ena\n", "playground\n", "step\n", "nl-restaurants\n", "Gyan\n", "Loaded file 28 of 29 in 2 seconds with DRR243850_NG-Tax_100.ttll\r" ] } ], "source": [ "# Process the turtle files\n", "from rdflib import Graph\n", "import requests, json\n", "import time \n", "\n", "# Check if GraphDB runs:...\n", "# docker volume create graphdb\n", "# docker run -v graphdb:/opt/graphdb/dist/data --name graphdb -p 7200:7200 ontotext/graphdb:10.1.3 -Xmx6g\n", "graphdb = 'http://localhost:7200'\n", "\n", "# GraphDB restrictions\n", "REPO = GROUP.replace(\"wur.unlock.\", \"wur_unlock_\")\n", "\n", "def check_graphdb():\n", " headers = {\n", " 'Accept': 'application/json',\n", " }\n", "\n", " response = requests.get(graphdb + '/rest/repositories', headers=headers)\n", " content = json.loads(response.content.decode())\n", " repo_available = False\n", " print(\"Available repositories:\")\n", " for repository in content:\n", " if repository['id'] == REPO:\n", " print(repository['id'])\n", " repo_available = True\n", " if not repo_available:\n", " # Create repository\n", " create_repository(REPO)\n", "\n", "def load_data(data_file):\n", " headers = {\n", " 'Content-Type': 'application/x-turtle',\n", " 'Accept': 'application/json'\n", " }\n", "\n", " with open(data_file, 'rb') as f:\n", " response = requests.post(graphdb + '/repositories/'+REPO+'/statements', data=f, headers=headers)\n", " if \"Unknown repository:\" in response.content.decode():\n", " raise Exception(response.content.decode())\n", "\n", "###\n", "to_be_loaded = set()\n", "check_graphdb()\n", "\n", "for current_dir, subdirs, files in os.walk( './unlock/' ):\n", " for file in files:\n", " if (file.endswith(\".ttl\")) and GROUP in current_dir:\n", " to_be_loaded.add(current_dir + \"/\" + file)\n", "\n", "for index, file_path in enumerate(to_be_loaded):\n", " current = time.time()\n", " load_data(file_path)\n", " end = time.time()\n", " diff = int(end - current)\n", " print(\"Loaded file \",index, \"of\", len(to_be_loaded),\"in\", diff, \"seconds with\",file_path.split(\"/\")[-1], end=\"\\r\")\n", " " ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### MIMOSA table generator" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "from SPARQLWrapper import SPARQLWrapper, JSON\n", "# Make connection with sparql endpoint containing all NG-Tax 2.0 data\n", "sparql = SPARQLWrapper(\"http://localhost:7200/repositories/\" + REPO)\n", "sparql.setReturnFormat(JSON)\n", "\n", "# Get all sample name mappings\n", "# Query for column headers to sample name\n", "query = \"\"\"\n", "PREFIX gbol: \n", "PREFIX schema: \n", "PREFIX sschema: \n", "PREFIX jerm: \n", "select distinct ?sample_name ?sample_id where { \n", "\t?ngtax_sample gbol:name ?sample_name .\n", " ?ngtax_sample gbol:metadata ?mappingFile .\n", " ?mappingFile a gbol:MappingFile .\n", " ?mappingFile gbol:fastQforward ?fwd .\n", " ?mappingFile gbol:fastQreverse ?rev .\n", " ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?fwd .\n", " ?sample jerm:hasPart/jerm:hasPart/schema:identifier ?rev .\n", " ?sample a jerm:Sample .\n", " ?sample schema:identifier ?sample_id .\n", "}\"\"\"\n", "\n", "sparql.setQuery(query)\n", "ret = sparql.queryAndConvert()\n", "# Iterate over the sample names\n", "sample_name_lookup = {}\n", "for index, r in enumerate(ret[\"results\"][\"bindings\"]):\n", " sample_name_lookup[r['sample_name']['value']] = r['sample_id']['value']\n", " " ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Processed sample number 8\r" ] } ], "source": [ "from SPARQLWrapper import SPARQLWrapper, JSON\n", "# Make connection with sparql endpoint containing all NG-Tax 2.0 data\n", "sparql = SPARQLWrapper(\"http://localhost:7200/repositories/\" + REPO)\n", "sparql.setReturnFormat(JSON)\n", "\n", "# Obtain all samples to make it more scalable\n", "query = \"\"\"\n", "PREFIX gbol:\n", "SELECT DISTINCT ?sample\n", "WHERE {\n", " ?sample a gbol:Sample .\n", "}\n", "\"\"\"\n", "\n", "sparql.setQuery(query)\n", "ret = sparql.queryAndConvert()\n", "# Create containers for the forward, reverse ASV and the sample list\n", "container_fseq = {}\n", "container_rseq = {}\n", "all_samples = set()\n", "# Iterate over the samples\n", "for index, r in enumerate(ret[\"results\"][\"bindings\"]):\n", " # For each sample get all ASVs and abundance information\n", " sample = r['sample']['value']\n", " print(\"Processed sample number\", index, end='\\r')\n", " query = \"\"\"\n", " PREFIX gbol:\n", " SELECT DISTINCT ?name ?fseq ?rseq ?count\n", " WHERE {\n", " VALUES ?sample { <\"\"\"+sample+\"\"\"> }\n", " ?sample a gbol:Sample .\n", " ?sample gbol:name ?name .\n", " ?sample gbol:asv ?asv .\n", " ?asv a gbol:ASVSet . \n", " ?asv gbol:forwardASV ?fasv .\n", " ?fasv gbol:sequence ?fseq .\n", " ?asv gbol:reverseASV ?rasv . \n", " ?rasv gbol:sequence ?rseq .\n", " ?asv gbol:clusteredReadCount ?count .\n", " }\"\"\"\n", " # Set query and iterate over results\n", " sparql.setQuery(query)\n", " ret = sparql.queryAndConvert()\n", " for r in ret[\"results\"][\"bindings\"]:\n", " name = r['name']['value']\n", " fseq = r['fseq']['value']\n", " rseq = r['rseq']['value']\n", " count = int(r['count']['value'])\n", " # Sample renaming using the previous lookup\n", " name = sample_name_lookup[name]\n", " # List of sample names\n", " all_samples.add(name)\n", " # Fseq object fill method\n", " if fseq not in container_fseq:\n", " container_fseq[fseq] = {}\n", " if name not in container_fseq[fseq]:\n", " container_fseq[fseq][name] = 0\n", " container_fseq[fseq][name] += count\n", "\n", " # Rseq object fill method\n", " if rseq not in container_rseq:\n", " container_rseq[rseq] = {}\n", " if name not in container_rseq[rseq]:\n", " container_rseq[rseq][name] = 0\n", " container_rseq[rseq][name] += count" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", "\n", "# Create an abundance matrix using numpy\n", "def create_abundance_matrix(name, container, all_samples):\n", " matrix = np.zeros((len(container), len(all_samples)), dtype=int)\n", " all_samples = sorted(list(all_samples))\n", " asvs = sorted(container.keys())\n", " for row_index, asv in enumerate(asvs):\n", " for sample in container[asv]:\n", " coll_index = all_samples.index(sample)\n", " value = container[asv][sample]\n", " matrix[row_index,coll_index] = int(value)\n", "\n", " # Write to file as panda frame with column and row names\n", " df = pd.DataFrame(matrix, columns=all_samples, index=asvs)\n", " df.index.name=\"OTU\"\n", " df.to_csv(name, sep=\"\\t\")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Creating the RSEQ tsv file\n", "create_abundance_matrix(\"fseq.tsv\", container_fseq, all_samples)\n", "# Creating the FSEQ tsv file\n", "create_abundance_matrix(\"rseq.tsv\", container_rseq, all_samples)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.16" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }