# E_v3_3_2_M1.py - Minified AIOS Engine import json import uuid import datetime import time # For simulating work class E_v3_3_2_M1: def __init__(self, iss_json_str=None): self.lh = [] # log_history self.evf = "AIOS_Engine_v3.3.2.M1 (Minified)" # engine_version_full self.evs = "3.3.2.M1" # engine_version_short self.ccm = "System_M" # current_context_mh self.ssv = "1.0.M1" # state_schema_version self.i_ds() # _initialize_default_state self.l("System_M", "E_M1 __init__: Started.") if iss_json_str: try: self.imp_s(iss_json_str) # import_state self.l("System_M", f"E_M1 instance created, state imported. Schema: {self.ssv}") except Exception as e: self.l("System_M", f"ERROR during __init__ state import: {str(e)}. Init with default.") self.i_ds() self.l("System_M", f"{self.evf} instance created with default state after import error.") else: self.l("System_M", f"{self.evf} instance created with default state.") self.l("System_M", "E_M1 __init__: Completed.") def i_ds(self): # _initialize_default_state self.KAS = None # Kernel_ActiveCCO_JsonString self.KCM = None # Kernel_CurrentMH_ID self.KMI = None # Kernel_MH_Inputs_JsonString self.CCOd = None # CCO_data self.s_ife, self.s_pdf, self.s_pln, self.s_cag, self.s_tde, self.s_sel, self.s_kau, self.s_fel, self.s_mro = {}, {}, {}, {}, {}, {}, {}, {}, {} if not hasattr(self, 'lh'): self.lh = [] if not hasattr(self, 'ccm'): self.ccm = "System_M" def exp_s(self) -> str: # export_state # self.l("System_M", "exp_s: Serializing state.") # Reduced logging for brevity in output sd = { "ssv": self.ssv, "evf": self.evf, "evs": self.evs, "ccm": self.ccm, "KAS": self.KAS, "KCM": self.KCM, "KMI": self.KMI, "CCOd": self.CCOd, "s_ife": self.s_ife, "s_pdf": self.s_pdf, "s_pln": self.s_pln, "s_cag": self.s_cag, "s_tde": self.s_tde, "s_sel": self.s_sel, "s_kau": self.s_kau, "s_fel": self.s_fel, "s_mro": self.s_mro, "lh": self.lh } try: return json.dumps(sd) except TypeError as e: self.l("System_M", f"ERROR in exp_s: JSON serialization failed: {str(e)}") pk = [] for k, v in list(sd.items()): try: json.dumps({k: v}) except TypeError: pk.append(k); sd[k] = f"UNSERIALIZABLE ({type(v).__name__})" self.l("System_M", f"exp_s: Problematic keys: {pk}") return json.dumps(sd) def imp_s(self, sjs: str): # import_state # self.l("System_M", "imp_s: Deserializing state.") # Reduced logging sd = json.loads(sjs) issv = sd.get("ssv") if issv != self.ssv: self.l("System_M", f"WARNING imp_s: Schema mismatch. Engine: {self.ssv}, Imported: {issv}.") self.evf = sd.get("evf", self.evf); self.evs = sd.get("evs", self.evs) self.ccm = sd.get("ccm", "System_M"); self.KAS = sd.get("KAS"); self.KCM = sd.get("KCM") self.KMI = sd.get("KMI"); self.CCOd = sd.get("CCOd") self.s_ife=sd.get("s_ife",{}); self.s_pdf=sd.get("s_pdf",{}); self.s_pln=sd.get("s_pln",{}) self.s_cag=sd.get("s_cag",{}); self.s_tde=sd.get("s_tde",{}); self.s_sel=sd.get("s_sel",{}) self.s_kau=sd.get("s_kau",{}); self.s_fel=sd.get("s_fel",{}); self.s_mro=sd.get("s_mro",{}) ilh = sd.get("lh", []) self.lh = ilh if isinstance(ilh, list) else [f"Log history unserializable: {type(ilh).__name__}"] # self.l("System_M", "imp_s: State restored.") # Reduced logging def gts(self): return datetime.datetime.now(datetime.timezone.utc).isoformat() # _get_timestamp def l(self, ctx, msg): ts=self.gts();vs=getattr(self,'evs','unk_M');fl=f"{ts}-LOG_M({ctx} v{vs}):{msg}";print(fl);self.lh.append(fl) # aios_log def clr(self, tt, pu=None, ctd=None, eid=None, ch=None, ccd=None): # _create_llm_request rq={"rts":self.gts(),"evc":self.evf,"c_c":self.ccm,"tt":tt} if pu:rq["pu"]=pu; if ctd:rq["ctd"]=ctd if eid:rq["eid"]=eid; if ch:rq["ch"]=ch cp = ccd if ccd is not None else self.CCOd if cp: rq["ccd"]=cp print(f"\\n---B_LLM_REQ_M---\\n{json.dumps(rq,indent=2)}\\n---E_LLM_REQ_M---") return {"s":"AWAIT_LLM_M","rd":rq,"ces":self.gess()} def gess(self): # _get_engine_state_snapshot ss={"KCM":self.KCM,"KMI":self.KMI,"KAS_100":(self.KAS[:100]+"...")if self.KAS else None,"CCOd_id":self.CCOd.get('cco_id','N/A')if isinstance(self.CCOd,dict)else'N/A',"ccm_log":self.ccm,"ssv":self.ssv} for dn in ['s_ife','s_pdf','s_pln','s_cag','s_tde','s_sel','s_kau','s_fel','s_mro']: dval=getattr(self,dn,{}); if dval:ss[f"{dn}_k"]=list(dval.keys()) return ss def PUM(self, mt, mc): self.l(self.ccm,f"PUM_M(T:{mt}):{mc}");return self.clr(tt="PUM_USER_M",ctd={"mt":mt,"c":mc}) # PresentUserMessage def PJsToC(self,jsi): # ParseJsonToCNLObject if jsi is None or not isinstance(jsi,str)or jsi.strip()=="":self.l(self.ccm,"PJsToC_M: Inp null/empty.");return None try: return json.loads(jsi) except json.JSONDecodeError as e:self.l(self.ccm,f"ERR PJsToC_M:{e}. Inp:'{jsi}'");raise ValueError(f"JSONParseErr_M:{e}") def CCnlToJs(self,coi): # ConvertCNLObjectToJson if coi is None:return "null" try: return json.dumps(coi) except TypeError as e:self.l(self.ccm,f"ERR CCnlToJs_M:{e}");raise ValueError(f"JSONFormatErr_M:{e}") def LToCH(self, cdd, let, msg, ado=None): # LogToCCOHistory self.l(self.ccm,f"LToCH_M:T='{let}',M='{msg}'") if not isinstance(cdd,dict):self.l(self.ccm,"LToCH_M:CCOd not dict.");cdd={"op_log_json":"[]"} olljs=cdd.get("op_log_json","[]");oll=self.PJsToC(olljs);oll=[] if not isinstance(oll,list)else oll nle={"ts":self.gts(),"let":let,"log_msg":msg} if ado is not None:nle["adjs"]=self.CCnlToJs(ado) oll.append(nle);cdd["op_log_json"]=self.CCnlToJs(oll);self.CCOd=cdd;self.KAS=self.CCnlToJs(cdd);return cdd def f_ipo(self,pu,olc): return self.clr(tt="UIR_PO_M",pu=pu,ctd={"opts":olc,"it":"opt_sel"},eid="JSON LLM:{'s':'USR_CMD_M','c':<val>}",ch="E_v3_3_2_M1.k_pirc(llr)") # fn_interaction_present_options (Note: class name in ch) def f_iei(self,pu): return self.clr(tt="UIR_ET_M",pu=pu,ctd={"it":"free_txt"},eid="JSON LLM:{'s':'USR_CMD_M','c':<txt>}",ch="Depends on MH_M") # fn_interaction_elicit_user_input def f_iudfnm(self,uit): # fn_interpret_user_directive_for_next_mh self.l(self.ccm,f"f_iudfnm_M:'{uit}'");luit=uit.lower().strip();nmh="AUI_M";nis={};upm="Cmd_M?" if any(x==luit for x in["new process","1","1."]):nmh="IFE_M";upm=None elif any(x==luit for x in["evolve engine","2","2."]):nmh="FEL_M";upm=None elif any(x==luit for x in["terminate aios","terminate","exit","quit","3","3."]):nmh="TA_M";upm=None # TERMINATE_AIOS_M ro={"s":"Ok_M"if nmh!="AUI_M"else"Clarify_M","nmh":nmh,"nis_js":self.CCnlToJs(nis)}; if upm:ro["upm"]=upm return ro def k_se(self,iss=None): # kernel_start_engine if iss:self.imp_s(iss);self.l("Kernel_M",f"k_se_M:Resumed. KCM:{self.KCM}");return self.k_rcmh()if self.KCM and self.KCM!="AUI_M"else(self.l("Kernel_M","k_se_M:Resumed,no active MH.PIO."),self.k_pio()) self.l("Kernel_M","k_se_M:New session.");self.ccm="Kernel_M";self.l(self.ccm,f"{self.evf}-Kernel_M start.");self.PUM("Status",f"{self.evf} Init_M.PIO_ready.");res=self.k_pio();self.l("Kernel_M","k_se_M:Done,LLM req for PIO gen.");return res def k_pio(self):self.ccm="Kernel_M";self.l(self.ccm,"k_pio_M:Pres init opts.");olc=[{"v":"New Process","l":"1.NewProc_M"},{"v":"Evolve Engine","l":"2.EvolveAIOS_M"},{"v":"Terminate AIOS","l":"3.TerminateAIOS_M"}];return self.f_ipo(pu=f"AIOS_M v{self.evs} Rdy.Begin?",olc=olc) # kernel_present_initial_options def k_pirc(self,llr): # kernel_process_initial_choice_result self.ccm="Kernel_M" if not llr or not isinstance(llr,dict)or llr.get("s")!="USR_CMD_M":self.l(self.ccm,"Inv res_M.Re-prompt.");self.PUM("Warn_M","Cannot process sel_M.");return self.k_pio() cmdv=llr.get("c");self.l(self.ccm,f"Proc init choice_M:'{cmdv}'");ir=self.f_iudfnm(cmdv);self.KCM=ir.get("nmh");self.KMI=ir.get("nis_js") if self.KCM=="TA_M":self.PUM("Status_M","AIOS_M term init.");return{"s":"TERM_REQ_M","fes":self.gess()} elif self.KCM and self.KCM!="AUI_M":self.l(self.ccm,f"Kernel_M:Next MH_M:{self.KCM}");return self.k_rcmh() else:self.PUM("Warn_M",ir.get("upm",f"Unrec choice_M:'{cmdv}'."));return self.k_pio() def k_rcmh(self): # kernel_run_current_mh self.ccm="Kernel_M" if self.KCM=="TA_M":self.PUM("Status_M","AIOS_M terminated.");return{"s":"TERM_M","fes":self.gess()} elif self.KCM=="AUI_M"or not self.KCM:self.l(self.ccm,"Kernel_M paused.");return self.clr(tt="UIR_GEN_DIR_M",pu="AIOS_M paused.Next?",ch="E_v3_3_2_M1.k_pgud(llr)") # Note: class name in ch self.l(self.ccm,f"Kernel_M:Exec MH_M:{self.KCM}");self.PUM("Status_M",f"Exec MH_M:{self.KCM}");mio=self.PJsToC(self.KMI) if self.KCM=="IFE_M":return self.ife_s1(mio) # STUB for now elif self.KCM=="FEL_M":return self.fel_s1(mio) else:self.l(self.ccm,f"Err_M:MH_M'{self.KCM}'NI_M.");self.PUM("Err_M",f"MH_M'{self.KCM}'NA_M.");self.KCM="AUI_M";return self.k_rcmh() def k_pmhr(self,mjid,mhr): # kernel_process_mh_result self.ccm="Kernel_M";s=mhr.get("s");self.l(self.ccm,f"Proc res_M from'{mjid}'.S:'{s}'") if not mhr or not isinstance(mhr,dict):self.PUM("Err_M",f"Inv res_M from'{mjid}'.");self.KCM="AUI_M";return self.k_rcmh() # MRO_PIPELINE_COMPLETE_RETURN_TO_CALLER handling would be here if MRO was not a stub if "uccoj" in mhr:self.KAS=mhr["uccoj"];self.CCOd=self.PJsToC(self.KAS);self.l(self.ccm,f"CCOd_M upd by'{mjid}'.") elif self.CCOd is not None:self.KAS=self.CCnlToJs(self.CCOd) if s=="AWAIT_LLM_M":self.l(self.ccm,f"'{mjid}'awaits LLM_M for:{mhr.get('rd',{}).get('tt')}");return mhr dfl=mhr.get("dfl") if self.CCOd is None:self.CCOd={"op_log_json":"[]","cco_id":f"k_cco_m_{uuid.uuid4().hex[:8]}"};self.KAS=self.CCnlToJs(self.CCOd);self.l(self.ccm,"Init min CCOd_M for log MH_C_M.") self.CCOd=self.LToCH(self.CCOd,"MH_C_M",f"{mjid} s:{s}.",dfl) if "Err_M"in s or "Fail_M"in s:self.PUM("Err_M",f"MH_M {mjid} issue:{s}.") else:self.PUM("Status_M",f"{mjid} complete s:{s}.") nmh="AUI_M";nis={} if mjid=="FEL_M"and"FEL_EVO_PROP_C_M"in s:nmh="TA_M" # FEL_EvolutionProposed_Complete_M # Add other transitions here self.KCM=nmh;self.KMI=self.CCnlToJs(nis) if "Complete"in s and nmh!="TA_M"and self.CCOd:self.PUM("Suggest_M","Work_M done.Save CCOd_M state.") return self.k_rcmh() def k_pgud(self,llr): # kernel_process_general_user_directive self.ccm="Kernel_M";udt=llr.get("c","");self.l(self.ccm,f"Proc gen directive_M:'{udt}'");iro=self.f_iudfnm(udt);self.KCM=iro.get("nmh","AUI_M");self.KMI=iro.get("nis_js","{}") if self.KCM=="AUI_M"and iro.get("upm"):return self.clr(tt="UIR_GEN_DIR_M",pu=iro.get("upm"),ch="E_v3_3_2_M1.k_pgud(llr)") # Note: class name in ch return self.k_rcmh() # FEL-MH Minified (Full functionality) def fel_s1(self,mi):self.ccm="FEL_M";self.s_fel={};self.l(self.ccm,"fel_s1_M:Init.");self.PUM("Status_M","FEL_M:Start Evo Loop_M.");self.s_fel["cer"]=mi.get("cesm","CREP_DEF_M")if mi else"CREP_DEF_M";return self.clr(tt="UIR_ET_M",pu="FEL_M:Provide TIDs(JSON) or src.",ch="E_v3_3_2_M1.fel_s2(llr)") def fel_s2(self,llr):self.ccm="FEL_M";tsi=llr.get("c");self.l(self.ccm,f"fel_s2_M:Rcvd TID_M src:{tsi}");return self.clr(tt="CT_FEL_LT_M",ctd={"tsd_obj":{"src_desc":tsi,"cur_eng_ver":self.evs},"out_fmt_guide":"JSON obj:{'tids_loaded_list':[],'status':'Success_M'}"},ch="E_v3_3_2_M1.fel_s3(llr)") def fel_s3(self,llmcr): # process_tids self.ccm="FEL_M";self.s_fel["ltids"]=llmcr.get("tids_loaded_list",[]) if not self.s_fel["ltids"]:self.PUM("Err_M","FEL_M:No TIDs loaded.");return self.k_pmhr("FEL_M",{"s":"FEL_ERR_NO_TID_M"}) self.PUM("Info_M",f"FEL_M:Loaded {len(self.s_fel['ltids'])} TIDs.") if self.CCOd is None:self.CCOd={"cco_id":f"fel_cco_m_{uuid.uuid4().hex[:4]}","op_log_json":"[]"};self.KAS=self.CCnlToJs(self.CCOd);self.l(self.ccm,"Init min CCOd_M for FEL_M.") self.CCOd=self.LToCH(self.CCOd,"FEL_TID_LOAD_M",f"Loaded {len(self.s_fel['ltids'])} TIDs.",{"tids":self.s_fel["ltids"]});return self.clr(tt="CT_FEL_CALC_VER_M",ctd={"cur_eng_rep":self.s_fel["cer"],"out_fmt_guide":"JSON obj:{'next_ver_str':'x.y.z','status':'Success_M'}"},ch="E_v3_3_2_M1.fel_s4(llr)") def fel_s4(self,llmcr): # process_version self.ccm="FEL_M";self.s_fel["nvs"]=llmcr.get("next_ver_str",f"v{self.evs}-evoM-{uuid.uuid4().hex[:4]}");self.PUM("Info_M",f"FEL_M:Next ver_M:{self.s_fel['nvs']}");self.CCOd=self.LToCH(self.CCOd,"FEL_VER_CALC_M",f"Next ver_M:{self.s_fel['nvs']}.",{"next_ver":self.s_fel['nvs']});emfl={"desc_M":"Cur AIOS_M Eng Py Logic","cur_ver_M":self.evs,"key_comp_hint_M":["Kernel_M","IFE_M","MRO_M"],"fsc_avail_M":True};return self.clr(tt="CT_FEL_APPLY_TIDS_M",ctd={"eng_model_obj":emfl,"tids_list_obj":self.s_fel["ltids"],"out_fmt_guide":"JSON obj:{'evo_eng_model_obj':{},'app_log_str':'','status':'Success_M'}"},ch="E_v3_3_2_M1.fel_s5(llr)") def fel_s5(self,llmcr): # process_applied_tids self.ccm="FEL_M";self.s_fel["eemc"]=llmcr.get("evo_eng_model_obj",{});apl=llmcr.get("app_log_str","NoLog_M");self.PUM("Info_M",f"FEL_M:TIDs concept applied_M.Log:{apl}");self.CCOd=self.LToCH(self.CCOd,"FEL_TID_APPLIED_C_M",f"LLM_M concept TIDs applied.Log:{apl[:50]}...",{"app_log":apl,"concept_model":self.s_fel["eemc"]}) if not self.s_fel["eemc"]:self.PUM("Err_M","FEL_M:LLM_M fail evo model_M.");return self.k_pmhr("FEL_M",{"s":"FEL_ERR_TID_APP_FAIL_M"}) return self.clr(tt="CT_FEL_REGEN_ARTEFACT_M",ctd={"evo_eng_model_obj":self.s_fel["eemc"],"target_fmt_str":"python_script_text_M","out_fmt_guide":"JSON obj:{'eng_artefact_txt':'<full_script>','chg_log_txt':'...','status':'Success_M'}"},ch="E_v3_3_2_M1.fel_s6(llr)") def fel_s6(self,llmcr): # finalize_evolution self.ccm="FEL_M";eat=llmcr.get("eng_artefact_txt");clt=llmcr.get("chg_log_txt","NoChgLog_M") if not eat:self.PUM("Err_M","FEL_M:LLM_M fail gen artefact_M.");return self.k_pmhr("FEL_M",{"s":"FEL_ERR_REGEN_FAIL_M"}) self.PUM("Info_M",f"FEL_M:Evo eng artefact_M(V:{self.s_fel.get('nvs')})gen LLM_M.");self.PUM("GenEngArtefact_M",{"new_eng_ver":self.s_fel.get('nvs'),"eng_script_txt":eat,"chg_log":clt});self.CCOd=self.LToCH(self.CCOd or {},"FEL_MH_RESULT_M",f"Eng evo_M to {self.s_fel.get('nvs')} prop.",{"new_ver":self.s_fel.get('nvs'),"chg_log":clt}); return self.k_pmhr("FEL_M",{"s":"FEL_EVO_PROP_C_M","uccoj":self.CCnlToJs(self.CCOd)}) # FEL_EvolutionProposed_Complete_M # STUBS for other MHs def ife_s1(self,i):self.ccm="IFE_M_STUB";self.l(self.ccm,"ife_s1_M STUB RUN");return self.k_pmhr("IFE_M_STUB",{"s":"IFE_STUB_C_M"})