diff --git a/Makefile b/Makefile index 2da8142..49a6b07 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,8 @@ .PHONY: clean-pyc clean-build docs clean PYTHON := python3 +# It's named nosetests3 in python3-nose=1.3.7-8 Ubuntu package. +NOSETEST != type nosetests3 >/dev/null 2>&1 && echo nosetests3 || echo nosetests # --------------------------------------------------------- # @@ -44,13 +46,13 @@ clean-test: # test # test: - nosetests -x -v tests/test_structs.py tests/test_llapi.py tests/test_hlapi.py + $(NOSETEST) -x -v tests/test_*.py test_llapi: - nosetests -x -v tests/$@.py + $(NOSETEST) -x -v tests/$@.py test_hlapi: - nosetests -x -v tests/$@.py + $(NOSETEST) -x -v tests/$@.py .PHONY: sdist diff --git a/read_tsip.py b/read_tsip.py index 9cb50af..33d497e 100755 --- a/read_tsip.py +++ b/read_tsip.py @@ -13,8 +13,8 @@ import tsip import time import binascii - import logging + logging.basicConfig(level=logging.INFO) @@ -30,7 +30,6 @@ def main(): except IndexError: help() - if os.path.isfile(source): conn = open(source) else: @@ -41,20 +40,17 @@ def main(): except TypeError: help() - conn = serial.Serial(source ,baud) + conn = serial.Serial(source, baud) gps = tsip.GPS(conn) - + while True: packet = gps.read() - if packet: - print "0x%0x %s" % (packet.code, packet.values) + print(packet) else: - print 'None' + print("None") -if __name__ == '__main__': +if __name__ == "__main__": main() - - diff --git a/tests/copernicus2.py b/tests/copernicus2.py index 65c593c..00c46de 100644 --- a/tests/copernicus2.py +++ b/tests/copernicus2.py @@ -37,6 +37,7 @@ def sigalrm_handler(s, f): signal.signal(signal.SIGALRM, sigalrm_handler) +# FIXME: broken as .code and .subcode attributes were removed since abb9c82a @attr(gps='copernicus2') class TestCopernicus(object): def setup(self): @@ -163,4 +164,4 @@ def test_0x2d_0x4d(self): command = Packet(0x2d) report = self.send_expect(command, 0x4d) -# TODO: 0x2e \ No newline at end of file +# TODO: 0x2e diff --git a/tests/read_thunderbolt.tsip.py b/tests/read_thunderbolt.tsip.py deleted file mode 100644 index 31e270d..0000000 --- a/tests/read_thunderbolt.tsip.py +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/python - -import struct - -def main(): - - with open('thunderbolt.tsip') as reader: - for packet in GPS(reader): - print Packet.unpack(packet) - - - -if __name__ == '__main__': - main() - diff --git a/tests/test_hlapi.py b/tests/test_hlapi.py index b33ca6d..0d61e80 100644 --- a/tests/test_hlapi.py +++ b/tests/test_hlapi.py @@ -1,10 +1,7 @@ from struct import Struct -try: - import StringIO as stringio -except ImportError: - import io as stringio +import io from nose.tools import raises @@ -33,6 +30,9 @@ def test_pack(self): self.pkt4 == \ self.pkt5 == \ self.pkt6 + for p in (self.pkt1, self.pkt2, self.pkt3, self.pkt4, self.pkt5, self.pkt6): + assert repr(p).startswith('Packet') + assert str(p).startswith('Packet') @raises(PackError) @@ -45,19 +45,21 @@ def test_unpack_unknown_packet(): packet = Packet.unpack('\x1e\x01\x02') assert packet[0] == 255 assert packet[1] == '\x1e\x01\x02' + assert str(packet).startswith('Packet') + assert repr(packet).startswith('Packet') -#def test_gps(): -# conn = stringio.StringIO() -# conn.write('\x10\x1c\x81\x00\x03\x02\x01\x0b\x11\x07\xdf\x0bproductname\x10\03') -# conn.seek(0) -# gps = GPS(conn) -# packet = gps.read() -# assert packet[0] == 0x1c -# -# packet = Packet.unpack('\x1e\x01') -# gps.write(packet) +def test_gps(): + conn = io.BytesIO() + conn.write(b'\x10\x1c\x81\x00\x03\x02\x01\x0b\x11\x07\xdf\x0bproductname\x10\x03') + conn.seek(0) + gps = GPS(conn) + packet = gps.read() + assert packet[0] == 0x1c + assert packet[9] == 'productname' + packet = Packet.unpack('\x1e\x01') + gps.write(packet) class Test0x1c01(PacketTest): (fields, rawpacket) = ([0x1c, 0x01], '\x1c\x01') diff --git a/tests/test_llapi.py b/tests/test_llapi.py index 7d2b965..9b62d54 100644 --- a/tests/test_llapi.py +++ b/tests/test_llapi.py @@ -14,13 +14,24 @@ from tsip.config import * from tsip.llapi import * +bID = b'\x42' # just a valid single-byte ID class TestIsFramed(object): def test_isframed(self): - assert is_framed(bDLE + bDLE + bETX) is True + # Following TSIP Packet Structure from ICM SMT 360™ & RES SMT 360™ USER GUIDE. + # is neither a valid framed packet (ID at [1] can't be DLE and can't be empty) + # nor a valid unframed & unstuffed packet (ID at [0] can't be DLE, once again). + assert is_framed(bDLE + bDLE + bETX) is False + assert is_framed(bDLE + bDLE + bDLE + bETX) is False + assert is_framed(bDLE + bETX + bDLE + bETX) is False + + # Boring good packets: + assert is_framed(bDLE + bID + bDLE + bETX) is True assert is_framed(bDLE + b'payload' + bDLE + bETX) is True + # define API for None + assert is_framed(None) is False class TestFrame(object): @@ -29,38 +40,77 @@ def test_frame(self): def test_unframe(self): assert unframe(bDLE + b'payload' + bDLE + bETX) == b'payload' + assert unframe(bDLE + b'pay' + bDLE_DLE + bDLE_ETX) == b'pay' + bDLE_DLE + + def test_encode(self): + assert frame(stuff(b'payload')) == bDLE + b'payload' + bDLE + bETX + assert frame(stuff(b'pay' + bDLE)) == bDLE + b'pay' + bDLE_DLE + bDLE_ETX + assert frame(stuff(b'pay' + bDLE + bDLE)) == bDLE + b'pay' + bDLE_DLE + bDLE_DLE + bDLE_ETX + + def test_decode(self): + assert unstuff(unframe(b'\x10pay\x10\x10\x10\x03')) == b'pay\x10' + assert unstuff(unframe(b'\x10pay\x10\x10\x10\x10\x10\x03')) == b'pay\x10\x10' + assert unstuff(unframe(b'\x10pa\x10\x10y\x10\x10\x10\x03')) == b'pa\x10y\x10' + assert unstuff(unframe(b'\x10p\x10\x10ay\x10\x10\x10\x03')) == b'p\x10ay\x10' + + @raises(ValueError) + def test_frame_nonstuffed_odd(self): + assert frame(b'pay\x10') + + @raises(ValueError) + def test_frame_nonstuffed_even(self): + assert frame(b'pa\x10y\x10') + + @raises(ValueError) + def test_frame_DLE(self): + assert frame(bDLE + b'payload') @raises(ValueError) - def test_frame_valueerror(self): + def test_frame_ETX(self): + assert frame(bETX + b'payload') + + @raises(ValueError) + def test_frame_framed(self): frame(bDLE + b'payload' + bDLE + bETX) @raises(ValueError) def test_unframe_valueerror(self): unframe(b'payload') + @raises(ValueError) + def test_unframe_empty_packet(self): + unframe(bDLE + bDLE_ETX) class TestStuff(object): def test_stuff(self): assert stuff(b'payload') == b'payload' - assert stuff(bDLE + b'payload') == bDLE + bDLE + b'payload' - assert stuff(bDLE + b'payload' + bDLE) == bDLE + bDLE + b'payload' + bDLE + bDLE - assert stuff(bDLE + bDLE + b'payload') == bDLE + bDLE + bDLE + bDLE + b'payload' + assert stuff(bID + bDLE + b'payload') == bID + bDLE_DLE + b'payload' + assert stuff(bID + bDLE + b'payload' + bDLE) == bID + bDLE_DLE + b'payload' + bDLE_DLE + assert stuff(bID + bDLE + bDLE + b'payload') == bID + bDLE_DLE + bDLE_DLE + b'payload' def test_unstuff(self): assert unstuff(b'payload') == b'payload' - assert unstuff(bDLE + bDLE + b'payload') == bDLE + b'payload' - assert unstuff(bDLE + bDLE + b'payload' + bDLE + bDLE) == bDLE + b'payload' + bDLE - assert unstuff(bDLE + bDLE + bDLE + bDLE + b'payload') == bDLE + bDLE + b'payload' + assert unstuff(bID + bDLE_DLE + b'payload') == bID + bDLE + b'payload' + assert unstuff(bID + bDLE_DLE + b'payload' + bDLE + bDLE) == bID + bDLE + b'payload' + bDLE + assert unstuff(bID + bDLE_DLE + bDLE_DLE + b'payload') == bID + bDLE + bDLE + b'payload' @raises(ValueError) def test_stuff_valueerror(self): stuff(bDLE + b'payload' + bDLE + bETX) + @raises(ValueError) + def test_stuff_id_eq_dle(self): + stuff(bDLE + b'payload') + @raises(ValueError) def test_unstuff_valueerror(self): unstuff(bDLE + b'payload' + bDLE + bETX) + @raises(ValueError) + def test_unstuff_not_stuffed(self): + unstuff(bID + bDLE + b'payload' ) + class TestGPS(object): @@ -79,21 +129,39 @@ def test_init(self): assert isinstance(self.gps_, gps) assert self.gps_.conn == self.conn -# def test_next(self): -# packet = self.gps_.next() -# assert packet.startswith(bDLE) -# assert packet.endswith(bDLE + bETX) -# packet = self.gps_.next() -# assert packet.startswith(bDLE) -# assert packet.endswith(bDLE + bETX) -# packet = self.gps_.next() -# assert packet.startswith(bDLE) -# assert packet.endswith(bDLE + bETX) - - def test_iter(self): - for packet in self.gps_: +def setup_tsipfile(fname): + if os.path.isfile(fname): + return fname + fname = os.path.join('tests', fname) + assert os.path.isfile(fname) + return fname + +KNOWN_DUMPS = ( + ('thunderbolt.tsip', 211), + ('copernicus2.tsip', 2478), +) + +def test_gps_next(): + for fname, expected_count in KNOWN_DUMPS: + conn = open(setup_tsipfile(fname), 'rb') + gps_ = gps(conn) + count = 0 + for packet in iter(gps_.next, None): + assert packet.startswith(bDLE) + assert packet.endswith(bDLE_ETX) + count += 1 + assert count == expected_count + +def test_gps_iter(): + for fname, expected_count in KNOWN_DUMPS: + conn = open(setup_tsipfile(fname), 'rb') + gps_ = gps(conn) + count = 0 + for packet in gps_: assert packet.startswith(bDLE) assert packet.endswith(bDLE + bETX) + count += 1 + assert count == expected_count # def test_unframe(self): # for packet in self.gps_: diff --git a/tests/test_tsip.py b/tests/test_tsip.py new file mode 100644 index 0000000..447a5c5 --- /dev/null +++ b/tests/test_tsip.py @@ -0,0 +1,61 @@ +import io +import itertools +import os.path + +from nose.tools import * + +from tsip import * + + +@raises(ValueError) +def test_hlgps_read_eof(): + # FIXME: that's not really a test, but a statement on weird API behavior on EOF. + p = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, "productname") + raw = frame(stuff(p.pack())) + conn = io.BytesIO(raw + raw) + drv = GPS(conn) + p1 = drv.read() + assert p1 == p + p2 = drv.read() + assert p2 == p + try: + p3 = drv.read() + except ValueError as e: + assert e.args == ("packet does not contain leading DLE+ID and trailing DLE/ETX",), e + raise e + + +def test_llgps_read_eof(): + p = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, "productname") + raw = frame(stuff(p.pack())) + conn = io.BytesIO(raw + raw) + drv = gps(conn) + p1 = drv.read() + assert p1 == raw + p2 = drv.read() + assert p2 == raw + p3 = drv.read() + assert p3 == None + + +def test_llgps_read_midpacket(): + for nlen in range(7): # simple fuzzer + for opt in itertools.product((bDLE, bETX, b"A"), repeat=nlen): + name = b"".join(opt) + pkt = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, name) + raw = frame(stuff(pkt.pack())) + + conn = io.BytesIO(raw + raw) + drv = gps(conn) + assert drv.read() == drv.read() == raw + + for off in range(1, len(raw)): + tail = raw[off:] + conn = io.BytesIO(tail + raw) + drv = gps(conn) + p1 = drv.read() + p2 = drv.read() + p3 = drv.read() + # Either half-packet is skipped silently && got RAW and EOF + # OR first damaged packet looks okayish, second is RAW and EOF + assert (p1 == raw and p2 == p3 == None) or (p1 == tail and p2 == raw and p3 is None) diff --git a/tsip/config.py b/tsip/config.py index 0168782..415c59d 100644 --- a/tsip/config.py +++ b/tsip/config.py @@ -19,6 +19,8 @@ # bDLE = DLE.to_bytes(1, 'little') bETX = ETX.to_bytes(1, 'little') +bDLE_DLE = bDLE + bDLE +bDLE_ETX = bDLE + bETX # Contants for setting bits diff --git a/tsip/hlapi.py b/tsip/hlapi.py index 4ba0999..d167bab 100644 --- a/tsip/hlapi.py +++ b/tsip/hlapi.py @@ -158,9 +158,23 @@ def unpack(cls, rawpacket): # return cls(0xff, rawpacket) + def __str__(self): + f = self.fields + # Name known packets in a way documentation names them: + if f[0] == 0xff: + return 'PacketErr' + str(tuple(f[1:])) + elif f[0] in PACKET_STRUCTURES: + head = 'Packet_0x{:02X}'.format(f[0]) + klen = 1 + elif len(f) > 1 and (f[0] * 256 + f[1]) in PACKET_STRUCTURES: + head = 'Packet_0x{:02X}-{:02X}'.format(f[0], f[1]) + klen = 2 + else: + return repr(self) + return head + repr(tuple(self.fields[klen:])) def __repr__(self): - return 'Packet%s' % (str(tuple(self.fields))) + return 'Packet%s' % (repr(tuple(self.fields))) class GPS(gps): diff --git a/tsip/llapi.py b/tsip/llapi.py index d38d33a..ea9d952 100644 --- a/tsip/llapi.py +++ b/tsip/llapi.py @@ -18,13 +18,12 @@ def is_framed(packet): """ - if packet == None or len(packet) < 3: + if packet == None or len(packet) < 4: return False - else: - return packet[0] == DLE and packet[-2] == DLE and packet[-1] == ETX + return packet[0] == DLE and packet[1] not in (DLE, ETX) and packet[-2:] == bDLE_ETX -def frame(data): +def frame(packet): """ Add leading DLE and trailing DLE/ETX to data. @@ -35,10 +34,15 @@ def frame(data): """ - if is_framed(data): - raise ValueError('data contains leading DLE and trailing DLE/ETX') - else: - return bDLE + data + bDLE + bETX + if is_framed(packet): + raise ValueError('packet contains leading DLE and trailing DLE/ETX') + if packet[0] in (DLE, ETX): # logic error + raise ValueError('packet can\t be DLE or ETX', packet[0]) + single = packet.count(bDLE) + double = packet.count(bDLE_DLE) + if single != double * 2: # coding error? + raise ValueError('packet contains unbalanced count of DLE (not stuffed?)', single, double) + return bDLE + packet + bDLE_ETX def unframe(packet): @@ -50,13 +54,11 @@ def unframe(packet): :return: TSIP packet with leading DLE and trailing DLE/ETX removed. :raise: ``ValueError`` if `packet` does not start with DLE and end in DLE/ETX. - """ - if is_framed(packet): - return packet.lstrip(bDLE).rstrip(bETX).rstrip(bDLE) - else: - raise ValueError('packet does not contain leading DLE and trailing DLE/ETX') + if not is_framed(packet): + raise ValueError('packet does not contain leading DLE+ID and trailing DLE/ETX') + return packet[1:-2] def stuff(packet): @@ -71,9 +73,9 @@ def stuff(packet): if is_framed(packet): raise ValueError('packet contains leading DLE and trailing DLE/ETX') - else: - return packet.replace(bDLE, bDLE + bDLE) - + if packet[0] in (DLE, ETX): # logic error + raise ValueError('packet can\t be DLE or ETX', packet[0]) + return packet.replace(bDLE, bDLE_DLE) def unstuff(packet): @@ -89,8 +91,16 @@ def unstuff(packet): if is_framed(packet): raise ValueError('packet contains leading DLE and trailing DLE/ETX') - else: - return packet.replace(bDLE + bDLE, bDLE) + # TSIP is not generous enough to provide checksums, so we squeeze every validation opportunity + single = packet.count(bDLE) + double = packet.count(bDLE_DLE) + if single != double * 2: + raise ValueError('packet contains uneven count of DLE', single, double) + unstuffed = packet.replace(bDLE_DLE, bDLE) + assert unstuffed.count(bDLE) == double + if unstuffed[0] in (DLE, ETX): + raise ValueError('packet can\t be DLE or ETX', unstuffed[0]) + return unstuffed class gps(object): @@ -102,39 +112,57 @@ def __iter__(self): return self def read(self): - - packet = bytes() - pkt_active = 0 - - #hold last 3 bytes (initialize assuming previous byte wasn't DLE) - #could get unlucky if start reading mid-message with 2nd data DLE (stuffed) byte as first byte seen - #would mis-interpret as start of message, but will simply return corrupt first packet, which was invalid anyway - b = [b'\0x00', b'\0x00', b'\0x00'] + # It's wrong to assume that end will always be . + # is perfectly fine if we get one in the end of the packet + # for some reason, e.g. unlucky floating point number, weird product name, + # Packet 0x45 with firmware built on year 2016, Packet 0x41 with leap second count 16, + # Packet 0x57 with (week % 256) being 16, Packet 0x8F-AB (Primary Timing) on year 2064, etc. + # + # is also a valid packet body. so we can't know if is + # the end of the packet or not unless we've counted true parity of count. We can't + # count it properly unless we've seen . It makes harder to skip the half-message in + # a streaming way: series of followed by may be the body or end of the packet. + # + # [] is always a marker of end of one packet and start of another + # one, as non-stuffed is only valid at the preamble. + # + # However, the read() usually does not see the as the usual startup happens when the + # wire is silent. Waiting for and skipping the first packet on every read() to sync + # with the stream is suboptimal. Storing state in the backtracking buffer is an option, but + # re-syncing might be more robust. + # + # Initialize assuming previous byte wasn't DLE. Could get unlucky if start reading + # mid-message with 2nd data DLE (stuffed) byte as first byte seen would mis-interpret as + # start of message, but will simply return corrupt first packet, which was invalid anyway. + # Packet ID is never ETX or DLE, so it'll break the streak. + streak = None + buf = [] while True: - b[0] = self.conn.read(1) - - if len(b[0]) == 0: #timeout + b = self.conn.read(1) + if len(b) == 0: # timeout, EOF return None - - #rather than counting even/odd DLEs, look for known pattern for start/end, to prevent issues when start reading mid-message - #end will always be - #start will always be (1 byte delayed, since DLE is the start) - if b[2][0] != DLE and b[1][0] == DLE and b[0][0] == ETX: #end of message - if pkt_active: #only return packet if active, otherwise found end of partial message, ignore - packet += b[0] - return packet - elif b[2][0] != DLE and b[1][0] == DLE and b[0][0] != DLE: #start of message - pkt_active = 1 - packet += bDLE #start is delayed by 1 byte, need to put first DLE byte into packet - packet += b[0] - else: - if pkt_active: #only accumulate packet data after start of message was found - packet += b[0] - - #shift old bytes - b[2] = b[1] - b[1] = b[0] - + buf.append(b) + if b == bETX and streak is not None and streak & 1: + # That's the end of packet for sure. The head might be just fine, but it as well may + # be polluted with the tail of the previous packet or have chopped off. + # 1. P endswith <2N+1*DLE> + # 2. That's the only <2N+1*DLE> sequence in P. + # Let's _assume_ that no bytes were skipped, e.g. by delayed calls to conn.read(). + # In this case the only possible tail of the previous packet is . + # start does not guarantee that the frame is complete, but that's the + # best we can hope for. Otherwise the frame is just half-frame. + p = b''.join(buf) + if p[0] == DLE and p[1] not in (DLE, ETX): # common case + return p + p = p.lstrip(bDLE) + if len(p) >= 4 and p[0] == ETX and p[1] == DLE and p[2] not in (DLE, ETX): + return p[1:] # N=0, just + else: + buf = [] # skip half-frame, keep `streak`, TODO: decide if return None instead + if b != bDLE: + streak = 0 + elif streak is not None: # got now, have seen before + streak += 1 def next(self): packet = self.read() diff --git a/tsip/structs.py b/tsip/structs.py index f90aebb..12cfc75 100644 --- a/tsip/structs.py +++ b/tsip/structs.py @@ -134,20 +134,27 @@ class Struct0x1c81(object): format = '>BBBBBBBBH' def pack(self, *f): + # TODO: ensure that ASCII encoding is used return struct.pack(self.format, *f[:-1]) + struct.pack('>B', len(f[-1])) + tobytes(f[-1]) def unpack(self, rawpacket): - return struct.unpack(self.format, rawpacket[:10]) + (rawpacket[11:].decode(),) + if rawpacket[10] != len(rawpacket) - 11: + raise ValueError('Wrong L1 length', rawpacket[10], len(rawpacket) - 11) + return struct.unpack(self.format, rawpacket[:10]) + (rawpacket[11:].decode('ascii'),) class Struct0x1c83(object): + """Report packet 0x1C-83: Hardware component version information.""" format = '>BBIBBHBH' def pack(self, *f): + # TODO: ensure that ASCII encoding is used return struct.pack(self.format, *f[:-1]) + struct.pack('>B', len(f[-1])) + tobytes(f[-1]) def unpack(self, s): - return struct.unpack(self.format, s[:13]) + (s[14:].decode(),) + if s[13] != len(s) - 14: + raise ValueError('Wrong L1 length', s[13], len(s) - 14) + return struct.unpack(self.format, s[:13]) + (s[14:].decode('ascii'),) class Struct0x47(object): def pack(self, *f):