- Notifications
You must be signed in to change notification settings - Fork 406
/
Copy pathutils.py
136 lines (109 loc) · 4.59 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
fromioimportTextIOWrapper
importlzma
frompathlibimportPath
importsys
importtarfile
importtempfile
EXTENSION_BY_FILETYPE= {
"metadata": ".tsv",
"sequences": ".fasta",
}
defextract_tar_file_contents(filename, filetype):
"""Try to extract the contents of a given file type (e.g., metadata or
sequences) from the given tar filename.
Parameters
----------
filename : str or Path-like
Path to the tar archive to search for the given file type.
filetype : str
Type of file to search for in the given tar archive based on the
associated file extension.
Returns
-------
tempfile.TemporaryDir :
Temporary directory containing the file extracted from the tar archive.
pathlib.Path :
Path to the file extracted from the archive with the same name as the
file in the original archive.
Raises
------
FileNotFoundError :
When a file with the the requested file type's extension could not be
found in the given tar archive.
"""
extension=EXTENSION_BY_FILETYPE[filetype]
withtarfile.open(filename) astar:
internal_member=None
formemberintar.getmembers():
suffixes=Path(member.name).suffixes
ifextensioninsuffixes:
# Only use the last part of the member file name, excluding any
# leading directories that might include the root file system.
member_path=Path(member.name)
member.name=member_path.name
# By default, return the binary stream for the member file.
internal_member=member
break
ifinternal_memberisNone:
raiseFileNotFoundError(f"Could not find a {filetype} file in '{filename}'")
# Extract the internal file with its original name in the tar archive to
# a temporary directory. This approach allows downstream processes to
# re-read the file in multiple passes instead of making a single pass
# through a stream.
temporary_dir=tempfile.TemporaryDirectory()
tar.extractall(
temporary_dir.name,
members=(internal_member,)
)
extracted_file_path=Path(temporary_dir.name) /Path(internal_member.name)
print(f"Extracted {filetype} file from {filename} to {extracted_file_path}", file=sys.stderr)
# Return temporary directory with the path to the extract file to allow the
# caller to clean up this directory and to maintain a reference to this
# directory until it is no longer needed. Python will automatically clean up
# the temporary directory when its object is destroyed. For more details, see
# https://docs.python.org/3/library/tempfile.html#tempfile.TemporaryDirectory
returntemporary_dir, extracted_file_path
defstream_tar_file_contents(filename, filetype):
"""Try to extract the contents of a given file type (e.g., metadata or
sequences) from the given tar filename.
Parameters
----------
filename : str or Path-like
Path to the tar archive to search for the given file type.
filetype : str
Type of file to search for in the given tar archive based on the
associated file extension.
Returns
-------
io.BufferedReader :
A stream of the requested file from the tar archive.
TarFile :
A handle to the original tar archive to be closed when the stream has
been read.
Raises
------
FileNotFoundError :
When a file with the the requested file type's extension could not be
found in the given tar archive.
"""
extension=EXTENSION_BY_FILETYPE[filetype]
tar=tarfile.open(filename)
internal_file=None
formemberintar.getmembers():
suffixes=Path(member.name).suffixes
ifextensioninsuffixes:
# By default, return the binary stream for the member file.
internal_file=tar.extractfile(member.name)
if".xz"insuffixes:
# Check for LZMA-compressed data and open these with the
# corresponding library.
internal_file=lzma.open(internal_file, "rt")
elifextension==".fasta":
# For sequence data, handle decoding of the binary stream prior
# to passing the data back to the caller.
internal_file=TextIOWrapper(internal_file)
break
ifinternal_fileisNone:
tar.close()
raiseFileNotFoundError(f"Could not find a {filetype} file in '{filename}'")
returninternal_file, tar