I am currently working on a AIS message decoder written in pure Python. It's just a little project to teach me some things about binary data, decoding, etc. Because I am only programming for about a year or so I am not quite sure, if my approaches is reasonable. The full source code be found here: https://github.com/M0r13n/pyais
Without going too deep into the details, my question is: What is a reasonably fast way to decode binary content in pure Python?
Let's say I have a bit sequence with a length of 168 bits and this bit sequence contains encoded data. The data may not be a multiple of 2 and therefore won't fit into typical data structures such as bytes.
I tried three approaches:
1: Store the bin sequence as a normal string and convert each substring into an int individually:
bit_str = '000001000101011110010111000110100110000000100000000000000001010111001111101011010110110000010101101000100010000010011001100101001111111111110110000010001000100010001110' d = { 'type': int(bit_vector[0:6], 2), 'repeat': int(bit_vector[6:8], 2), 'mmsi': int(bit_vector[8:38], 2), 'ais_version': int(bit_vector[38:40], 2), 'imo': int(bit_vector[40:70], 2), 'callsign': ascii6(bit_vector[70:112]), # custom method of mine, ignore for now # ... }
2: Using bitstring's BitArray and slicing:
b = BitArray(bin=bit_vector) # access each piece of data like so type_ = b[0:6].int
3: Using the bitarray module:
-> The Bitarray module does not have a nice way to convert individual parts into ints, so I dropped it.
Approach 1 (my current one) decodes #8000 messages in 1.132967184 seconds and the second one takes nearly 3 seconds.
Overall I am fairly happy with my first idea, but I feel like, I could be missing something. My main concern is readability, but the code should not be overly slow. In C I would have used structs, is the ctypes module worth a consideration?
EDIT: It was suggested, that I include all relevant parts of my code here. So here we go:
The overall flow can be summarized like this. First part is the name of the method and the second part in brackets it the new data type, after the method was called:
Socket data(bytes) -> decode("utf-8")(string) -> decode_ascii6()(string) -> decode_msg_1()(dict)
Reading the data.
Data is coming from a TCP socket:
def ais_stream(url="ais.exploratorium.edu", port=80): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((url, port)) while True: # there may be multiple messages for msg in s.recv(4096).decode("utf-8").splitlines(): yield msg
Decoding
Each message is then decoded and it's checksum is computed. Because AIS is using 6bit ASCII and not 8bit like normal, we need convert the payload. The checksum is just a logical XOR of all characters. Also there are different message types and each type requires it's own decoding. I simply created a list of the first 25 types and stored the corresponding function in it. Because functions are objects in python, that should not be a problem. And this is a very clean way to express my intention. The to_int and signed methods are just wrappers around Pythons normal int method, that check of empty strings, etc.
def decode_ascii6(data): """ Decode AIS_ASCII_6 encoded data and convert it into binary. :param data: ASI_ASCII_6 encoded data :return: a binary string of 0's and 1's, e.g. 011100 011111 100001 """ binary_string = '' for c in data: c = ord(c) - 48 if c > 40: c -= 8 binary_string += f'{c:06b}' return binary_string def checksum(msg): """ Compute the checksum of a given message :param msg: message :return: hex """ c_sum = 0 for c in msg[1::]: if c == '*': break c_sum ^= ord(c) return c_sum def decode(msg): m_typ, n_sentences, sentence_num, seq_id, channel, data, chcksum = msg.split(',') # convert normal ASCII into AIS_ASCII_6 decoded_data = decode_ascii6(data) msg_type = int(decoded_data[0:6], 2) if checksum(msg) != int("0x" + chcksum[2::], 16): print(f"\x1b[31mInvalid Checksum dropping packet!\x1b[0m") return None if n_sentences != '1' or sentence_num != '1': print(f"\x1b[31mSentencing is not supported yet!\x1b[0m") return None if 0 < msg_type < 25: # DECODE_MSG is a list of functions return DECODE_MSG[msg_type](decoded_data) return None
Actual AIS Message Decoding
As I said, each message types is different and so needs to be treated individually. I decided to decode each message and return a dictionary containing all relevant information. Because 90% of the data is numeric, I also convert every chunk of data into an integer. Some information needs additional context. For example the NAVIGATION_STATUS, where each number has a special meaning. Therefore I initialize a Dictionary when the script is called, which serves as a lookup table. The lookup time is O(1) and the code gets very readable. See the decoding of message type 1 as an example:
def decode_msg_1(bit_vector): """ AIS Vessel position report using SOTDMA (Self-Organizing Time Division Multiple Access) Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_types_1_2_and_3_position_report_class_a """ status = to_int(bit_vector[38:42], 2) maneuver = to_int(bit_vector[143:145], 2) return { 'type': to_int(bit_vector[0:6], 2), 'repeat': to_int(bit_vector[6:8], 2), 'mmsi': to_int(bit_vector[8:38], 2), 'status': (status, NAVIGATION_STATUS[status]), # ... 'lon': signed(bit_vector[61:89]) / 600000.0, 'lat': signed(bit_vector[89:116]) / 600000.0, 'course': to_int(bit_vector[116:128], 2) * 0.1, # ... 'maneuver': (maneuver, MANEUVER_INDICATOR[maneuver]), }
I would love to get some feedback and tips from you! :-)
struct
module in the standard library?\$\endgroup\$b[123:]
).\$\endgroup\$