Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
GFAstats: GFA statistics.
Compute general statistics of a GFA
@author: alexis.mergez@inrae.fr
"""
import re
import argparse
import os
import numpy as np
import time
import pandas as pd
## Argument parser
arg_parser = argparse.ArgumentParser(description='GFAstats: GFA statistics')
arg_parser.add_argument(
"--gfa",
"-g",
dest = "gfa",
required = True,
help = "GFA file"
)
arg_parser.add_argument(
"--output",
"-o",
dest = "output",
required = True,
help = "Output name"
)
arg_parser.add_argument(
"--threads",
"-t",
dest = "threads",
required = False,
default = 1,
type = int,
help = "Number of threads"
)
arg_parser.add_argument(
'--version',
'-v',
action="store_true",
dest = "version",
help = "Show version"
)
arg_parser.add_argument(
'--progress',
'-P',
action="store_true",
dest = "progress",
help = "Show progress to stdout"
)
if args.version:
print(version)
os._exit(0)
panname = os.path.basename(args.gfa).split('.gfa')[0]
#% Reading the gfa into a list
if args.progress: print(f"[GFAstats::{panname}] Reading GFA file...")
gfaLines = [line.rstrip() for line in file.readlines()]
else : # Else the GFA is gzipped
with gzip.open(args.gfa, 'r') as file:
gfaLines = [line.decode().rstrip() for line in file.readlines()]
#% Checking Header for version number (not perfect)
if gfaLines[0].split('\t')[1] != "VN:Z:1.0":
raise ImportError("GFA v1 only are supported")
# <Path_name>:<shared nodes count>,<shared nodes length (No R)>,<shared nodes length (R)>;...
"Edges.count": 0,
"Path.count": 0,
"Steps.count": 0,
"Nodes.length.mean": 0,
"Nodes.length.median": 0,
"Degree.mean": 0,
#% Parsing the GFA lines
nodesLengthDict = {} # Dict storing nodes length
Links = {} # Dict storing the number of in/out links by nodes
if args.progress: print(f"[GFAstats::{panname}] Parsing GFA file...")
if line_type == "S": # Segments = Nodes
lineType, uid, value = line.split('\t')[:3]
nodesLengthDict[int(uid)] = len(value)
genStats["Nodes.count"] += 1
elif line_type == "L": # Links = Edges
lineType, node_from, node_from_orient, node_to = line.split('\t')[:4]
node_from, node_to = int(node_from), int(node_to)
# Adding outcoming
try:
Links[node_from] += 1
except:
Links[node_from] = 1
# Adding incoming
try:
Links[node_to] += 1
except:
Links[node_to] = 1
# Counting edge
elif line_type == "P": # Paths
lineType, uid, value = line.split('\t')[:3]
pathStats[uid] = {}
pathStats[uid]["Path.nodes"] = [ # Stores
int(nodeid[:-1]) # Removing +/- from id
for nodeid in value.split(",")
]
if args.progress:
print(f"[GFAstats::{panname}] Parsed in {round(time.time() - start_time, 2)}s")
#% Path stats function --------------------------------------------------------------------------------
def getPathStats(pathName, pathDict=pathStats, nodesLengthDict=nodesLengthDict, showProgress=args.progress, panname=panname):
"""
Compute statistics related to paths from the graph
"""
if showProgress: print(f"[GFAstats::{panname}] Computing stats for {pathName}")
# Storing the length of each nodes of the given path (with repeats)
_lengths_R = [
# Same without repeats
_lengths = [
nodesLengthDict[nodeid]
for nodeid in set(pathDict[pathName]["Path.nodes"])
]
_stats[pathName]["Path.length"] = np.sum(_lengths_R)
_stats[pathName]["Path.nodes.length"] = np.sum(_lengths)
_stats[pathName]["Path.compression.factor"] = round(_stats[pathName]["Path.length"]/_stats[pathName]["Path.nodes.length"], 2)
# Number of unique nodes in the path
_stats[pathName]["Path.nodes.count"] = len(set(pathDict[pathName]["Path.nodes"]))
# List of nodes only traversed by this path
_stats[pathName]["Path.private.nodes.list"] = np.setdiff1d(
pathDict[pathName]["Path.nodes"], # All node ids of the current path
reduce(np.union1d, tuple( # Getting the union of all other node ids from other path
pathDict[path]["Path.nodes"]
for path in pathDict.keys()
if path != pathName
_stats[pathName]["Path.private.nodes.list"] = set(_stats[pathName]["Path.private.nodes.list"])
# List of nodes traversing all paths
_stats[pathName]["Path.core.nodes.list"] = reduce(
np.intersect1d,
tuple( # Getting the intersection of all other node ids from other path
pathDict[path]["Path.nodes"]
for path in pathDict.keys()
)
)
_stats[pathName]["Path.core.nodes.list"] = set(_stats[pathName]["Path.core.nodes.list"])
# Number of private and core nodes traversing the current path (without repeats)
_stats[pathName]["Path.private.nodes.count"] = len(_stats[pathName]["Path.private.nodes.list"])
_stats[pathName]["Path.core.nodes.count"] = len(_stats[pathName]["Path.core.nodes.list"])
# Length of private and core nodes traversing the current path (without repeats)
_stats[pathName]["Path.private.length"] = np.sum([
nodesLengthDict[nodeid]
for nodeid in _stats[pathName]["Path.private.nodes.list"]
])
_stats[pathName]["Path.core.length"] = np.sum([
nodesLengthDict[nodeid]
for nodeid in _stats[pathName]["Path.core.nodes.list"]
])
# Length of private and core nodes traversing the current path (with repeats)
_stats[pathName]["Path.private.R.length"] = np.sum([
nodesLengthDict[nodeid]
for nodeid in pathDict[pathName]["Path.nodes"]
if nodeid in _stats[pathName]["Path.private.nodes.list"]
])
_stats[pathName]["Path.core.R.length"] = np.sum([
for nodeid in pathDict[pathName]["Path.nodes"]
if nodeid in _stats[pathName]["Path.core.nodes.list"]
_stats[pathName]["Path.steps.count"] = len(pathDict[pathName]["Path.nodes"])
# Mean size of the nodes (with and without repeats)
_stats[pathName]["Path.nodes.R.size.mean"] = round(np.mean(_lengths_R), 2)
_stats[pathName]["Path.nodes.size.mean"] = round(np.mean(_lengths), 2)
# Median size of the nodes (with and without repeats)
_stats[pathName]["Path.nodes.R.size.median"] = round(np.median(_lengths_R), 2)
_stats[pathName]["Path.nodes.size.median"] = round(np.median(_lengths), 2)
# All
_path_links = [
Links[nodeid]
for nodeid in set(pathDict[pathName]["Path.nodes"])
]
_stats[pathName]["Path.degree.mean"] = round(np.mean(_path_links), 2)
# Private
_path_links = [
Links[nodeid]
for nodeid in _stats[pathName]["Path.private.nodes.list"]
]
_stats[pathName]["Path.private.degree.mean"] = round(np.mean(_path_links), 2)
# Core
_path_links = [
Links[nodeid]
for nodeid in _stats[pathName]["Path.core.nodes.list"]
]
_stats[pathName]["Path.core.degree.mean"] = round(np.mean(_path_links), 2)
_stats[pathName].pop("Path.private.nodes.list")
_stats[pathName].pop("Path.core.nodes.list")
# Multithreading the path statistics retrieving
executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.threads)
res = {}
for pathname in pathStats.keys():
res[pathname] = executor.submit(getPathStats, pathname)
executor.shutdown(wait=True)
# Aggregating results
for pathname in pathStats.keys():
_pathstats = res[pathname].result()[pathname]
for key in _pathstats.keys():
pathStats[pathname][key] = _pathstats[key]
#% Getting shared content between paths --------------------------------------------------------------------------------
#% Shared nodes function
def get_shared_nodes(query, target, pathDict=pathStats, showProgress=args.progress, panname=panname):
if showProgress: print(f"[GFAstats::{panname}] Searching shared nodes between {query} and {target}")
return np.intersect1d(pathDict[query]["Path.nodes"], pathDict[target]["Path.nodes"])
all_paths = list(pathStats.keys())
shared_nodes = {}
for query_id in range(len(all_paths[:-1])):
for target_id in range(query_id+1, len(all_paths)):
query, target = all_paths[query_id], all_paths[target_id]
value = get_shared_nodes(query, target)
#if args.progress: print(f"[GFAstats::{panname}]\tFound {len(value)}")
shared_nodes[(query, target)] = value
def get_shared_content(query, pathDict=pathStats, nodesLengthDict=nodesLengthDict, shared_nodes=shared_nodes, showProgress=args.progress, panname=panname):
if showProgress: print(f"[GFAstats::{panname}] Computing shared content stats for {query}")
_stats = {}
for target in pathDict.keys():
if target != query:
if showProgress: print(f"[GFAstats::{panname}] Computing shared content stats for {query} against {target}")
# Creating entry
_stats[target] = {}
# Retrieving value
try :
_shared = set(shared_nodes[(query, target)])
except :
_shared = set(shared_nodes[(target, query)])
# Number of shared nodes traversing the current path (without repeats)
# Length of shared nodes traversing the current path (without repeats)
_stats[target]["shared.length"] = np.sum([
nodesLengthDict[nodeid]
#print("without repeats : ", _stats[target]["shared.length"])
# Length of shared nodes traversing the current path (with repeats)
_stats[target]["shared.R.length"] = np.sum([
nodesLengthDict[nodeid]
for nodeid in pathDict[query]["Path.nodes"]
#print("with repeats : ", _stats[target]["shared.R.length"])
return _stats
# Multithreading the path statistics retrieving
executor = concurrent.futures.ThreadPoolExecutor(max_workers=args.threads)
res = {}
for pathname in pathStats.keys():
res[pathname] = executor.submit(get_shared_content, pathname)
executor.shutdown(wait=True)
# Aggregating results
for pathname in pathStats.keys():
_pathstats = res[pathname].result()
shared_content = []
for target in _pathstats.keys():
shared_content.append(
f"{target}:{_pathstats[target]['shared.nodes.count']},{_pathstats[target]['shared.length']},{_pathstats[target]['shared.R.length']}"
)
pathStats[pathname]["Shared.content"] = ";".join(shared_content)
# Removing temporary key from dictionnary
pathStats[pathname].pop("Path.nodes")
if args.progress:
print(f"[GFAstats::{panname}] Computing general stats")
genStats["Path.length.mean"] = round(np.mean(
[pathStats[pathname]["Path.length"] for pathname in pathStats.keys()]
genStats["Path.length.median"] = round(np.median(
[pathStats[pathname]["Path.length"] for pathname in pathStats.keys()]
genStats["Nodes.private.count"] = np.sum(
[pathStats[pathname]["Path.private.nodes.count"] for pathname in pathStats.keys()]
genStats["Nodes.core.count"] = pathStats[list(pathStats.keys())[0]]["Path.core.nodes.count"]
genStats["Steps.count"] = np.sum(
[pathStats[pathname]["Path.steps.count"] for pathname in pathStats.keys()]
# Total node length
genStats["Total.nodes.length"] = np.sum(
# Total sequence length
genStats["Total.sequence.length"] = np.sum(
[pathStats[pathname]["Path.length"] for pathname in pathStats.keys()]
)
# Compression factor
genStats["Path.compression.factor.mean"] = round(np.mean(
[pathStats[pathname]["Path.compression.factor"] for pathname in pathStats.keys()]
), 2)
genStats["Compression.factor"] = np.round(genStats["Total.sequence.length"]/genStats["Total.nodes.length"], 2)
genStats["Nodes.length.mean"] = round(np.mean(
list(nodesLengthDict.values())
), 2)
genStats["Nodes.length.median"] = round(np.median(
list(nodesLengthDict.values())
), 2)
# Mean and median degree
genStats["Degree.mean"] = round(np.mean(
list(Links.values())
), 2)
genStats["Degree.median"] = round(np.median(
list(Links.values())
), 2)
print(f"[GFAstats::{panname}] Statistics computed in {round(time.time() - start_time, 2)} s")
#% Exporting general data to tsv
gen_metrics = [
'Pangenome.name', 'Nodes.count', 'Steps.count', 'Nodes.private.count', 'Nodes.core.count', 'Nodes.length.mean', 'Nodes.length.median',
'Path.count', 'Path.length.mean', 'Path.length.median', 'Edges.count', 'Degree.mean', 'Degree.median',
'Total.nodes.length', 'Total.sequence.length', 'Path.compression.factor.mean', 'Compression.factor'
]
path_metrics = [
'Pangenome.name', 'Path.name', 'Path.nodes.count', 'Path.steps.count', 'Path.private.nodes.count', 'Path.core.nodes.count',
'Path.length', 'Path.nodes.length', 'Path.private.R.length', 'Path.private.length', 'Path.core.R.length', 'Path.core.length',
'Path.nodes.R.size.mean', 'Path.nodes.size.mean', 'Path.nodes.R.size.median', 'Path.nodes.size.median',
'Path.degree.mean', 'Path.private.degree.mean', 'Path.core.degree.mean', 'Path.compression.factor', 'Shared.content'
genStatsArr = {key: {panname : value} for key, value in genStats.items()}
gendf = pd.DataFrame.from_dict(genStatsArr, orient='columns')
gendf.reset_index(inplace=True)
gendf.rename(columns={'index': 'Pangenome.name'}, inplace=True)
gendf.to_csv(args.output+".general.stats.tsv", sep='\t', index=False)
## Exporting paths data to tsv
# Transforming the dictionnary to work with pandas
pathStatsFinal = {
(panname, path): pathStats[path]
for path in pathStats.keys()
}
pathdf = pd.DataFrame.from_dict(pathStatsFinal, orient='index')
pathdf.reset_index(inplace=True)
pathdf.rename(
columns={'level_0': 'Pangenome.name', 'level_1': "Path.name"}, inplace=True
)
pathdf.to_csv(args.output+".path.stats.tsv", sep='\t', index=False)