Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 20 additions & 31 deletions usblib.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ def shell_usbdevice(fd, device):
}
banner = (
"Variables:\n"
+ "\n".join(["{:>12}: {}".format(k, repr(v)) for k, v in namespace.items()])
+ "\n"
)
+ "\n".join(
"{:>12}: {}".format(k, repr(v)) for k, v in namespace.items()
)
) + "\n"

Comment on lines -174 to +178
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function shell_usbdevice refactored with the following changes:

shell = InteractiveShellEmbed.instance(banner1=banner, user_ns=namespace)
shell()

Expand Down Expand Up @@ -245,7 +247,7 @@ def read_until(self, expected, size=-1):

def contains(self, expected):
with self.lock:
return -1 != self.buf.find(expected)
return self.buf.find(expected) != -1
Comment on lines -248 to +250
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Buffer.contains refactored with the following changes:


def peek(self, size):
return self.buf[:size]
Expand All @@ -269,10 +271,7 @@ def __init__(self, duration):
self.is_infinite = duration is None
self.is_non_blocking = duration == 0
self.duration = duration
if duration is not None:
self.target_time = self.TIME() + duration
else:
self.target_time = None
self.target_time = self.TIME() + duration if duration is not None else None
Comment on lines -272 to +274
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Timeout.__init__ refactored with the following changes:


def expired(self):
"""Return a boolean, telling if the timeout has expired."""
Expand All @@ -286,13 +285,13 @@ def time_left(self):
return None
else:
delta = self.target_time - self.TIME()
if delta > self.duration:
# clock jumped, recalculate
self.target_time = self.TIME() + self.duration
return self.duration
else:
if delta <= self.duration:
return max(0, delta)

# clock jumped, recalculate
self.target_time = self.TIME() + self.duration
return self.duration
Comment on lines -289 to +293
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Timeout.time_left refactored with the following changes:


def restart(self, duration):
"""\
Restart a timeout, only supported if a timeout was already set up
Expand Down Expand Up @@ -463,15 +462,14 @@ def get_endpoints(device):
# --------------------------------

def send_ctrl_cmd(self, request, value=0, data=None, intf=0):
ret = self._device.ctrl_transfer(
return self._device.ctrl_transfer(
Comment on lines -466 to +465
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.send_ctrl_cmd refactored with the following changes:

CP210x_REQTYPE_HOST2DEVICE,
request,
wValue=value,
wIndex=intf,
data_or_wLength=data,
timeout=None,
)
return ret

def recv_ctrl_cmd(self, request, blen, value=0, intf=0):
buf = usb.util.create_buffer(blen)
Expand Down Expand Up @@ -668,17 +666,14 @@ def set_DTR(self, on):
self.send_ctrl_cmd(CP210x_SET_MHS, CP210x_MHS_DTR_OFF, None)

def get_modem_state(self):
buf = self.recv_ctrl_cmd(CP210x_GET_MDMSTS, 1)
return buf
return self.recv_ctrl_cmd(CP210x_GET_MDMSTS, 1)
Comment on lines -671 to +669
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.get_modem_state refactored with the following changes:


def get_comm_status(self):
buf = self.recv_ctrl_cmd(CP210x_GET_COMM_STATUS, 19)
return buf
return self.recv_ctrl_cmd(CP210x_GET_COMM_STATUS, 19)
Comment on lines -675 to +672
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.get_comm_status refactored with the following changes:


def get_CTL(self):
buf = self.recv_ctrl_cmd(CP210x_GET_LINE_CTL, 2)
val = struct.unpack("<H", buf.tobytes())[0]
return val
return struct.unpack("<H", buf.tobytes())[0]
Comment on lines -680 to +676
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.get_CTL refactored with the following changes:


def purgeHWBuffer(self, rx, tx):
# https://github.com/mik3y/usb-serial-for-android/blob/master/usbSerialForAndroid/src/main/java/com/hoho/android/usbserial/driver/Cp21xxSerialDriver.java#L304
Expand Down Expand Up @@ -806,10 +801,7 @@ def read_until(self, expected=b"\n", size=None, timeout=None):
delay = DEFAUL_TIMEOUT
with buf.changed:
buf.changed.wait(delay / 1000.0)
if size > 0:
rlen = size - len(data)
else:
rlen = size
rlen = size - len(data) if size > 0 else size
Comment on lines -809 to +804
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.read_until refactored with the following changes:

chunk = buf.read_until(expected_last, rlen)
data += chunk

Expand Down Expand Up @@ -862,7 +854,7 @@ def read_until_or_none(self, expected=b"\n", size=None, timeout=None):
# check if needle in size limit
if size > 0 and size < len(buf):
data_peek = buf.peek(size)
if -1 == data_peek.find(expected):
if data_peek.find(expected) == -1:
Comment on lines -865 to +857
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.read_until_or_none refactored with the following changes:

return None

# needle should be in limit
Expand Down Expand Up @@ -1101,10 +1093,7 @@ def prepare_usb_cp210x(self, intf=0, baudRate=DEFAULT_BAUDRATE):
return False

ret = self.send_ctrl_cmd(CP210x_SET_MHS, CP210x_MHS_DEFAULT, None)
if ret < 0:
return False

return True
return ret >= 0
Comment on lines -1104 to +1096
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.prepare_usb_cp210x refactored with the following changes:


def open(self, _async=True):
if self._is_open:
Expand All @@ -1126,7 +1115,7 @@ def read_dump_forever(self):
while True:
try:
data = device.read(endp_in.bEndpointAddress, endp_in.wMaxPacketSize)
text = "".join([chr(v) for v in data])
text = "".join(chr(v) for v in data)
Comment on lines -1129 to +1118
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CP210xSerial.read_dump_forever refactored with the following changes:

print(text, end="", flush=True)
except libusb1.USBError:
pass
Expand Down
2 changes: 1 addition & 1 deletion usbtest_rw1.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def dump_test(ser):
def dumper():
while not stop:
data = ser._buf_in.read(100)
text = "".join([chr(v) for v in data])
text = "".join(chr(v) for v in data)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function dump_test.dumper refactored with the following changes:

print(text, end="", flush=True)

t = threading.Thread(target=dumper)
Expand Down
2 changes: 1 addition & 1 deletion usbtest_rw_buf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def dumper():
if not data:
print(" no data")
continue
text = "".join([chr(v) for v in data])
text = "".join(chr(v) for v in data)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function buf_test.dumper refactored with the following changes:

print(" data:", text, end="\n", flush=True)
print("read stopped")

Expand Down