# Consolidated IO Simulation & Analysis Code (v2.4 - Continuous State, Dynamic CA)
## 1. Objective
This node consolidates the necessary Python code for running and analyzing the Information Dynamics (IO) v2.4 simulation model. This version incorporates:
* Continuous node states (`φ`).
* Refined Potentiality (`P_target`) dynamics with contextual adaptation (v3 from [[releases/archive/Information Ontology 1/0115_P_target_Dynamics_v3]]).
* Dynamic Causality (CA) weights (`w`) with stability-weighted reinforcement [[0118_IO_Formalism_Refinement]].
* Metric calculation functions [[releases/archive/Information Ontology 1/0129_IO_Metrics_Implementation]].
* Plotting functions.
This consolidated code serves as the canonical source referenced and executed (via `tool_code`) in subsequent simulation run nodes (e.g., [[releases/archive/Information Ontology 1/0137_IO_Simulation_Batch1]]), adhering to the workflow defined in [[releases/archive/Information Ontology 1/0132_IO_Simulation_Workflow]]. **This node itself contains no execution output.**
## 2. Consolidated Python Code
```python
# Consolidated IO Simulation Code v2.4
# Includes simulation engine, metric calculations, and plotting
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import io
import base64
from scipy.fft import fft # For PSD calculation
# --- Helper Functions ---
def normalize_p_target(p_target_array, p_min=1e-9):
"""Normalizes P_target rows and applies p_min floor."""
if p_target_array.ndim == 1: p_target_array = p_target_array.reshape(1, -1)
p_target_array = np.maximum(p_target_array, p_min)
row_sums = p_target_array.sum(axis=1)
# Prevent division by zero if a row sums to 0
# This might happen if N_states * p_min > 1, which shouldn't occur
# Or if input array contains NaNs or Infs
row_sums[row_sums <= 0] = 1.0
p_target_array = p_target_array / row_sums[:, np.newaxis]
return p_target_array
def calculate_k_local(epsilon_state):
"""Calculates local contrast K for binary states."""
# Assumes epsilon_state is binary {0, 1} for this simple K
# Needs generalization for continuous phi
# Placeholder: Using gradient for continuous phi as a proxy for contrast
grad = np.abs(np.roll(epsilon_state, -1) - np.roll(epsilon_state, 1)) / 2.0
return grad # Return local gradient magnitude as proxy
def f_H(h_param, p_leave_array):
"""Η drive function."""
return np.clip(h_param * p_leave_array, 0.0, 1.0)
def f_Theta(theta_val_array, alpha_param):
"""Θ resistance function."""
return 1.0 / (1.0 + alpha_param * theta_val_array)
def f_K(k_local_array, gamma_param):
"""K gating function (using gradient magnitude proxy)."""
# Normalize gradient proxy? Assume max gradient is ~1 for now
# Needs refinement based on phi range and dynamics
# Using simple linear gating for now:
return np.clip(k_local_array * gamma_param, 0.0, 1.0) # Simple linear gating
# --- Simulation Function ---
def run_io_simulation_v2_4(params):
"""Runs the IO v2.4 simulation with dynamic CA weights (stability weighted)."""
# Extract Parameters
N = params['N']
S_max = params['S_max'] # Using S_max for discrete steps
dt = params.get('dt', 1.0) # Time step (can be 1 for CA-like updates)
h = params['h']
alpha = params['alpha']
gamma = params['gamma']
p_M = params['p_M']
delta_theta_inc = params['delta_theta_inc']
theta_max = params['theta_max']
theta_base = params['theta_base']
lambda_base_adapt = params['lambda_base_adapt']
beta_adapt = params['beta_adapt']
p_min = params['p_min']
w_init = params['w_init']
delta_w_base = params['delta_w_base']
decay_rate = params['decay_rate']
w_max = params.get('w_max', None) # Optional max weight
seed = params.get('seed', None)
epsilon_states = params.get('epsilon_states', 2) # Number of discrete states
if seed is not None:
np.random.seed(seed)
else:
np.random.seed()
# Initialization
epsilon_state = np.random.randint(0, epsilon_states, size=N)
# P_target: size (N, epsilon_states)
p_target_state = np.full((N, epsilon_states), 1.0 / epsilon_states)
theta_state = np.zeros(N)
w_left = np.full(N, w_init)
w_right = np.full(N, w_init)
# History tracking
epsilon_history = np.zeros((S_max, N), dtype=int)
avg_theta_history = np.zeros(S_max)
avg_ptarget_entropy_history = np.zeros(S_max)
avg_w_left_history = np.zeros(S_max)
avg_w_right_history = np.zeros(S_max)
# --- Simulation Loop ---
for S in range(S_max):
epsilon_history[S, :] = epsilon_state
prev_epsilon = epsilon_state.copy()
prev_p_target = p_target_state.copy()
prev_theta = theta_state.copy()
prev_w_left = w_left.copy()
prev_w_right = w_right.copy()
# --- Phase 1: Calculate Influences ---
# K_local: Use a simple difference metric for discrete states
neighbors_left_eps = np.roll(prev_epsilon, 1)
neighbors_right_eps = np.roll(prev_epsilon, -1)
k_local = 0.5 * ((prev_epsilon != neighbors_left_eps).astype(float) +
(prev_epsilon != neighbors_right_eps).astype(float))
# Causal Input (CA)
influence = np.zeros((N, epsilon_states))
for k in range(epsilon_states):
influence[:, k] = (neighbors_left_eps == k).astype(int) * prev_w_left + \
(neighbors_right_eps == k).astype(int) * prev_w_right
total_causal_weight = influence.sum(axis=1)
# --- Phase 2: Determine Probability of State Change ---
# Use probabilities from P_target for the *current* state
current_state_indices = prev_epsilon
p_target_current = prev_p_target[np.arange(N), current_state_indices]
p_leave = 1.0 - p_target_current
prob_H_driven = f_H(h, p_leave)
prob_Theta_resisted = f_Theta(prev_theta, alpha)
prob_K_gated = f_K(k_local, gamma) # Using 0/1 contrast here
P_change = prob_H_driven * prob_Theta_resisted * prob_K_gated
# --- Phase 3: Execute State Transition ---
r_change = np.random.rand(N)
change_mask = r_change < P_change
no_change_mask = ~change_mask
# Initialize next step states
next_epsilon = prev_epsilon.copy()
next_theta = prev_theta.copy()
next_p_target = prev_p_target.copy()
# Determine Target State for Changing Nodes
changing_indices = np.where(change_mask)[0]
if len(changing_indices) > 0:
# Calculate Mimicry bias on P_target
p_target_intrinsic = prev_p_target[changing_indices, :]
total_causal_changing = total_causal_weight[changing_indices]
safe_total_causal = np.where(total_causal_changing == 0, 1.0, total_causal_changing)
# Calculate modification factors for each target state k
mod_factors = np.zeros_like(p_target_intrinsic)
for k in range(epsilon_states):
mod_factors[:, k] = (1.0 + p_M * influence[changing_indices, k] / safe_total_causal)
p_prime = p_target_intrinsic * mod_factors
# Normalize P'_target for changing nodes
p_prime_norm = normalize_p_target(p_prime, p_min)
# Sample new epsilon state from modified distribution
cum_probs = p_prime_norm.cumsum(axis=1)
r_target = np.random.rand(len(changing_indices), 1)
# Find first index where cumulative probability exceeds random number
epsilon_target = (cum_probs < r_target).sum(axis=1)
next_epsilon[changing_indices] = epsilon_target
# Update Theta for ALL nodes
next_theta[change_mask] = theta_base
next_theta[no_change_mask] = np.minimum(prev_theta[no_change_mask] + delta_theta_inc, theta_max)
# Update P_target using Mechanism v3 for ALL nodes
p_context = np.zeros_like(prev_p_target)
valid_ca = total_causal_weight > 0
valid_indices = np.where(valid_ca)[0]
if len(valid_indices) > 0:
safe_total_causal_valid = total_causal_weight[valid_indices]
for k in range(epsilon_states):
p_context[valid_indices, k] = influence[valid_indices, k] / safe_total_causal_valid
invalid_indices = np.where(~valid_ca)[0]
if len(invalid_indices) > 0:
p_context[invalid_indices, :] = 1.0 / epsilon_states # Default to uniform
f_theta_adapt = 1.0 / (1.0 + beta_adapt * prev_theta)
lambda_eff = lambda_base_adapt * f_theta_adapt
# Apply updates based on change_mask
# Reset towards P_context for changing nodes (lambda_reset = 1.0)
next_p_target[change_mask, :] = p_context[change_mask, :]
# Adapt gradually for stable nodes
stable_indices = np.where(no_change_mask)[0]
if len(stable_indices) > 0:
lambda_eff_stable = lambda_eff[stable_indices]
next_p_target[stable_indices, :] = (1.0 - lambda_eff_stable[:, np.newaxis]) * prev_p_target[stable_indices, :] \
+ lambda_eff_stable[:, np.newaxis] * p_context[stable_indices, :]
# Apply H floor and Final Normalization for ALL nodes
next_p_target = normalize_p_target(next_p_target, p_min)
# --- Phase 4: Update Causal Network Weights (Stability Weighted) ---
# Calculate correlations: +1 if neighbor state S matches target state S+1, -1 otherwise
corr_left = np.where(neighbors_left_eps == next_epsilon, 1, -1)
corr_right = np.where(neighbors_right_eps == next_epsilon, 1, -1)
# Stability-weighted correlation factor (using next_theta)
# Ensure theta_max is not zero
safe_theta_max = theta_max if theta_max > 0 else 1.0
f_theta_corr = 1.0 + next_theta / safe_theta_max
# Calculate weight changes
dw_left = delta_w_base * corr_left * f_theta_corr
dw_right = delta_w_base * corr_right * f_theta_corr # Assumes f_theta_corr is same for i
# Apply decay and update weights
w_left = prev_w_left * (1.0 - decay_rate) + dw_left
w_right = prev_w_right * (1.0 - decay_rate) + dw_right
# Apply bounds (non-negative and optional max)
w_left = np.maximum(0, w_left)
w_right = np.maximum(0, w_right)
if w_max is not None:
w_left = np.minimum(w_left, w_max)
w_right = np.minimum(w_right, w_max)
# --- Assign final states for next step ---
epsilon_state = next_epsilon
theta_state = next_theta
p_target_state = next_p_target
# --- Calculate Metrics for History ---
avg_theta_history[S] = np.mean(theta_state)
# Calculate Shannon entropy of P_target vectors
# Add small epsilon to prevent log(0)
log_p = np.log2(np.maximum(p_target_state, p_min))
ptarget_entropy = -np.sum(p_target_state * log_p, axis=1)
avg_ptarget_entropy_history[S] = np.mean(ptarget_entropy)
avg_w_left_history[S] = np.mean(w_left)
avg_w_right_history[S] = np.mean(w_right)
# --- Prepare Results ---
results = {
"parameters": params,
"epsilon_history": epsilon_history,
"avg_theta_history": avg_theta_history,
"avg_ptarget_entropy_history": avg_ptarget_entropy_history,
"avg_w_left_history": avg_w_left_history,
"avg_w_right_history": avg_w_right_history
}
return results
# --- Metric Calculation Functions (from 0129, adapted slightly) ---
def calculate_average_gradient_magnitude(phi_history):
"""Calculates average gradient magnitude for discrete states (proxy for boundary density)."""
N_steps, N_nodes = phi_history.shape
avg_grad_hist = np.zeros(N_steps)
for S in range(N_steps):
state = phi_history[S, :]
diff_left = (state != np.roll(state, 1)).astype(float)
diff_right = (state != np.roll(state, -1)).astype(float)
# Count boundaries (where state differs from at least one neighbor)
boundaries = np.logical_or(diff_left, diff_right)
avg_grad_hist[S] = np.mean(boundaries) # Boundary density
return avg_grad_hist
def calculate_power_spectral_density(history_data, dt=1.0):
"""Calculates average power spectral density from history data (e.g., epsilon or theta)."""
S_max, N_nodes = history_data.shape
if S_max < 2: return None, None # Need at least 2 time steps for FFT
psd_list = []
for i in range(N_nodes):
time_series = history_data[:, i]
yf = fft(time_series)
T = dt # Time step
N = len(time_series)
if N == 0: continue
xf = np.fft.fftfreq(N, T)[:N//2] # Positive frequencies
psd = (np.abs(yf[0:N//2])**2) / N # Normalized PSD
psd_list.append(psd)
if not psd_list: return None, None
avg_psd = np.mean(np.array(psd_list), axis=0)
return xf, avg_psd
# --- Plotting Function (Consolidated) ---
def plot_results_v2_4(results, title_suffix=""):
"""Generates plots for v2.4 results and returns base64 string."""
params = results["parameters"]
epsilon_history = results["epsilon_history"]
avg_theta_history = results["avg_theta_history"]
avg_ptarget_entropy_history = results["avg_ptarget_entropy_history"]
avg_w_left_history = results["avg_w_left_history"]
avg_w_right_history = results["avg_w_right_history"]
S_max = params['S_max']
N = params['N']
dt = params.get('dt', 1.0)
epsilon_states = params.get('epsilon_states', 2)
fig, axs = plt.subplots(4, 1, figsize=(10, 10), sharex=True)
# Spacetime plot
cmap = plt.cm.get_cmap('viridis', epsilon_states) # Use viridis for multi-state
im = axs[0].imshow(epsilon_history.T, cmap=cmap, aspect='auto', interpolation='none', vmin=0, vmax=epsilon_states-1)
axs[0].set_title(f'IO v2.4 Simulation {title_suffix}: ε State Evolution')
axs[0].set_ylabel('Node Index (i)')
# Add colorbar if more than 2 states
if epsilon_states > 2:
plt.colorbar(im, ax=axs[0], label='State (ε)', ticks=range(epsilon_states))
# Average Theta
axs[1].plot(avg_theta_history)
axs[1].set_title('Average Stability (Θ_val)')
axs[1].set_ylabel('Avg Θ_val')
axs[1].grid(True)
# Average P_target Entropy
axs[2].plot(avg_ptarget_entropy_history)
axs[2].set_title('Average Potentiality Entropy (H(P_target))')
axs[2].set_ylabel('Avg Entropy (bits)')
axs[2].set_ylim(bottom=-0.05) # Ensure 0 is visible
axs[2].grid(True)
# Average Causal Weights
axs[3].plot(avg_w_left_history, label='Avg W(left -> self)')
axs[3].plot(avg_w_right_history, label='Avg W(right -> self)')
axs[3].set_title('Average Causal Weights (w)')
axs[3].set_xlabel('Sequence Step (S)')
axs[3].set_ylabel('Avg Weight')
axs[3].legend()
axs[3].grid(True)
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
plot_base64 = base64.b64encode(buf.read()).decode('utf-8')
buf.close()
plt.close(fig)
return plot_base64
```
## 3. Code Structure and Logic
* **Consolidation:** Combines simulation logic (v2.4), metric calculations (adapted for discrete states/history), and plotting into one code block for execution via `tool_code`.
* **Discrete State Adaptation:** The code now explicitly handles `epsilon_states` > 2, including normalization of `P_target` and sampling from the modified target distribution. The K_local calculation uses a simple difference metric suitable for discrete states.
* **Metric Integration:** Basic metrics (average theta, average P_target entropy, average weights) are calculated within the main loop. More complex metrics (PSD, domain length - which is less meaningful for continuous states anyway) are defined as separate functions to be called *after* the simulation on the history data.
* **Plotting:** The plotting function is updated to handle potentially multi-state `epsilon_history` and includes the causal weight history.
## 4. Next Steps
This consolidated code block is now ready to be used in the next node ([[releases/archive/Information Ontology 1/0137_IO_Simulation_Batch1]]) for executing a batch of simulation runs to explore the parameter space and test the effectiveness of the v2.4 formalism, particularly the dynamic CA mechanism.