Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
.PHONY: clean-pyc clean-build docs clean

PYTHON := python3
# It's named nosetests3 in python3-nose=1.3.7-8 Ubuntu package.
NOSETEST != type nosetests3 >/dev/null 2>&1 && echo nosetests3 || echo nosetests

# ---------------------------------------------------------
#
Expand Down Expand Up @@ -44,13 +46,13 @@ clean-test:
# test
#
test:
nosetests -x -v tests/test_structs.py tests/test_llapi.py tests/test_hlapi.py
$(NOSETEST) -x -v tests/test_*.py

test_llapi:
nosetests -x -v tests/$@.py
$(NOSETEST) -x -v tests/$@.py

test_hlapi:
nosetests -x -v tests/$@.py
$(NOSETEST) -x -v tests/$@.py


.PHONY: sdist
Expand Down
16 changes: 6 additions & 10 deletions read_tsip.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import tsip
import time
import binascii

import logging

logging.basicConfig(level=logging.INFO)


Expand All @@ -30,7 +30,6 @@ def main():
except IndexError:
help()


if os.path.isfile(source):
conn = open(source)
else:
Expand All @@ -41,20 +40,17 @@ def main():
except TypeError:
help()

conn = serial.Serial(source ,baud)
conn = serial.Serial(source, baud)

gps = tsip.GPS(conn)

while True:
packet = gps.read()

if packet:
print "0x%0x %s" % (packet.code, packet.values)
print(packet)
else:
print 'None'
print("None")


if __name__ == '__main__':
if __name__ == "__main__":
main()


3 changes: 2 additions & 1 deletion tests/copernicus2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def sigalrm_handler(s, f):
signal.signal(signal.SIGALRM, sigalrm_handler)


# FIXME: broken as .code and .subcode attributes were removed since abb9c82a
@attr(gps='copernicus2')
class TestCopernicus(object):
def setup(self):
Expand Down Expand Up @@ -163,4 +164,4 @@ def test_0x2d_0x4d(self):
command = Packet(0x2d)
report = self.send_expect(command, 0x4d)

# TODO: 0x2e
# TODO: 0x2e
15 changes: 0 additions & 15 deletions tests/read_thunderbolt.tsip.py

This file was deleted.

30 changes: 16 additions & 14 deletions tests/test_hlapi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@

from struct import Struct

try:
import StringIO as stringio
except ImportError:
import io as stringio
import io

from nose.tools import raises

Expand Down Expand Up @@ -33,6 +30,9 @@ def test_pack(self):
self.pkt4 == \
self.pkt5 == \
self.pkt6
for p in (self.pkt1, self.pkt2, self.pkt3, self.pkt4, self.pkt5, self.pkt6):
assert repr(p).startswith('Packet')
assert str(p).startswith('Packet')


@raises(PackError)
Expand All @@ -45,19 +45,21 @@ def test_unpack_unknown_packet():
packet = Packet.unpack('\x1e\x01\x02')
assert packet[0] == 255
assert packet[1] == '\x1e\x01\x02'
assert str(packet).startswith('Packet')
assert repr(packet).startswith('Packet')


#def test_gps():
# conn = stringio.StringIO()
# conn.write('\x10\x1c\x81\x00\x03\x02\x01\x0b\x11\x07\xdf\x0bproductname\x10\03')
# conn.seek(0)
# gps = GPS(conn)
# packet = gps.read()
# assert packet[0] == 0x1c
#
# packet = Packet.unpack('\x1e\x01')
# gps.write(packet)
def test_gps():
conn = io.BytesIO()
conn.write(b'\x10\x1c\x81\x00\x03\x02\x01\x0b\x11\x07\xdf\x0bproductname\x10\x03')
conn.seek(0)
gps = GPS(conn)
packet = gps.read()
assert packet[0] == 0x1c
assert packet[9] == 'productname'

packet = Packet.unpack('\x1e\x01')
gps.write(packet)

class Test0x1c01(PacketTest):
(fields, rawpacket) = ([0x1c, 0x01], '\x1c\x01')
Expand Down
110 changes: 89 additions & 21 deletions tests/test_llapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@
from tsip.config import *
from tsip.llapi import *

bID = b'\x42' # just a valid single-byte ID

class TestIsFramed(object):

def test_isframed(self):
assert is_framed(bDLE + bDLE + bETX) is True
# Following TSIP Packet Structure from ICM SMT 360™ & RES SMT 360™ USER GUIDE.
# <DLE DLE ETX> is neither a valid framed packet (ID at [1] can't be DLE and can't be empty)
# nor a valid unframed & unstuffed packet (ID at [0] can't be DLE, once again).
assert is_framed(bDLE + bDLE + bETX) is False
assert is_framed(bDLE + bDLE + bDLE + bETX) is False
assert is_framed(bDLE + bETX + bDLE + bETX) is False

# Boring good packets:
assert is_framed(bDLE + bID + bDLE + bETX) is True
assert is_framed(bDLE + b'payload' + bDLE + bETX) is True

# define API for None
assert is_framed(None) is False

class TestFrame(object):

Expand All @@ -29,38 +40,77 @@ def test_frame(self):

def test_unframe(self):
assert unframe(bDLE + b'payload' + bDLE + bETX) == b'payload'
assert unframe(bDLE + b'pay' + bDLE_DLE + bDLE_ETX) == b'pay' + bDLE_DLE

def test_encode(self):
assert frame(stuff(b'payload')) == bDLE + b'payload' + bDLE + bETX
assert frame(stuff(b'pay' + bDLE)) == bDLE + b'pay' + bDLE_DLE + bDLE_ETX
assert frame(stuff(b'pay' + bDLE + bDLE)) == bDLE + b'pay' + bDLE_DLE + bDLE_DLE + bDLE_ETX

def test_decode(self):
assert unstuff(unframe(b'\x10pay\x10\x10\x10\x03')) == b'pay\x10'
assert unstuff(unframe(b'\x10pay\x10\x10\x10\x10\x10\x03')) == b'pay\x10\x10'
assert unstuff(unframe(b'\x10pa\x10\x10y\x10\x10\x10\x03')) == b'pa\x10y\x10'
assert unstuff(unframe(b'\x10p\x10\x10ay\x10\x10\x10\x03')) == b'p\x10ay\x10'

@raises(ValueError)
def test_frame_nonstuffed_odd(self):
assert frame(b'pay\x10')

@raises(ValueError)
def test_frame_nonstuffed_even(self):
assert frame(b'pa\x10y\x10')

@raises(ValueError)
def test_frame_DLE(self):
assert frame(bDLE + b'payload')

@raises(ValueError)
def test_frame_valueerror(self):
def test_frame_ETX(self):
assert frame(bETX + b'payload')

@raises(ValueError)
def test_frame_framed(self):
frame(bDLE + b'payload' + bDLE + bETX)

@raises(ValueError)
def test_unframe_valueerror(self):
unframe(b'payload')

@raises(ValueError)
def test_unframe_empty_packet(self):
unframe(bDLE + bDLE_ETX)

class TestStuff(object):

def test_stuff(self):
assert stuff(b'payload') == b'payload'
assert stuff(bDLE + b'payload') == bDLE + bDLE + b'payload'
assert stuff(bDLE + b'payload' + bDLE) == bDLE + bDLE + b'payload' + bDLE + bDLE
assert stuff(bDLE + bDLE + b'payload') == bDLE + bDLE + bDLE + bDLE + b'payload'
assert stuff(bID + bDLE + b'payload') == bID + bDLE_DLE + b'payload'
assert stuff(bID + bDLE + b'payload' + bDLE) == bID + bDLE_DLE + b'payload' + bDLE_DLE
assert stuff(bID + bDLE + bDLE + b'payload') == bID + bDLE_DLE + bDLE_DLE + b'payload'

def test_unstuff(self):
assert unstuff(b'payload') == b'payload'
assert unstuff(bDLE + bDLE + b'payload') == bDLE + b'payload'
assert unstuff(bDLE + bDLE + b'payload' + bDLE + bDLE) == bDLE + b'payload' + bDLE
assert unstuff(bDLE + bDLE + bDLE + bDLE + b'payload') == bDLE + bDLE + b'payload'
assert unstuff(bID + bDLE_DLE + b'payload') == bID + bDLE + b'payload'
assert unstuff(bID + bDLE_DLE + b'payload' + bDLE + bDLE) == bID + bDLE + b'payload' + bDLE
assert unstuff(bID + bDLE_DLE + bDLE_DLE + b'payload') == bID + bDLE + bDLE + b'payload'

@raises(ValueError)
def test_stuff_valueerror(self):
stuff(bDLE + b'payload' + bDLE + bETX)

@raises(ValueError)
def test_stuff_id_eq_dle(self):
stuff(bDLE + b'payload')

@raises(ValueError)
def test_unstuff_valueerror(self):
unstuff(bDLE + b'payload' + bDLE + bETX)

@raises(ValueError)
def test_unstuff_not_stuffed(self):
unstuff(bID + bDLE + b'payload' )


class TestGPS(object):

Expand All @@ -79,21 +129,39 @@ def test_init(self):
assert isinstance(self.gps_, gps)
assert self.gps_.conn == self.conn

# def test_next(self):
# packet = self.gps_.next()
# assert packet.startswith(bDLE)
# assert packet.endswith(bDLE + bETX)
# packet = self.gps_.next()
# assert packet.startswith(bDLE)
# assert packet.endswith(bDLE + bETX)
# packet = self.gps_.next()
# assert packet.startswith(bDLE)
# assert packet.endswith(bDLE + bETX)

def test_iter(self):
for packet in self.gps_:
def setup_tsipfile(fname):
if os.path.isfile(fname):
return fname
fname = os.path.join('tests', fname)
assert os.path.isfile(fname)
return fname

KNOWN_DUMPS = (
('thunderbolt.tsip', 211),
('copernicus2.tsip', 2478),
)

def test_gps_next():
for fname, expected_count in KNOWN_DUMPS:
conn = open(setup_tsipfile(fname), 'rb')
gps_ = gps(conn)
count = 0
for packet in iter(gps_.next, None):
assert packet.startswith(bDLE)
assert packet.endswith(bDLE_ETX)
count += 1
assert count == expected_count

def test_gps_iter():
for fname, expected_count in KNOWN_DUMPS:
conn = open(setup_tsipfile(fname), 'rb')
gps_ = gps(conn)
count = 0
for packet in gps_:
assert packet.startswith(bDLE)
assert packet.endswith(bDLE + bETX)
count += 1
assert count == expected_count

# def test_unframe(self):
# for packet in self.gps_:
Expand Down
61 changes: 61 additions & 0 deletions tests/test_tsip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import io
import itertools
import os.path

from nose.tools import *

from tsip import *


@raises(ValueError)
def test_hlgps_read_eof():
# FIXME: that's not really a test, but a statement on weird API behavior on EOF.
p = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, "productname")
raw = frame(stuff(p.pack()))
conn = io.BytesIO(raw + raw)
drv = GPS(conn)
p1 = drv.read()
assert p1 == p
p2 = drv.read()
assert p2 == p
try:
p3 = drv.read()
except ValueError as e:
assert e.args == ("packet does not contain leading DLE+ID and trailing DLE/ETX",), e
raise e


def test_llgps_read_eof():
p = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, "productname")
raw = frame(stuff(p.pack()))
conn = io.BytesIO(raw + raw)
drv = gps(conn)
p1 = drv.read()
assert p1 == raw
p2 = drv.read()
assert p2 == raw
p3 = drv.read()
assert p3 == None


def test_llgps_read_midpacket():
for nlen in range(7): # simple fuzzer
for opt in itertools.product((bDLE, bETX, b"A"), repeat=nlen):
name = b"".join(opt)
pkt = Packet(0x1C, 0x81, 0, 3, 2, 1, 11, 17, 2015, name)
raw = frame(stuff(pkt.pack()))

conn = io.BytesIO(raw + raw)
drv = gps(conn)
assert drv.read() == drv.read() == raw

for off in range(1, len(raw)):
tail = raw[off:]
conn = io.BytesIO(tail + raw)
drv = gps(conn)
p1 = drv.read()
p2 = drv.read()
p3 = drv.read()
# Either half-packet is skipped silently && got RAW and EOF
# OR first damaged packet looks okayish, second is RAW and EOF
assert (p1 == raw and p2 == p3 == None) or (p1 == tail and p2 == raw and p3 is None)
2 changes: 2 additions & 0 deletions tsip/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#
bDLE = DLE.to_bytes(1, 'little')
bETX = ETX.to_bytes(1, 'little')
bDLE_DLE = bDLE + bDLE
bDLE_ETX = bDLE + bETX


# Contants for setting bits
Expand Down
Loading