Loading bin/kaxxxxp-viewer +83 −10 Original line number Diff line number Diff line Loading @@ -68,23 +68,33 @@ class SerialReader(serial.threaded.Protocol): def __init__(self): self.remaining_chars = 0 self.read_complete = False self.expect_binary = False self.recv_buf = "" self.lines = [] def expect(self, num_chars): def expect(self, num_chars, binary=False): self.recv_buf = "" self.remaining_chars = num_chars self.read_complete = False self.expect_binary = binary def __call__(self): return self def data_received(self, data): if self.expect_binary: self.lines.extend(list(data)) self.remaining_chars -= len(data) if self.remaining_chars <= 0: self.read_complete = True return try: str_data = data.decode("UTF-8") self.recv_buf += str_data except UnicodeDecodeError: sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) return self.remaining_chars -= len(str_data) Loading @@ -94,8 +104,11 @@ class SerialReader(serial.threaded.Protocol): def get_expected_line(self): if len(self.lines): if self.expect_binary: ret = self.lines else: ret = self.lines[0] self.lines = [] self.lines = list() return ret return None Loading @@ -107,6 +120,22 @@ class SerialReader(serial.threaded.Protocol): return None class KoradStatus: # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. # Or they're the wrong bits altogether. # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/> # don't agree on how to parse the status byte. def __init__(self, status_bytes): status_byte = status_bytes[0] self.over_current_protection_enabled = bool(status_byte & 0x20) self.output_enabled = bool(status_byte & 0x40) self.over_voltage_protection_enabled = bool(status_byte & 0x80) def __repr__(self): return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>" class KA320: def __init__(self, port, channel=1): self.ser = serial.serial_for_url(port, do_not_open=True) Loading @@ -129,21 +158,40 @@ class KA320: self.worker = serial.threaded.ReaderThread(self.ser, self.reader) self.worker.start() def rw(self, cmd, num_chars, exact=False): self.reader.expect(num_chars) def rw(self, cmd, num_chars, exact=False, binary=False): self.reader.expect(num_chars, binary=binary) self.ser.write(cmd) timeout = 10 timeout = 20 while not self.reader.read_complete and not timeout == 0: time.sleep(0.02) timeout -= 1 if exact: return self.reader.get_expected_line() elif self.reader.read_complete: return self.reader.get_line() else: return self.reader.recv_buf # See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands def connect(self): return self.rw(b"*IDN?", 16) # Device ID length is unknown return self.rw(b"*IDN?", 32, exact=False) def get_status(self): return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) def ovp(self, enable=True): enable_bit = int(enable) self.ser.write(f"OVP{enable_bit}".encode()) time.sleep(0.1) # assert self.get_status().over_voltage_protection_enabled == enable def ocp(self, enable=True): enable_bit = int(enable) self.ser.write(f"OCP{enable_bit}".encode()) time.sleep(0.1) # assert self.get_status().over_current_protection_enabled == enable def get_max_voltage(self): return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) Loading Loading @@ -180,7 +228,7 @@ def graceful_exit(sig, frame): terminate_measurement = True def measure_data(port, filename, duration, channel=1): def measure_data(port, filename, duration, channel=1, ocp=False, ovp=False): global terminate_measurement signal.signal(signal.SIGINT, graceful_exit) Loading @@ -195,6 +243,14 @@ def measure_data(port, filename, duration, channel=1): else: output_handle = tempfile.TemporaryFile("w+") if ovp: print("Enabling over-voltage protection") korad.ovp(True) if ocp: print("Enabling over-current protection") korad.ocp(True) if duration: print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") else: Loading Loading @@ -321,6 +377,18 @@ def main(): help="Set PSU serial port", ) parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") parser.add_argument( "--over-current-protection", "--ocp", action="store_true", help="Enable over-current protection", ) parser.add_argument( "--over-voltage-protection", "--ovp", action="store_true", help="Enable over-voltage protection", ) parser.add_argument( "--save", metavar="FILE", type=str, help="Save measurement data in FILE" ) Loading Loading @@ -364,7 +432,12 @@ def main(): log_data = f.read() else: log_data = measure_data( args.port, args.save, args.duration, channel=args.channel args.port, args.save, args.duration, channel=args.channel, ocp=args.over_current_protection, ovp=args.over_voltage_protection, ) data = parse_data(log_data, skip=args.skip, limit=args.limit) Loading Loading
bin/kaxxxxp-viewer +83 −10 Original line number Diff line number Diff line Loading @@ -68,23 +68,33 @@ class SerialReader(serial.threaded.Protocol): def __init__(self): self.remaining_chars = 0 self.read_complete = False self.expect_binary = False self.recv_buf = "" self.lines = [] def expect(self, num_chars): def expect(self, num_chars, binary=False): self.recv_buf = "" self.remaining_chars = num_chars self.read_complete = False self.expect_binary = binary def __call__(self): return self def data_received(self, data): if self.expect_binary: self.lines.extend(list(data)) self.remaining_chars -= len(data) if self.remaining_chars <= 0: self.read_complete = True return try: str_data = data.decode("UTF-8") self.recv_buf += str_data except UnicodeDecodeError: sys.stderr.write("UART output contains gargabe: {data}\n".format(data=data)) return self.remaining_chars -= len(str_data) Loading @@ -94,8 +104,11 @@ class SerialReader(serial.threaded.Protocol): def get_expected_line(self): if len(self.lines): if self.expect_binary: ret = self.lines else: ret = self.lines[0] self.lines = [] self.lines = list() return ret return None Loading @@ -107,6 +120,22 @@ class SerialReader(serial.threaded.Protocol): return None class KoradStatus: # The status command is unreliable. Disable OCP/OVP does not reflect in the OCP/OVP bits. # Or they're the wrong bits altogether. # <https://sigrok.org/wiki/Korad_KAxxxxP_series> and # <https://www.eevblog.com/forum/testgear/korad-ka3005p-io-commands/> # don't agree on how to parse the status byte. def __init__(self, status_bytes): status_byte = status_bytes[0] self.over_current_protection_enabled = bool(status_byte & 0x20) self.output_enabled = bool(status_byte & 0x40) self.over_voltage_protection_enabled = bool(status_byte & 0x80) def __repr__(self): return f"KoradStatus<ovp={self.over_voltage_protection_enabled}, ocp={self.over_current_protection_enabled}, out={self.output_enabled}>" class KA320: def __init__(self, port, channel=1): self.ser = serial.serial_for_url(port, do_not_open=True) Loading @@ -129,21 +158,40 @@ class KA320: self.worker = serial.threaded.ReaderThread(self.ser, self.reader) self.worker.start() def rw(self, cmd, num_chars, exact=False): self.reader.expect(num_chars) def rw(self, cmd, num_chars, exact=False, binary=False): self.reader.expect(num_chars, binary=binary) self.ser.write(cmd) timeout = 10 timeout = 20 while not self.reader.read_complete and not timeout == 0: time.sleep(0.02) timeout -= 1 if exact: return self.reader.get_expected_line() elif self.reader.read_complete: return self.reader.get_line() else: return self.reader.recv_buf # See <https://sigrok.org/wiki/Korad_KAxxxxP_series> for supported commands def connect(self): return self.rw(b"*IDN?", 16) # Device ID length is unknown return self.rw(b"*IDN?", 32, exact=False) def get_status(self): return KoradStatus(self.rw(b"STATUS?", 1, exact=True, binary=True)) def ovp(self, enable=True): enable_bit = int(enable) self.ser.write(f"OVP{enable_bit}".encode()) time.sleep(0.1) # assert self.get_status().over_voltage_protection_enabled == enable def ocp(self, enable=True): enable_bit = int(enable) self.ser.write(f"OCP{enable_bit}".encode()) time.sleep(0.1) # assert self.get_status().over_current_protection_enabled == enable def get_max_voltage(self): return float(self.rw(f"VSET{self.channel:d}?".encode(), 5, True)) Loading Loading @@ -180,7 +228,7 @@ def graceful_exit(sig, frame): terminate_measurement = True def measure_data(port, filename, duration, channel=1): def measure_data(port, filename, duration, channel=1, ocp=False, ovp=False): global terminate_measurement signal.signal(signal.SIGINT, graceful_exit) Loading @@ -195,6 +243,14 @@ def measure_data(port, filename, duration, channel=1): else: output_handle = tempfile.TemporaryFile("w+") if ovp: print("Enabling over-voltage protection") korad.ovp(True) if ocp: print("Enabling over-current protection") korad.ocp(True) if duration: print(f"Logging data for {duration} seconds. Press Ctrl+C to stop early.") else: Loading Loading @@ -321,6 +377,18 @@ def main(): help="Set PSU serial port", ) parser.add_argument("--channel", type=int, default=1, help="Measurement Channel") parser.add_argument( "--over-current-protection", "--ocp", action="store_true", help="Enable over-current protection", ) parser.add_argument( "--over-voltage-protection", "--ovp", action="store_true", help="Enable over-voltage protection", ) parser.add_argument( "--save", metavar="FILE", type=str, help="Save measurement data in FILE" ) Loading Loading @@ -364,7 +432,12 @@ def main(): log_data = f.read() else: log_data = measure_data( args.port, args.save, args.duration, channel=args.channel args.port, args.save, args.duration, channel=args.channel, ocp=args.over_current_protection, ovp=args.over_voltage_protection, ) data = parse_data(log_data, skip=args.skip, limit=args.limit) Loading