# Consolidated IO Simulation & Analysis Code (v2.4 - Continuous State, Dynamic CA) ## 1. Objective This node consolidates the necessary Python code for running and analyzing the Information Dynamics (IO) v2.4 simulation model. This version incorporates: * Continuous node states (`φ`). * Refined Potentiality (`P_target`) dynamics with contextual adaptation (v3 from [[releases/archive/Information Ontology 1/0115_P_target_Dynamics_v3]]). * Dynamic Causality (CA) weights (`w`) with stability-weighted reinforcement [[0118_IO_Formalism_Refinement]]. * Metric calculation functions [[releases/archive/Information Ontology 1/0129_IO_Metrics_Implementation]]. * Plotting functions. This consolidated code serves as the canonical source referenced and executed (via `tool_code`) in subsequent simulation run nodes (e.g., [[releases/archive/Information Ontology 1/0137_IO_Simulation_Batch1]]), adhering to the workflow defined in [[releases/archive/Information Ontology 1/0132_IO_Simulation_Workflow]]. **This node itself contains no execution output.** ## 2. Consolidated Python Code ```python # Consolidated IO Simulation Code v2.4 # Includes simulation engine, metric calculations, and plotting import numpy as np import matplotlib.pyplot as plt import matplotlib.colors as mcolors import io import base64 from scipy.fft import fft # For PSD calculation # --- Helper Functions --- def normalize_p_target(p_target_array, p_min=1e-9): """Normalizes P_target rows and applies p_min floor.""" if p_target_array.ndim == 1: p_target_array = p_target_array.reshape(1, -1) p_target_array = np.maximum(p_target_array, p_min) row_sums = p_target_array.sum(axis=1) # Prevent division by zero if a row sums to 0 # This might happen if N_states * p_min > 1, which shouldn't occur # Or if input array contains NaNs or Infs row_sums[row_sums <= 0] = 1.0 p_target_array = p_target_array / row_sums[:, np.newaxis] return p_target_array def calculate_k_local(epsilon_state): """Calculates local contrast K for binary states.""" # Assumes epsilon_state is binary {0, 1} for this simple K # Needs generalization for continuous phi # Placeholder: Using gradient for continuous phi as a proxy for contrast grad = np.abs(np.roll(epsilon_state, -1) - np.roll(epsilon_state, 1)) / 2.0 return grad # Return local gradient magnitude as proxy def f_H(h_param, p_leave_array): """Η drive function.""" return np.clip(h_param * p_leave_array, 0.0, 1.0) def f_Theta(theta_val_array, alpha_param): """Θ resistance function.""" return 1.0 / (1.0 + alpha_param * theta_val_array) def f_K(k_local_array, gamma_param): """K gating function (using gradient magnitude proxy).""" # Normalize gradient proxy? Assume max gradient is ~1 for now # Needs refinement based on phi range and dynamics # Using simple linear gating for now: return np.clip(k_local_array * gamma_param, 0.0, 1.0) # Simple linear gating # --- Simulation Function --- def run_io_simulation_v2_4(params): """Runs the IO v2.4 simulation with dynamic CA weights (stability weighted).""" # Extract Parameters N = params['N'] S_max = params['S_max'] # Using S_max for discrete steps dt = params.get('dt', 1.0) # Time step (can be 1 for CA-like updates) h = params['h'] alpha = params['alpha'] gamma = params['gamma'] p_M = params['p_M'] delta_theta_inc = params['delta_theta_inc'] theta_max = params['theta_max'] theta_base = params['theta_base'] lambda_base_adapt = params['lambda_base_adapt'] beta_adapt = params['beta_adapt'] p_min = params['p_min'] w_init = params['w_init'] delta_w_base = params['delta_w_base'] decay_rate = params['decay_rate'] w_max = params.get('w_max', None) # Optional max weight seed = params.get('seed', None) epsilon_states = params.get('epsilon_states', 2) # Number of discrete states if seed is not None: np.random.seed(seed) else: np.random.seed() # Initialization epsilon_state = np.random.randint(0, epsilon_states, size=N) # P_target: size (N, epsilon_states) p_target_state = np.full((N, epsilon_states), 1.0 / epsilon_states) theta_state = np.zeros(N) w_left = np.full(N, w_init) w_right = np.full(N, w_init) # History tracking epsilon_history = np.zeros((S_max, N), dtype=int) avg_theta_history = np.zeros(S_max) avg_ptarget_entropy_history = np.zeros(S_max) avg_w_left_history = np.zeros(S_max) avg_w_right_history = np.zeros(S_max) # --- Simulation Loop --- for S in range(S_max): epsilon_history[S, :] = epsilon_state prev_epsilon = epsilon_state.copy() prev_p_target = p_target_state.copy() prev_theta = theta_state.copy() prev_w_left = w_left.copy() prev_w_right = w_right.copy() # --- Phase 1: Calculate Influences --- # K_local: Use a simple difference metric for discrete states neighbors_left_eps = np.roll(prev_epsilon, 1) neighbors_right_eps = np.roll(prev_epsilon, -1) k_local = 0.5 * ((prev_epsilon != neighbors_left_eps).astype(float) + (prev_epsilon != neighbors_right_eps).astype(float)) # Causal Input (CA) influence = np.zeros((N, epsilon_states)) for k in range(epsilon_states): influence[:, k] = (neighbors_left_eps == k).astype(int) * prev_w_left + \ (neighbors_right_eps == k).astype(int) * prev_w_right total_causal_weight = influence.sum(axis=1) # --- Phase 2: Determine Probability of State Change --- # Use probabilities from P_target for the *current* state current_state_indices = prev_epsilon p_target_current = prev_p_target[np.arange(N), current_state_indices] p_leave = 1.0 - p_target_current prob_H_driven = f_H(h, p_leave) prob_Theta_resisted = f_Theta(prev_theta, alpha) prob_K_gated = f_K(k_local, gamma) # Using 0/1 contrast here P_change = prob_H_driven * prob_Theta_resisted * prob_K_gated # --- Phase 3: Execute State Transition --- r_change = np.random.rand(N) change_mask = r_change < P_change no_change_mask = ~change_mask # Initialize next step states next_epsilon = prev_epsilon.copy() next_theta = prev_theta.copy() next_p_target = prev_p_target.copy() # Determine Target State for Changing Nodes changing_indices = np.where(change_mask)[0] if len(changing_indices) > 0: # Calculate Mimicry bias on P_target p_target_intrinsic = prev_p_target[changing_indices, :] total_causal_changing = total_causal_weight[changing_indices] safe_total_causal = np.where(total_causal_changing == 0, 1.0, total_causal_changing) # Calculate modification factors for each target state k mod_factors = np.zeros_like(p_target_intrinsic) for k in range(epsilon_states): mod_factors[:, k] = (1.0 + p_M * influence[changing_indices, k] / safe_total_causal) p_prime = p_target_intrinsic * mod_factors # Normalize P'_target for changing nodes p_prime_norm = normalize_p_target(p_prime, p_min) # Sample new epsilon state from modified distribution cum_probs = p_prime_norm.cumsum(axis=1) r_target = np.random.rand(len(changing_indices), 1) # Find first index where cumulative probability exceeds random number epsilon_target = (cum_probs < r_target).sum(axis=1) next_epsilon[changing_indices] = epsilon_target # Update Theta for ALL nodes next_theta[change_mask] = theta_base next_theta[no_change_mask] = np.minimum(prev_theta[no_change_mask] + delta_theta_inc, theta_max) # Update P_target using Mechanism v3 for ALL nodes p_context = np.zeros_like(prev_p_target) valid_ca = total_causal_weight > 0 valid_indices = np.where(valid_ca)[0] if len(valid_indices) > 0: safe_total_causal_valid = total_causal_weight[valid_indices] for k in range(epsilon_states): p_context[valid_indices, k] = influence[valid_indices, k] / safe_total_causal_valid invalid_indices = np.where(~valid_ca)[0] if len(invalid_indices) > 0: p_context[invalid_indices, :] = 1.0 / epsilon_states # Default to uniform f_theta_adapt = 1.0 / (1.0 + beta_adapt * prev_theta) lambda_eff = lambda_base_adapt * f_theta_adapt # Apply updates based on change_mask # Reset towards P_context for changing nodes (lambda_reset = 1.0) next_p_target[change_mask, :] = p_context[change_mask, :] # Adapt gradually for stable nodes stable_indices = np.where(no_change_mask)[0] if len(stable_indices) > 0: lambda_eff_stable = lambda_eff[stable_indices] next_p_target[stable_indices, :] = (1.0 - lambda_eff_stable[:, np.newaxis]) * prev_p_target[stable_indices, :] \ + lambda_eff_stable[:, np.newaxis] * p_context[stable_indices, :] # Apply H floor and Final Normalization for ALL nodes next_p_target = normalize_p_target(next_p_target, p_min) # --- Phase 4: Update Causal Network Weights (Stability Weighted) --- # Calculate correlations: +1 if neighbor state S matches target state S+1, -1 otherwise corr_left = np.where(neighbors_left_eps == next_epsilon, 1, -1) corr_right = np.where(neighbors_right_eps == next_epsilon, 1, -1) # Stability-weighted correlation factor (using next_theta) # Ensure theta_max is not zero safe_theta_max = theta_max if theta_max > 0 else 1.0 f_theta_corr = 1.0 + next_theta / safe_theta_max # Calculate weight changes dw_left = delta_w_base * corr_left * f_theta_corr dw_right = delta_w_base * corr_right * f_theta_corr # Assumes f_theta_corr is same for i # Apply decay and update weights w_left = prev_w_left * (1.0 - decay_rate) + dw_left w_right = prev_w_right * (1.0 - decay_rate) + dw_right # Apply bounds (non-negative and optional max) w_left = np.maximum(0, w_left) w_right = np.maximum(0, w_right) if w_max is not None: w_left = np.minimum(w_left, w_max) w_right = np.minimum(w_right, w_max) # --- Assign final states for next step --- epsilon_state = next_epsilon theta_state = next_theta p_target_state = next_p_target # --- Calculate Metrics for History --- avg_theta_history[S] = np.mean(theta_state) # Calculate Shannon entropy of P_target vectors # Add small epsilon to prevent log(0) log_p = np.log2(np.maximum(p_target_state, p_min)) ptarget_entropy = -np.sum(p_target_state * log_p, axis=1) avg_ptarget_entropy_history[S] = np.mean(ptarget_entropy) avg_w_left_history[S] = np.mean(w_left) avg_w_right_history[S] = np.mean(w_right) # --- Prepare Results --- results = { "parameters": params, "epsilon_history": epsilon_history, "avg_theta_history": avg_theta_history, "avg_ptarget_entropy_history": avg_ptarget_entropy_history, "avg_w_left_history": avg_w_left_history, "avg_w_right_history": avg_w_right_history } return results # --- Metric Calculation Functions (from 0129, adapted slightly) --- def calculate_average_gradient_magnitude(phi_history): """Calculates average gradient magnitude for discrete states (proxy for boundary density).""" N_steps, N_nodes = phi_history.shape avg_grad_hist = np.zeros(N_steps) for S in range(N_steps): state = phi_history[S, :] diff_left = (state != np.roll(state, 1)).astype(float) diff_right = (state != np.roll(state, -1)).astype(float) # Count boundaries (where state differs from at least one neighbor) boundaries = np.logical_or(diff_left, diff_right) avg_grad_hist[S] = np.mean(boundaries) # Boundary density return avg_grad_hist def calculate_power_spectral_density(history_data, dt=1.0): """Calculates average power spectral density from history data (e.g., epsilon or theta).""" S_max, N_nodes = history_data.shape if S_max < 2: return None, None # Need at least 2 time steps for FFT psd_list = [] for i in range(N_nodes): time_series = history_data[:, i] yf = fft(time_series) T = dt # Time step N = len(time_series) if N == 0: continue xf = np.fft.fftfreq(N, T)[:N//2] # Positive frequencies psd = (np.abs(yf[0:N//2])**2) / N # Normalized PSD psd_list.append(psd) if not psd_list: return None, None avg_psd = np.mean(np.array(psd_list), axis=0) return xf, avg_psd # --- Plotting Function (Consolidated) --- def plot_results_v2_4(results, title_suffix=""): """Generates plots for v2.4 results and returns base64 string.""" params = results["parameters"] epsilon_history = results["epsilon_history"] avg_theta_history = results["avg_theta_history"] avg_ptarget_entropy_history = results["avg_ptarget_entropy_history"] avg_w_left_history = results["avg_w_left_history"] avg_w_right_history = results["avg_w_right_history"] S_max = params['S_max'] N = params['N'] dt = params.get('dt', 1.0) epsilon_states = params.get('epsilon_states', 2) fig, axs = plt.subplots(4, 1, figsize=(10, 10), sharex=True) # Spacetime plot cmap = plt.cm.get_cmap('viridis', epsilon_states) # Use viridis for multi-state im = axs[0].imshow(epsilon_history.T, cmap=cmap, aspect='auto', interpolation='none', vmin=0, vmax=epsilon_states-1) axs[0].set_title(f'IO v2.4 Simulation {title_suffix}: ε State Evolution') axs[0].set_ylabel('Node Index (i)') # Add colorbar if more than 2 states if epsilon_states > 2: plt.colorbar(im, ax=axs[0], label='State (ε)', ticks=range(epsilon_states)) # Average Theta axs[1].plot(avg_theta_history) axs[1].set_title('Average Stability (Θ_val)') axs[1].set_ylabel('Avg Θ_val') axs[1].grid(True) # Average P_target Entropy axs[2].plot(avg_ptarget_entropy_history) axs[2].set_title('Average Potentiality Entropy (H(P_target))') axs[2].set_ylabel('Avg Entropy (bits)') axs[2].set_ylim(bottom=-0.05) # Ensure 0 is visible axs[2].grid(True) # Average Causal Weights axs[3].plot(avg_w_left_history, label='Avg W(left -> self)') axs[3].plot(avg_w_right_history, label='Avg W(right -> self)') axs[3].set_title('Average Causal Weights (w)') axs[3].set_xlabel('Sequence Step (S)') axs[3].set_ylabel('Avg Weight') axs[3].legend() axs[3].grid(True) plt.tight_layout() buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) plot_base64 = base64.b64encode(buf.read()).decode('utf-8') buf.close() plt.close(fig) return plot_base64 ``` ## 3. Code Structure and Logic * **Consolidation:** Combines simulation logic (v2.4), metric calculations (adapted for discrete states/history), and plotting into one code block for execution via `tool_code`. * **Discrete State Adaptation:** The code now explicitly handles `epsilon_states` > 2, including normalization of `P_target` and sampling from the modified target distribution. The K_local calculation uses a simple difference metric suitable for discrete states. * **Metric Integration:** Basic metrics (average theta, average P_target entropy, average weights) are calculated within the main loop. More complex metrics (PSD, domain length - which is less meaningful for continuous states anyway) are defined as separate functions to be called *after* the simulation on the history data. * **Plotting:** The plotting function is updated to handle potentially multi-state `epsilon_history` and includes the causal weight history. ## 4. Next Steps This consolidated code block is now ready to be used in the next node ([[releases/archive/Information Ontology 1/0137_IO_Simulation_Batch1]]) for executing a batch of simulation runs to explore the parameter space and test the effectiveness of the v2.4 formalism, particularly the dynamic CA mechanism.