Module PdmContext.utils.MovingPC

Expand source code
import math

import numpy as np


class RunningCovarianceTimestamps:
    def __init__(self):
        """
        Initializes running sum, sum of squares, mean, and covariance matrix
        for incremental updates.
        """
        import scipy.stats as stats
        self._stats=stats
        self.datacopy=None
        self.data = None
    def init(self,data,timestamps):
        self.data = data
        self.n, self.d = data.shape
        # Sufficient statistics
        self.sum = np.sum(data, axis=0)
        self.sum_sq = np.dot(data.T, data)  # Sum of outer products
        self.mean = self.sum / self.n

        self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
        self.corr = self.covariance_to_correlation(self.cov)
        self.timestamps = timestamps
        self.datacopy=data
        return
    def update(self, data, timestamps):

        """
        Updates the sufficient statistics when a sample is removed and a new one is added.
        Recomputes the mean and covariance exactly as np.cov would.

        """
        # Update the sums


        if self.data is None or self.data.shape[1]!=data.shape[1]:
            self.init(data, timestamps)
            return
        if timestamps[0]==self.timestamps[0] and timestamps[-1]==self.timestamps[-1]:
            return


        assert self.data.shape[1] == data.shape[1], "Data have different shapes"

        positionOld = self.timestamps.index(timestamps[0])
        oldsamples = self.data[:positionOld]

        positionNew = timestamps.index(self.timestamps[-1])+1
        newsamples = data[positionNew:]

        for old_sample in oldsamples:
            self.sum = self.sum - old_sample
            self.sum_sq = self.sum_sq - np.outer(old_sample, old_sample)
        self.datacopy=self.datacopy[positionOld:]
        for new_sample in newsamples:
            self.sum = self.sum + new_sample
            self.sum_sq = self.sum_sq + np.outer(new_sample, new_sample)
        self.datacopy = np.vstack([self.datacopy, newsamples])

        # if abs(np.sum(self.datacopy-data))>0.00000001:
        #     ok='ok'
        self.n = data.shape[0]
        # Update the mean
        self.mean = self.sum / self.n

        # Recompute the covariance (unbiased estimate with ddof=1)
        self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
        self.corr=self.covariance_to_correlation(self.cov)


        self.timestamps = timestamps
        self.data = data

    def covariance_to_correlation(self,C):
        """Convert a covariance matrix C to a correlation matrix R."""
        diag_sqrt = np.sqrt(np.diag(C))  # Square root of diagonal elements
        R = C / np.outer(diag_sqrt, diag_sqrt)  # Element-wise division
        return R

    def coovarianve(self,sum, sum_sq, mean, n):
        return (sum_sq - (np.outer(mean, sum) + np.outer(sum, mean)) + np.outer(mean, mean) * n) / (n - 1)

    # def computer_originalfisherz(self,data, x, y, z):
    #     import scipy.stats as stats
    #
    #     n = data.shape[0]
    #     k = len(z)
    #     sub_corr=None
    #     corr=np.corrcoef(data.T)
    #     if k == 0:
    #         r = np.corrcoef(data[:, [x, y]].T)[0][1]
    #     else:
    #         sub_index = [x, y]
    #         sub_index.extend(z)
    #         sub_corr = np.corrcoef(data[:, sub_index].T)
    #         # inverse matrix
    #         try:
    #             PM = np.linalg.inv(sub_corr)
    #         except np.linalg.LinAlgError:
    #             PM = np.linalg.pinv(sub_corr)
    #         r = -1 * PM[0, 1] / math.sqrt(abs(PM[0, 0] * PM[1, 1]))
    #     cut_at = 0.99999
    #     rold=r
    #     r = min(cut_at, max(-1 * cut_at, r))  # make r between -1 and 1
    #
    #     # Fisher’s z-transform
    #     res = math.sqrt(n - k - 3) * .5 * math.log1p((2 * r) / (1 - r))
    #     p_value = 2 * (1 - stats.norm.cdf(abs(res)))
    #
    #     return corr,sub_corr,r,rold, res, p_value

    def compute_fisherz(self, x, y, z=[]):
        assert self.data is not None, "Moving Covariance Not initialized"

        k = len(z)
        if k == 0:
            r = self.corr[x, y]
        else:
            sub_index = [x, y]
            sub_index.extend(z)
            sub_corr = self.corr[sub_index][:,sub_index]
            try:
                PM = np.linalg.inv(sub_corr)
            except np.linalg.LinAlgError:
                PM = np.linalg.pinv(sub_corr)
            r = -1 * PM[0, 1] / math.sqrt(abs(PM[0, 0] * PM[1, 1]))
        cut_at = 0.99999
        r = min(cut_at, max(-1 * cut_at, r))  # make r between -1 and 1

        # Fisher’s z-transform
        res = math.sqrt(self.n - k - 3) * .5 * math.log1p((2 * r) / (1 - r))
        p_value = 2 * (1 - self._stats.norm.cdf(abs(res)))

        return None, None, p_value



class PC():

    def __init__(self, variant='stable', alpha=0.05,
                 priori_knowledge=None):
        from castle.common import Tensor
        self._Tensor=Tensor

        from castle.common.priori_knowledge import orient_by_priori_knowledge
        self._orient_by_priori_knowledge=orient_by_priori_knowledge

        from itertools import permutations
        self._permutations = permutations
        from itertools import combinations
        self._combinations = combinations

        from copy import deepcopy
        self._deepcopy = deepcopy
        self.variant = variant
        self.alpha = alpha
        self.ci_test = RunningCovarianceTimestamps()
        self.causal_matrix = None
        self.priori_knowledge = priori_knowledge

    def learn(self, data,timestamps, columns=None, **kwargs):
        """
        Based on the implementation of gCastle Library
        """


        self.ci_test.update(data,timestamps)
        if self.ci_test.data is None:
            self.causal_matrix=None
            return
        data = self._Tensor(data, columns=columns)

        skeleton, sep_set = self.find_skeleton(data,
                                          alpha=self.alpha,
                                          ci_test_class=self.ci_test,
                                          variant=self.variant,
                                          priori_knowledge=self.priori_knowledge,
                                          **kwargs)

        self._causal_matrix = self._Tensor(
            self.orient(skeleton, sep_set, self.priori_knowledge).astype(int),
            index=data.columns,
            columns=data.columns
        )
        self.causal_matrix=self._causal_matrix

    def orient(self, skeleton, sep_set, priori_knowledge=None):


        if priori_knowledge is not None:
            skeleton = self._orient_by_priori_knowledge(skeleton, priori_knowledge)

        columns = list(range(skeleton.shape[1]))
        cpdag = self._deepcopy(abs(skeleton))
        # pre-processing
        for ij in sep_set.keys():
            i, j = ij
            all_k = [x for x in columns if x not in ij]
            for k in all_k:
                if cpdag[i, k] + cpdag[k, i] != 0 \
                        and cpdag[k, j] + cpdag[j, k] != 0:
                    if k not in sep_set[ij]:
                        if cpdag[i, k] + cpdag[k, i] == 2:
                            cpdag[k, i] = 0
                        if cpdag[j, k] + cpdag[k, j] == 2:
                            cpdag[k, j] = 0
        while True:
            old_cpdag = self._deepcopy(cpdag)
            pairs = list(self._combinations(columns, 2))
            for ij in pairs:
                i, j = ij
                if cpdag[i, j] * cpdag[j, i] == 1:
                    # rule1
                    for i, j in self._permutations(ij, 2):
                        all_k = [x for x in columns if x not in ij]
                        for k in all_k:
                            if cpdag[k, i] == 1 and cpdag[i, k] == 0 \
                                    and cpdag[k, j] + cpdag[j, k] == 0:
                                cpdag[j, i] = 0
                    # rule2
                    for i, j in self._permutations(ij, 2):
                        all_k = [x for x in columns if x not in ij]
                        for k in all_k:
                            if (cpdag[i, k] == 1 and cpdag[k, i] == 0) \
                                    and (cpdag[k, j] == 1 and cpdag[j, k] == 0):
                                cpdag[j, i] = 0
                    # rule3
                    for i, j in self._permutations(ij, 2):
                        for kl in sep_set.keys():  # k and l are nonadjacent.
                            k, l = kl
                            # if i——k——>j and  i——l——>j
                            if cpdag[i, k] == 1 \
                                    and cpdag[k, i] == 1 \
                                    and cpdag[k, j] == 1 \
                                    and cpdag[j, k] == 0 \
                                    and cpdag[i, l] == 1 \
                                    and cpdag[l, i] == 1 \
                                    and cpdag[l, j] == 1 \
                                    and cpdag[j, l] == 0:
                                cpdag[j, i] = 0
                    # rule4
                    for i, j in self._permutations(ij, 2):
                        for kj in sep_set.keys():  # k and j are nonadjacent.
                            if j not in kj:
                                continue
                            else:
                                kj = list(kj)
                                kj.remove(j)
                                k = kj[0]
                                ls = [x for x in columns if x not in [i, j, k]]
                                for l in ls:
                                    if cpdag[k, l] == 1 \
                                            and cpdag[l, k] == 0 \
                                            and cpdag[i, k] == 1 \
                                            and cpdag[k, i] == 1 \
                                            and cpdag[l, j] == 1 \
                                            and cpdag[j, l] == 0:
                                        cpdag[j, i] = 0
            if np.all(cpdag == old_cpdag):
                break

        return cpdag

    def _loop(self,G, d):

        assert G.shape[0] == G.shape[1]

        pairs = [(x, y) for x, y in self._combinations(set(range(G.shape[0])), 2)]
        less_d = 0
        for i, j in pairs:
            adj_i = set(np.argwhere(G[i] != 0).reshape(-1, ))
            z = adj_i - {j}  # adj(C, i)\{j}
            if len(z) < d:
                less_d += 1
            else:
                break
        if less_d == len(pairs):
            return False
        else:
            return True





    def find_skeleton(self,data, alpha, ci_test_class, variant='original',
                      priori_knowledge=None, base_skeleton=None,
                      p_cores=1, s=None, batch=None):
        ci_test = ci_test_class.compute_fisherz
        n_feature = data.shape[1]
        if base_skeleton is None:
            skeleton = np.ones((n_feature, n_feature)) - np.eye(n_feature)
        else:
            row, col = np.diag_indices_from(base_skeleton)
            base_skeleton[row, col] = 0
            skeleton = base_skeleton
        nodes = set(range(n_feature))

        # update skeleton based on priori knowledge
        for i, j in self._combinations(nodes, 2):
            if priori_knowledge is not None and (
                    priori_knowledge.is_forbidden(i, j)
                    and priori_knowledge.is_forbidden(j, i)):
                skeleton[i, j] = skeleton[j, i] = 0

        sep_set = {}
        d = -1
        while self._loop(skeleton, d):  # until for each adj(C,i)\{j} < l
            d += 1
            if variant == 'stable':
                C = self._deepcopy(skeleton)
            else:
                C = skeleton
            if variant != 'parallel':
                for i, j in self._combinations(nodes, 2):
                    if skeleton[i, j] == 0:
                        continue
                    adj_i = set(np.argwhere(C[i] == 1).reshape(-1, ))
                    z = adj_i - {j}  # adj(C, i)\{j}
                    if len(z) >= d:
                        # |adj(C, i)\{j}| >= l
                        for sub_z in self._combinations(z, d):
                            sub_z = list(sub_z)
                            _,_,p_value, = ci_test(i, j, sub_z)
                            if p_value >= alpha:
                                skeleton[i, j] = skeleton[j, i] = 0
                                sep_set[(i, j)] = sub_z
                                break

        return skeleton, sep_set


class MovingPC():
    def __init__(self):
        self.pc = PC()
        import networkx as nx
        self._nx=nx

    def calculate_with_pc_moving(self,names, data,timestamps):

        sorted_indices = np.argsort(names)
        Xdata = data[:, sorted_indices]
        names = [names[i] for i in sorted_indices]


        self.pc.learn(Xdata,timestamps)
        learned_graph = self._nx.DiGraph(self.pc.causal_matrix)
        # Relabel the nodes
        MAPPING = {k: n for k, n in zip(range(len(names)), names)}
        learned_graph = self._nx.relabel_nodes(learned_graph, MAPPING, copy=True)
        edges =learned_graph.edges
        fedges =[]
        for tup in edges:
            if tup not in fedges:
                fedges.append(tup)
            if (tup[1] ,tup[0]) not in fedges:
                fedges.append((tup[1] ,tup[0]))

Classes

class MovingPC
Expand source code
class MovingPC():
    def __init__(self):
        self.pc = PC()
        import networkx as nx
        self._nx=nx

    def calculate_with_pc_moving(self,names, data,timestamps):

        sorted_indices = np.argsort(names)
        Xdata = data[:, sorted_indices]
        names = [names[i] for i in sorted_indices]


        self.pc.learn(Xdata,timestamps)
        learned_graph = self._nx.DiGraph(self.pc.causal_matrix)
        # Relabel the nodes
        MAPPING = {k: n for k, n in zip(range(len(names)), names)}
        learned_graph = self._nx.relabel_nodes(learned_graph, MAPPING, copy=True)
        edges =learned_graph.edges
        fedges =[]
        for tup in edges:
            if tup not in fedges:
                fedges.append(tup)
            if (tup[1] ,tup[0]) not in fedges:
                fedges.append((tup[1] ,tup[0]))

Methods

def calculate_with_pc_moving(self, names, data, timestamps)
Expand source code
def calculate_with_pc_moving(self,names, data,timestamps):

    sorted_indices = np.argsort(names)
    Xdata = data[:, sorted_indices]
    names = [names[i] for i in sorted_indices]


    self.pc.learn(Xdata,timestamps)
    learned_graph = self._nx.DiGraph(self.pc.causal_matrix)
    # Relabel the nodes
    MAPPING = {k: n for k, n in zip(range(len(names)), names)}
    learned_graph = self._nx.relabel_nodes(learned_graph, MAPPING, copy=True)
    edges =learned_graph.edges
    fedges =[]
    for tup in edges:
        if tup not in fedges:
            fedges.append(tup)
        if (tup[1] ,tup[0]) not in fedges:
            fedges.append((tup[1] ,tup[0]))
class PC (variant='stable', alpha=0.05, priori_knowledge=None)
Expand source code
class PC():

    def __init__(self, variant='stable', alpha=0.05,
                 priori_knowledge=None):
        from castle.common import Tensor
        self._Tensor=Tensor

        from castle.common.priori_knowledge import orient_by_priori_knowledge
        self._orient_by_priori_knowledge=orient_by_priori_knowledge

        from itertools import permutations
        self._permutations = permutations
        from itertools import combinations
        self._combinations = combinations

        from copy import deepcopy
        self._deepcopy = deepcopy
        self.variant = variant
        self.alpha = alpha
        self.ci_test = RunningCovarianceTimestamps()
        self.causal_matrix = None
        self.priori_knowledge = priori_knowledge

    def learn(self, data,timestamps, columns=None, **kwargs):
        """
        Based on the implementation of gCastle Library
        """


        self.ci_test.update(data,timestamps)
        if self.ci_test.data is None:
            self.causal_matrix=None
            return
        data = self._Tensor(data, columns=columns)

        skeleton, sep_set = self.find_skeleton(data,
                                          alpha=self.alpha,
                                          ci_test_class=self.ci_test,
                                          variant=self.variant,
                                          priori_knowledge=self.priori_knowledge,
                                          **kwargs)

        self._causal_matrix = self._Tensor(
            self.orient(skeleton, sep_set, self.priori_knowledge).astype(int),
            index=data.columns,
            columns=data.columns
        )
        self.causal_matrix=self._causal_matrix

    def orient(self, skeleton, sep_set, priori_knowledge=None):


        if priori_knowledge is not None:
            skeleton = self._orient_by_priori_knowledge(skeleton, priori_knowledge)

        columns = list(range(skeleton.shape[1]))
        cpdag = self._deepcopy(abs(skeleton))
        # pre-processing
        for ij in sep_set.keys():
            i, j = ij
            all_k = [x for x in columns if x not in ij]
            for k in all_k:
                if cpdag[i, k] + cpdag[k, i] != 0 \
                        and cpdag[k, j] + cpdag[j, k] != 0:
                    if k not in sep_set[ij]:
                        if cpdag[i, k] + cpdag[k, i] == 2:
                            cpdag[k, i] = 0
                        if cpdag[j, k] + cpdag[k, j] == 2:
                            cpdag[k, j] = 0
        while True:
            old_cpdag = self._deepcopy(cpdag)
            pairs = list(self._combinations(columns, 2))
            for ij in pairs:
                i, j = ij
                if cpdag[i, j] * cpdag[j, i] == 1:
                    # rule1
                    for i, j in self._permutations(ij, 2):
                        all_k = [x for x in columns if x not in ij]
                        for k in all_k:
                            if cpdag[k, i] == 1 and cpdag[i, k] == 0 \
                                    and cpdag[k, j] + cpdag[j, k] == 0:
                                cpdag[j, i] = 0
                    # rule2
                    for i, j in self._permutations(ij, 2):
                        all_k = [x for x in columns if x not in ij]
                        for k in all_k:
                            if (cpdag[i, k] == 1 and cpdag[k, i] == 0) \
                                    and (cpdag[k, j] == 1 and cpdag[j, k] == 0):
                                cpdag[j, i] = 0
                    # rule3
                    for i, j in self._permutations(ij, 2):
                        for kl in sep_set.keys():  # k and l are nonadjacent.
                            k, l = kl
                            # if i——k——>j and  i——l——>j
                            if cpdag[i, k] == 1 \
                                    and cpdag[k, i] == 1 \
                                    and cpdag[k, j] == 1 \
                                    and cpdag[j, k] == 0 \
                                    and cpdag[i, l] == 1 \
                                    and cpdag[l, i] == 1 \
                                    and cpdag[l, j] == 1 \
                                    and cpdag[j, l] == 0:
                                cpdag[j, i] = 0
                    # rule4
                    for i, j in self._permutations(ij, 2):
                        for kj in sep_set.keys():  # k and j are nonadjacent.
                            if j not in kj:
                                continue
                            else:
                                kj = list(kj)
                                kj.remove(j)
                                k = kj[0]
                                ls = [x for x in columns if x not in [i, j, k]]
                                for l in ls:
                                    if cpdag[k, l] == 1 \
                                            and cpdag[l, k] == 0 \
                                            and cpdag[i, k] == 1 \
                                            and cpdag[k, i] == 1 \
                                            and cpdag[l, j] == 1 \
                                            and cpdag[j, l] == 0:
                                        cpdag[j, i] = 0
            if np.all(cpdag == old_cpdag):
                break

        return cpdag

    def _loop(self,G, d):

        assert G.shape[0] == G.shape[1]

        pairs = [(x, y) for x, y in self._combinations(set(range(G.shape[0])), 2)]
        less_d = 0
        for i, j in pairs:
            adj_i = set(np.argwhere(G[i] != 0).reshape(-1, ))
            z = adj_i - {j}  # adj(C, i)\{j}
            if len(z) < d:
                less_d += 1
            else:
                break
        if less_d == len(pairs):
            return False
        else:
            return True





    def find_skeleton(self,data, alpha, ci_test_class, variant='original',
                      priori_knowledge=None, base_skeleton=None,
                      p_cores=1, s=None, batch=None):
        ci_test = ci_test_class.compute_fisherz
        n_feature = data.shape[1]
        if base_skeleton is None:
            skeleton = np.ones((n_feature, n_feature)) - np.eye(n_feature)
        else:
            row, col = np.diag_indices_from(base_skeleton)
            base_skeleton[row, col] = 0
            skeleton = base_skeleton
        nodes = set(range(n_feature))

        # update skeleton based on priori knowledge
        for i, j in self._combinations(nodes, 2):
            if priori_knowledge is not None and (
                    priori_knowledge.is_forbidden(i, j)
                    and priori_knowledge.is_forbidden(j, i)):
                skeleton[i, j] = skeleton[j, i] = 0

        sep_set = {}
        d = -1
        while self._loop(skeleton, d):  # until for each adj(C,i)\{j} < l
            d += 1
            if variant == 'stable':
                C = self._deepcopy(skeleton)
            else:
                C = skeleton
            if variant != 'parallel':
                for i, j in self._combinations(nodes, 2):
                    if skeleton[i, j] == 0:
                        continue
                    adj_i = set(np.argwhere(C[i] == 1).reshape(-1, ))
                    z = adj_i - {j}  # adj(C, i)\{j}
                    if len(z) >= d:
                        # |adj(C, i)\{j}| >= l
                        for sub_z in self._combinations(z, d):
                            sub_z = list(sub_z)
                            _,_,p_value, = ci_test(i, j, sub_z)
                            if p_value >= alpha:
                                skeleton[i, j] = skeleton[j, i] = 0
                                sep_set[(i, j)] = sub_z
                                break

        return skeleton, sep_set

Methods

def find_skeleton(self, data, alpha, ci_test_class, variant='original', priori_knowledge=None, base_skeleton=None, p_cores=1, s=None, batch=None)
Expand source code
def find_skeleton(self,data, alpha, ci_test_class, variant='original',
                  priori_knowledge=None, base_skeleton=None,
                  p_cores=1, s=None, batch=None):
    ci_test = ci_test_class.compute_fisherz
    n_feature = data.shape[1]
    if base_skeleton is None:
        skeleton = np.ones((n_feature, n_feature)) - np.eye(n_feature)
    else:
        row, col = np.diag_indices_from(base_skeleton)
        base_skeleton[row, col] = 0
        skeleton = base_skeleton
    nodes = set(range(n_feature))

    # update skeleton based on priori knowledge
    for i, j in self._combinations(nodes, 2):
        if priori_knowledge is not None and (
                priori_knowledge.is_forbidden(i, j)
                and priori_knowledge.is_forbidden(j, i)):
            skeleton[i, j] = skeleton[j, i] = 0

    sep_set = {}
    d = -1
    while self._loop(skeleton, d):  # until for each adj(C,i)\{j} < l
        d += 1
        if variant == 'stable':
            C = self._deepcopy(skeleton)
        else:
            C = skeleton
        if variant != 'parallel':
            for i, j in self._combinations(nodes, 2):
                if skeleton[i, j] == 0:
                    continue
                adj_i = set(np.argwhere(C[i] == 1).reshape(-1, ))
                z = adj_i - {j}  # adj(C, i)\{j}
                if len(z) >= d:
                    # |adj(C, i)\{j}| >= l
                    for sub_z in self._combinations(z, d):
                        sub_z = list(sub_z)
                        _,_,p_value, = ci_test(i, j, sub_z)
                        if p_value >= alpha:
                            skeleton[i, j] = skeleton[j, i] = 0
                            sep_set[(i, j)] = sub_z
                            break

    return skeleton, sep_set
def learn(self, data, timestamps, columns=None, **kwargs)

Based on the implementation of gCastle Library

Expand source code
def learn(self, data,timestamps, columns=None, **kwargs):
    """
    Based on the implementation of gCastle Library
    """


    self.ci_test.update(data,timestamps)
    if self.ci_test.data is None:
        self.causal_matrix=None
        return
    data = self._Tensor(data, columns=columns)

    skeleton, sep_set = self.find_skeleton(data,
                                      alpha=self.alpha,
                                      ci_test_class=self.ci_test,
                                      variant=self.variant,
                                      priori_knowledge=self.priori_knowledge,
                                      **kwargs)

    self._causal_matrix = self._Tensor(
        self.orient(skeleton, sep_set, self.priori_knowledge).astype(int),
        index=data.columns,
        columns=data.columns
    )
    self.causal_matrix=self._causal_matrix
def orient(self, skeleton, sep_set, priori_knowledge=None)
Expand source code
def orient(self, skeleton, sep_set, priori_knowledge=None):


    if priori_knowledge is not None:
        skeleton = self._orient_by_priori_knowledge(skeleton, priori_knowledge)

    columns = list(range(skeleton.shape[1]))
    cpdag = self._deepcopy(abs(skeleton))
    # pre-processing
    for ij in sep_set.keys():
        i, j = ij
        all_k = [x for x in columns if x not in ij]
        for k in all_k:
            if cpdag[i, k] + cpdag[k, i] != 0 \
                    and cpdag[k, j] + cpdag[j, k] != 0:
                if k not in sep_set[ij]:
                    if cpdag[i, k] + cpdag[k, i] == 2:
                        cpdag[k, i] = 0
                    if cpdag[j, k] + cpdag[k, j] == 2:
                        cpdag[k, j] = 0
    while True:
        old_cpdag = self._deepcopy(cpdag)
        pairs = list(self._combinations(columns, 2))
        for ij in pairs:
            i, j = ij
            if cpdag[i, j] * cpdag[j, i] == 1:
                # rule1
                for i, j in self._permutations(ij, 2):
                    all_k = [x for x in columns if x not in ij]
                    for k in all_k:
                        if cpdag[k, i] == 1 and cpdag[i, k] == 0 \
                                and cpdag[k, j] + cpdag[j, k] == 0:
                            cpdag[j, i] = 0
                # rule2
                for i, j in self._permutations(ij, 2):
                    all_k = [x for x in columns if x not in ij]
                    for k in all_k:
                        if (cpdag[i, k] == 1 and cpdag[k, i] == 0) \
                                and (cpdag[k, j] == 1 and cpdag[j, k] == 0):
                            cpdag[j, i] = 0
                # rule3
                for i, j in self._permutations(ij, 2):
                    for kl in sep_set.keys():  # k and l are nonadjacent.
                        k, l = kl
                        # if i——k——>j and  i——l——>j
                        if cpdag[i, k] == 1 \
                                and cpdag[k, i] == 1 \
                                and cpdag[k, j] == 1 \
                                and cpdag[j, k] == 0 \
                                and cpdag[i, l] == 1 \
                                and cpdag[l, i] == 1 \
                                and cpdag[l, j] == 1 \
                                and cpdag[j, l] == 0:
                            cpdag[j, i] = 0
                # rule4
                for i, j in self._permutations(ij, 2):
                    for kj in sep_set.keys():  # k and j are nonadjacent.
                        if j not in kj:
                            continue
                        else:
                            kj = list(kj)
                            kj.remove(j)
                            k = kj[0]
                            ls = [x for x in columns if x not in [i, j, k]]
                            for l in ls:
                                if cpdag[k, l] == 1 \
                                        and cpdag[l, k] == 0 \
                                        and cpdag[i, k] == 1 \
                                        and cpdag[k, i] == 1 \
                                        and cpdag[l, j] == 1 \
                                        and cpdag[j, l] == 0:
                                    cpdag[j, i] = 0
        if np.all(cpdag == old_cpdag):
            break

    return cpdag
class RunningCovarianceTimestamps

Initializes running sum, sum of squares, mean, and covariance matrix for incremental updates.

Expand source code
class RunningCovarianceTimestamps:
    def __init__(self):
        """
        Initializes running sum, sum of squares, mean, and covariance matrix
        for incremental updates.
        """
        import scipy.stats as stats
        self._stats=stats
        self.datacopy=None
        self.data = None
    def init(self,data,timestamps):
        self.data = data
        self.n, self.d = data.shape
        # Sufficient statistics
        self.sum = np.sum(data, axis=0)
        self.sum_sq = np.dot(data.T, data)  # Sum of outer products
        self.mean = self.sum / self.n

        self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
        self.corr = self.covariance_to_correlation(self.cov)
        self.timestamps = timestamps
        self.datacopy=data
        return
    def update(self, data, timestamps):

        """
        Updates the sufficient statistics when a sample is removed and a new one is added.
        Recomputes the mean and covariance exactly as np.cov would.

        """
        # Update the sums


        if self.data is None or self.data.shape[1]!=data.shape[1]:
            self.init(data, timestamps)
            return
        if timestamps[0]==self.timestamps[0] and timestamps[-1]==self.timestamps[-1]:
            return


        assert self.data.shape[1] == data.shape[1], "Data have different shapes"

        positionOld = self.timestamps.index(timestamps[0])
        oldsamples = self.data[:positionOld]

        positionNew = timestamps.index(self.timestamps[-1])+1
        newsamples = data[positionNew:]

        for old_sample in oldsamples:
            self.sum = self.sum - old_sample
            self.sum_sq = self.sum_sq - np.outer(old_sample, old_sample)
        self.datacopy=self.datacopy[positionOld:]
        for new_sample in newsamples:
            self.sum = self.sum + new_sample
            self.sum_sq = self.sum_sq + np.outer(new_sample, new_sample)
        self.datacopy = np.vstack([self.datacopy, newsamples])

        # if abs(np.sum(self.datacopy-data))>0.00000001:
        #     ok='ok'
        self.n = data.shape[0]
        # Update the mean
        self.mean = self.sum / self.n

        # Recompute the covariance (unbiased estimate with ddof=1)
        self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
        self.corr=self.covariance_to_correlation(self.cov)


        self.timestamps = timestamps
        self.data = data

    def covariance_to_correlation(self,C):
        """Convert a covariance matrix C to a correlation matrix R."""
        diag_sqrt = np.sqrt(np.diag(C))  # Square root of diagonal elements
        R = C / np.outer(diag_sqrt, diag_sqrt)  # Element-wise division
        return R

    def coovarianve(self,sum, sum_sq, mean, n):
        return (sum_sq - (np.outer(mean, sum) + np.outer(sum, mean)) + np.outer(mean, mean) * n) / (n - 1)

    # def computer_originalfisherz(self,data, x, y, z):
    #     import scipy.stats as stats
    #
    #     n = data.shape[0]
    #     k = len(z)
    #     sub_corr=None
    #     corr=np.corrcoef(data.T)
    #     if k == 0:
    #         r = np.corrcoef(data[:, [x, y]].T)[0][1]
    #     else:
    #         sub_index = [x, y]
    #         sub_index.extend(z)
    #         sub_corr = np.corrcoef(data[:, sub_index].T)
    #         # inverse matrix
    #         try:
    #             PM = np.linalg.inv(sub_corr)
    #         except np.linalg.LinAlgError:
    #             PM = np.linalg.pinv(sub_corr)
    #         r = -1 * PM[0, 1] / math.sqrt(abs(PM[0, 0] * PM[1, 1]))
    #     cut_at = 0.99999
    #     rold=r
    #     r = min(cut_at, max(-1 * cut_at, r))  # make r between -1 and 1
    #
    #     # Fisher’s z-transform
    #     res = math.sqrt(n - k - 3) * .5 * math.log1p((2 * r) / (1 - r))
    #     p_value = 2 * (1 - stats.norm.cdf(abs(res)))
    #
    #     return corr,sub_corr,r,rold, res, p_value

    def compute_fisherz(self, x, y, z=[]):
        assert self.data is not None, "Moving Covariance Not initialized"

        k = len(z)
        if k == 0:
            r = self.corr[x, y]
        else:
            sub_index = [x, y]
            sub_index.extend(z)
            sub_corr = self.corr[sub_index][:,sub_index]
            try:
                PM = np.linalg.inv(sub_corr)
            except np.linalg.LinAlgError:
                PM = np.linalg.pinv(sub_corr)
            r = -1 * PM[0, 1] / math.sqrt(abs(PM[0, 0] * PM[1, 1]))
        cut_at = 0.99999
        r = min(cut_at, max(-1 * cut_at, r))  # make r between -1 and 1

        # Fisher’s z-transform
        res = math.sqrt(self.n - k - 3) * .5 * math.log1p((2 * r) / (1 - r))
        p_value = 2 * (1 - self._stats.norm.cdf(abs(res)))

        return None, None, p_value

Methods

def compute_fisherz(self, x, y, z=[])
Expand source code
def compute_fisherz(self, x, y, z=[]):
    assert self.data is not None, "Moving Covariance Not initialized"

    k = len(z)
    if k == 0:
        r = self.corr[x, y]
    else:
        sub_index = [x, y]
        sub_index.extend(z)
        sub_corr = self.corr[sub_index][:,sub_index]
        try:
            PM = np.linalg.inv(sub_corr)
        except np.linalg.LinAlgError:
            PM = np.linalg.pinv(sub_corr)
        r = -1 * PM[0, 1] / math.sqrt(abs(PM[0, 0] * PM[1, 1]))
    cut_at = 0.99999
    r = min(cut_at, max(-1 * cut_at, r))  # make r between -1 and 1

    # Fisher’s z-transform
    res = math.sqrt(self.n - k - 3) * .5 * math.log1p((2 * r) / (1 - r))
    p_value = 2 * (1 - self._stats.norm.cdf(abs(res)))

    return None, None, p_value
def coovarianve(self, sum, sum_sq, mean, n)
Expand source code
def coovarianve(self,sum, sum_sq, mean, n):
    return (sum_sq - (np.outer(mean, sum) + np.outer(sum, mean)) + np.outer(mean, mean) * n) / (n - 1)
def covariance_to_correlation(self, C)

Convert a covariance matrix C to a correlation matrix R.

Expand source code
def covariance_to_correlation(self,C):
    """Convert a covariance matrix C to a correlation matrix R."""
    diag_sqrt = np.sqrt(np.diag(C))  # Square root of diagonal elements
    R = C / np.outer(diag_sqrt, diag_sqrt)  # Element-wise division
    return R
def init(self, data, timestamps)
Expand source code
def init(self,data,timestamps):
    self.data = data
    self.n, self.d = data.shape
    # Sufficient statistics
    self.sum = np.sum(data, axis=0)
    self.sum_sq = np.dot(data.T, data)  # Sum of outer products
    self.mean = self.sum / self.n

    self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
    self.corr = self.covariance_to_correlation(self.cov)
    self.timestamps = timestamps
    self.datacopy=data
    return
def update(self, data, timestamps)

Updates the sufficient statistics when a sample is removed and a new one is added. Recomputes the mean and covariance exactly as np.cov would.

Expand source code
def update(self, data, timestamps):

    """
    Updates the sufficient statistics when a sample is removed and a new one is added.
    Recomputes the mean and covariance exactly as np.cov would.

    """
    # Update the sums


    if self.data is None or self.data.shape[1]!=data.shape[1]:
        self.init(data, timestamps)
        return
    if timestamps[0]==self.timestamps[0] and timestamps[-1]==self.timestamps[-1]:
        return


    assert self.data.shape[1] == data.shape[1], "Data have different shapes"

    positionOld = self.timestamps.index(timestamps[0])
    oldsamples = self.data[:positionOld]

    positionNew = timestamps.index(self.timestamps[-1])+1
    newsamples = data[positionNew:]

    for old_sample in oldsamples:
        self.sum = self.sum - old_sample
        self.sum_sq = self.sum_sq - np.outer(old_sample, old_sample)
    self.datacopy=self.datacopy[positionOld:]
    for new_sample in newsamples:
        self.sum = self.sum + new_sample
        self.sum_sq = self.sum_sq + np.outer(new_sample, new_sample)
    self.datacopy = np.vstack([self.datacopy, newsamples])

    # if abs(np.sum(self.datacopy-data))>0.00000001:
    #     ok='ok'
    self.n = data.shape[0]
    # Update the mean
    self.mean = self.sum / self.n

    # Recompute the covariance (unbiased estimate with ddof=1)
    self.cov = self.coovarianve(self.sum, self.sum_sq, self.mean, self.n)
    self.corr=self.covariance_to_correlation(self.cov)


    self.timestamps = timestamps
    self.data = data