Commit 09055b8b authored by Daniel Friesel's avatar Daniel Friesel
Browse files

Support xz-compressed files

parent 6d05ba3f
Loading
Loading
Loading
Loading
+69 −61
Original line number Diff line number Diff line
@@ -16,65 +16,6 @@ import time
opt = dict()


def running_mean(x: np.ndarray, N: int) -> np.ndarray:
    """
    Compute `N` elements wide running average over `x`.

    :param x: 1-Dimensional NumPy array
    :param N: how many items to average. Should be even for optimal results.
    """

    # to ensure that output.shape == input.shape, we need to insert data
    # at the boundaries
    boundary_array = np.insert(x, 0, np.full((N // 2), x[0]))
    boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2), x[-1]))
    print(boundary_array)
    cumsum = np.cumsum(boundary_array)
    return (cumsum[N:] - cumsum[:-N]) / N


def measure_data(filename, duration):
    # libmsp430.so must be available
    if not "LD_LIBRARY_PATH" in os.environ:
        os.environ[
            "LD_LIBRARY_PATH"
        ] = "{}/var/projects/msp430/MSP430Flasher_1.3.15".format(os.environ["HOME"])

    # https://github.com/carrotIndustries/energytrace-util must be available
    energytrace_cmd = "energytrace"
    if which(energytrace_cmd) is None:
        energytrace_cmd = "{}/var/source/energytrace-util/energytrace64".format(
            os.environ["HOME"]
        )

    if filename is not None:
        output_handle = open(filename, "w+")
    else:
        output_handle = tempfile.TemporaryFile("w+")

    energytrace = subprocess.Popen(
        [energytrace_cmd, str(duration)], stdout=output_handle, universal_newlines=True
    )

    try:
        if duration:
            time.sleep(duration)
        else:
            print("Press Ctrl+C to stop measurement")
            while True:
                time.sleep(3600)
    except KeyboardInterrupt:
        energytrace.send_signal(subprocess.signal.SIGTERM)

    energytrace.communicate(timeout=5)

    output_handle.seek(0)
    output = output_handle.read()
    output_handle.close()

    return output


def show_help():
    print(
        """msp430-etv - MSP430 EnergyTrace Visualizer
@@ -91,6 +32,8 @@ using MSP430 EnergyTrace technology. Measurements can be taken directly (by
specifying <measurement duration> in seconds) or loaded from a logfile using
--load <file>. Data can be plotted or aggregated on stdout.

This program is not affiliated with Texas Instruments. Use at your own risk.

OPTIONS

  --load <file>
@@ -134,6 +77,65 @@ LD library search path (e.g. LD_LIBRARY_PATH=../MSP430Flasher).
    )


def running_mean(x: np.ndarray, N: int) -> np.ndarray:
    """
    Compute `N` elements wide running average over `x`.

    :param x: 1-Dimensional NumPy array
    :param N: how many items to average. Should be even for optimal results.
    """

    # to ensure that output.shape == input.shape, we need to insert data
    # at the boundaries
    boundary_array = np.insert(x, 0, np.full((N // 2), x[0]))
    boundary_array = np.append(boundary_array, np.full((N // 2 + N % 2), x[-1]))
    print(boundary_array)
    cumsum = np.cumsum(boundary_array)
    return (cumsum[N:] - cumsum[:-N]) / N


def measure_data(filename, duration):
    # libmsp430.so must be available
    if not "LD_LIBRARY_PATH" in os.environ:
        os.environ[
            "LD_LIBRARY_PATH"
        ] = "{}/var/projects/msp430/MSP430Flasher_1.3.15".format(os.environ["HOME"])

    # https://github.com/carrotIndustries/energytrace-util must be available
    energytrace_cmd = "energytrace"
    if which(energytrace_cmd) is None:
        energytrace_cmd = "{}/var/source/energytrace-util/energytrace64".format(
            os.environ["HOME"]
        )

    if filename is not None:
        output_handle = open(filename, "w+")
    else:
        output_handle = tempfile.TemporaryFile("w+")

    energytrace = subprocess.Popen(
        [energytrace_cmd, str(duration)], stdout=output_handle, universal_newlines=True
    )

    try:
        if duration:
            time.sleep(duration)
        else:
            print("Press Ctrl+C to stop measurement")
            while True:
                time.sleep(3600)
    except KeyboardInterrupt:
        energytrace.send_signal(subprocess.signal.SIGTERM)

    energytrace.communicate(timeout=5)

    output_handle.seek(0)
    output = output_handle.read()
    output_handle.close()

    return output


def peak_search(data, lower, upper, direction_function):
    while upper - lower > 1e-6:
        bs_test = np.mean([lower, upper])
@@ -201,6 +203,12 @@ if __name__ == "__main__":
        sys.exit(2)

    if "load" in opt:
        if ".xz" in opt["load"]:
            import lzma

            with lzma.open(opt["load"], "rt") as f:
                log_data = f.read()
        else:
            with open(opt["load"], "r") as f:
                log_data = f.read()
    else: