|
|
@@ -0,0 +1,3255 @@
|
|
|
+// Query routes and execution
|
|
|
+use anyhow::{Context, Result};
|
|
|
+use axum::{
|
|
|
+ extract::{ConnectInfo, State},
|
|
|
+ http::{HeaderMap, StatusCode},
|
|
|
+ Json,
|
|
|
+};
|
|
|
+use chrono::Utc;
|
|
|
+use rand::Rng;
|
|
|
+use serde_json::Value;
|
|
|
+use sqlx::{Column, Row};
|
|
|
+use std::collections::HashMap;
|
|
|
+use std::net::SocketAddr;
|
|
|
+use tracing::{error, info, warn};
|
|
|
+
|
|
|
+use crate::logging::AuditLogger;
|
|
|
+use crate::models::{PermissionsResponse, QueryAction, QueryRequest, QueryResponse, UserInfo};
|
|
|
+use crate::sql::{
|
|
|
+ build_filter_clause, build_legacy_where_clause, build_order_by_clause, validate_column_name,
|
|
|
+ validate_column_names, validate_table_name,
|
|
|
+};
|
|
|
+use crate::AppState;
|
|
|
+
|
|
|
+// Helper function to extract token from Authorization header
|
|
|
+fn extract_token(headers: &HeaderMap) -> Option<String> {
|
|
|
+ headers
|
|
|
+ .get("Authorization")
|
|
|
+ .and_then(|header| header.to_str().ok())
|
|
|
+ .and_then(|auth_str| {
|
|
|
+ if auth_str.starts_with("Bearer ") {
|
|
|
+ Some(auth_str[7..].to_string())
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+fn database_unavailable_response(request_id: &str) -> QueryResponse {
|
|
|
+ QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Database temporarily unavailable, try again in a moment [request_id: {}]",
|
|
|
+ request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn database_unavailable_batch_response(request_id: &str) -> QueryResponse {
|
|
|
+ let mut base = database_unavailable_response(request_id);
|
|
|
+ base.results = Some(vec![]);
|
|
|
+ base
|
|
|
+}
|
|
|
+
|
|
|
+async fn log_database_unavailable_event(
|
|
|
+ logger: &AuditLogger,
|
|
|
+ request_id: &str,
|
|
|
+ username: Option<&str>,
|
|
|
+ power: Option<i32>,
|
|
|
+ detail: &str,
|
|
|
+) {
|
|
|
+ if let Err(err) = logger
|
|
|
+ .log_error(
|
|
|
+ request_id,
|
|
|
+ Utc::now(),
|
|
|
+ detail,
|
|
|
+ Some("database_unavailable"),
|
|
|
+ username,
|
|
|
+ power,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to record database outage: {}", request_id, err);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn execute_query(
|
|
|
+ State(state): State<AppState>,
|
|
|
+ ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
|
|
+ headers: HeaderMap,
|
|
|
+ Json(payload): Json<QueryRequest>,
|
|
|
+) -> Result<Json<QueryResponse>, StatusCode> {
|
|
|
+ let timestamp = Utc::now();
|
|
|
+ let client_ip = addr.ip().to_string();
|
|
|
+ let request_id = AuditLogger::generate_request_id();
|
|
|
+
|
|
|
+ // Extract and validate session token
|
|
|
+ let token = match extract_token(&headers) {
|
|
|
+ Some(token) => token,
|
|
|
+ None => {
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(
|
|
|
+ "Please stop trying to access this resource without signing in".to_string(),
|
|
|
+ ),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let session = match state.session_manager.get_session(&token) {
|
|
|
+ Some(session) => session,
|
|
|
+ None => {
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("Session not found".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Detect batch mode - if queries field is present, handle as batch operation
|
|
|
+ if payload.queries.is_some() {
|
|
|
+ // SECURITY: Check if user has permission to use batch operations
|
|
|
+ let power_perms = state
|
|
|
+ .config
|
|
|
+ .permissions
|
|
|
+ .power_levels
|
|
|
+ .get(&session.power.to_string())
|
|
|
+ .ok_or(StatusCode::FORBIDDEN)?;
|
|
|
+
|
|
|
+ if !power_perms.allow_batch_operations {
|
|
|
+ super::log_warning_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "User {} (power {}) attempted batch operation without permission",
|
|
|
+ session.username, session.power
|
|
|
+ ),
|
|
|
+ Some("authorization"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("Batch operations not permitted for your role".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ return execute_batch_mode(state, session, request_id, timestamp, client_ip, &payload)
|
|
|
+ .await;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate input for very basic security vulnerabilities (null bytes, etc.)
|
|
|
+ if let Err(security_error) = validate_input_security(&payload) {
|
|
|
+ super::log_warning_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "Security validation failed for user {}: {}",
|
|
|
+ session.username, security_error
|
|
|
+ ),
|
|
|
+ Some("security_validation"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Invalid input detected, how did you even manage to do that? [request_id: {}]",
|
|
|
+ request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Log the request
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_request(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &client_ip,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ "/query",
|
|
|
+ &serde_json::to_value(&payload).unwrap_or_default(),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log request: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Clone payload before extracting fields (to avoid partial move issues)
|
|
|
+ let payload_clone = payload.clone();
|
|
|
+
|
|
|
+ // Single query mode - validate required fields
|
|
|
+ let action = payload.action.ok_or_else(|| {
|
|
|
+ let error_msg = "Missing action field in single query mode";
|
|
|
+ super::log_error_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ error_msg,
|
|
|
+ Some("request_validation"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+ StatusCode::BAD_REQUEST
|
|
|
+ })?;
|
|
|
+
|
|
|
+ let table = payload.table.ok_or_else(|| {
|
|
|
+ let error_msg = "Missing table field in single query mode";
|
|
|
+ super::log_error_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ error_msg,
|
|
|
+ Some("request_validation"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+ StatusCode::BAD_REQUEST
|
|
|
+ })?;
|
|
|
+
|
|
|
+ // SECURITY: Validate table name before any operations
|
|
|
+ if let Err(e) = validate_table_name(&table, &state.config) {
|
|
|
+ let error_msg = format!("Invalid table name '{}': {}", table, e);
|
|
|
+ super::log_error_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &error_msg,
|
|
|
+ Some("table_validation"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!("Invalid table name [request_id: {}]", request_id)),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ // SECURITY: Validate column names if specified
|
|
|
+ if let Some(ref columns) = payload.columns {
|
|
|
+ if let Err(e) = validate_column_names(columns) {
|
|
|
+ let error_msg = format!("Invalid column names on table '{}': {}", table, e);
|
|
|
+ super::log_error_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &error_msg,
|
|
|
+ Some("column_validation"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Invalid column name: {} [request_id: {}]",
|
|
|
+ e, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check permissions (after validation to avoid leaking table existence)
|
|
|
+ if !state
|
|
|
+ .rbac
|
|
|
+ .check_permission(&state.config, session.power, &table, &action)
|
|
|
+ {
|
|
|
+ let action_str = match action {
|
|
|
+ QueryAction::Select => "SELECT",
|
|
|
+ QueryAction::Insert => "INSERT",
|
|
|
+ QueryAction::Update => "UPDATE",
|
|
|
+ QueryAction::Delete => "DELETE",
|
|
|
+ QueryAction::Count => "COUNT",
|
|
|
+ };
|
|
|
+
|
|
|
+ super::log_warning_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "User {} attempted unauthorized {} on table {}",
|
|
|
+ session.username, action_str, table
|
|
|
+ ),
|
|
|
+ Some("authorization"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ // Log security violation
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!("Permission denied: {} on table {}", action_str, table),
|
|
|
+ Some("authorization"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to log permission denial: {}",
|
|
|
+ request_id, log_err
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!("Insufficient permissions for this operation, Might as well give up [request_id: {}]", request_id)),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ if !state.database.is_available() {
|
|
|
+ warn!(
|
|
|
+ "[{}] Database marked unavailable, returning graceful error",
|
|
|
+ request_id
|
|
|
+ );
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ "Database flagged unavailable before transaction",
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_response(&request_id)));
|
|
|
+ }
|
|
|
+
|
|
|
+ // ALL database operations now use transactions with user context set
|
|
|
+ let mut tx = match state.database.pool().begin().await {
|
|
|
+ Ok(tx) => {
|
|
|
+ state.database.mark_available();
|
|
|
+ tx
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ state.database.mark_unavailable();
|
|
|
+ error!("[{}] Failed to begin transaction: {}", request_id, e);
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ &format!("Failed to begin transaction: {}", e),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_response(&request_id)));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Set user context and request ID in transaction - ALL queries have user context now
|
|
|
+ if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?")
|
|
|
+ .bind(session.user_id)
|
|
|
+ .bind(&request_id)
|
|
|
+ .execute(&mut *tx)
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ state.database.mark_unavailable();
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to set current user context and request ID: {}",
|
|
|
+ request_id, e
|
|
|
+ );
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ &format!("Failed to set user context: {}", e),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_response(&request_id)));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query within the transaction
|
|
|
+ let result = match action {
|
|
|
+ QueryAction::Select => {
|
|
|
+ execute_select_with_tx(
|
|
|
+ &request_id,
|
|
|
+ tx,
|
|
|
+ &payload_clone,
|
|
|
+ &session.username,
|
|
|
+ &session,
|
|
|
+ &state,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Insert => {
|
|
|
+ execute_insert_with_tx(
|
|
|
+ &request_id,
|
|
|
+ tx,
|
|
|
+ &payload_clone,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Update => {
|
|
|
+ execute_update_with_tx(
|
|
|
+ &request_id,
|
|
|
+ tx,
|
|
|
+ &payload_clone,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Delete => {
|
|
|
+ execute_delete_with_tx(
|
|
|
+ &request_id,
|
|
|
+ tx,
|
|
|
+ &payload_clone,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Count => {
|
|
|
+ execute_count_with_tx(
|
|
|
+ &request_id,
|
|
|
+ tx,
|
|
|
+ &payload_clone,
|
|
|
+ &session.username,
|
|
|
+ &session,
|
|
|
+ &state,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ match result {
|
|
|
+ Ok(response) => {
|
|
|
+ let action_str = match action {
|
|
|
+ QueryAction::Select => "SELECT",
|
|
|
+ QueryAction::Insert => "INSERT",
|
|
|
+ QueryAction::Update => "UPDATE",
|
|
|
+ QueryAction::Delete => "DELETE",
|
|
|
+ QueryAction::Count => "COUNT",
|
|
|
+ };
|
|
|
+
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Query executed successfully: {} on {}", action_str, table),
|
|
|
+ Some("query_execution"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+ Ok(Json(response))
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ error!("[{}] Query execution failed: {}", request_id, e);
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!("Query execution error: {}", e),
|
|
|
+ Some("query_execution"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log error: {}", request_id, log_err);
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Database query failed [request_id: {}]",
|
|
|
+ request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ }))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn get_permissions(
|
|
|
+ State(state): State<AppState>,
|
|
|
+ ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
|
|
+ headers: HeaderMap,
|
|
|
+) -> Result<Json<PermissionsResponse>, StatusCode> {
|
|
|
+ let timestamp = Utc::now();
|
|
|
+ let client_ip = addr.ip().to_string();
|
|
|
+ let request_id = AuditLogger::generate_request_id();
|
|
|
+
|
|
|
+ // Extract and validate session token
|
|
|
+ let token = match extract_token(&headers) {
|
|
|
+ Some(token) => token,
|
|
|
+ None => {
|
|
|
+ return Ok(Json(PermissionsResponse {
|
|
|
+ success: false,
|
|
|
+ permissions: HashMap::new(),
|
|
|
+ user: UserInfo {
|
|
|
+ id: 0,
|
|
|
+ username: "".to_string(),
|
|
|
+ name: "".to_string(),
|
|
|
+ role: "".to_string(),
|
|
|
+ power: 0,
|
|
|
+ },
|
|
|
+ security_clearance: None,
|
|
|
+ user_settings_access: "".to_string(),
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let session = match state.session_manager.get_session(&token) {
|
|
|
+ Some(session) => session,
|
|
|
+ None => {
|
|
|
+ return Ok(Json(PermissionsResponse {
|
|
|
+ success: false,
|
|
|
+ permissions: HashMap::new(),
|
|
|
+ user: UserInfo {
|
|
|
+ id: 0,
|
|
|
+ username: "".to_string(),
|
|
|
+ name: "".to_string(),
|
|
|
+ role: "".to_string(),
|
|
|
+ power: 0,
|
|
|
+ },
|
|
|
+ security_clearance: None,
|
|
|
+ user_settings_access: "".to_string(),
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Log the request
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_request(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &client_ip,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ "/permissions",
|
|
|
+ &serde_json::json!({}),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log request: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ let permissions = state
|
|
|
+ .rbac
|
|
|
+ .get_table_permissions(&state.config, session.power);
|
|
|
+
|
|
|
+ // Get user settings access permission
|
|
|
+ let user_settings_permission = state
|
|
|
+ .config
|
|
|
+ .permissions
|
|
|
+ .power_levels
|
|
|
+ .get(&session.power.to_string())
|
|
|
+ .map(|p| &p.user_settings_access)
|
|
|
+ .unwrap_or(&state.config.security.default_user_settings_access);
|
|
|
+
|
|
|
+ let user_settings_access_str = match user_settings_permission {
|
|
|
+ crate::config::UserSettingsAccess::ReadOwnOnly => "read-own-only",
|
|
|
+ crate::config::UserSettingsAccess::ReadWriteOwn => "read-write-own",
|
|
|
+ crate::config::UserSettingsAccess::ReadWriteAll => "read-write-all",
|
|
|
+ };
|
|
|
+
|
|
|
+ Ok(Json(PermissionsResponse {
|
|
|
+ success: true,
|
|
|
+ permissions,
|
|
|
+ user: UserInfo {
|
|
|
+ id: session.user_id,
|
|
|
+ username: session.username,
|
|
|
+ name: "".to_string(), // We don't store name in session, would need to fetch from DB
|
|
|
+ role: session.role_name,
|
|
|
+ power: session.power,
|
|
|
+ },
|
|
|
+ security_clearance: None,
|
|
|
+ user_settings_access: user_settings_access_str.to_string(),
|
|
|
+ }))
|
|
|
+}
|
|
|
+
|
|
|
+// ===== SAFE SQL QUERY BUILDERS WITH VALIDATION =====
|
|
|
+// All functions validate table/column names to prevent SQL injection
|
|
|
+
|
|
|
+/// Build WHERE clause with column name validation (legacy simple format)
|
|
|
+fn build_where_clause(where_clause: &Value) -> anyhow::Result<(String, Vec<String>)> {
|
|
|
+ // Use the new validated builder from sql module
|
|
|
+ build_legacy_where_clause(where_clause)
|
|
|
+}
|
|
|
+
|
|
|
+/// Build INSERT data with column name validation
|
|
|
+fn build_insert_data(
|
|
|
+ data: &Value,
|
|
|
+) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<Option<String>>)> {
|
|
|
+ let mut columns = Vec::new();
|
|
|
+ let mut placeholders = Vec::new();
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ if let Value::Object(map) = data {
|
|
|
+ for (key, value) in map {
|
|
|
+ // SECURITY: Validate column name before using it
|
|
|
+ validate_column_name(key)
|
|
|
+ .with_context(|| format!("Invalid column name in INSERT: {}", key))?;
|
|
|
+
|
|
|
+ // Handle special JSON fields like additional_fields
|
|
|
+ if key == "additional_fields" && value.is_object() {
|
|
|
+ columns.push(key.clone());
|
|
|
+ placeholders.push("?".to_string());
|
|
|
+ values.push(Some(
|
|
|
+ serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()),
|
|
|
+ ));
|
|
|
+ } else {
|
|
|
+ columns.push(key.clone());
|
|
|
+ placeholders.push("?".to_string());
|
|
|
+ values.push(json_value_to_option_string(value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ anyhow::bail!("INSERT data must be a JSON object");
|
|
|
+ }
|
|
|
+
|
|
|
+ if columns.is_empty() {
|
|
|
+ anyhow::bail!("INSERT data cannot be empty");
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok((columns, placeholders, values))
|
|
|
+}
|
|
|
+
|
|
|
+/// Build UPDATE SET clause with column name validation
|
|
|
+fn build_update_set_clause(data: &Value) -> anyhow::Result<(String, Vec<Option<String>>)> {
|
|
|
+ let mut set_clauses = Vec::new();
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ if let Value::Object(map) = data {
|
|
|
+ for (key, value) in map {
|
|
|
+ // SECURITY: Validate column name before using it
|
|
|
+ validate_column_name(key)
|
|
|
+ .with_context(|| format!("Invalid column name in UPDATE: {}", key))?;
|
|
|
+
|
|
|
+ set_clauses.push(format!("{} = ?", key));
|
|
|
+ // Handle special JSON fields like additional_fields
|
|
|
+ if key == "additional_fields" && value.is_object() {
|
|
|
+ values.push(Some(
|
|
|
+ serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()),
|
|
|
+ ));
|
|
|
+ } else {
|
|
|
+ values.push(json_value_to_option_string(value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ anyhow::bail!("UPDATE data must be a JSON object");
|
|
|
+ }
|
|
|
+
|
|
|
+ if set_clauses.is_empty() {
|
|
|
+ anyhow::bail!("UPDATE data cannot be empty");
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok((set_clauses.join(", "), values))
|
|
|
+}
|
|
|
+
|
|
|
+/// Convert JSON value to Option<String> for SQL binding
|
|
|
+/// Properly handles booleans (true/false -> "1"/"0" for MySQL TINYINT/BOOLEAN)
|
|
|
+/// NULL values return None for proper SQL NULL handling
|
|
|
+fn json_value_to_option_string(value: &Value) -> Option<String> {
|
|
|
+ match value {
|
|
|
+ Value::String(s) => Some(s.clone()),
|
|
|
+ Value::Number(n) => Some(n.to_string()),
|
|
|
+ // MySQL uses TINYINT(1) for booleans: true -> 1, false -> 0
|
|
|
+ Value::Bool(b) => Some(if *b { "1".to_string() } else { "0".to_string() }),
|
|
|
+ Value::Null => None, // Return None for proper SQL NULL
|
|
|
+ // Complex types (objects, arrays) get JSON serialized
|
|
|
+ _ => Some(serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())),
|
|
|
+ }
|
|
|
+}
|
|
|
+/// Convert SQL values from MySQL to JSON
|
|
|
+/// Handles ALL MySQL types automatically with proper NULL handling
|
|
|
+/// Returns booleans as true/false (MySQL TINYINT(1) -> JSON bool)
|
|
|
+fn convert_sql_value_to_json(
|
|
|
+ row: &sqlx::mysql::MySqlRow,
|
|
|
+ index: usize,
|
|
|
+ request_id: Option<&str>,
|
|
|
+ username: Option<&str>,
|
|
|
+ power: Option<i32>,
|
|
|
+ state: Option<&AppState>,
|
|
|
+) -> anyhow::Result<Value> {
|
|
|
+ let column = &row.columns()[index];
|
|
|
+
|
|
|
+ use sqlx::TypeInfo;
|
|
|
+ let type_name = column.type_info().name();
|
|
|
+
|
|
|
+ // Comprehensive MySQL type handling - no need to manually add types anymore!
|
|
|
+ let result = match type_name {
|
|
|
+ // ===== String types =====
|
|
|
+ "VARCHAR" | "TEXT" | "CHAR" | "LONGTEXT" | "MEDIUMTEXT" | "TINYTEXT" | "SET" | "ENUM" => {
|
|
|
+ row.try_get::<Option<String>, _>(index)
|
|
|
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Integer types =====
|
|
|
+ "INT" | "BIGINT" | "MEDIUMINT" | "SMALLINT" | "INTEGER" => row
|
|
|
+ .try_get::<Option<i64>, _>(index)
|
|
|
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
|
|
|
+
|
|
|
+ // Unsigned integers
|
|
|
+ "INT UNSIGNED" | "BIGINT UNSIGNED" | "MEDIUMINT UNSIGNED" | "SMALLINT UNSIGNED" => row
|
|
|
+ .try_get::<Option<u64>, _>(index)
|
|
|
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
|
|
|
+
|
|
|
+ // ===== Boolean type (MySQL TINYINT(1)) =====
|
|
|
+ // Returns proper JSON true/false instead of 1/0
|
|
|
+ "TINYINT" | "BOOLEAN" | "BOOL" => {
|
|
|
+ // Try as bool first (for TINYINT(1))
|
|
|
+ if let Ok(opt_bool) = row.try_get::<Option<bool>, _>(index) {
|
|
|
+ return Ok(opt_bool.map(Value::Bool).unwrap_or(Value::Null));
|
|
|
+ }
|
|
|
+ // Fallback to i8 for regular TINYINT
|
|
|
+ row.try_get::<Option<i8>, _>(index)
|
|
|
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Decimal/Numeric types =====
|
|
|
+ "DECIMAL" | "NUMERIC" => {
|
|
|
+ row.try_get::<Option<rust_decimal::Decimal>, _>(index)
|
|
|
+ .map(|opt| {
|
|
|
+ opt.map(|decimal| {
|
|
|
+ // Keep precision by converting to string then parsing
|
|
|
+ let decimal_str = decimal.to_string();
|
|
|
+ if let Ok(f) = decimal_str.parse::<f64>() {
|
|
|
+ serde_json::json!(f)
|
|
|
+ } else {
|
|
|
+ Value::String(decimal_str)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Floating point types =====
|
|
|
+ "FLOAT" | "DOUBLE" | "REAL" => row
|
|
|
+ .try_get::<Option<f64>, _>(index)
|
|
|
+ .map(|opt| opt.map(|v| serde_json::json!(v)).unwrap_or(Value::Null)),
|
|
|
+
|
|
|
+ // ===== Date/Time types =====
|
|
|
+ "DATE" => {
|
|
|
+ use chrono::NaiveDate;
|
|
|
+ row.try_get::<Option<NaiveDate>, _>(index).map(|opt| {
|
|
|
+ opt.map(|date| Value::String(date.format("%Y-%m-%d").to_string()))
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ "DATETIME" => {
|
|
|
+ use chrono::NaiveDateTime;
|
|
|
+ row.try_get::<Option<NaiveDateTime>, _>(index).map(|opt| {
|
|
|
+ opt.map(|datetime| Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string()))
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ "TIMESTAMP" => {
|
|
|
+ use chrono::{DateTime, Utc};
|
|
|
+ row.try_get::<Option<DateTime<Utc>>, _>(index).map(|opt| {
|
|
|
+ opt.map(|timestamp| Value::String(timestamp.to_rfc3339()))
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ "TIME" => {
|
|
|
+ // TIME values come as strings in HH:MM:SS format
|
|
|
+ row.try_get::<Option<String>, _>(index)
|
|
|
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+ "YEAR" => row
|
|
|
+ .try_get::<Option<i32>, _>(index)
|
|
|
+ .map(|opt| opt.map(|v| Value::Number(v.into())).unwrap_or(Value::Null)),
|
|
|
+
|
|
|
+ // ===== JSON type =====
|
|
|
+ "JSON" => row.try_get::<Option<String>, _>(index).map(|opt| {
|
|
|
+ opt.and_then(|s| serde_json::from_str(&s).ok())
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ }),
|
|
|
+
|
|
|
+ // ===== Binary types =====
|
|
|
+ // Return as base64-encoded strings for safe JSON transmission
|
|
|
+ "BLOB" | "MEDIUMBLOB" | "LONGBLOB" | "TINYBLOB" | "BINARY" | "VARBINARY" => {
|
|
|
+ row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| {
|
|
|
+ opt.map(|bytes| {
|
|
|
+ use base64::{engine::general_purpose, Engine as _};
|
|
|
+ Value::String(general_purpose::STANDARD.encode(&bytes))
|
|
|
+ })
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Bit type =====
|
|
|
+ "BIT" => {
|
|
|
+ row.try_get::<Option<Vec<u8>>, _>(index).map(|opt| {
|
|
|
+ opt.map(|bytes| {
|
|
|
+ // Convert bit value to number
|
|
|
+ let mut val: i64 = 0;
|
|
|
+ for &byte in &bytes {
|
|
|
+ val = (val << 8) | byte as i64;
|
|
|
+ }
|
|
|
+ Value::Number(val.into())
|
|
|
+ })
|
|
|
+ .unwrap_or(Value::Null)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Spatial/Geometry types =====
|
|
|
+ "GEOMETRY" | "POINT" | "LINESTRING" | "POLYGON" | "MULTIPOINT" | "MULTILINESTRING"
|
|
|
+ | "MULTIPOLYGON" | "GEOMETRYCOLLECTION" => {
|
|
|
+ // Return as WKT (Well-Known Text) string
|
|
|
+ row.try_get::<Option<String>, _>(index)
|
|
|
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+
|
|
|
+ // ===== Catch-all for unknown/new types =====
|
|
|
+ // This ensures forward compatibility if MySQL adds new types
|
|
|
+ _ => {
|
|
|
+ warn!(
|
|
|
+ "Unknown MySQL type '{}' for column '{}', attempting string fallback",
|
|
|
+ type_name,
|
|
|
+ column.name()
|
|
|
+ );
|
|
|
+ if let (Some(rid), Some(st)) = (request_id, state) {
|
|
|
+ super::log_warning_async(
|
|
|
+ &st.logging,
|
|
|
+ rid,
|
|
|
+ &format!(
|
|
|
+ "Unknown MySQL type '{}' for column '{}', attempting string fallback",
|
|
|
+ type_name,
|
|
|
+ column.name()
|
|
|
+ ),
|
|
|
+ Some("data_conversion"),
|
|
|
+ username,
|
|
|
+ power,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ row.try_get::<Option<String>, _>(index)
|
|
|
+ .map(|opt| opt.map(Value::String).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Robust error handling with fallback
|
|
|
+ match result {
|
|
|
+ Ok(value) => Ok(value),
|
|
|
+ Err(e) => {
|
|
|
+ // Final fallback: try as string
|
|
|
+ match row.try_get::<Option<String>, _>(index) {
|
|
|
+ Ok(opt) => {
|
|
|
+ warn!("Primary conversion failed for column '{}' (type: {}), used string fallback",
|
|
|
+ column.name(), type_name);
|
|
|
+ if let (Some(rid), Some(st)) = (request_id, state) {
|
|
|
+ super::log_warning_async(
|
|
|
+ &st.logging,
|
|
|
+ rid,
|
|
|
+ &format!("Primary conversion failed for column '{}' (type: {}), used string fallback", column.name(), type_name),
|
|
|
+ Some("data_conversion"),
|
|
|
+ username,
|
|
|
+ power,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ Ok(opt.map(Value::String).unwrap_or(Value::Null))
|
|
|
+ }
|
|
|
+ Err(_) => {
|
|
|
+ error!(
|
|
|
+ "Complete failure to decode column '{}' (index: {}, type: {}): {}",
|
|
|
+ column.name(),
|
|
|
+ index,
|
|
|
+ type_name,
|
|
|
+ e
|
|
|
+ );
|
|
|
+ // Return NULL instead of failing the entire query
|
|
|
+ Ok(Value::Null)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Generate auto values based on configuration
|
|
|
+async fn generate_auto_value(
|
|
|
+ state: &AppState,
|
|
|
+ table: &str,
|
|
|
+ config: &crate::config::AutoGenerationConfig,
|
|
|
+) -> Result<String, anyhow::Error> {
|
|
|
+ match config.gen_type.as_str() {
|
|
|
+ "numeric" => generate_unique_numeric_id(state, table, config).await,
|
|
|
+ _ => Err(anyhow::anyhow!(
|
|
|
+ "Unsupported auto-generation type: {}",
|
|
|
+ config.gen_type
|
|
|
+ )),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Generate a unique numeric ID based on configuration
|
|
|
+async fn generate_unique_numeric_id(
|
|
|
+ state: &AppState,
|
|
|
+ table: &str,
|
|
|
+ config: &crate::config::AutoGenerationConfig,
|
|
|
+) -> Result<String, anyhow::Error> {
|
|
|
+ let range_min = config.range_min.unwrap_or(10000000);
|
|
|
+ let range_max = config.range_max.unwrap_or(99999999);
|
|
|
+ let max_attempts = config.max_attempts.unwrap_or(10) as usize;
|
|
|
+ let field_name = &config.field;
|
|
|
+
|
|
|
+ for _attempt in 0..max_attempts {
|
|
|
+ // Generate random number in specified range
|
|
|
+ let id = {
|
|
|
+ let mut rng = rand::rng();
|
|
|
+ rng.random_range(range_min..=range_max)
|
|
|
+ };
|
|
|
+ let id_str = id.to_string();
|
|
|
+
|
|
|
+ // Check if this ID already exists
|
|
|
+ let query_str = format!(
|
|
|
+ "SELECT COUNT(*) as count FROM {} WHERE {} = ?",
|
|
|
+ table, field_name
|
|
|
+ );
|
|
|
+ let exists = sqlx::query(&query_str)
|
|
|
+ .bind(&id_str)
|
|
|
+ .fetch_one(state.database.pool())
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ let count: i64 = exists.try_get("count")?;
|
|
|
+
|
|
|
+ if count == 0 {
|
|
|
+ return Ok(id_str);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Err(anyhow::anyhow!(
|
|
|
+ "Failed to generate unique {} for table {} after {} attempts",
|
|
|
+ field_name,
|
|
|
+ table,
|
|
|
+ max_attempts
|
|
|
+ ))
|
|
|
+}
|
|
|
+
|
|
|
+// Security validation functions
|
|
|
+fn validate_input_security(payload: &QueryRequest) -> Result<(), String> {
|
|
|
+ // Check for null bytes in table name
|
|
|
+ if let Some(ref table) = payload.table {
|
|
|
+ if table.contains('\0') {
|
|
|
+ return Err("Null byte detected in table name".to_string());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for null bytes in column names
|
|
|
+ if let Some(columns) = &payload.columns {
|
|
|
+ for column in columns {
|
|
|
+ if column.contains('\0') {
|
|
|
+ return Err("Null byte detected in column name".to_string());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for null bytes in data values
|
|
|
+ if let Some(data) = &payload.data {
|
|
|
+ if contains_null_bytes_in_value(data) {
|
|
|
+ return Err("Null byte detected in data values".to_string());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for null bytes in WHERE clause
|
|
|
+ if let Some(where_clause) = &payload.where_clause {
|
|
|
+ if contains_null_bytes_in_value(where_clause) {
|
|
|
+ return Err("Null byte detected in WHERE clause".to_string());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+fn contains_null_bytes_in_value(value: &Value) -> bool {
|
|
|
+ match value {
|
|
|
+ Value::String(s) => s.contains('\0'),
|
|
|
+ Value::Array(arr) => arr.iter().any(contains_null_bytes_in_value),
|
|
|
+ Value::Object(map) => {
|
|
|
+ map.keys().any(|k| k.contains('\0')) || map.values().any(contains_null_bytes_in_value)
|
|
|
+ }
|
|
|
+ _ => false,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Core execution functions that work with mutable transaction references
|
|
|
+// These are used by batch operations to execute multiple queries in a single atomic transaction
|
|
|
+
|
|
|
+async fn execute_select_core(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ // Helper to count conditions in filter/where
|
|
|
+ fn count_conditions(
|
|
|
+ filter: &Option<crate::models::FilterCondition>,
|
|
|
+ where_clause: &Option<serde_json::Value>,
|
|
|
+ ) -> usize {
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(f) = filter {
|
|
|
+ count += count_filter_conditions(f);
|
|
|
+ }
|
|
|
+ if let Some(w) = where_clause {
|
|
|
+ count += count_where_conditions(w);
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+
|
|
|
+ fn count_filter_conditions(filter: &crate::models::FilterCondition) -> usize {
|
|
|
+ use crate::models::FilterCondition;
|
|
|
+ match filter {
|
|
|
+ FilterCondition::Logical {
|
|
|
+ and_conditions,
|
|
|
+ or_conditions,
|
|
|
+ } => {
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(conditions) = and_conditions {
|
|
|
+ count += conditions
|
|
|
+ .iter()
|
|
|
+ .map(|c| count_filter_conditions(c))
|
|
|
+ .sum::<usize>();
|
|
|
+ }
|
|
|
+ if let Some(conditions) = or_conditions {
|
|
|
+ count += conditions
|
|
|
+ .iter()
|
|
|
+ .map(|c| count_filter_conditions(c))
|
|
|
+ .sum::<usize>();
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+ _ => 1,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn count_where_conditions(where_clause: &serde_json::Value) -> usize {
|
|
|
+ match where_clause {
|
|
|
+ serde_json::Value::Object(map) => {
|
|
|
+ if map.contains_key("AND") || map.contains_key("OR") {
|
|
|
+ if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) {
|
|
|
+ if let serde_json::Value::Array(conditions) = arr {
|
|
|
+ return conditions.iter().map(|c| count_where_conditions(c)).sum();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ 1
|
|
|
+ }
|
|
|
+ _ => 1,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ let requested_columns = if let Some(ref cols) = payload.columns {
|
|
|
+ cols.clone()
|
|
|
+ } else {
|
|
|
+ vec!["*".to_string()]
|
|
|
+ };
|
|
|
+
|
|
|
+ let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" {
|
|
|
+ "*".to_string()
|
|
|
+ } else {
|
|
|
+ let allowed_columns =
|
|
|
+ state
|
|
|
+ .config
|
|
|
+ .filter_readable_columns(session.power, &table, &requested_columns);
|
|
|
+ if allowed_columns.is_empty() {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("No readable columns available for this request".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ allowed_columns.join(", ")
|
|
|
+ };
|
|
|
+
|
|
|
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE/filter conditions ({}) > max {}",
|
|
|
+ condition_count, max_where
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut query = format!("SELECT {} FROM {}", filtered_columns, table);
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ if let Some(joins) = &payload.joins {
|
|
|
+ for join in joins {
|
|
|
+ if !state.rbac.check_permission(
|
|
|
+ &state.config,
|
|
|
+ session.power,
|
|
|
+ &join.table,
|
|
|
+ &crate::models::QueryAction::Select,
|
|
|
+ ) {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions to JOIN with table '{}'",
|
|
|
+ join.table
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
|
|
|
+ query.push_str(&join_sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Some(filter) = &payload.filter {
|
|
|
+ let (where_sql, where_values) = build_filter_clause(filter)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ } else if let Some(where_clause) = &payload.where_clause {
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Some(order_by) = &payload.order_by {
|
|
|
+ let order_clause = build_order_by_clause(order_by)?;
|
|
|
+ query.push_str(&order_clause);
|
|
|
+ }
|
|
|
+
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ query.push_str(&format!(" LIMIT {}", limit));
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!(
|
|
|
+ "Requested LIMIT {} exceeded maximum {} for your power level, capped to {}",
|
|
|
+ requested_limit.unwrap(),
|
|
|
+ max_limit,
|
|
|
+ max_limit
|
|
|
+ ))
|
|
|
+ } else if requested_limit.is_none() {
|
|
|
+ Some(format!(
|
|
|
+ "No LIMIT specified, defaulted to {} (max for power level {})",
|
|
|
+ max_limit, session.power
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ chrono::Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let rows = sqlx_query.fetch_all(&mut **tx).await?;
|
|
|
+
|
|
|
+ let mut results = Vec::new();
|
|
|
+ for row in rows {
|
|
|
+ let mut result_row = serde_json::Map::new();
|
|
|
+ for (i, column) in row.columns().iter().enumerate() {
|
|
|
+ let value = convert_sql_value_to_json(
|
|
|
+ &row,
|
|
|
+ i,
|
|
|
+ Some(request_id),
|
|
|
+ Some(username),
|
|
|
+ Some(session.power),
|
|
|
+ Some(state),
|
|
|
+ )?;
|
|
|
+ result_row.insert(column.name().to_string(), value);
|
|
|
+ }
|
|
|
+ results.push(Value::Object(result_row));
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: Some(Value::Array(results)),
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_count_core(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ // Helper to count conditions in filter/where (same as other functions)
|
|
|
+ fn count_conditions(
|
|
|
+ filter: &Option<crate::models::FilterCondition>,
|
|
|
+ where_clause: &Option<serde_json::Value>,
|
|
|
+ ) -> usize {
|
|
|
+ use crate::models::FilterCondition;
|
|
|
+ fn count_filter_conditions(filter: &FilterCondition) -> usize {
|
|
|
+ match filter {
|
|
|
+ FilterCondition::Logical {
|
|
|
+ and_conditions,
|
|
|
+ or_conditions,
|
|
|
+ } => {
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(conditions) = and_conditions {
|
|
|
+ count += conditions
|
|
|
+ .iter()
|
|
|
+ .map(|c| count_filter_conditions(c))
|
|
|
+ .sum::<usize>();
|
|
|
+ }
|
|
|
+ if let Some(conditions) = or_conditions {
|
|
|
+ count += conditions
|
|
|
+ .iter()
|
|
|
+ .map(|c| count_filter_conditions(c))
|
|
|
+ .sum::<usize>();
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+ _ => 1,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn count_where_conditions(where_clause: &serde_json::Value) -> usize {
|
|
|
+ match where_clause {
|
|
|
+ serde_json::Value::Object(map) => {
|
|
|
+ if map.contains_key("AND") || map.contains_key("OR") {
|
|
|
+ if let Some(arr) = map.get("AND").or_else(|| map.get("OR")) {
|
|
|
+ if let serde_json::Value::Array(conditions) = arr {
|
|
|
+ return conditions.iter().map(|c| count_where_conditions(c)).sum();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ 1
|
|
|
+ }
|
|
|
+ _ => 1,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(f) = filter {
|
|
|
+ count += count_filter_conditions(f);
|
|
|
+ }
|
|
|
+ if let Some(w) = where_clause {
|
|
|
+ count += count_where_conditions(w);
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ // Enforce WHERE clause complexity
|
|
|
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE/filter conditions ({}) > max {}",
|
|
|
+ condition_count, max_where
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut query = format!("SELECT COUNT(*) as count FROM {}", table);
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ // Add JOIN clauses if provided - validates permissions for all joined tables
|
|
|
+ if let Some(joins) = &payload.joins {
|
|
|
+ for join in joins {
|
|
|
+ if !state.rbac.check_permission(
|
|
|
+ &state.config,
|
|
|
+ session.power,
|
|
|
+ &join.table,
|
|
|
+ &crate::models::QueryAction::Select,
|
|
|
+ ) {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions to JOIN with table '{}'",
|
|
|
+ join.table
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
|
|
|
+ query.push_str(&join_sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add WHERE conditions (filter takes precedence over where_clause if both are provided)
|
|
|
+ if let Some(filter) = &payload.filter {
|
|
|
+ let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ } else if let Some(where_clause) = &payload.where_clause {
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ chrono::Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.fetch_one(&mut **tx).await?;
|
|
|
+ let count: i64 = result.try_get("count")?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: Some(serde_json::json!(count)),
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_update_core(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ let mut data = payload
|
|
|
+ .data
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))?
|
|
|
+ .clone();
|
|
|
+
|
|
|
+ let where_clause = payload
|
|
|
+ .where_clause
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?;
|
|
|
+
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+ let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE conditions ({}) > max {}",
|
|
|
+ condition_count, max_where
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
|
|
|
+ if let Value::Object(ref mut map) = data {
|
|
|
+ let all_columns: Vec<String> = map.keys().cloned().collect();
|
|
|
+ let writable_columns =
|
|
|
+ state
|
|
|
+ .config
|
|
|
+ .filter_writable_columns(session.power, &table, &all_columns);
|
|
|
+
|
|
|
+ // Remove columns that user cannot write to
|
|
|
+ map.retain(|key, _| writable_columns.contains(key));
|
|
|
+
|
|
|
+ // Check for auto-generation (system-generated fields bypass write protection)
|
|
|
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
|
|
|
+ if auto_config.on_action == "update" || auto_config.on_action == "both" {
|
|
|
+ let field_name = &auto_config.field;
|
|
|
+ if !map.contains_key(field_name)
|
|
|
+ || map.get(field_name).map_or(true, |v| {
|
|
|
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
|
|
|
+ })
|
|
|
+ {
|
|
|
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
|
|
|
+ map.insert(field_name.clone(), Value::String(generated_value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let writable_columns = data
|
|
|
+ .as_object()
|
|
|
+ .unwrap()
|
|
|
+ .keys()
|
|
|
+ .cloned()
|
|
|
+ .collect::<Vec<_>>();
|
|
|
+ if writable_columns.is_empty() {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("No writable columns in UPDATE data".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let set_clause = writable_columns
|
|
|
+ .iter()
|
|
|
+ .map(|col| format!("{} = ?", col))
|
|
|
+ .collect::<Vec<_>>()
|
|
|
+ .join(", ");
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ let query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql);
|
|
|
+
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ let query_with_limit = format!("{} LIMIT {}", query, limit);
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!(
|
|
|
+ "Requested LIMIT {} exceeded maximum {}, capped to {}",
|
|
|
+ requested_limit.unwrap(),
|
|
|
+ max_limit,
|
|
|
+ max_limit
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ chrono::Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query_with_limit,
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut sqlx_query = sqlx::query(&query_with_limit);
|
|
|
+ for col in &writable_columns {
|
|
|
+ if let Some(val) = data.get(col) {
|
|
|
+ sqlx_query = sqlx_query.bind(val.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for val in where_values {
|
|
|
+ sqlx_query = sqlx_query.bind(val);
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.execute(&mut **tx).await?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(result.rows_affected()),
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_delete_core(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ let where_clause = payload
|
|
|
+ .where_clause
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?;
|
|
|
+
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+ let condition_count = where_clause.as_object().map(|obj| obj.len()).unwrap_or(0);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE conditions ({}) > max {}",
|
|
|
+ condition_count, max_where
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ let query = format!("DELETE FROM {} WHERE {}", table, where_sql);
|
|
|
+
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ let query_with_limit = format!("{} LIMIT {}", query, limit);
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!(
|
|
|
+ "Requested LIMIT {} exceeded maximum {}, capped to {}",
|
|
|
+ requested_limit.unwrap(),
|
|
|
+ max_limit,
|
|
|
+ max_limit
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ chrono::Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query_with_limit,
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut sqlx_query = sqlx::query(&query_with_limit);
|
|
|
+ for val in where_values {
|
|
|
+ sqlx_query = sqlx_query.bind(val);
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.execute(&mut **tx).await?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(result.rows_affected()),
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+// Transaction-based execution functions for user context operations
|
|
|
+// These create their own transactions and commit them - used for single query operations
|
|
|
+
|
|
|
+async fn execute_select_with_tx(
|
|
|
+ request_id: &str,
|
|
|
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ // Helper to count conditions in filter/where
|
|
|
+ fn count_conditions(
|
|
|
+ filter: &Option<crate::models::FilterCondition>,
|
|
|
+ where_clause: &Option<serde_json::Value>,
|
|
|
+ ) -> usize {
|
|
|
+ use crate::models::FilterCondition;
|
|
|
+ fn count_filter(cond: &FilterCondition) -> usize {
|
|
|
+ match cond {
|
|
|
+ FilterCondition::Simple { .. } => 1,
|
|
|
+ FilterCondition::Logical {
|
|
|
+ and_conditions,
|
|
|
+ or_conditions,
|
|
|
+ } => {
|
|
|
+ and_conditions
|
|
|
+ .as_ref()
|
|
|
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
|
|
|
+ + or_conditions
|
|
|
+ .as_ref()
|
|
|
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
|
|
|
+ }
|
|
|
+ FilterCondition::Not { not } => count_filter(not),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(f) = filter {
|
|
|
+ count += count_filter(f);
|
|
|
+ }
|
|
|
+ if let Some(w) = where_clause {
|
|
|
+ if let serde_json::Value::Object(map) = w {
|
|
|
+ count += map.len();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enforce query limits from config (power-level specific with fallback to defaults)
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ // Apply granular column filtering based on user's power level
|
|
|
+ let requested_columns = if let Some(cols) = &payload.columns {
|
|
|
+ cols.clone()
|
|
|
+ } else {
|
|
|
+ vec!["*".to_string()]
|
|
|
+ };
|
|
|
+
|
|
|
+ let filtered_columns = if requested_columns.len() == 1 && requested_columns[0] == "*" {
|
|
|
+ "*".to_string()
|
|
|
+ } else {
|
|
|
+ let allowed_columns =
|
|
|
+ state
|
|
|
+ .config
|
|
|
+ .filter_readable_columns(session.power, &table, &requested_columns);
|
|
|
+ if allowed_columns.is_empty() {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("No readable columns available for this request".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ allowed_columns.join(", ")
|
|
|
+ };
|
|
|
+
|
|
|
+ // Enforce WHERE clause complexity
|
|
|
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ // Log security violation
|
|
|
+ let timestamp = chrono::Utc::now();
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!(
|
|
|
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
|
|
|
+ condition_count, max_where, session.power
|
|
|
+ ),
|
|
|
+ Some("query_limits"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to log WHERE limit violation: {}",
|
|
|
+ request_id, log_err
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]",
|
|
|
+ condition_count, max_where, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut query = format!("SELECT {} FROM {}", filtered_columns, table);
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ // Add JOIN clauses if provided - validates permissions for all joined tables
|
|
|
+ if let Some(joins) = &payload.joins {
|
|
|
+ // Validate user has read permission for all joined tables
|
|
|
+ for join in joins {
|
|
|
+ if !state.rbac.check_permission(
|
|
|
+ &state.config,
|
|
|
+ session.power,
|
|
|
+ &join.table,
|
|
|
+ &crate::models::QueryAction::Select,
|
|
|
+ ) {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions to JOIN with table '{}'",
|
|
|
+ join.table
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build and append JOIN SQL
|
|
|
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
|
|
|
+ query.push_str(&join_sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add WHERE clause - support both legacy and new filter format
|
|
|
+ if let Some(filter) = &payload.filter {
|
|
|
+ let (where_sql, where_values) = build_filter_clause(filter)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ } else if let Some(where_clause) = &payload.where_clause {
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add ORDER BY if provided
|
|
|
+ if let Some(order_by) = &payload.order_by {
|
|
|
+ let order_clause = build_order_by_clause(order_by)?;
|
|
|
+ query.push_str(&order_clause);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enforce LIMIT and track if it was capped
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ query.push_str(&format!(" LIMIT {}", limit));
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
|
|
|
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
|
|
|
+ } else if requested_limit.is_none() {
|
|
|
+ Some(format!(
|
|
|
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
|
|
|
+ max_limit, request_id
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ // Add OFFSET if provided
|
|
|
+ if let Some(offset) = payload.offset {
|
|
|
+ query.push_str(&format!(" OFFSET {}", offset));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ let params_json = serde_json::to_value(&values).ok();
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ params_json.as_ref(),
|
|
|
+ None, // Row count will be known after execution
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let rows = sqlx_query.fetch_all(&mut *tx).await?;
|
|
|
+ tx.commit().await?;
|
|
|
+
|
|
|
+ let mut results = Vec::new();
|
|
|
+ for row in rows {
|
|
|
+ let mut result_row = serde_json::Map::new();
|
|
|
+ for (i, column) in row.columns().iter().enumerate() {
|
|
|
+ let value = convert_sql_value_to_json(
|
|
|
+ &row,
|
|
|
+ i,
|
|
|
+ Some(request_id),
|
|
|
+ Some(username),
|
|
|
+ Some(session.power),
|
|
|
+ Some(state),
|
|
|
+ )?;
|
|
|
+ result_row.insert(column.name().to_string(), value);
|
|
|
+ }
|
|
|
+ results.push(Value::Object(result_row));
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: Some(Value::Array(results)),
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_insert_with_tx(
|
|
|
+ request_id: &str,
|
|
|
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ let mut data = payload
|
|
|
+ .data
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Data is required for INSERT"))?
|
|
|
+ .clone();
|
|
|
+
|
|
|
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
|
|
|
+ // This validates what the user is trying to write
|
|
|
+ if let Value::Object(ref mut map) = data {
|
|
|
+ let all_columns: Vec<String> = map.keys().cloned().collect();
|
|
|
+ let writable_columns =
|
|
|
+ state
|
|
|
+ .config
|
|
|
+ .filter_writable_columns(session.power, &table, &all_columns);
|
|
|
+
|
|
|
+ // Remove columns that user cannot write to
|
|
|
+ map.retain(|key, _| writable_columns.contains(key));
|
|
|
+
|
|
|
+ // Check for auto-generation requirements based on config
|
|
|
+ // Auto-generated fields bypass write protection since they're system-generated
|
|
|
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
|
|
|
+ // Check if auto-generation is enabled for INSERT action
|
|
|
+ if auto_config.on_action == "insert" || auto_config.on_action == "both" {
|
|
|
+ let field_name = &auto_config.field;
|
|
|
+
|
|
|
+ if !map.contains_key(field_name)
|
|
|
+ || map.get(field_name).map_or(true, |v| {
|
|
|
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
|
|
|
+ })
|
|
|
+ {
|
|
|
+ // Generate auto value based on config
|
|
|
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
|
|
|
+ map.insert(field_name.clone(), Value::String(generated_value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Final validation: ensure we have columns to insert
|
|
|
+ if let Value::Object(ref map) = data {
|
|
|
+ if map.is_empty() {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("No writable columns in INSERT data".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let (columns_vec, placeholders_vec, values) = build_insert_data(&data)?;
|
|
|
+ let columns = columns_vec.join(", ");
|
|
|
+ let placeholders = placeholders_vec.join(", ");
|
|
|
+
|
|
|
+ let query = format!(
|
|
|
+ "INSERT INTO {} ({}) VALUES ({})",
|
|
|
+ table, columns, placeholders
|
|
|
+ );
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ let params_json = serde_json::to_value(&values).ok();
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ params_json.as_ref(),
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.execute(&mut *tx).await?;
|
|
|
+ let insert_id = result.last_insert_id();
|
|
|
+ tx.commit().await?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: Some(serde_json::json!(insert_id)),
|
|
|
+ rows_affected: Some(result.rows_affected()),
|
|
|
+ error: None,
|
|
|
+ warning: None, // INSERT queries don't have LIMIT,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_update_with_tx(
|
|
|
+ request_id: &str,
|
|
|
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ let mut data = payload
|
|
|
+ .data
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Data is required for UPDATE"))?
|
|
|
+ .clone();
|
|
|
+
|
|
|
+ let where_clause = payload
|
|
|
+ .where_clause
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for UPDATE"))?;
|
|
|
+
|
|
|
+ // Enforce query limits from config (power-level specific with fallback to defaults)
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ // Enforce WHERE clause complexity
|
|
|
+ let condition_count = if let Some(w) = &payload.where_clause {
|
|
|
+ if let serde_json::Value::Object(map) = w {
|
|
|
+ map.len()
|
|
|
+ } else {
|
|
|
+ 0
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ 0
|
|
|
+ };
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ // Log security violation
|
|
|
+ let timestamp = chrono::Utc::now();
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!(
|
|
|
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
|
|
|
+ condition_count, max_where, session.power
|
|
|
+ ),
|
|
|
+ Some("query_limits"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to log WHERE limit violation: {}",
|
|
|
+ request_id, log_err
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE conditions ({}) > max {} [request_id: {}]",
|
|
|
+ condition_count, max_where, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // SECURITY: Apply column-level write filtering FIRST (before auto-generation)
|
|
|
+ if let Value::Object(ref mut map) = data {
|
|
|
+ let all_columns: Vec<String> = map.keys().cloned().collect();
|
|
|
+ let writable_columns =
|
|
|
+ state
|
|
|
+ .config
|
|
|
+ .filter_writable_columns(session.power, &table, &all_columns);
|
|
|
+
|
|
|
+ // Remove columns that user cannot write to
|
|
|
+ map.retain(|key, _| writable_columns.contains(key));
|
|
|
+
|
|
|
+ // Check for auto-generation (system-generated fields bypass write protection)
|
|
|
+ if let Some(auto_config) = state.config.get_auto_generation_config(&table) {
|
|
|
+ if auto_config.on_action == "update" || auto_config.on_action == "both" {
|
|
|
+ let field_name = &auto_config.field;
|
|
|
+ if !map.contains_key(field_name)
|
|
|
+ || map.get(field_name).map_or(true, |v| {
|
|
|
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
|
|
|
+ })
|
|
|
+ {
|
|
|
+ let generated_value = generate_auto_value(&state, &table, auto_config).await?;
|
|
|
+ map.insert(field_name.clone(), Value::String(generated_value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Final validation: ensure we have columns to update
|
|
|
+ if let Value::Object(ref map) = data {
|
|
|
+ if map.is_empty() {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("No writable columns in UPDATE data".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let (set_clause, mut values) = build_update_set_clause(&data)?;
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ // Convert where_values to Option<String> to match set values
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+
|
|
|
+ let mut query = format!("UPDATE {} SET {} WHERE {}", table, set_clause, where_sql);
|
|
|
+
|
|
|
+ // Enforce LIMIT and track if it was capped
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ query.push_str(&format!(" LIMIT {}", limit));
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
|
|
|
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
|
|
|
+ } else if requested_limit.is_none() {
|
|
|
+ Some(format!(
|
|
|
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
|
|
|
+ max_limit, request_id
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ let params_json = serde_json::to_value(&values).ok();
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ params_json.as_ref(),
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.execute(&mut *tx).await?;
|
|
|
+ tx.commit().await?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(result.rows_affected()),
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_delete_with_tx(
|
|
|
+ request_id: &str,
|
|
|
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ let where_clause = payload
|
|
|
+ .where_clause
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("WHERE clause is required for DELETE"))?;
|
|
|
+
|
|
|
+ // Enforce query limits from config (power-level specific with fallback to defaults)
|
|
|
+ let max_limit = state.config.get_max_limit(session.power);
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ // Enforce WHERE clause complexity
|
|
|
+ let condition_count = if let Some(w) = &payload.where_clause {
|
|
|
+ if let serde_json::Value::Object(map) = w {
|
|
|
+ map.len()
|
|
|
+ } else {
|
|
|
+ 0
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ 0
|
|
|
+ };
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ // Log security violation
|
|
|
+ let timestamp = chrono::Utc::now();
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!(
|
|
|
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
|
|
|
+ condition_count, max_where, session.power
|
|
|
+ ),
|
|
|
+ Some("query_limits"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to log WHERE limit violation: {}",
|
|
|
+ request_id, log_err
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE conditions ({}) > max {} [request_id: {}]",
|
|
|
+ condition_count, max_where, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let (where_sql, values) = build_where_clause(where_clause)?;
|
|
|
+
|
|
|
+ let mut query = format!("DELETE FROM {} WHERE {}", table, where_sql);
|
|
|
+
|
|
|
+ // Enforce LIMIT and track if it was capped
|
|
|
+ let requested_limit = payload.limit;
|
|
|
+ let limit = requested_limit.unwrap_or(max_limit);
|
|
|
+ let was_capped = limit > max_limit;
|
|
|
+ let limit = if limit > max_limit { max_limit } else { limit };
|
|
|
+ query.push_str(&format!(" LIMIT {}", limit));
|
|
|
+
|
|
|
+ let limit_warning = if was_capped {
|
|
|
+ Some(format!("Requested LIMIT {} exceeded maximum {} for your power level, capped to {} [request_id: {}]",
|
|
|
+ requested_limit.unwrap(), max_limit, max_limit, request_id))
|
|
|
+ } else if requested_limit.is_none() {
|
|
|
+ Some(format!(
|
|
|
+ "No LIMIT specified, using default {} based on power level [request_id: {}]",
|
|
|
+ max_limit, request_id
|
|
|
+ ))
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ };
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ let params_json = serde_json::to_value(&values).ok();
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ params_json.as_ref(),
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ sqlx_query = sqlx_query.bind(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.execute(&mut *tx).await?;
|
|
|
+ tx.commit().await?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(result.rows_affected()),
|
|
|
+ error: None,
|
|
|
+ warning: limit_warning,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+async fn execute_count_with_tx(
|
|
|
+ request_id: &str,
|
|
|
+ mut tx: sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ payload: &QueryRequest,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Table is required"))?;
|
|
|
+
|
|
|
+ // Check read permissions for the table
|
|
|
+ if !state.rbac.check_permission(
|
|
|
+ &state.config,
|
|
|
+ session.power,
|
|
|
+ table,
|
|
|
+ &crate::models::QueryAction::Select,
|
|
|
+ ) {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions to COUNT from table '{}' [request_id: {}]",
|
|
|
+ table, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Helper to count conditions in filter/where (same as select function)
|
|
|
+ fn count_conditions(
|
|
|
+ filter: &Option<crate::models::FilterCondition>,
|
|
|
+ where_clause: &Option<serde_json::Value>,
|
|
|
+ ) -> usize {
|
|
|
+ use crate::models::FilterCondition;
|
|
|
+ fn count_filter(cond: &FilterCondition) -> usize {
|
|
|
+ match cond {
|
|
|
+ FilterCondition::Simple { .. } => 1,
|
|
|
+ FilterCondition::Logical {
|
|
|
+ and_conditions,
|
|
|
+ or_conditions,
|
|
|
+ } => {
|
|
|
+ and_conditions
|
|
|
+ .as_ref()
|
|
|
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
|
|
|
+ + or_conditions
|
|
|
+ .as_ref()
|
|
|
+ .map_or(0, |conds| conds.iter().map(count_filter).sum())
|
|
|
+ }
|
|
|
+ FilterCondition::Not { not } => count_filter(not),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let mut count = 0;
|
|
|
+ if let Some(f) = filter {
|
|
|
+ count += count_filter(f);
|
|
|
+ }
|
|
|
+ if let Some(w) = where_clause {
|
|
|
+ if let serde_json::Value::Object(map) = w {
|
|
|
+ count += map.len();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ count
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enforce query limits from config (power-level specific with fallback to defaults)
|
|
|
+ let max_where = state.config.get_max_where_conditions(session.power);
|
|
|
+
|
|
|
+ // Enforce WHERE clause complexity
|
|
|
+ let condition_count = count_conditions(&payload.filter, &payload.where_clause);
|
|
|
+ if condition_count > max_where as usize {
|
|
|
+ // Log security violation
|
|
|
+ let timestamp = chrono::Utc::now();
|
|
|
+ if let Err(log_err) = state
|
|
|
+ .logging
|
|
|
+ .log_error(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &format!(
|
|
|
+ "Too many WHERE conditions: {} exceeds maximum {} for power level {}",
|
|
|
+ condition_count, max_where, session.power
|
|
|
+ ),
|
|
|
+ Some("query_limits"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to log WHERE limit violation: {}",
|
|
|
+ request_id, log_err
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Too many WHERE/filter conditions ({}) > max {} [request_id: {}]",
|
|
|
+ condition_count, max_where, request_id
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut query = format!("SELECT COUNT(*) as count FROM {}", table);
|
|
|
+ let mut values = Vec::new();
|
|
|
+
|
|
|
+ // Add JOIN clauses if provided - validates permissions for all joined tables
|
|
|
+ if let Some(joins) = &payload.joins {
|
|
|
+ // Validate user has read permission for all joined tables
|
|
|
+ for join in joins {
|
|
|
+ if !state.rbac.check_permission(
|
|
|
+ &state.config,
|
|
|
+ session.power,
|
|
|
+ &join.table,
|
|
|
+ &crate::models::QueryAction::Select,
|
|
|
+ ) {
|
|
|
+ return Ok(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions to JOIN with table '{}'",
|
|
|
+ join.table
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ let join_sql = crate::sql::build_join_clause(joins, &state.config)?;
|
|
|
+ query.push_str(&join_sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add WHERE conditions (filter takes precedence over where_clause if both are provided)
|
|
|
+ if let Some(filter) = &payload.filter {
|
|
|
+ let (where_sql, where_values) = crate::sql::build_filter_clause(filter)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ } else if let Some(where_clause) = &payload.where_clause {
|
|
|
+ let (where_sql, where_values) = build_where_clause(where_clause)?;
|
|
|
+ query.push_str(&format!(" WHERE {}", where_sql));
|
|
|
+ values.extend(where_values.into_iter().map(Some));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Log the query
|
|
|
+ let params_json = serde_json::to_value(&values).ok();
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_query(
|
|
|
+ request_id,
|
|
|
+ chrono::Utc::now(),
|
|
|
+ username,
|
|
|
+ Some(session.power),
|
|
|
+ &query,
|
|
|
+ params_json.as_ref(),
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log query: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ let mut sqlx_query = sqlx::query(&query);
|
|
|
+ for value in values {
|
|
|
+ match value {
|
|
|
+ Some(v) => sqlx_query = sqlx_query.bind(v),
|
|
|
+ None => sqlx_query = sqlx_query.bind(Option::<String>::None),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let result = sqlx_query.fetch_one(&mut *tx).await?;
|
|
|
+ tx.commit().await?;
|
|
|
+
|
|
|
+ let count: i64 = result.try_get("count")?;
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: Some(serde_json::json!(count)),
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+/// Execute multiple queries in a single transaction
|
|
|
+async fn execute_batch_mode(
|
|
|
+ state: AppState,
|
|
|
+ session: crate::models::Session,
|
|
|
+ request_id: String,
|
|
|
+ timestamp: chrono::DateTime<chrono::Utc>,
|
|
|
+ client_ip: String,
|
|
|
+ payload: &QueryRequest,
|
|
|
+) -> Result<Json<QueryResponse>, StatusCode> {
|
|
|
+ let queries = payload.queries.as_ref().unwrap();
|
|
|
+
|
|
|
+ // Check if batch is empty
|
|
|
+ if queries.is_empty() {
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some("Batch request cannot be empty".to_string()),
|
|
|
+ warning: None,
|
|
|
+ results: Some(vec![]),
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch query request from user {} (power {}): {} queries",
|
|
|
+ request_id,
|
|
|
+ session.username,
|
|
|
+ session.power,
|
|
|
+ queries.len()
|
|
|
+ );
|
|
|
+
|
|
|
+ // Log the batch request (log full payload including action/table)
|
|
|
+ if let Err(e) = state
|
|
|
+ .logging
|
|
|
+ .log_request(
|
|
|
+ &request_id,
|
|
|
+ timestamp,
|
|
|
+ &client_ip,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ "/query",
|
|
|
+ &serde_json::to_value(payload).unwrap_or_default(),
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ error!("[{}] Failed to log batch request: {}", request_id, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get action and table from parent request (all batch queries share these)
|
|
|
+ let action = payload
|
|
|
+ .action
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| StatusCode::BAD_REQUEST)?;
|
|
|
+
|
|
|
+ let table = payload
|
|
|
+ .table
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| StatusCode::BAD_REQUEST)?;
|
|
|
+
|
|
|
+ // Validate table name once for the entire batch
|
|
|
+ if let Err(e) = validate_table_name(table, &state.config) {
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!("Invalid table name: {}", e)),
|
|
|
+ warning: None,
|
|
|
+ results: Some(vec![]),
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check RBAC permission once for the entire batch
|
|
|
+ if !state
|
|
|
+ .rbac
|
|
|
+ .check_permission(&state.config, session.power, table, action)
|
|
|
+ {
|
|
|
+ return Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!(
|
|
|
+ "Insufficient permissions for {} on table '{}'",
|
|
|
+ match action {
|
|
|
+ QueryAction::Select => "SELECT",
|
|
|
+ QueryAction::Insert => "INSERT",
|
|
|
+ QueryAction::Update => "UPDATE",
|
|
|
+ QueryAction::Delete => "DELETE",
|
|
|
+ QueryAction::Count => "COUNT",
|
|
|
+ },
|
|
|
+ table
|
|
|
+ )),
|
|
|
+ warning: None,
|
|
|
+ results: Some(vec![]),
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Validated batch: {} x {:?} on table '{}'",
|
|
|
+ request_id,
|
|
|
+ queries.len(),
|
|
|
+ action,
|
|
|
+ table
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "Validated batch: {} x {:?} on table '{}'",
|
|
|
+ queries.len(),
|
|
|
+ action,
|
|
|
+ table
|
|
|
+ ),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ if !state.database.is_available() {
|
|
|
+ warn!(
|
|
|
+ "[{}] Database marked unavailable before batch execution",
|
|
|
+ request_id
|
|
|
+ );
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ "Database flagged unavailable before batch",
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start a SINGLE transaction for the batch - proper atomic operation
|
|
|
+ let mut tx = match state.database.pool().begin().await {
|
|
|
+ Ok(tx) => {
|
|
|
+ state.database.mark_available();
|
|
|
+ tx
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ state.database.mark_unavailable();
|
|
|
+ error!("[{}] Failed to begin batch transaction: {}", request_id, e);
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ &format!("Failed to begin batch transaction: {}", e),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Set user context and request ID in transaction
|
|
|
+ if let Err(e) = sqlx::query("SET @current_user_id = ?, @request_id = ?")
|
|
|
+ .bind(session.user_id)
|
|
|
+ .bind(&request_id)
|
|
|
+ .execute(&mut *tx)
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ state.database.mark_unavailable();
|
|
|
+ error!(
|
|
|
+ "[{}] Failed to set current user context and request ID: {}",
|
|
|
+ request_id, e
|
|
|
+ );
|
|
|
+ log_database_unavailable_event(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ &format!("Failed to set batch user context: {}", e),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ return Ok(Json(database_unavailable_batch_response(&request_id)));
|
|
|
+ }
|
|
|
+
|
|
|
+ let rollback_on_error = payload.rollback_on_error.unwrap_or(false);
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})",
|
|
|
+ request_id,
|
|
|
+ queries.len(),
|
|
|
+ action,
|
|
|
+ table,
|
|
|
+ rollback_on_error
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "Executing NATIVE batch: {} x {:?} on '{}' (rollback_on_error={})",
|
|
|
+ queries.len(),
|
|
|
+ action,
|
|
|
+ table,
|
|
|
+ rollback_on_error
|
|
|
+ ),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ // Execute as SINGLE native batch query based on action type
|
|
|
+ let result = match action {
|
|
|
+ QueryAction::Insert => {
|
|
|
+ execute_batch_insert(
|
|
|
+ &request_id,
|
|
|
+ &mut tx,
|
|
|
+ queries,
|
|
|
+ table,
|
|
|
+ action,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Update => {
|
|
|
+ execute_batch_update(
|
|
|
+ &request_id,
|
|
|
+ &mut tx,
|
|
|
+ queries,
|
|
|
+ table,
|
|
|
+ action,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Delete => {
|
|
|
+ execute_batch_delete(
|
|
|
+ &request_id,
|
|
|
+ &mut tx,
|
|
|
+ queries,
|
|
|
+ table,
|
|
|
+ action,
|
|
|
+ &session.username,
|
|
|
+ &state,
|
|
|
+ &session,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Select => {
|
|
|
+ // SELECT batches are less common but we'll execute them individually
|
|
|
+ // (combining SELECTs into one query doesn't make sense as they return different results)
|
|
|
+ execute_batch_selects(
|
|
|
+ &request_id,
|
|
|
+ &mut tx,
|
|
|
+ queries,
|
|
|
+ table,
|
|
|
+ action,
|
|
|
+ &session.username,
|
|
|
+ &session,
|
|
|
+ &state,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ QueryAction::Count => {
|
|
|
+ // COUNT batches execute individually (each returns different results)
|
|
|
+ execute_batch_counts(
|
|
|
+ &request_id,
|
|
|
+ &mut tx,
|
|
|
+ queries,
|
|
|
+ table,
|
|
|
+ action,
|
|
|
+ &session.username,
|
|
|
+ &session,
|
|
|
+ &state,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ match result {
|
|
|
+ Ok(response) => {
|
|
|
+ if response.success || !rollback_on_error {
|
|
|
+ // Commit the transaction
|
|
|
+ tx.commit().await.map_err(|e| {
|
|
|
+ error!("[{}] Failed to commit batch transaction: {}", request_id, e);
|
|
|
+ StatusCode::INTERNAL_SERVER_ERROR
|
|
|
+ })?;
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Native batch committed: {} operations",
|
|
|
+ request_id,
|
|
|
+ queries.len()
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Native batch committed: {} operations", queries.len()),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+ Ok(Json(response))
|
|
|
+ } else {
|
|
|
+ // Rollback on error
|
|
|
+ error!(
|
|
|
+ "[{}] Rolling back batch transaction due to error",
|
|
|
+ request_id
|
|
|
+ );
|
|
|
+ tx.rollback().await.map_err(|e| {
|
|
|
+ error!("[{}] Failed to rollback transaction: {}", request_id, e);
|
|
|
+ StatusCode::INTERNAL_SERVER_ERROR
|
|
|
+ })?;
|
|
|
+ Ok(Json(response))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ error!("[{}] Batch execution failed: {}", request_id, e);
|
|
|
+ tx.rollback().await.map_err(|e2| {
|
|
|
+ error!("[{}] Failed to rollback after error: {}", request_id, e2);
|
|
|
+ StatusCode::INTERNAL_SERVER_ERROR
|
|
|
+ })?;
|
|
|
+
|
|
|
+ Ok(Json(QueryResponse {
|
|
|
+ success: false,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: Some(format!("Batch execution failed: {}", e)),
|
|
|
+ warning: None,
|
|
|
+ results: Some(vec![]),
|
|
|
+ }))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ===== NATIVE BATCH EXECUTION FUNCTIONS =====
|
|
|
+
|
|
|
+/// Execute batch INSERT using MySQL multi-value INSERT
|
|
|
+async fn execute_batch_insert(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ queries: &Vec<crate::models::BatchQuery>,
|
|
|
+ table: &str,
|
|
|
+ _action: &QueryAction,
|
|
|
+ _username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ _session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ use serde_json::Value;
|
|
|
+
|
|
|
+ // Extract all data objects, apply auto-generation, and validate they have the same columns
|
|
|
+ let mut all_data = Vec::new();
|
|
|
+ let mut column_set: Option<std::collections::HashSet<String>> = None;
|
|
|
+
|
|
|
+ // Check for auto-generation config
|
|
|
+ let auto_config = state.config.get_auto_generation_config(&table);
|
|
|
+
|
|
|
+ for (idx, query) in queries.iter().enumerate() {
|
|
|
+ let mut data = query
|
|
|
+ .data
|
|
|
+ .as_ref()
|
|
|
+ .ok_or_else(|| anyhow::anyhow!("Query {} missing data field for INSERT", idx + 1))?
|
|
|
+ .clone();
|
|
|
+
|
|
|
+ // Apply auto-generation if configured for INSERT
|
|
|
+ if let Some(ref auto_cfg) = auto_config {
|
|
|
+ if auto_cfg.on_action == "insert" || auto_cfg.on_action == "both" {
|
|
|
+ if let Value::Object(ref mut map) = data {
|
|
|
+ let field_name = &auto_cfg.field;
|
|
|
+
|
|
|
+ if !map.contains_key(field_name)
|
|
|
+ || map.get(field_name).map_or(true, |v| {
|
|
|
+ v.is_null() || v.as_str().map_or(true, |s| s.is_empty())
|
|
|
+ })
|
|
|
+ {
|
|
|
+ // Generate auto value based on config
|
|
|
+ let generated_value = generate_auto_value(&state, &table, auto_cfg).await?;
|
|
|
+ map.insert(field_name.clone(), Value::String(generated_value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if let Value::Object(map) = data {
|
|
|
+ let cols: std::collections::HashSet<String> = map.keys().cloned().collect();
|
|
|
+
|
|
|
+ if let Some(ref expected_cols) = column_set {
|
|
|
+ if *expected_cols != cols {
|
|
|
+ anyhow::bail!("All INSERT queries must have the same columns. Query {} has different columns", idx + 1);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ column_set = Some(cols);
|
|
|
+ }
|
|
|
+
|
|
|
+ all_data.push(map);
|
|
|
+ } else {
|
|
|
+ anyhow::bail!("Query {} data must be an object", idx + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let columns: Vec<String> = column_set.unwrap().into_iter().collect();
|
|
|
+
|
|
|
+ // Validate column names
|
|
|
+ for col in &columns {
|
|
|
+ validate_column_name(col)?;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build multi-value INSERT: INSERT INTO table (col1, col2) VALUES (?, ?), (?, ?), ...
|
|
|
+ let column_list = columns.join(", ");
|
|
|
+ let value_placeholder = format!("({})", vec!["?"; columns.len()].join(", "));
|
|
|
+ let values_clause = vec![value_placeholder; all_data.len()].join(", ");
|
|
|
+
|
|
|
+ let sql = format!(
|
|
|
+ "INSERT INTO {} ({}) VALUES {}",
|
|
|
+ table, column_list, values_clause
|
|
|
+ );
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Native batch INSERT: {} rows into {}",
|
|
|
+ request_id,
|
|
|
+ all_data.len(),
|
|
|
+ table
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!(
|
|
|
+ "Native batch INSERT: {} rows into {}",
|
|
|
+ all_data.len(),
|
|
|
+ table
|
|
|
+ ),
|
|
|
+ Some("query"),
|
|
|
+ Some(&_session.username),
|
|
|
+ Some(_session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ // Bind all values
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for data_map in &all_data {
|
|
|
+ for col in &columns {
|
|
|
+ let value = data_map.get(col).and_then(|v| match v {
|
|
|
+ Value::String(s) => Some(s.clone()),
|
|
|
+ Value::Number(n) => Some(n.to_string()),
|
|
|
+ Value::Bool(b) => Some(b.to_string()),
|
|
|
+ Value::Null => None,
|
|
|
+ _ => Some(v.to_string()),
|
|
|
+ });
|
|
|
+ query = query.bind(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the batch INSERT
|
|
|
+ let result = query.execute(&mut **tx).await?;
|
|
|
+ let rows_affected = result.rows_affected();
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch INSERT affected {} rows",
|
|
|
+ request_id, rows_affected
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Batch INSERT affected {} rows", rows_affected),
|
|
|
+ Some("query"),
|
|
|
+ Some(&_session.username),
|
|
|
+ Some(_session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(rows_affected),
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+/// Execute batch UPDATE (executes individually for now, could be optimized with CASE statements)
|
|
|
+async fn execute_batch_update(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ queries: &Vec<crate::models::BatchQuery>,
|
|
|
+ table: &str,
|
|
|
+ action: &QueryAction,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let mut total_rows = 0u64;
|
|
|
+
|
|
|
+ for (idx, batch_query) in queries.iter().enumerate() {
|
|
|
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
|
|
|
+ let query_req = QueryRequest {
|
|
|
+ action: Some(action.clone()),
|
|
|
+ table: Some(table.to_string()),
|
|
|
+ columns: batch_query.columns.clone(),
|
|
|
+ data: batch_query.data.clone(),
|
|
|
+ where_clause: batch_query.where_clause.clone(),
|
|
|
+ filter: batch_query.filter.clone(),
|
|
|
+ joins: None,
|
|
|
+ limit: batch_query.limit,
|
|
|
+ offset: batch_query.offset,
|
|
|
+ order_by: batch_query.order_by.clone(),
|
|
|
+ queries: None,
|
|
|
+ rollback_on_error: None,
|
|
|
+ };
|
|
|
+
|
|
|
+ let result = execute_update_core(
|
|
|
+ &format!("{}-{}", request_id, idx + 1),
|
|
|
+ tx,
|
|
|
+ &query_req,
|
|
|
+ username,
|
|
|
+ state,
|
|
|
+ session,
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ if let Some(rows) = result.rows_affected {
|
|
|
+ total_rows += rows;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch UPDATE affected {} total rows",
|
|
|
+ request_id, total_rows
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Batch UPDATE affected {} total rows", total_rows),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(total_rows),
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+/// Execute batch DELETE using IN clause when possible
|
|
|
+async fn execute_batch_delete(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ queries: &Vec<crate::models::BatchQuery>,
|
|
|
+ table: &str,
|
|
|
+ action: &QueryAction,
|
|
|
+ username: &str,
|
|
|
+ state: &AppState,
|
|
|
+ session: &crate::models::Session,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ // For now, execute individually
|
|
|
+ // TODO: Optimize by detecting simple ID-based deletes and combining with IN clause
|
|
|
+ let mut total_rows = 0u64;
|
|
|
+
|
|
|
+ for (idx, batch_query) in queries.iter().enumerate() {
|
|
|
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
|
|
|
+ let query_req = QueryRequest {
|
|
|
+ action: Some(action.clone()),
|
|
|
+ table: Some(table.to_string()),
|
|
|
+ columns: batch_query.columns.clone(),
|
|
|
+ data: batch_query.data.clone(),
|
|
|
+ where_clause: batch_query.where_clause.clone(),
|
|
|
+ filter: batch_query.filter.clone(),
|
|
|
+ joins: None,
|
|
|
+ limit: batch_query.limit,
|
|
|
+ offset: batch_query.offset,
|
|
|
+ order_by: batch_query.order_by.clone(),
|
|
|
+ queries: None,
|
|
|
+ rollback_on_error: None,
|
|
|
+ };
|
|
|
+
|
|
|
+ let result = execute_delete_core(
|
|
|
+ &format!("{}-{}", request_id, idx + 1),
|
|
|
+ tx,
|
|
|
+ &query_req,
|
|
|
+ username,
|
|
|
+ state,
|
|
|
+ session,
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ if let Some(rows) = result.rows_affected {
|
|
|
+ total_rows += rows;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch DELETE affected {} total rows",
|
|
|
+ request_id, total_rows
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Batch DELETE affected {} total rows", total_rows),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: Some(total_rows),
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: None,
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+/// Execute batch SELECT (executes individually since they return different results)
|
|
|
+async fn execute_batch_selects(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ queries: &Vec<crate::models::BatchQuery>,
|
|
|
+ table: &str,
|
|
|
+ action: &QueryAction,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let mut results = Vec::new();
|
|
|
+
|
|
|
+ for (idx, batch_query) in queries.iter().enumerate() {
|
|
|
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
|
|
|
+ let query_req = QueryRequest {
|
|
|
+ action: Some(action.clone()),
|
|
|
+ table: Some(table.to_string()),
|
|
|
+ columns: batch_query.columns.clone(),
|
|
|
+ data: batch_query.data.clone(),
|
|
|
+ where_clause: batch_query.where_clause.clone(),
|
|
|
+ filter: batch_query.filter.clone(),
|
|
|
+ joins: None,
|
|
|
+ limit: batch_query.limit,
|
|
|
+ offset: batch_query.offset,
|
|
|
+ order_by: batch_query.order_by.clone(),
|
|
|
+ queries: None,
|
|
|
+ rollback_on_error: None,
|
|
|
+ };
|
|
|
+
|
|
|
+ let result = execute_select_core(
|
|
|
+ &format!("{}-{}", request_id, idx + 1),
|
|
|
+ tx,
|
|
|
+ &query_req,
|
|
|
+ username,
|
|
|
+ session,
|
|
|
+ state,
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ results.push(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch SELECT executed {} queries",
|
|
|
+ request_id,
|
|
|
+ results.len()
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Batch SELECT executed {} queries", results.len()),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: Some(results),
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+/// Execute batch COUNT (executes individually since they return different results)
|
|
|
+async fn execute_batch_counts(
|
|
|
+ request_id: &str,
|
|
|
+ tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
|
|
|
+ queries: &Vec<crate::models::BatchQuery>,
|
|
|
+ table: &str,
|
|
|
+ action: &QueryAction,
|
|
|
+ username: &str,
|
|
|
+ session: &crate::models::Session,
|
|
|
+ state: &AppState,
|
|
|
+) -> anyhow::Result<QueryResponse> {
|
|
|
+ let mut results = Vec::new();
|
|
|
+
|
|
|
+ for (idx, batch_query) in queries.iter().enumerate() {
|
|
|
+ // Convert BatchQuery to QueryRequest by adding inherited action/table
|
|
|
+ let query_req = QueryRequest {
|
|
|
+ action: Some(action.clone()),
|
|
|
+ table: Some(table.to_string()),
|
|
|
+ columns: batch_query.columns.clone(),
|
|
|
+ data: batch_query.data.clone(),
|
|
|
+ where_clause: batch_query.where_clause.clone(),
|
|
|
+ filter: batch_query.filter.clone(),
|
|
|
+ joins: None,
|
|
|
+ limit: batch_query.limit,
|
|
|
+ offset: batch_query.offset,
|
|
|
+ order_by: batch_query.order_by.clone(),
|
|
|
+ queries: None,
|
|
|
+ rollback_on_error: None,
|
|
|
+ };
|
|
|
+
|
|
|
+ let result = execute_count_core(
|
|
|
+ &format!("{}:{}", request_id, idx),
|
|
|
+ tx,
|
|
|
+ &query_req,
|
|
|
+ username,
|
|
|
+ session,
|
|
|
+ state,
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ results.push(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(
|
|
|
+ "[{}] Batch COUNT executed {} queries",
|
|
|
+ request_id,
|
|
|
+ results.len()
|
|
|
+ );
|
|
|
+ super::log_info_async(
|
|
|
+ &state.logging,
|
|
|
+ &request_id,
|
|
|
+ &format!("Batch COUNT executed {} queries", results.len()),
|
|
|
+ Some("query"),
|
|
|
+ Some(&session.username),
|
|
|
+ Some(session.power),
|
|
|
+ );
|
|
|
+
|
|
|
+ Ok(QueryResponse {
|
|
|
+ success: true,
|
|
|
+ data: None,
|
|
|
+ rows_affected: None,
|
|
|
+ error: None,
|
|
|
+ warning: None,
|
|
|
+ results: Some(results),
|
|
|
+ })
|
|
|
+}
|