Executor Crate
Executor Crate
Section titled “Executor Crate”The faber-executor
crate provides the task execution logic for Faber, coordinating between the API layer and the sandbox system. It manages task scheduling, execution, monitoring, and result collection.
Overview
Section titled “Overview”The executor crate is responsible for:
- Task Scheduling: Managing task execution queue and priorities
- Execution Coordination: Coordinating between API and sandbox
- Resource Management: Monitoring and enforcing resource limits
- Result Collection: Gathering execution results and statistics
- Error Handling: Managing execution errors and timeouts
- Concurrency Control: Managing multiple concurrent executions
Architecture
Section titled “Architecture”The executor crate follows a service-oriented architecture:
executor/├── lib.rs # Main library entry point├── task_executor.rs # Core task execution logic├── scheduler.rs # Task scheduling and queue management├── monitor.rs # Resource monitoring and limits├── result_collector.rs # Result gathering and processing└── error.rs # Executor-specific error handling
Key Components
Section titled “Key Components”Task Executor
Section titled “Task Executor”The main TaskExecutor
struct manages task execution:
pub struct TaskExecutor { pub config: ExecutorConfig, pub sandbox: Arc<SandboxManager>, pub scheduler: Arc<Scheduler>, pub monitor: Arc<ResourceMonitor>, pub result_collector: Arc<ResultCollector>,}
Executor Configuration
Section titled “Executor Configuration”Configuration for the executor:
pub struct ExecutorConfig { pub max_concurrent_tasks: usize, pub default_timeout: Duration, pub default_memory_limit: u64, pub default_cpu_limit: u64, pub enable_monitoring: bool, pub result_cache_size: usize,}
Task Execution Flow
Section titled “Task Execution Flow”1. Task Submission
Section titled “1. Task Submission”pub async fn execute_tasks( &self, tasks: Vec<Task>,) -> Result<Vec<TaskResult>, ExecutorError> { // Validate tasks let validated_tasks = self.validate_tasks(tasks)?;
// Submit to scheduler let execution_ids = self.scheduler.submit_tasks(validated_tasks).await?;
// Wait for completion let results = self.wait_for_completion(execution_ids).await?;
Ok(results)}
2. Task Validation
Section titled “2. Task Validation”fn validate_tasks(&self, tasks: Vec<Task>) -> Result<Vec<ValidatedTask>, ExecutorError> { let mut validated_tasks = Vec::new();
for task in tasks { // Validate command if task.command.is_empty() { return Err(ExecutorError::Validation("Command cannot be empty".into())); }
// Check for dangerous commands if self.is_dangerous_command(&task.command) { return Err(ExecutorError::Security("Dangerous command not allowed".into())); }
// Apply default limits let resource_limits = self.apply_default_limits(&task);
let validated_task = ValidatedTask { task, resource_limits, execution_id: generate_execution_id(), };
validated_tasks.push(validated_task); }
Ok(validated_tasks)}
3. Task Scheduling
Section titled “3. Task Scheduling”pub async fn submit_tasks( &self, tasks: Vec<ValidatedTask>,) -> Result<Vec<String>, ExecutorError> { let mut execution_ids = Vec::new();
for task in tasks { // Check resource availability if !self.monitor.can_accept_task(&task.resource_limits) { return Err(ExecutorError::Resource("Insufficient resources".into())); }
// Add to execution queue let execution_id = self.queue.add_task(task).await?; execution_ids.push(execution_id); }
Ok(execution_ids)}
4. Task Execution
Section titled “4. Task Execution”async fn execute_single_task( &self, task: ValidatedTask,) -> Result<TaskResult, ExecutorError> { let start_time = Instant::now();
// Create sandbox container let container = self.sandbox.create_container(&task.resource_limits).await?;
// Start resource monitoring let monitor_handle = self.monitor.start_monitoring( container.id.clone(), &task.resource_limits, ).await?;
// Execute in sandbox let execution_result = self.sandbox.execute_in_container( &container, &task.task.command, task.task.args.as_deref().unwrap_or(&[]), &task.task.env.as_deref().unwrap_or(&[]), ).await?;
// Stop monitoring let resource_usage = self.monitor.stop_monitoring(monitor_handle).await?;
// Create task result let task_result = TaskResult { status: self.determine_status(&execution_result, &resource_usage), error: execution_result.error, exit_code: Some(execution_result.exit_code), stdout: execution_result.stdout, stderr: execution_result.stderr, resource_usage, resource_limits_exceeded: self.check_limits_exceeded(&resource_usage, &task.resource_limits), };
// Clean up container self.sandbox.cleanup_container(container).await?;
Ok(task_result)}
Resource Monitoring
Section titled “Resource Monitoring”Resource Monitor
Section titled “Resource Monitor”The ResourceMonitor
tracks resource usage:
pub struct ResourceMonitor { pub active_monitors: Arc<RwLock<HashMap<String, MonitorHandle>>>, pub resource_limits: Arc<RwLock<ResourceLimits>>, pub usage_history: Arc<RwLock<VecDeque<ResourceUsage>>>,}
Monitoring Implementation
Section titled “Monitoring Implementation”pub async fn start_monitoring( &self, container_id: String, limits: &ResourceLimits,) -> Result<MonitorHandle, ExecutorError> { let handle = MonitorHandle { container_id: container_id.clone(), start_time: Instant::now(), limits: limits.clone(), };
// Start background monitoring task let monitor_task = { let container_id = container_id.clone(); let usage_history = self.usage_history.clone();
tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(100));
loop { interval.tick().await;
if let Ok(usage) = collect_container_usage(&container_id).await { let mut history = usage_history.write().await; history.push_back(usage);
// Keep only last 1000 entries if history.len() > 1000 { history.pop_front(); } } } }) };
// Store monitor handle { let mut monitors = self.active_monitors.write().await; monitors.insert(container_id, handle.clone()); }
Ok(handle)}
Result Collection
Section titled “Result Collection”Result Collector
Section titled “Result Collector”The ResultCollector
manages execution results:
pub struct ResultCollector { pub results: Arc<RwLock<HashMap<String, TaskResult>>>, pub cache: Arc<RwLock<LruCache<String, TaskResult>>>, pub statistics: Arc<RwLock<ExecutionStatistics>>,}
Result Processing
Section titled “Result Processing”pub async fn collect_result( &self, execution_id: String, result: TaskResult,) -> Result<(), ExecutorError> { // Store result { let mut results = self.results.write().await; results.insert(execution_id.clone(), result.clone()); }
// Update cache { let mut cache = self.cache.write().await; cache.put(execution_id.clone(), result.clone()); }
// Update statistics { let mut stats = self.statistics.write().await; stats.update_with_result(&result); }
Ok(())}
Error Handling
Section titled “Error Handling”The executor provides comprehensive error handling:
#[derive(Debug, thiserror::Error)]pub enum ExecutorError { #[error("Validation error: {0}")] Validation(String),
#[error("Security error: {0}")] Security(String),
#[error("Resource error: {0}")] Resource(String),
#[error("Execution error: {0}")] Execution(String),
#[error("Timeout error: {0}")] Timeout(String),
#[error("Sandbox error: {0}")] Sandbox(String),
#[error("Monitoring error: {0}")] Monitoring(String),
#[error("Configuration error: {0}")] Configuration(String),}
Task Scheduling
Section titled “Task Scheduling”Scheduler Implementation
Section titled “Scheduler Implementation”pub struct Scheduler { pub queue: Arc<RwLock<VecDeque<ValidatedTask>>>, pub active_tasks: Arc<RwLock<HashMap<String, TaskHandle>>>, pub max_concurrent: usize, pub worker_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,}
Worker Pool
Section titled “Worker Pool”pub async fn start_workers(&self) -> Result<(), ExecutorError> { let num_workers = self.max_concurrent;
for worker_id in 0..num_workers { let scheduler = self.clone(); let worker_handle = tokio::spawn(async move { scheduler.worker_loop(worker_id).await; });
self.worker_handles.write().await.push(worker_handle); }
Ok(())}
async fn worker_loop(&self, worker_id: usize) { loop { // Get next task from queue let task = { let mut queue = self.queue.write().await; queue.pop_front() };
if let Some(task) = task { // Execute task let result = self.execute_single_task(task.clone()).await;
// Store result if let Ok(task_result) = result { self.result_collector.collect_result( task.execution_id.clone(), task_result, ).await.ok(); }
// Remove from active tasks { let mut active_tasks = self.active_tasks.write().await; active_tasks.remove(&task.execution_id); } } else { // No tasks available, wait a bit tokio::time::sleep(Duration::from_millis(10)).await; } }}
Configuration
Section titled “Configuration”Executor Configuration
Section titled “Executor Configuration”pub struct ExecutorConfig { pub max_concurrent_tasks: usize, pub default_timeout: Duration, pub default_memory_limit: u64, pub default_cpu_limit: u64, pub enable_monitoring: bool, pub result_cache_size: usize, pub worker_pool_size: usize, pub queue_size: usize,}
impl Default for ExecutorConfig { fn default() -> Self { Self { max_concurrent_tasks: 10, default_timeout: Duration::from_secs(30), default_memory_limit: 536870912, // 512MB default_cpu_limit: 30000000000, // 30 seconds enable_monitoring: true, result_cache_size: 1000, worker_pool_size: 4, queue_size: 100, } }}
Usage Examples
Section titled “Usage Examples”Creating an Executor
Section titled “Creating an Executor”use faber_executor::{TaskExecutor, ExecutorConfig};
async fn create_executor() -> Result<TaskExecutor, ExecutorError> { let config = ExecutorConfig { max_concurrent_tasks: 20, default_timeout: Duration::from_secs(60), default_memory_limit: 1073741824, // 1GB default_cpu_limit: 60000000000, // 60 seconds enable_monitoring: true, result_cache_size: 2000, worker_pool_size: 8, queue_size: 200, };
TaskExecutor::new(config).await}
Executing Tasks
Section titled “Executing Tasks”use faber_core::{Task, TaskResult};
async fn execute_tasks(executor: &TaskExecutor) -> Result<Vec<TaskResult>, ExecutorError> { let tasks = vec![ Task { command: "echo".to_string(), args: Some(vec!["hello".to_string(), "world".to_string()]), env: None, files: None, }, Task { command: "python".to_string(), args: Some(vec!["-c".to_string(), "print('Hello from Python')".to_string()]), env: Some(HashMap::from([ ("PYTHONPATH".to_string(), "/tmp".to_string()), ])), files: None, }, ];
executor.execute_tasks(tasks).await}
Testing
Section titled “Testing”The executor crate includes comprehensive tests:
#[cfg(test)]mod tests { use super::*;
#[tokio::test] async fn test_task_execution() { let executor = create_test_executor().await.unwrap();
let tasks = vec![ Task { command: "echo".to_string(), args: Some(vec!["test".to_string()]), env: None, files: None, }, ];
let results = executor.execute_tasks(tasks).await.unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].status, TaskStatus::Success); }
#[tokio::test] async fn test_resource_limits() { let executor = create_test_executor().await.unwrap();
let tasks = vec![ Task { command: "python".to_string(), args: Some(vec!["-c".to_string(), "import time; time.sleep(10)".to_string()]), env: None, files: None, }, ];
let results = executor.execute_tasks(tasks).await.unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].status, TaskStatus::Timeout); }}
Dependencies
Section titled “Dependencies”The executor crate uses the following dependencies:
[dependencies]tokio = { workspace = true }tracing = { workspace = true }thiserror = { workspace = true }serde = { workspace = true }uuid = { workspace = true }lru = "0.12"faber-core = { path = "../core" }faber-container = { path = "../sandbox" }faber-config = { path = "../config" }
Best Practices
Section titled “Best Practices”- Use appropriate timeouts: Set reasonable timeouts for all tasks
- Monitor resource usage: Track and limit resource consumption
- Handle errors gracefully: Provide clear error messages
- Use worker pools: Scale execution with worker pools
- Cache results: Cache frequently requested results
- Validate inputs: Always validate task parameters
- Monitor performance: Track execution statistics
- Clean up resources: Properly clean up after execution