Skip to content

network_models

Base model for network simulations.

Source code in navis/models/network_models.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BaseNetworkModel:
    """Base model for network simulations."""

    def __init__(self, edges: pd.DataFrame, source: str, target: str):
        """Initialize model."""
        assert isinstance(edges, pd.DataFrame), f'edges must be pandas DataFrame, got "{type(edges)}"'
        assert source in edges.columns, f'edges DataFrame must contain "{source}" column'
        assert target in edges.columns, f'edges DataFrame must contain "{target}" column'
        self.edges = edges
        self.source = source
        self.target = target

        if (self.edges.dtypes[source] == object) or (self.edges.dtypes[target] == object):
            logger.warning('Looks like sources and/or targets in your edge list '
                           'might be strings? This can massively slow down '
                           'computations. If at all possible try to use numeric IDs.')

    @property
    def n_nodes(self) -> int:
        """Return unique nodes in network."""
        return np.unique(self.edges[[self.source, self.target]].values.flatten()).shape[0]

    @property
    def has_results(self) -> bool:
        """Check if model has results."""
        if isinstance(getattr(self, 'results', None), pd.DataFrame):
            return True
        return False

    def run_parallel(self,
                     n_cores: int = 5,
                     iterations: int = 100,
                     **kwargs: dict) -> None:
        """Run model using parallel processes."""
        # Note that we initialize each process by making "edges" a global argument
        with mp.Pool(processes=n_cores,
                     initializer=self.initializer) as pool:

            # Each process will take about the same amount of time
            # So we want each process to take a single batch of iterations/n_cores runs
            kwargs['iterations'] = int(iterations/n_cores)
            calls = [{**kwargs, **{'position': i}} for i in range(int(n_cores))]

            # Generate processes - note the use of chunksize 1 because we have
            # already chunked the iterations such that each worker process
            # runs exactly once
            p = pool.imap_unordered(self._worker_wrapper, calls, chunksize=1)

            # Wait for processes to complete
            res = list(p)

        # Combine results
        self.results = pd.concat(res, axis=0)
        self.iterations = iterations

    def _worker_wrapper(self, kwargs: dict):
        return self.run(**kwargs)

    def initializer(self):
        pass

    def run(self):
        pass

Check if model has results.

Return unique nodes in network.

Initialize model.

Source code in navis/models/network_models.py
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, edges: pd.DataFrame, source: str, target: str):
    """Initialize model."""
    assert isinstance(edges, pd.DataFrame), f'edges must be pandas DataFrame, got "{type(edges)}"'
    assert source in edges.columns, f'edges DataFrame must contain "{source}" column'
    assert target in edges.columns, f'edges DataFrame must contain "{target}" column'
    self.edges = edges
    self.source = source
    self.target = target

    if (self.edges.dtypes[source] == object) or (self.edges.dtypes[target] == object):
        logger.warning('Looks like sources and/or targets in your edge list '
                       'might be strings? This can massively slow down '
                       'computations. If at all possible try to use numeric IDs.')

Run model using parallel processes.

Source code in navis/models/network_models.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def run_parallel(self,
                 n_cores: int = 5,
                 iterations: int = 100,
                 **kwargs: dict) -> None:
    """Run model using parallel processes."""
    # Note that we initialize each process by making "edges" a global argument
    with mp.Pool(processes=n_cores,
                 initializer=self.initializer) as pool:

        # Each process will take about the same amount of time
        # So we want each process to take a single batch of iterations/n_cores runs
        kwargs['iterations'] = int(iterations/n_cores)
        calls = [{**kwargs, **{'position': i}} for i in range(int(n_cores))]

        # Generate processes - note the use of chunksize 1 because we have
        # already chunked the iterations such that each worker process
        # runs exactly once
        p = pool.imap_unordered(self._worker_wrapper, calls, chunksize=1)

        # Wait for processes to complete
        res = list(p)

    # Combine results
    self.results = pd.concat(res, axis=0)
    self.iterations = iterations

Sample random activation for edges given a weight to probability function.

PARAMETER DESCRIPTION
w
(N, 1) array containing the edge weights.

TYPE: np.ndarray

func
A function returning activation probabilities given a set of weights.

TYPE: callable

RETURNS DESCRIPTION
np.ndarray

True or False values for each edge.

Source code in navis/models/network_models.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def random_activation_function(
    w: np.ndarray,
    func: Callable,
) -> np.ndarray:
    """Sample random activation for edges given a weight to probability function.

    Parameters
    ----------
    w :     np.ndarray
            (N, 1) array containing the edge weights.
    func :  callable
            A function returning activation probabilities given a set of weights.

    Returns
    -------
    np.ndarray
            True or False values for each edge.

    """
    # Generate a random number between 0 and 1 for each connection
    r = np.random.rand(w.shape[0])

    # Normalize weights to probabilities
    w_norm = func(w)

    # Test active
    act = w_norm >= r

    return act