diff --git a/plt.php b/plt.php index 421d39b..9fa8b79 100755 --- a/plt.php +++ b/plt.php @@ -57,131 +57,289 @@ * SYS_PRD_BND.PyPi (LibName, AliasName) * DynamicTables (LastUpdated, [PrimaryKeyColumns], ...) */ +/** + * Processes all active tables, updating rows based on trigger conditions. + * This function scans each active table, executes dynamic trigger code, + * and handles errors accordingly. + */ function processAllTheActiveTables() { - echo "Scanning all the tables that need updating...\n"; - foreach(sql("SELECT Name, onUpdate_phpCode, onUpdate_pyCode, LastUpdated FROM SYS_PRD_BND.Tables") as $activeTable) { - extract($activeTable); - echo "Found Table : ".greenText($Name)."\n"; - echo "Scanning all the rows in table $Name that need to be ran through trigger code:\n"; - // @todo: check if the table has a column called : LastUpdated. If try to auto-create it. (ALTER TABLE ... LastUpdated DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP) - foreach(sql("SELECT * FROM $Name WHERE LastUpdated > '$LastUpdated'") as $unprocessedRow) { - echo "Found row\n".json_encode($unprocessedRow)."\n"; - $functionName = "handleNew".str_replace(".","__",$activeTable["Name"])."Row"; + echo "Scanning all the tables that need updating...\n"; + $activeTables = sql("SELECT Name, onUpdate_phpCode, onUpdate_pyCode, LastUpdated FROM SYS_PRD_BND.Tables"); - echo "Checking if function ".greenText($functionName)." exists at code-level (sys).\n"; - if (function_exists($functionName)) { - echo "Calling function ".greenText("$functionName(...)")."\n"; - if ($functionName($unprocessedRow, $error) === false) { - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '".str_replace("'",'"',$error)."', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - sendTelegramMessage("*ERROR* on trigger function for table {$activeTable["Name"]} : ".$error,"DatabaseGroup"); - } else { - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - } - - } - /* - * PHP CODE - */ - echo "Checking if function ".greenText($functionName)." exists in PHP at database-level (app).\n"; - if (!is_null($onUpdate_phpCode) && !empty($onUpdate_phpCode)) { - echo "Creating function ".greenText("$functionName(...)")." context\n"; - // 1. Add the PHP START TAG - $code = "<"."?"."php \n"; - // 2. Add the CONSTANTS - foreach(sql("SELECT Name, Type, Value FROM SYS_PRD_BND.Constants") as $const) - $code .= "define(\"{$const["Name"]}\",".($const["Type"]!="String"?$const["Value"]:'"'.$const["Value"].'"').");\n"; - // 3. Add the SUPPORT FUNCTIONS - foreach(sql("SELECT Name, InputArgs_json, PhpCode FROM SYS_PRD_BND.SupportFunctions WHERE PhpCode IS NOT NULL") as $f) - $code .= "function {$f["Name"]} (".implode(", ",array_map(fn($s)=>"\$$s",array_keys(json_decode($f["InputArgs_json"],1)))).") {\n".$f["PhpCode"]."\n}\n"; - - $code .= "require_once '".__DIR__."/sys.php'; \n"; - $code .= "function $functionName (&\$data, &\$error) {\n"; - $code .= $onUpdate_phpCode; - $code .= "\n}\n"; - $code .= "\$data = ".var_export($unprocessedRow,1).";\n"; - $code .= "\$initial_data = json_encode(".var_export($unprocessedRow,1).");\n"; - $code .= "$functionName(\$data,\$error);"; - $code .= "\necho json_encode(\$data);\n"; - file_put_contents(__DIR__."/test_code.php", $code); - echo "Running function ".greenText("$functionName(...)")." in sandbox environment\n"; - if (runProcess("/usr/bin/php",$code,$stdout,$error) != 0) { - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '".str_replace("'",'"',$error)."', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - sendTelegramMessage("*ERROR* on trigger function for table {$activeTable["Name"]} : ".$error,"DatabaseGroup"); - } else { - // Update database row if needed - $newRowValue = json_decode($stdout,1); - echo "\n".redText("DEBUG new-row-value-json: ").json_encode($newRowValue)."\n"; - echo "\n".redText("DEBUG unprocessedRow-json: ").json_encode($unprocessedRow)."\n"; - if (json_encode($newRowValue) != json_encode($unprocessedRow)) { - $pkColsName = getTblPrimaryKeyColName($activeTable["Name"]); - $pkColsValues = array_map(fn($cName)=>$unprocessedRow[$cName],$pkColsName); - $sql_instruction = ""; - $sql_instruction .= "UPDATE "; - $sql_instruction .= $activeTable["Name"]; - $sql_instruction .= " SET ".implode(",",array_map(fn($k,$v)=>"$k=".(is_numeric($v)?$v:"'".str_replace("'","''",$v)."'"),array_keys($newRowValue),array_values($newRowValue))); - $sql_instruction .= " WHERE ".implode(" AND ",array_map(fn($k,$v)=>"$k = ".(is_numeric($newRowValue[$k])?$newRowValue[$k]:'"'.$newRowValue[$k].'"'),$pkColsName,$pkColsValues)); - echo "\n".redText("DEBUG sql-instruction: ").$sql_instruction."\n"; - sql($sql_instruction); - } - // Update the status of the operation - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - } - - } - /* - * Python CODE - */ - echo "Checking if function ".greenText($functionName)." exists in Python at database-level (app).\n"; - if (!is_null($onUpdate_pyCode) && !empty($onUpdate_pyCode)) { - echo "Creating function ".greenText("$functionName(...)")." context\n"; - $code = ""; - // 1. Insert the import to all the external libraries - foreach(sql("SELECT LibName, AliasName FROM SYS_PRD_BND.PyPi") as $module) - $code .= "import ".$module["LibName"] .(is_null($module["AliasName"])&&!empty($module["AliasName"])?" as ".$module["AliasName"]:"")."\n\n"; - - // 2. Define the handler function - $code .= "def $functionName (data, error) :\n"; - $code .= " ".implode("\n ",explode("\n",$onUpdate_pyCode))."\n"; - - // 3. Pass the data - $code .= "data = ".json_encode($unprocessedRow)."\n"; - $code .= "error = { \"status\": \"ok\", \"message\": \"\"}\n"; - - // 4. Call the handler function - $code .= "$functionName(data, error)\n"; - - // 5. Data changing code - $code .= "\nprint(json.dumps(data))\n"; - - file_put_contents(__DIR__."/test_code.py",$code); - echo "Running function ".greenText("$functionName(...)")." in sandbox python environment\n"; - if (runProcess("/usr/bin/python3",$code,$stdout,$error) != 0) { - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '".str_replace("'",'"',$error)."', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - sendTelegramMessage("*ERROR* on trigger function for table {$activeTable["Name"]} : ".$error,"DatabaseGroup"); - break; - } else { - // Update database row if needed - $newRowValue = json_decode($stdout,1); - echo "\n".redText("DEBUG new-row-value-json: ").json_encode($newRowValue)."\n"; - echo "\n".redText("DEBUG unprocessedRow-json: ").json_encode($unprocessedRow)."\n"; - if (json_encode($newRowValue) != json_encode($unprocessedRow)) { - $pkColsName = getTblPrimaryKeyColName($activeTable["Name"]); - $pkColsValues = array_map(fn($cName)=>$unprocessedRow[$cName],$pkColsName); - $sql_instruction = ""; - $sql_instruction .= "UPDATE "; - $sql_instruction .= $activeTable["Name"]; - $sql_instruction .= " SET ".implode(",",array_map(fn($k,$v)=>"$k=".(is_numeric($v)?$v:"\"$v\""),array_keys($newRowValue),array_values($newRowValue))); - $sql_instruction .= " WHERE ".implode(" AND ",array_map(fn($k,$v)=>"$k = ".(is_numeric($newRowValue[$k])?$newRowValue[$k]:'"'.$newRowValue[$k].'"'),$pkColsName,$pkColsValues)); - echo "\n".redText("DEBUG sql-instruction: ").$sql_instruction."\n"; - sql($sql_instruction); - } - // Update the status of the operation - sql("UPDATE SYS_PRD_BND.Tables SET LastError = '', LastUpdated = NOW() WHERE Name = '{$activeTable["Name"]}'"); - } - - } + foreach ($activeTables as $activeTable) { + processActiveTable($activeTable); + } +} + +/** + * Processes a single active table. + */ +function processActiveTable($activeTable) { + extract($activeTable); + echo "Found Table: " . greenText($Name) . "\n"; + echo "Scanning rows in table $Name that require trigger execution:\n"; + + ensureLastUpdatedColumnExists($Name); + + $rowsToProcess = sql("SELECT * FROM $Name WHERE LastUpdated > '$LastUpdated'"); + + foreach ($rowsToProcess as $unprocessedRow) { + processTableRow($activeTable, $unprocessedRow); + } +} + +/** + * Ensures the 'LastUpdated' column exists, adding it if missing. + */ +function ensureLastUpdatedColumnExists($_tableName) { + echo "Ensuring LastUpdated column is created if not exists..\n"; + list($dbName,$tableName) = (strpos($_tableName,".") ? explode(".",$_tableName) : ["",$_tableName]); + sql((empty($dbName)?"":"USE $dbName; ")."ALTER TABLE `$tableName` ADD COLUMN IF NOT EXISTS LastUpdated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"); +} + +/** + * Processes an individual row for triggers. + */ +function processTableRow($activeTable, $unprocessedRow) { + $functionName = "handleNew" . str_replace(".", "__", $activeTable["Name"]) . "Row"; + + if (function_exists($functionName)) { + runSystemLevelTrigger($functionName, $activeTable, $unprocessedRow); + } + + if (!empty($activeTable['onUpdate_phpCode'])) { + runPHPCodeTrigger($functionName, $activeTable, $unprocessedRow); + } + + if (!empty($activeTable['onUpdate_pyCode'])) { + runPythonCodeTrigger($functionName, $activeTable, $unprocessedRow); + } +} + +/** + * Executes system-level PHP trigger function. + */ +function runSystemLevelTrigger($functionName, $activeTable, &$row) { + echo "Calling system-level function " . greenText($functionName) . "\n"; + $error = ''; + + if ($functionName($row, $error) === false) { + updateTriggerError($activeTable['Name'], $error); + } else { + clearTriggerError($activeTable['Name']); + } +} + +/** + * Executes database-level dynamic PHP trigger code. + */ +function runPHPCodeTrigger($functionName, $activeTable, $row) { + echo "Running PHP Code trigger...\n"; + $phpCode = generatePHPTriggerCode($functionName, $activeTable['onUpdate_phpCode'], $row); + $result = runSandboxedPHP($phpCode,$stdout,$stderr); + + handleTriggerExecutionResult($result, $stdout,$stderr,$row, $activeTable); +} + +/** + * Executes database-level dynamic Python trigger code. + */ +function runPythonCodeTrigger($functionName, $activeTable, $row) { + $pyCode = generatePythonTriggerCode($functionName, $activeTable['onUpdate_pyCode'], $row); + $result = runSandboxedPython($pyCode,$stdout,$stderr); + + handleTriggerExecutionResult($result, $stdout, $stderr, $row, $activeTable); +} + +/** + * Helper functions implementations. + */ + +function updateDatabaseRow($tableName, $originalRow, $newRowValue) { + $pkColsName = getTblPrimaryKeyColName($tableName); + $pkColsValues = array_map(fn($cName) => $originalRow[$cName], $pkColsName); + + $setStatements = []; + foreach ($newRowValue as $k => $v) { + $value = is_numeric($v) ? $v : "'" . str_replace("'", "''", $v) . "'"; + $setStatements[] = "$k = $value"; + } + + $whereStatements = []; + foreach ($pkColsName as $k) { + $val = is_numeric($originalRow[$k]) ? $originalRow[$k] : "'" . $originalRow[$k] . "'"; + $whereStatements[] = "$k = $val"; + } + + $sql_instruction = "UPDATE $tableName SET " . implode(", ", $setStatements) . " WHERE " . implode(" AND ", $whereStatements); + sql($sql_instruction); +} + +function updateTriggerError($tableName, $error) { + sql("UPDATE SYS_PRD_BND.Tables SET LastError = '" . str_replace("'", '"', $error) . "', LastUpdated = NOW() WHERE Name = '$tableName'"); + sendTelegramMessage("*ERROR* on trigger function for table $tableName : " . $error, "DatabaseGroup"); +} + +function clearTriggerError($tableName) { + sql("UPDATE SYS_PRD_BND.Tables SET LastError = '', LastUpdated = NOW() WHERE Name = '$tableName'"); +} + +function getConstantsDefinition() { + $constants = ""; + foreach (sql("SELECT Name, Type, Value FROM SYS_PRD_BND.Constants") as $const) { + $value = $const["Type"] != "String" ? $const["Value"] : '"' . $const["Value"] . '"'; + $constants .= "define(\"{$const['Name']}\", $value);\n"; + } + return $constants; +} + +function getSupportFunctionsDefinition() { + $supportFunctions = ""; + foreach (sql("SELECT Name, InputArgs_json, PhpCode FROM SYS_PRD_BND.SupportFunctions WHERE PhpCode IS NOT NULL") as $f) { + $args = implode(", ", array_map(fn($s) => "\$$s", array_keys(json_decode($f["InputArgs_json"], true)))); + $supportFunctions .= "function {$f['Name']}($args) {\n{$f['PhpCode']}\n}\n"; + } + return $supportFunctions; +} + +function getPythonImports() { + $imports = ""; + foreach (sql("SELECT LibName, AliasName FROM SYS_PRD_BND.PyPi") as $module) { + $alias = !empty($module["AliasName"]) ? " as {$module['AliasName']}" : ""; + $imports .= "import {$module['LibName']}{$alias}\n"; + } + return $imports; +} +/** + * Generates PHP code for sandbox execution based on provided dynamic PHP code and row data. + * + * @param string $functionName The name of the PHP function to generate. + * @param string $onUpdate_phpCode The PHP code to embed within the generated function. + * @param array $row The row data to pass to the generated function. + * + * @return string The complete PHP code ready for sandbox execution. + */ +function generatePHPTriggerCode($functionName, $onUpdate_phpCode, $row) { + // Prepare the constants definition from the database + $constants = getConstantsDefinition(); + + // Prepare the support functions definitions from the database + $supportFunctions = getSupportFunctionsDefinition(); + + // Export the row data into PHP code format + $rowExport = var_export($row, true); + + // Assemble the complete PHP script + $phpCode = << ['pipe', 'w'], // stdout + 2 => ['pipe', 'w'], // stderr + ]; + + // Execute the PHP code using proc_open + $process = proc_open($command, $descriptorspec, $pipes); + + if (!is_resource($process)) { + unlink($tempFile); + throw new Exception('Failed to execute sandboxed PHP code.'); + } + + // Capture stdout and stderr + $stdout = stream_get_contents($pipes[1]); + fclose($pipes[1]); + + $stderr = stream_get_contents($pipes[2]); + fclose($pipes[2]); + + // Get the exit code + $exitCode = proc_close($process); + + // Cleanup temporary file + unlink($tempFile); + + return $exitCode; +} +/** + * Handles the result of trigger execution, including error handling and updating database rows if necessary. + * + * @param int $result Exit code from the sandbox execution (0 for success, non-zero for failure). + * @param string $stdout Captured standard output from the execution (JSON encoded new row data). + * @param string $stderr Captured standard error from the execution (error message if any). + * @param array $originalRow Original database row before trigger execution. + * @param array $activeTable The active table details (including Name). + */ +function handleTriggerExecutionResult($result, $stdout, $stderr, $originalRow, $activeTable) { + $tableName = $activeTable['Name']; + + if ($result !== 0) { + // Update table error status and notify via Telegram if execution failed + updateTriggerError($tableName, $stderr); + } else { + // Decode the output from JSON to array + $newRowValue = json_decode($stdout, true); + + if ($newRowValue === null) { + updateTriggerError($tableName, 'Invalid JSON output from sandboxed execution.'); + return; + } + + // Check if the new data differs from the original data + if ($newRowValue !== $originalRow) { + // Update the database row accordingly + updateDatabaseRow($tableName, $originalRow, $newRowValue); + } + + // Clear any previous error status + clearTriggerError($tableName); } - } } function greenText($string) { $green = "\033[0;32m"; $reset = "\033[0m"; return $green . $string . $reset ; } function blueText($string) { $green = "\033[0;34m"; $reset = "\033[0m"; return $green . $string . $reset ; }