Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 165 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "worker"
version = "0.1.0"
edition = "2021"
edition = "2024"

[[bin]]
name = "mock"
Expand All @@ -15,9 +15,10 @@ clap = { version = "4.0.27", features = ["derive", "string"] }
libc = "0.2.153"
serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
sysinfo = "0.34.2"
toml = "0.8.12"
sysinfo = "0.38.4"
toml = "1.1.2"
itertools = "0.14.0"
avt = "0.17.0"

[dev-dependencies]
assert_cmd = "2.0"
Expand Down
31 changes: 22 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{fs::File, path::PathBuf, str::FromStr};
use std::{
collections::hash_map::DefaultHasher,
fs::File,
hash::{Hash, Hasher},
path::PathBuf,
str::FromStr,
};

use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use itertools::{Either, Itertools};
use serde::Deserialize;

use crate::{
project::{Project, RunningProject, WorkerProject},
ActionArg, ActionArgRunning,
project::{Project, RunningProject, WorkerProject},
};

const CONFIG_FILE: &str = ".worker.toml";
Expand Down Expand Up @@ -93,7 +99,7 @@ impl FromStr for ActionArgRunning {
pub struct WorkerConfig {
pub projects: Vec<Project>,
state_dir: PathBuf,
log_dir: PathBuf,
sock_dir: PathBuf,
}

impl WorkerConfig {
Expand All @@ -102,23 +108,30 @@ impl WorkerConfig {
let config_string = std::fs::read_to_string(base_dir.join(CONFIG_FILE))?;

let state_dir = base_dir.join(".worker/state");
let log_dir = base_dir.join(".worker/log");
let sock_dir = base_dir.join(".worker/sock");

std::fs::create_dir_all(&state_dir)?;
std::fs::create_dir_all(&log_dir)?;
std::fs::create_dir_all(&sock_dir)?;

// Deserialize the TOML string into the Config struct
let config: Config = toml::from_str(&config_string)?;

Ok(Self {
projects: config.project,
state_dir,
log_dir,
sock_dir,
})
}

pub fn log_file<T: WorkerProject>(&self, project: &T) -> PathBuf {
self.log_dir.join(project.name())
pub fn sock_file<T: WorkerProject>(&self, project: &T) -> PathBuf {
// Unix socket paths are limited to ~104 chars on macOS.
// Use a hash of the full sock_dir + project name to create a short
// but unique path under /tmp.
let full_path = self.sock_dir.join(project.name());
let mut hasher = DefaultHasher::new();
full_path.hash(&mut hasher);
let hash = hasher.finish();
PathBuf::from(format!("/tmp/worker-{:016x}.sock", hash))
}

pub fn get_state(&self, name: &str) -> Result<Option<RunningProject>, anyhow::Error> {
Expand Down
79 changes: 71 additions & 8 deletions src/libc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::os::fd::{AsRawFd, RawFd};

use serde::{Deserialize, Serialize};
use sysinfo::System;

pub enum Fork {
Parent(libc::pid_t),
Expand Down Expand Up @@ -33,20 +34,82 @@ pub fn waitpid(pid: i32) -> Result<libc::pid_t, i32> {
}
}

/// Returns true if the child has exited.
pub fn has_child_exited(pid: i32) -> bool {
let mut status: i32 = 0;
unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) > 0 }
}

pub fn stop_pg(sid: i32, signal: &Signal) -> Result<(), i32> {
match unsafe { libc::killpg(sid, signal.to_owned() as i32) } {
0 => Ok(()),
e => Err(e),
}
}

pub fn has_processes_running(sid: libc::pid_t) -> bool {
let mut sys = System::new();
sys.refresh_all();
sys.processes().iter().any(|(_, p)| {
p.session_id()
.is_some_and(|session_id| session_id.as_u32() == sid as u32)
})
pub fn dup2(src: i32, dst: i32) -> i32 {
unsafe { libc::dup2(src, dst) }
}

pub fn signal(signum: i32, handler: usize) -> usize {
unsafe { libc::signal(signum, handler) }
}

pub fn set_nonblocking(fd: RawFd) {
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL);
libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
}

#[derive(Default)]
pub struct PollSet {
fds: Vec<libc::pollfd>,
}

pub enum PollResult {
Ready,
Timeout,
Interrupted,
Error,
}

impl PollSet {
pub fn add(&mut self, fd: &impl AsRawFd) -> usize {
let idx = self.fds.len();
self.fds.push(libc::pollfd {
fd: fd.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
});
idx
}

pub fn wait(&mut self, timeout_ms: i32) -> PollResult {
let ret = unsafe {
libc::poll(
self.fds.as_mut_ptr(),
self.fds.len() as libc::nfds_t,
timeout_ms,
)
};
match ret {
_ if ret > 0 => PollResult::Ready,
0 => PollResult::Timeout,
_ if std::io::Error::last_os_error().raw_os_error() == Some(libc::EINTR) => {
PollResult::Interrupted
}
_ => PollResult::Error,
}
}

pub fn is_readable(&self, index: usize) -> bool {
self.fds[index].revents & libc::POLLIN != 0
}

pub fn is_hungup(&self, index: usize) -> bool {
self.fds[index].revents & libc::POLLHUP != 0 && self.fds[index].revents & libc::POLLIN == 0
}
}

#[derive(Deserialize, Clone, Debug, Serialize, Hash, PartialEq, Eq)]
Expand Down
101 changes: 70 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,51 @@
use std::time::{Duration, Instant};

use clap::{command, ArgGroup, Parser};
use clap::{ArgGroup, Parser, command};
use config::WorkerConfig;
use itertools::Itertools;
use project::Project;

use crate::project::RunningProject;
use crate::{
project::RunningProject,
pty::Message,
pty_client::{DetachReason, PtyClient},
};

pub mod config;
pub mod libc;
pub mod project;

fn start(config: &WorkerConfig, projects: Vec<Project>) -> Result<(), anyhow::Error> {
pub mod pty;
pub mod pty_client;
pub mod pty_server;

fn start(
config: &WorkerConfig,
projects: Vec<Project>,
verbose: bool,
) -> Result<(), anyhow::Error> {
let (running, not_running) = config.partition_projects(projects)?;

for project in running {
eprintln!("{} is already running", project);
}

for project in not_running {
if verbose {
eprintln!("Starting: {}", project.name);
eprintln!(" Command: {:?}", project.command);
eprintln!(" Cwd: {}", project.cwd);
eprintln!(" Socket: {}", config.sock_file(&project).display());
if let Some(ref envs) = project.envs {
eprintln!(" Envs: {:?}", envs);
}
}

project.start(config)?;

if verbose {
let client = PtyClient::connect(&config.sock_file(&project))?;
client.event_loop(true);
}
}

Ok(())
Expand Down Expand Up @@ -50,7 +76,11 @@ fn stop(config: &WorkerConfig, projects: Vec<RunningProject>) -> Result<(), anyh

fn restart(config: &WorkerConfig, projects: Vec<RunningProject>) -> Result<(), anyhow::Error> {
stop(config, projects.clone())?;
start(config, projects.into_iter().map(|p| p.into()).collect())?;
start(
config,
projects.into_iter().map(|p| p.into()).collect(),
false,
)?;

Ok(())
}
Expand All @@ -63,6 +93,24 @@ fn run(config: &WorkerConfig, project: Project) -> Result<(), anyhow::Error> {
Ok(())
}

fn attach(config: &WorkerConfig, args: AttachArgs) -> Result<(), anyhow::Error> {
let mut client = PtyClient::connect(&config.sock_file(&args.project))?;
client.send(Message::DumpScreen);

let reason = client.event_loop(false);

let msg = match reason {
DetachReason::UserDetach => "Detached from process (Ctrl+D)",
DetachReason::ProcessExited => "Process exited",
DetachReason::ConnectionLost => "Connection lost",
};
// Print on stdout (same fd as PTY output) so they don't interleave.
// Clear the current line in case output ended mid-line.
println!("\x1b[2K\r{}", msg);

Ok(())
}

fn status(config: &WorkerConfig, args: StatusArgs) -> Result<(), anyhow::Error> {
for project in config.running()? {
if args.quiet {
Expand All @@ -88,26 +136,9 @@ fn list(config: &WorkerConfig, args: ListArgs) -> Result<(), anyhow::Error> {
}

fn logs(config: &WorkerConfig, args: LogsArgs) -> Result<(), anyhow::Error> {
let mut cmd = std::process::Command::new("tail");

if args.follow {
cmd.arg("-f");
}

let mut child = cmd
.args(["-n", &args.number.to_string()])
.arg(config.log_file(&args.project))
.spawn()?;

if args.follow {
while args.project.is_running() {
std::thread::sleep(Duration::from_secs(2));
}
child.kill()?;
} else {
child.wait()?;
}

let mut client = PtyClient::connect(&config.sock_file(&args.project))?;
client.send(Message::DumpScreen);
client.event_loop(true);
Ok(())
}

Expand Down Expand Up @@ -163,6 +194,10 @@ struct StartArgs {
conflicts_with = "projects"
)]
name: Option<String>,

/// Print debug info and tail the log after starting
#[arg(short, long)]
verbose: bool,
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -200,11 +235,6 @@ struct RunArgs {
#[derive(Debug, Parser)]
struct LogsArgs {
project: RunningProject,
#[arg(short, long)]
follow: bool,

#[arg(short, long = "lines", default_value = "50")]
number: i32,
}

#[derive(Debug, Parser)]
Expand All @@ -219,6 +249,11 @@ struct ListArgs {
quiet: bool,
}

#[derive(Debug, Parser)]
struct AttachArgs {
project: RunningProject,
}

#[derive(Parser, Debug)]
enum SubCommands {
/// Start the specified project(s). E.g. `worker start foo bar`
Expand All @@ -235,6 +270,8 @@ enum SubCommands {
List(ListArgs),
/// Print out logs for the specified project.
Logs(LogsArgs),
/// Attach to a running process (Ctrl+D to detach)
Attach(AttachArgs),
}

#[derive(Parser, Debug)]
Expand All @@ -261,13 +298,14 @@ fn main() -> Result<(), anyhow::Error> {

match cli.subcommand {
SubCommands::Start(args) => {
let verbose = args.verbose;
let projects = match (args.projects, args.name, args.cmd) {
(Some(projects), None, None) => unique(projects),
(None, Some(name), Some(command)) => vec![Project::from_cmd(name, command)],
_ => unreachable!("Only one of project or command should be specified"),
};

start(&config, projects)?
start(&config, projects, verbose)?
}
SubCommands::Stop(args) => stop(
&config,
Expand Down Expand Up @@ -303,6 +341,7 @@ fn main() -> Result<(), anyhow::Error> {
SubCommands::Status(args) => status(&config, args)?,
SubCommands::List(args) => list(&config, args)?,
SubCommands::Logs(args) => logs(&config, args)?,
SubCommands::Attach(args) => attach(&config, args)?,
}

Ok(())
Expand Down
Loading
Loading