- Notifications
You must be signed in to change notification settings - Fork 406
/
Copy pathsanitize_sequences.py
144 lines (116 loc) · 5.44 KB
/
sanitize_sequences.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
importargparse
fromaugur.ioimportopen_file, read_sequences, write_sequences
importhashlib
frompathlibimportPath
importre
importsys
fromutilsimportstream_tar_file_contents
classDuplicateSequenceError(ValueError):
pass
defrename_sequences(sequences, pattern):
"""Rename the given sequences' ids by replacing the given patterns with the
empty string.
"""
forsequenceinsequences:
# Replace the given patterns in the sequence description with the empty
# string. For a simple FASTA record with only an identifier in the
# defline, the description is identical to the `id` and `name` fields.
# For a complex FASTA record that has spaces in the identifier or other
# additional information, we need to parse the description to get any
# trailing components of the strain name.
sequence.id=re.sub(pattern, "", sequence.description)
# Replace standard characters that are not accepted by all downstream
# tools as valid FASTA names.
sequence.id=sequence.id.replace("'", "-")
# The name field stores the same information for a simple FASTA input, so we need to override its value, too.
sequence.name=sequence.id
# Do not keep additional information that follows the sequence identifier.
sequence.description=""
yieldsequence
defdrop_duplicate_sequences(sequences, error_on_duplicates=False):
"""Identify and drop duplicate sequences from the given iterator.
Parameters
----------
sequences : Iterator
Yields
------
Bio.SeqIO.Seq :
Unique sequence records
Raises
------
DuplicateSequenceError :
If `error_on_duplicates` is True and any duplicate records are found,
raises an exception with a comma-delimited list of duplicates as the
message.
"""
sequence_hash_by_name= {}
duplicate_strains=set()
forsequenceinsequences:
# Hash each sequence and check whether another sequence with the same
# name already exists and if the hash is different.
sequence_hash=hashlib.sha256(str(sequence.seq).encode("utf-8")).hexdigest()
ifsequence.nameinsequence_hash_by_name:
# If the hashes differ (multiple entries with the same strain name
# but different sequences), we keep the first sequence and add the
# strain to a list of duplicates to report at the end.
ifsequence_hash_by_name.get(sequence.name) !=sequence_hash:
duplicate_strains.add(sequence.name)
# If the current strain has been seen before, don't write
# out its sequence again.
continue
sequence_hash_by_name[sequence.name] =sequence_hash
yieldsequence
# Report names of duplicate strains with different sequences when requested.
iflen(duplicate_strains) >0anderror_on_duplicates:
raiseDuplicateSequenceError(", ".join(duplicate_strains))
if__name__=='__main__':
parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--sequences", nargs="+", required=True, help="sequences to be sanitized")
parser.add_argument("--strip-prefixes", nargs="+", help="prefixes to strip from strain names in the sequences")
parser.add_argument('--error-on-duplicate-strains', action="store_true", help="exit with an error when the same strain is detected multiple times with different sequences. By default, use the first occurrence of each duplicated sequence.")
parser.add_argument("--output", required=True, help="sanitized sequences")
args=parser.parse_args()
sequence_files= []
tar_files= []
forsequence_filenameinargs.sequences:
# If the input is a tarball, try to find a sequence file inside the
# archive.
if".tar"inPath(sequence_filename).suffixes:
try:
sequence_file, tar_file=stream_tar_file_contents(
sequence_filename,
"sequences"
)
sequence_files.append(sequence_file)
tar_files.append(tar_file)
exceptFileNotFoundErroraserror:
print(f"ERROR: {error}", file=sys.stderr)
sys.exit(1)
else:
sequence_files.append(sequence_filename)
# Replace whitespace and everything following pipes with nothing.
pattern="( )|(\|.*)"
ifargs.strip_prefixes:
prefixes="|".join(args.strip_prefixes)
pattern=f"^({prefixes})|{pattern}"
withopen_file(args.output, "w", threads=1) asoutput_handle:
# In order to prefer the latter files, we have to reverse the order of
# the files.
sequences=read_sequences(*reversed(sequence_files))
renamed_sequences=rename_sequences(sequences, pattern)
deduplicated_sequences=drop_duplicate_sequences(
renamed_sequences,
args.error_on_duplicate_strains
)
try:
forsequenceindeduplicated_sequences:
write_sequences(sequence, output_handle)
exceptDuplicateSequenceErroraserror:
print(
f"ERROR: The following strains have duplicate sequences: {error}",
file=sys.stderr
)
sys.exit(1)
# Clean up temporary directory and files that came from a tarball.
fortar_fileintar_files:
tar_file.close()