Skip to content

Executor Crate

The faber-executor crate provides the task execution logic for Faber, coordinating between the API layer and the sandbox system. It manages task scheduling, execution, monitoring, and result collection.

The executor crate is responsible for:

  • Task Scheduling: Managing task execution queue and priorities
  • Execution Coordination: Coordinating between API and sandbox
  • Resource Management: Monitoring and enforcing resource limits
  • Result Collection: Gathering execution results and statistics
  • Error Handling: Managing execution errors and timeouts
  • Concurrency Control: Managing multiple concurrent executions

The executor crate follows a service-oriented architecture:

executor/
├── lib.rs # Main library entry point
├── task_executor.rs # Core task execution logic
├── scheduler.rs # Task scheduling and queue management
├── monitor.rs # Resource monitoring and limits
├── result_collector.rs # Result gathering and processing
└── error.rs # Executor-specific error handling

The main TaskExecutor struct manages task execution:

pub struct TaskExecutor {
pub config: ExecutorConfig,
pub sandbox: Arc<SandboxManager>,
pub scheduler: Arc<Scheduler>,
pub monitor: Arc<ResourceMonitor>,
pub result_collector: Arc<ResultCollector>,
}

Configuration for the executor:

pub struct ExecutorConfig {
pub max_concurrent_tasks: usize,
pub default_timeout: Duration,
pub default_memory_limit: u64,
pub default_cpu_limit: u64,
pub enable_monitoring: bool,
pub result_cache_size: usize,
}
pub async fn execute_tasks(
&self,
tasks: Vec<Task>,
) -> Result<Vec<TaskResult>, ExecutorError> {
// Validate tasks
let validated_tasks = self.validate_tasks(tasks)?;
// Submit to scheduler
let execution_ids = self.scheduler.submit_tasks(validated_tasks).await?;
// Wait for completion
let results = self.wait_for_completion(execution_ids).await?;
Ok(results)
}
fn validate_tasks(&self, tasks: Vec<Task>) -> Result<Vec<ValidatedTask>, ExecutorError> {
let mut validated_tasks = Vec::new();
for task in tasks {
// Validate command
if task.command.is_empty() {
return Err(ExecutorError::Validation("Command cannot be empty".into()));
}
// Check for dangerous commands
if self.is_dangerous_command(&task.command) {
return Err(ExecutorError::Security("Dangerous command not allowed".into()));
}
// Apply default limits
let resource_limits = self.apply_default_limits(&task);
let validated_task = ValidatedTask {
task,
resource_limits,
execution_id: generate_execution_id(),
};
validated_tasks.push(validated_task);
}
Ok(validated_tasks)
}
pub async fn submit_tasks(
&self,
tasks: Vec<ValidatedTask>,
) -> Result<Vec<String>, ExecutorError> {
let mut execution_ids = Vec::new();
for task in tasks {
// Check resource availability
if !self.monitor.can_accept_task(&task.resource_limits) {
return Err(ExecutorError::Resource("Insufficient resources".into()));
}
// Add to execution queue
let execution_id = self.queue.add_task(task).await?;
execution_ids.push(execution_id);
}
Ok(execution_ids)
}
async fn execute_single_task(
&self,
task: ValidatedTask,
) -> Result<TaskResult, ExecutorError> {
let start_time = Instant::now();
// Create sandbox container
let container = self.sandbox.create_container(&task.resource_limits).await?;
// Start resource monitoring
let monitor_handle = self.monitor.start_monitoring(
container.id.clone(),
&task.resource_limits,
).await?;
// Execute in sandbox
let execution_result = self.sandbox.execute_in_container(
&container,
&task.task.command,
task.task.args.as_deref().unwrap_or(&[]),
&task.task.env.as_deref().unwrap_or(&[]),
).await?;
// Stop monitoring
let resource_usage = self.monitor.stop_monitoring(monitor_handle).await?;
// Create task result
let task_result = TaskResult {
status: self.determine_status(&execution_result, &resource_usage),
error: execution_result.error,
exit_code: Some(execution_result.exit_code),
stdout: execution_result.stdout,
stderr: execution_result.stderr,
resource_usage,
resource_limits_exceeded: self.check_limits_exceeded(&resource_usage, &task.resource_limits),
};
// Clean up container
self.sandbox.cleanup_container(container).await?;
Ok(task_result)
}

The ResourceMonitor tracks resource usage:

pub struct ResourceMonitor {
pub active_monitors: Arc<RwLock<HashMap<String, MonitorHandle>>>,
pub resource_limits: Arc<RwLock<ResourceLimits>>,
pub usage_history: Arc<RwLock<VecDeque<ResourceUsage>>>,
}
pub async fn start_monitoring(
&self,
container_id: String,
limits: &ResourceLimits,
) -> Result<MonitorHandle, ExecutorError> {
let handle = MonitorHandle {
container_id: container_id.clone(),
start_time: Instant::now(),
limits: limits.clone(),
};
// Start background monitoring task
let monitor_task = {
let container_id = container_id.clone();
let usage_history = self.usage_history.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
if let Ok(usage) = collect_container_usage(&container_id).await {
let mut history = usage_history.write().await;
history.push_back(usage);
// Keep only last 1000 entries
if history.len() > 1000 {
history.pop_front();
}
}
}
})
};
// Store monitor handle
{
let mut monitors = self.active_monitors.write().await;
monitors.insert(container_id, handle.clone());
}
Ok(handle)
}

The ResultCollector manages execution results:

pub struct ResultCollector {
pub results: Arc<RwLock<HashMap<String, TaskResult>>>,
pub cache: Arc<RwLock<LruCache<String, TaskResult>>>,
pub statistics: Arc<RwLock<ExecutionStatistics>>,
}
pub async fn collect_result(
&self,
execution_id: String,
result: TaskResult,
) -> Result<(), ExecutorError> {
// Store result
{
let mut results = self.results.write().await;
results.insert(execution_id.clone(), result.clone());
}
// Update cache
{
let mut cache = self.cache.write().await;
cache.put(execution_id.clone(), result.clone());
}
// Update statistics
{
let mut stats = self.statistics.write().await;
stats.update_with_result(&result);
}
Ok(())
}

The executor provides comprehensive error handling:

#[derive(Debug, thiserror::Error)]
pub enum ExecutorError {
#[error("Validation error: {0}")]
Validation(String),
#[error("Security error: {0}")]
Security(String),
#[error("Resource error: {0}")]
Resource(String),
#[error("Execution error: {0}")]
Execution(String),
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Sandbox error: {0}")]
Sandbox(String),
#[error("Monitoring error: {0}")]
Monitoring(String),
#[error("Configuration error: {0}")]
Configuration(String),
}
pub struct Scheduler {
pub queue: Arc<RwLock<VecDeque<ValidatedTask>>>,
pub active_tasks: Arc<RwLock<HashMap<String, TaskHandle>>>,
pub max_concurrent: usize,
pub worker_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
}
pub async fn start_workers(&self) -> Result<(), ExecutorError> {
let num_workers = self.max_concurrent;
for worker_id in 0..num_workers {
let scheduler = self.clone();
let worker_handle = tokio::spawn(async move {
scheduler.worker_loop(worker_id).await;
});
self.worker_handles.write().await.push(worker_handle);
}
Ok(())
}
async fn worker_loop(&self, worker_id: usize) {
loop {
// Get next task from queue
let task = {
let mut queue = self.queue.write().await;
queue.pop_front()
};
if let Some(task) = task {
// Execute task
let result = self.execute_single_task(task.clone()).await;
// Store result
if let Ok(task_result) = result {
self.result_collector.collect_result(
task.execution_id.clone(),
task_result,
).await.ok();
}
// Remove from active tasks
{
let mut active_tasks = self.active_tasks.write().await;
active_tasks.remove(&task.execution_id);
}
} else {
// No tasks available, wait a bit
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
pub struct ExecutorConfig {
pub max_concurrent_tasks: usize,
pub default_timeout: Duration,
pub default_memory_limit: u64,
pub default_cpu_limit: u64,
pub enable_monitoring: bool,
pub result_cache_size: usize,
pub worker_pool_size: usize,
pub queue_size: usize,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 10,
default_timeout: Duration::from_secs(30),
default_memory_limit: 536870912, // 512MB
default_cpu_limit: 30000000000, // 30 seconds
enable_monitoring: true,
result_cache_size: 1000,
worker_pool_size: 4,
queue_size: 100,
}
}
}
use faber_executor::{TaskExecutor, ExecutorConfig};
async fn create_executor() -> Result<TaskExecutor, ExecutorError> {
let config = ExecutorConfig {
max_concurrent_tasks: 20,
default_timeout: Duration::from_secs(60),
default_memory_limit: 1073741824, // 1GB
default_cpu_limit: 60000000000, // 60 seconds
enable_monitoring: true,
result_cache_size: 2000,
worker_pool_size: 8,
queue_size: 200,
};
TaskExecutor::new(config).await
}
use faber_core::{Task, TaskResult};
async fn execute_tasks(executor: &TaskExecutor) -> Result<Vec<TaskResult>, ExecutorError> {
let tasks = vec![
Task {
command: "echo".to_string(),
args: Some(vec!["hello".to_string(), "world".to_string()]),
env: None,
files: None,
},
Task {
command: "python".to_string(),
args: Some(vec!["-c".to_string(), "print('Hello from Python')".to_string()]),
env: Some(HashMap::from([
("PYTHONPATH".to_string(), "/tmp".to_string()),
])),
files: None,
},
];
executor.execute_tasks(tasks).await
}

The executor crate includes comprehensive tests:

#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_task_execution() {
let executor = create_test_executor().await.unwrap();
let tasks = vec![
Task {
command: "echo".to_string(),
args: Some(vec!["test".to_string()]),
env: None,
files: None,
},
];
let results = executor.execute_tasks(tasks).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].status, TaskStatus::Success);
}
#[tokio::test]
async fn test_resource_limits() {
let executor = create_test_executor().await.unwrap();
let tasks = vec![
Task {
command: "python".to_string(),
args: Some(vec!["-c".to_string(), "import time; time.sleep(10)".to_string()]),
env: None,
files: None,
},
];
let results = executor.execute_tasks(tasks).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].status, TaskStatus::Timeout);
}
}

The executor crate uses the following dependencies:

[dependencies]
tokio = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
uuid = { workspace = true }
lru = "0.12"
faber-core = { path = "../core" }
faber-container = { path = "../sandbox" }
faber-config = { path = "../config" }
  1. Use appropriate timeouts: Set reasonable timeouts for all tasks
  2. Monitor resource usage: Track and limit resource consumption
  3. Handle errors gracefully: Provide clear error messages
  4. Use worker pools: Scale execution with worker pools
  5. Cache results: Cache frequently requested results
  6. Validate inputs: Always validate task parameters
  7. Monitor performance: Track execution statistics
  8. Clean up resources: Properly clean up after execution