- Notifications
You must be signed in to change notification settings - Fork 46.7k
/
Copy pathlempel_ziv.py
125 lines (100 loc) · 3.5 KB
/
lempel_ziv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
One of the several implementations of Lempel-Ziv-Welch compression algorithm
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
"""
importmath
importos
importsys
defread_file_binary(file_path: str) ->str:
"""
Reads given file as bytes and returns them as a long string
"""
result=""
try:
withopen(file_path, "rb") asbinary_file:
data=binary_file.read()
fordatindata:
curr_byte=f"{dat:08b}"
result+=curr_byte
returnresult
exceptOSError:
print("File not accessible")
sys.exit()
defadd_key_to_lexicon(
lexicon: dict[str, str], curr_string: str, index: int, last_match_id: str
) ->None:
"""
Adds new strings (curr_string + "0", curr_string + "1") to the lexicon
"""
lexicon.pop(curr_string)
lexicon[curr_string+"0"] =last_match_id
ifmath.log2(index).is_integer():
forcurr_key, valueinlexicon.items():
lexicon[curr_key] =f"0{value}"
lexicon[curr_string+"1"] =bin(index)[2:]
defcompress_data(data_bits: str) ->str:
"""
Compresses given data_bits using Lempel-Ziv-Welch compression algorithm
and returns the result as a string
"""
lexicon= {"0": "0", "1": "1"}
result, curr_string="", ""
index=len(lexicon)
foriinrange(len(data_bits)):
curr_string+=data_bits[i]
ifcurr_stringnotinlexicon:
continue
last_match_id=lexicon[curr_string]
result+=last_match_id
add_key_to_lexicon(lexicon, curr_string, index, last_match_id)
index+=1
curr_string=""
whilecurr_string!=""andcurr_stringnotinlexicon:
curr_string+="0"
ifcurr_string!="":
last_match_id=lexicon[curr_string]
result+=last_match_id
returnresult
defadd_file_length(source_path: str, compressed: str) ->str:
"""
Adds given file's length in front (using Elias gamma coding) of the compressed
string
"""
file_length=os.path.getsize(source_path)
file_length_binary=bin(file_length)[2:]
length_length=len(file_length_binary)
return"0"* (length_length-1) +file_length_binary+compressed
defwrite_file_binary(file_path: str, to_write: str) ->None:
"""
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
file
"""
byte_length=8
try:
withopen(file_path, "wb") asopened_file:
result_byte_array= [
to_write[i : i+byte_length]
foriinrange(0, len(to_write), byte_length)
]
iflen(result_byte_array[-1]) %byte_length==0:
result_byte_array.append("10000000")
else:
result_byte_array[-1] +="1"+"0"* (
byte_length-len(result_byte_array[-1]) -1
)
foreleminresult_byte_array:
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
exceptOSError:
print("File not accessible")
sys.exit()
defcompress(source_path: str, destination_path: str) ->None:
"""
Reads source file, compresses it and writes the compressed result in destination
file
"""
data_bits=read_file_binary(source_path)
compressed=compress_data(data_bits)
compressed=add_file_length(source_path, compressed)
write_file_binary(destination_path, compressed)
if__name__=="__main__":
compress(sys.argv[1], sys.argv[2])