Commit d274e459 authored by Daniel Friesel's avatar Daniel Friesel
Browse files

add changepoint detection and optional load limit

parent beac1d86
Loading
Loading
Loading
Loading
+185 −5
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ OPTIONS

import argparse
import csv
import json
import matplotlib.pyplot as plt
import numpy as np
import os
@@ -24,6 +25,8 @@ import struct
import sys
import xml.etree.ElementTree as ET

from multiprocessing import Pool


def running_mean(x: np.ndarray, N: int) -> np.ndarray:
    """
@@ -41,6 +44,109 @@ def running_mean(x: np.ndarray, N: int) -> np.ndarray:
    return np.convolve(boundary_array, np.ones((N,)) / N, mode="valid")


def downsample(x: np.ndarray, N: int) -> np.ndarray:
    """Downsample `x` by factor `N`."""
    fill = x.shape[0] % N
    if fill:
        x = np.array(list(x) + [x[-1] for i in range(N - fill)])
    return x.reshape(-1, N).mean(axis=1)


class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NpEncoder, self).default(obj)


def PELT_get_changepoints(algo, penalty):
    res = (penalty, algo.predict(pen=penalty))
    return res


class PELT:
    def __init__(self, signal, num_samples=None):
        self.signal = signal
        self.model = "l1"
        self.jump = 1
        self.min_dist = 500

        if num_samples is not None:
            self.ds_factor = len(signal) // num_samples
        else:
            self.ds_factor = 1

        self.jump = self.ds_factor
        # if self.ds_factor > 1:
        #    self.signal = downsample(self.signal, self.ds_factor)
        # print(f"ds from {len(signal)} to {len(self.signal)}")

    def norm_signal(self, signal, scaler=25):
        max_val = max(np.abs(signal))
        normed_signal = np.zeros(shape=len(signal))
        for i, signal_i in enumerate(signal):
            normed_signal[i] = signal_i / max_val
            normed_signal[i] = normed_signal[i] * scaler
        return normed_signal

    def get_changepoints(self):
        # imported here as ruptures is only used for changepoint detection
        import ruptures

        algo = ruptures.Pelt(
            model=self.model, jump=self.jump, min_size=self.min_dist
        ).fit(self.norm_signal(self.signal))
        queue = list()
        for i in range(0, 100):
            queue.append((algo, i))
        with Pool() as pool:
            changepoints = pool.starmap(PELT_get_changepoints, queue)
        changepoints_by_penalty = dict()
        for res in changepoints:
            changepoints_by_penalty[res[0]] = res[1]
        num_changepoints = list()
        for i in range(0, 100):
            num_changepoints.append(len(changepoints_by_penalty[i]))

        # Find plateau
        start_index = -1
        end_index = -1
        longest_start = -1
        longest_end = -1
        prev_val = -1
        for i, num_bkpts in enumerate(num_changepoints):
            if num_bkpts != prev_val:
                end_index = i - 1
                if end_index - start_index > longest_end - longest_start:
                    # currently found sequence is the longest found yet
                    longest_start = start_index
                    longest_end = end_index
                start_index = i
            if i == len(num_changepoints) - 1:
                # end sequence with last value
                end_index = i
                # # since it is not guaranteed that this is the end of the plateau, assume the mid
                # # of the plateau was hit.
                # size = end_index - start_index
                # end_index = end_index + size
                # However this is not the clean solution. Better if search interval is widened
                # with range_min and range_max
                if end_index - start_index > longest_end - longest_start:
                    # last found sequence is the longest found yet
                    longest_start = start_index
                    longest_end = end_index
                start_index = i
            prev_val = num_bkpts
        middle_of_plateau = longest_start + (longest_start - longest_start) // 2
        changepoints = np.array(changepoints_by_penalty[middle_of_plateau])
        return changepoints


class DLogChannel:
    def __init__(self, desc_tuple):
        self.slot = desc_tuple[0]
@@ -53,7 +159,8 @@ class DLogChannel:


class DLog:
    def __init__(self, filename):
    def __init__(self, filename, limit=None):
        self.limit_duration = limit
        self.load_dlog(filename)

    def load_dlog(self, filename):
@@ -99,6 +206,19 @@ class DLog:
            0, self.observed_duration, num=int(len(raw_data) / (4 * num_channels))
        )

        if (
            self.limit_duration is not None
            and self.observed_duration > self.limit_duration
        ):
            stop_offset = len(self.timestamps) - 1
            for i, ts in enumerate(self.timestamps):
                if ts > self.limit_duration:
                    stop_offset = i
                    break
            self.timestamps = self.timestamps[:stop_offset]
            self.observed_duration = self.timestamps[-1]
            raw_data = raw_data[: stop_offset * 4 * num_channels]

        self.data = np.ndarray(
            shape=(num_channels, int(len(raw_data) / (4 * num_channels))),
            dtype=np.float32,
@@ -149,17 +269,40 @@ class DLog:
        return True


def detect_changepoints(dlog, num_samples):
    ret_slots = list()
    for slot in dlog.slots:
        ret_slot = dict()
        for unit in slot:
            print(f"changepoint detection for {slot} {unit}")
            pelt = PELT(running_mean(slot[unit].data, 10), num_samples=num_samples)
            changepoints = pelt.get_changepoints()
            prev = 0
            ret_slot[unit] = list()
            for cp in changepoints:
                cp = cp - 1
                ret_slot[unit].append(
                    {
                        "interval": [dlog.timestamps[prev], dlog.timestamps[cp]],
                        "mean": np.mean(slot[unit].data[prev:cp]),
                    }
                )
                prev = cp
        ret_slots.append(ret_slot)
    return ret_slots


def print_stats(dlog):
    if dlog.observed_duration_equals_expectation():
        print(
            "Measurement duration: {:d} seconds at {:f} µs per sample".format(
                dlog.planned_duration, dlog.interval * 1000000
                dlog.planned_duration, dlog.interval * 1_000_000
            )
        )
    else:
        print(
            "Measurement duration: {:f} of {:d} seconds at {:f} µs per sample".format(
                dlog.observed_duration, dlog.planned_duration, dlog.interval * 1000000
                dlog.observed_duration, dlog.planned_duration, dlog.interval * 1_000_000
            )
        )

@@ -303,12 +446,40 @@ def export_csv(dlog, filename):
            writer.writerow([dlog.timestamps[row]] + list(dlog.data[:, row]))


def export_json(dlog, filename, extra_data=dict()):
    json_channels = list()
    for channel in dlog.channels:
        json_channels.append({"smu": channel.smu, "unit": channel.unit})
    json_data = {"channels": json_channels}
    json_data.update(extra_data)
    with open(filename, "w") as f:
        json.dump(json_data, f, cls=NpEncoder)


def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__
    )
    parser.add_argument(
        "--csv-export", type=str, help="Export measurements to CSV file"
        "--csv-export",
        metavar="FILENAME",
        type=str,
        help="Export measurements to CSV file",
    )
    parser.add_argument(
        "--json-export",
        metavar="FILENAME",
        type=str,
        help="Export analysis results (e.g. changepoints) to JSON file",
    )
    parser.add_argument(
        "--limit",
        type=float,
        metavar="N",
        help="Limit analysis to the first N seconds of data",
    )
    parser.add_argument(
        "--pelt", metavar="JUMP", type=int, help="Perform changepoint detection"
    )
    parser.add_argument(
        "--plot",
@@ -324,7 +495,7 @@ def main():

    args = parser.parse_args()

    dlog = DLog(args.dlog_file)
    dlog = DLog(args.dlog_file, args.limit)

    if args.stat:
        print_stats(dlog)
@@ -332,6 +503,15 @@ def main():
    if args.csv_export:
        export_csv(dlog, args.csv_export)

    if args.pelt:
        changepoints = detect_changepoints(dlog, num_samples=args.pelt)

    if args.json_export:
        extra_data = dict()
        if args.pelt:
            extra_data["changepoints"] = changepoints
        export_json(dlog, args.json_export, extra_data)

    if args.plot:
        if args.plot == "P" and dlog.all_data_slots_have_power():
            show_power_plot(dlog)