Function bodies 169 total
main.projectCmd function · go · L2861-L2882 (22 LOC)cmd/anvil/main.go
func projectCmd(args []string) {
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "usage: anvil project <subcommand> [options]\n")
fmt.Fprintf(os.Stderr, "Run 'anvil help' for more information.\n")
os.Exit(1)
}
switch args[0] {
case "create":
projectCreateCmd(args[1:])
case "ls":
projectLsCmd(args[1:])
case "get":
projectGetCmd(args[1:])
case "rm":
projectRmCmd(args[1:])
default:
fmt.Fprintf(os.Stderr, "unknown project command: %s\n", args[0])
fmt.Fprintf(os.Stderr, "Run 'anvil help' for more information.\n")
os.Exit(1)
}
}main.projectCreateCmd function · go · L2884-L2904 (21 LOC)cmd/anvil/main.go
func projectCreateCmd(args []string) {
path := "."
if len(args) > 0 {
path = args[0]
}
abs, err := filepath.Abs(path)
if err != nil {
log.Fatalf("bad path: %v", err)
}
// Initialize project .anvil/ structure
if err := project.Init(abs, tools.FS); err != nil {
log.Fatalf("failed to init project: %v", err)
}
fmt.Printf("created %s\n", abs)
// Register with daemon (watch)
registerProject(abs)
}main.projectRmCmd function · go · L2906-L2952 (47 LOC)cmd/anvil/main.go
func projectRmCmd(args []string) {
path := "."
clean := false
var filtered []string
for _, a := range args {
switch a {
case "--clean":
clean = true
default:
filtered = append(filtered, a)
}
}
if len(filtered) > 0 {
path = filtered[0]
}
abs, err := filepath.Abs(path)
if err != nil {
log.Fatalf("bad path: %v", err)
}
hash := projectHash(abs)
watchDir := filepath.Join(config.WatchedDir(), hash)
if _, err := os.Stat(watchDir); os.IsNotExist(err) {
fmt.Printf("not watching %s\n", abs)
return
}
if err := os.RemoveAll(watchDir); err != nil {
log.Fatalf("failed to unwatch: %v", err)
}
fmt.Printf("unwatched %s\n", abs)
if clean {
anvilDir := filepath.Join(abs, ".anvil")
if _, err := os.Stat(anvilDir); os.IsNotExist(err) {
return
}
if err := os.RemoveAll(anvilDir); err != nil {
log.Fatalf("failed to clean .anvil/: %v", err)
}
fmt.Printf("removed %s\n", anvilDir)
}
}main.projectLsCmd function · go · L2954-L3078 (125 LOC)cmd/anvil/main.go
func projectLsCmd(args []string) {
allProjects := false
jsonOutput := false
for _, a := range args {
if a == "--all" || a == "-a" {
allProjects = true
}
if a == "--json" {
jsonOutput = true
}
}
watched, err := loadAllWatched()
if err != nil {
log.Fatalf("failed to read watched: %v", err)
}
if !allProjects {
// Scope to current directory
abs, err := filepath.Abs(".")
if err != nil {
log.Fatalf("bad path: %v", err)
}
var filtered []watchFrontmatter
for _, w := range watched {
if w.Path == abs || strings.HasPrefix(w.Path, abs+"/") {
filtered = append(filtered, w)
}
}
watched = filtered
}
if len(watched) == 0 {
if jsonOutput {
fmt.Println("[]")
} else {
fmt.Println("no watched projects")
}
return
}
// Get running tasks from daemon
var runningTasks []daemon.TaskInfo
if daemon.IsDaemonRunning() {
runningTasks, _ = daemon.SendPsRequest()
}
// Count running tasks per project
runningByProject := make(map[string]imain.projectGetCmd function · go · L3080-L3163 (84 LOC)cmd/anvil/main.go
func projectGetCmd(args []string) {
path := "."
if len(args) > 0 {
path = args[0]
}
abs, err := filepath.Abs(path)
if err != nil {
log.Fatalf("bad path: %v", err)
}
// Check if project is initialized
if _, err := os.Stat(filepath.Join(abs, ".anvil", "todos")); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "not an anvil project: %s\n", abs)
os.Exit(1)
}
// Check watch status
watched := "no"
hash := projectHash(abs)
watchDir := filepath.Join(config.WatchedDir(), hash)
if entries, err := os.ReadDir(watchDir); err == nil && len(entries) > 0 {
watched = "yes"
}
// Load todos and count by priority
proj, err := project.Load(abs)
if err != nil {
log.Fatalf("failed to load project: %v", err)
}
todos, err := proj.LoadTodos()
if err != nil {
log.Fatalf("failed to load todos: %v", err)
}
priorityCounts := make(map[int]int)
for _, t := range todos {
priorityCounts[t.Priority]++
}
// Print project details
fmt.Printf("Path: %s\n", abs)
fmt.Printf("main.followLog function · go · L3167-L3231 (65 LOC)cmd/anvil/main.go
func followLog(sessionPath string, projectPath string, taskName string) {
// Wait for the file to exist (task may not have started yet)
for {
if _, err := os.Stat(sessionPath); err == nil {
break
}
fmt.Fprintf(os.Stderr, "waiting for log file...\n")
time.Sleep(500 * time.Millisecond)
}
f, err := os.Open(sessionPath)
if err != nil {
log.Fatalf("failed to open session log: %v", err)
}
defer f.Close()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigCh)
buf := make([]byte, 4096)
stableCount := 0
for {
select {
case <-sigCh:
return
default:
}
n, readErr := f.Read(buf)
if n > 0 {
os.Stdout.Write(buf[:n])
stableCount = 0
continue
}
if readErr != nil && readErr != io.EOF {
return
}
// No new data — check periodically if task has finished
stableCount++
if stableCount%10 == 0 {
taskRunning := false
if daemon.IsDaemonRunning() {
tasks, psErr := daemon.SendPsmain.logsCmd function · go · L3239-L3383 (145 LOC)cmd/anvil/main.go
func logsCmd(args []string) {
if !daemon.IsDaemonRunning() {
fmt.Fprintln(os.Stderr, "daemon not running")
os.Exit(1)
}
// Parse --runs N flag
runsCount := 1
var filteredArgs []string
for i := 0; i < len(args); i++ {
if args[i] == "--runs" && i+1 < len(args) {
n, err := strconv.Atoi(args[i+1])
if err != nil || n < 1 {
fmt.Fprintln(os.Stderr, "invalid --runs value: must be a positive integer")
os.Exit(1)
}
runsCount = n
i++ // skip the value
} else if strings.HasPrefix(args[i], "--runs=") {
val := strings.TrimPrefix(args[i], "--runs=")
n, err := strconv.Atoi(val)
if err != nil || n < 1 {
fmt.Fprintln(os.Stderr, "invalid --runs value: must be a positive integer")
os.Exit(1)
}
runsCount = n
} else {
filteredArgs = append(filteredArgs, args[i])
}
}
if len(filteredArgs) == 0 {
logsMultiplex()
return
}
// Single task mode: anvil logs <name> [--runs N]
name := filteredArgs[0]
abs, err := filepath.Abs(".")
if err All rows above produced by Repobility · https://repobility.com
main.logsMultiplex function · go · L3387-L3538 (152 LOC)cmd/anvil/main.go
func logsMultiplex() {
tasks, err := daemon.SendPsRequest()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get running tasks: %v\n", err)
os.Exit(1)
}
// Collect tasks that have a raw log path
type taskState struct {
name string
logPath string
file *os.File
offset int64
buf []byte // partial line buffer
}
var states []*taskState
for _, t := range tasks {
if t.LogPath == "" {
continue
}
f, err := os.Open(t.LogPath)
if err != nil {
continue
}
// Display name: strip project path prefix for readability
displayName := t.Name
if idx := strings.LastIndex(displayName, "/"); idx >= 0 {
displayName = displayName[idx+1:]
}
// Strip .md suffix if present
displayName = strings.TrimSuffix(displayName, ".md")
states = append(states, &taskState{
name: displayName,
logPath: t.LogPath,
file: f,
})
}
if len(states) == 0 {
fmt.Println("no tasks running")
return
}
defer func() {
for _, s := range states {
main.findTodo function · go · L3540-L3550 (11 LOC)cmd/anvil/main.go
func findTodo(todos []project.Todo, name string) *project.Todo {
if !strings.HasSuffix(name, ".md") {
name += ".md"
}
for i := range todos {
if todos[i].Name == name {
return &todos[i]
}
}
return nil
}main.updateCmd function · go · L3552-L3666 (115 LOC)cmd/anvil/main.go
func updateCmd(args []string) {
checkOnly := false
for _, a := range args {
if a == "--check" {
checkOnly = true
}
}
// Fetch latest release from GitHub
resp, err := http.Get("https://api.github.com/repos/johnjansen/anvil/releases/latest")
if err != nil {
fmt.Fprintf(os.Stderr, "update check failed: %v\n", err)
os.Exit(1)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "update check failed: HTTP %d\n", resp.StatusCode)
os.Exit(1)
}
var release struct {
TagName string `json:"tag_name"`
}
if err := json.NewDecoder(resp.Body).Decode(&release); err != nil {
fmt.Fprintf(os.Stderr, "failed to parse release info: %v\n", err)
os.Exit(1)
}
latest := release.TagName
if latest == "" {
fmt.Fprintf(os.Stderr, "no releases found\n")
os.Exit(1)
}
// Compare versions (strip leading 'v' for comparison)
normalizedCurrent := strings.TrimPrefix(version, "v")
normalizedLatest := strings.TrimPrefix(latest, "v")
if normain.formatTokens function · go · L3886-L3897 (12 LOC)cmd/anvil/main.go
func formatTokens(n int) string {
if n == 0 {
return "N/A"
}
if n >= 1_000_000 {
return fmt.Sprintf("%.1fM", float64(n)/1_000_000)
}
if n >= 1_000 {
return fmt.Sprintf("%.1fK", float64(n)/1_000)
}
return fmt.Sprintf("%d", n)
}main.formatCost function · go · L3899-L3907 (9 LOC)cmd/anvil/main.go
func formatCost(c float64) string {
if c == 0 {
return "N/A"
}
if c < 0.01 {
return fmt.Sprintf("$%.4f", c)
}
return fmt.Sprintf("$%.2f", c)
}main.loadAllWatched function · go · L3921-L3979 (59 LOC)cmd/anvil/main.go
func loadAllWatched() ([]watchFrontmatter, error) {
watchedDir := config.WatchedDir()
dirs, err := os.ReadDir(watchedDir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var result []watchFrontmatter
for _, d := range dirs {
if !d.IsDir() {
continue
}
dirPath := filepath.Join(watchedDir, d.Name())
entries, err := os.ReadDir(dirPath)
if err != nil {
continue
}
// Sort entries, take the latest .md file
sort.Slice(entries, func(i, j int) bool {
return entries[i].Name() > entries[j].Name()
})
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".md") {
continue
}
data, err := os.ReadFile(filepath.Join(dirPath, e.Name()))
if err != nil {
break
}
content := string(data)
start := strings.Index(content, "---\n")
if start == -1 {
break
}
end := strings.Index(content[start+4:], "\n---")
if end == -1 {
break
}
var fm watchFrontmatter
if err config.RetentionConfig.UnmarshalYAML method · go · L37-L57 (21 LOC)internal/config/config.go
func (r *RetentionConfig) UnmarshalYAML(value *yaml.Node) error {
// Decode into a raw map to handle max_log_size as a string
var raw struct {
MaxAge time.Duration `yaml:"max_age"`
MaxRuns int `yaml:"max_runs"`
MaxLogSize string `yaml:"max_log_size"`
}
if err := value.Decode(&raw); err != nil {
return err
}
r.MaxAge = raw.MaxAge
r.MaxRuns = raw.MaxRuns
if raw.MaxLogSize != "" {
size, err := ParseByteSize(raw.MaxLogSize)
if err != nil {
return fmt.Errorf("parsing max_log_size: %w", err)
}
r.MaxLogSize = size
}
return nil
}config.ParseByteSize function · go · L62-L99 (38 LOC)internal/config/config.go
func ParseByteSize(s string) (int64, error) {
s = strings.TrimSpace(strings.ToLower(s))
if s == "" || s == "0" {
return 0, nil
}
// Check longest suffixes first to avoid "b" matching before "mb"
suffixes := []struct {
suffix string
mult int64
}{
{"gb", 1024 * 1024 * 1024},
{"mb", 1024 * 1024},
{"kb", 1024},
{"b", 1},
}
for _, entry := range suffixes {
if strings.HasSuffix(s, entry.suffix) {
numStr := strings.TrimSpace(s[:len(s)-len(entry.suffix)])
if numStr == "" {
return 0, fmt.Errorf("invalid byte size %q: missing number", s)
}
n, err := strconv.ParseFloat(numStr, 64)
if err != nil {
return 0, fmt.Errorf("invalid byte size %q: %w", s, err)
}
return int64(n * float64(entry.mult)), nil
}
}
// Plain number (bytes)
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid byte size %q: %w", s, err)
}
return n, nil
}Powered by Repobility — scan your code at https://repobility.com
config.Default function · go · L107-L114 (8 LOC)internal/config/config.go
func Default() *Config {
return &Config{
TickInterval: 10 * time.Second,
Runner: "echo",
Timeout: 5 * time.Minute,
MaxWorkers: 1,
}
}config.EnsureDir function · go · L137-L142 (6 LOC)internal/config/config.go
func EnsureDir() error {
if err := os.MkdirAll(Dir(), 0755); err != nil {
return err
}
return os.MkdirAll(WatchedDir(), 0755)
}config.EnsureConfig function · go · L147-L164 (18 LOC)internal/config/config.go
func EnsureConfig() (bool, error) {
if err := EnsureDir(); err != nil {
return false, err
}
p := Path()
if _, err := os.Stat(p); err == nil {
return false, nil // already exists
}
defaults := `runners:
- claude
max_workers: 10
timeout: 15m
tick_interval: 10s
`
return true, os.WriteFile(p, []byte(defaults), 0644)
}config.Load function · go · L166-L197 (32 LOC)internal/config/config.go
func Load() (*Config, error) {
data, err := os.ReadFile(Path())
if err != nil {
if os.IsNotExist(err) {
return Default(), nil
}
return nil, fmt.Errorf("reading config: %w", err)
}
cfg := Default()
if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("parsing config: %w", err)
}
if cfg.TickInterval <= 0 {
cfg.TickInterval = 10 * time.Second
}
// Backwards compatibility: max_todos maps to max_workers
if cfg.MaxWorkers <= 0 && cfg.MaxTodos > 0 {
cfg.MaxWorkers = cfg.MaxTodos
}
if cfg.MaxWorkers <= 0 {
cfg.MaxWorkers = 1
}
// Backwards compatibility: if Runners is empty but Runner is set, use Runner as single entry
if len(cfg.Runners) == 0 && cfg.Runner != "" {
cfg.Runners = []string{cfg.Runner}
}
return cfg, nil
}cron.Parse function · go · L41-L76 (36 LOC)internal/cron/parser.go
func Parse(expr string) (*Parser, error) {
fields := strings.Fields(expr)
if len(fields) != 5 {
return nil, fmt.Errorf("expected 5 fields, got %d", len(fields))
}
p := &Parser{}
var err error
p.minute, err = parseField(fields[0], fieldBounds[0])
if err != nil {
return nil, fmt.Errorf("minute field: %w", err)
}
p.hour, err = parseField(fields[1], fieldBounds[1])
if err != nil {
return nil, fmt.Errorf("hour field: %w", err)
}
p.dayOfMonth, err = parseField(fields[2], fieldBounds[2])
if err != nil {
return nil, fmt.Errorf("day-of-month field: %w", err)
}
p.month, err = parseField(fields[3], fieldBounds[3])
if err != nil {
return nil, fmt.Errorf("month field: %w", err)
}
p.dayOfWeek, err = parseField(fields[4], fieldBounds[4])
if err != nil {
return nil, fmt.Errorf("day-of-week field: %w", err)
}
return p, nil
}cron.parseField function · go · L79-L91 (13 LOC)internal/cron/parser.go
func parseField(field string, b bounds) (Field, error) {
f := Field{values: make(map[int]bool)}
// Handle multiple parts separated by comma
parts := strings.Split(field, ",")
for _, part := range parts {
if err := parsePart(part, b, f.values); err != nil {
return Field{}, err
}
}
return f, nil
}cron.parsePart function · go · L94-L155 (62 LOC)internal/cron/parser.go
func parsePart(part string, b bounds, values map[int]bool) error {
// Handle step values (*/n or n-m/s)
step := 1
if idx := strings.Index(part, "/"); idx != -1 {
stepStr := part[idx+1:]
var err error
step, err = strconv.Atoi(stepStr)
if err != nil || step <= 0 {
return fmt.Errorf("invalid step value: %s", stepStr)
}
part = part[:idx]
}
var rangeStart, rangeEnd int
switch {
case part == "*":
rangeStart = b.min
rangeEnd = b.max
case strings.Contains(part, "-"):
// Range: n-m
parts := strings.Split(part, "-")
if len(parts) != 2 {
return fmt.Errorf("invalid range: %s", part)
}
var err error
rangeStart, err = strconv.Atoi(parts[0])
if err != nil {
return fmt.Errorf("invalid range start: %s", parts[0])
}
rangeEnd, err = strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("invalid range end: %s", parts[1])
}
default:
// Single value
val, err := strconv.Atoi(part)
if err != nil {
return fmt.Errorf("invalid value: %s", pcron.Parser.Matches method · go · L158-L164 (7 LOC)internal/cron/parser.go
func (p *Parser) Matches(t time.Time) bool {
return p.minute.values[t.Minute()] &&
p.hour.values[t.Hour()] &&
p.dayOfMonth.values[t.Day()] &&
p.month.values[int(t.Month())] &&
p.dayOfWeek.values[int(t.Weekday())]
}Want this analysis on your repo? https://repobility.com/scan/
cron.Parser.Next method · go · L168-L185 (18 LOC)internal/cron/parser.go
func (p *Parser) Next(after time.Time) (time.Time, error) {
// Start from the next minute
t := after.Add(time.Minute).Truncate(time.Minute)
// Limit search to 5 years to prevent infinite loops
end := after.AddDate(5, 0, 0)
for t.Before(end) {
if p.Matches(t) {
return t, nil
}
// Advance time
t = t.Add(time.Minute)
}
return time.Time{}, fmt.Errorf("no matching time found within 5 years")
}cron.Parser.Prev method · go · L189-L206 (18 LOC)internal/cron/parser.go
func (p *Parser) Prev(before time.Time) (time.Time, error) {
// Start from the previous minute
t := before.Add(-time.Minute).Truncate(time.Minute)
// Limit search to 5 years to prevent infinite loops
end := before.AddDate(-5, 0, 0)
for t.After(end) {
if p.Matches(t) {
return t, nil
}
// Go back in time
t = t.Add(-time.Minute)
}
return time.Time{}, fmt.Errorf("no matching time found within 5 years")
}cron.Parser.CountMissed method · go · L210-L226 (17 LOC)internal/cron/parser.go
func (p *Parser) CountMissed(from time.Time, to time.Time) (int, error) {
if !to.After(from) {
return 0, nil
}
count := 0
t := from.Truncate(time.Minute).Add(time.Minute) // Start from next minute after 'from'
for !t.After(to) {
if p.Matches(t) {
count++
}
t = t.Add(time.Minute)
}
return count, nil
}cron.Parser.String method · go · L229-L237 (9 LOC)internal/cron/parser.go
func (p *Parser) String() string {
return fmt.Sprintf("minute=%v, hour=%v, dom=%v, month=%v, dow=%v",
setToSlice(p.minute.values),
setToSlice(p.hour.values),
setToSlice(p.dayOfMonth.values),
setToSlice(p.month.values),
setToSlice(p.dayOfWeek.values),
)
}cron.Matches function · go · L241-L247 (7 LOC)internal/cron/parser.go
func Matches(expr string, t time.Time) bool {
p, err := Parse(expr)
if err != nil {
return false
}
return p.Matches(t)
}cron.setToSlice function · go · L250-L257 (8 LOC)internal/cron/parser.go
func setToSlice(m map[int]bool) []int {
result := make([]int, 0, len(m))
for v := range m {
result = append(result, v)
}
// Sort not needed for display, but could be added
return result
}daemon.version function · go · L36-L41 (6 LOC)internal/daemon/daemon.go
func version() string {
if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" {
return info.Main.Version
}
return "dev"
}daemon.checkAndWritePID function · go · L48-L77 (30 LOC)internal/daemon/daemon.go
func checkAndWritePID() error {
pidPath := config.PidFile()
// Check if PID file exists
if _, err := os.Stat(pidPath); err == nil {
// PID file exists, check if the process is still running
data, readErr := os.ReadFile(pidPath)
if readErr == nil {
pidStr := strings.TrimSpace(string(data))
pid, parseErr := strconv.Atoi(pidStr)
if parseErr == nil {
// Try to send signal 0 (check if process exists)
proc, err := os.FindProcess(pid)
if err == nil {
// Signal 0 checks if process exists without sending a signal
if err := proc.Signal(syscall.Signal(0)); err == nil {
// Process is running
return fmt.Errorf("%w (PID %d)", ErrDaemonAlreadyRunning, pid)
}
// Process not found or dead, continue to cleanup
}
}
}
// Stale PID file, remove it
os.Remove(pidPath)
}
// Write our PID file
return os.WriteFile(pidPath, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644)
}Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
daemon.New function · go · L180-L204 (25 LOC)internal/daemon/daemon.go
func New(cfg *config.Config) *Daemon {
poolSize := cfg.MaxWorkers
if poolSize < 1 {
poolSize = 1
}
return &Daemon{
config: cfg,
runner: runner.New(cfg.Runners, cfg.Timeout),
workQueue: make(chan workItem, poolSize*4),
inFlight: make(map[string]int),
stop: make(chan struct{}),
done: make(chan struct{}),
reload: make(chan struct{}, 1),
socketPath: filepath.Join(config.Dir(), "daemon.sock"),
tasks: make(map[string]*RunningTask),
drainedTasks: make(map[string]bool),
persistentFailures: make(map[string]int),
persistentCooldowns: make(map[string]time.Time),
starvationTrackers: make(map[string]time.Time),
runnerCooldowns: make(map[int]time.Time),
pendingTasks: make(map[string]string),
stoppedTasks: make(map[string]bool),
persistentBudgetUsed: make(map[string]time.Duration),
}
}daemon.Daemon.Run method · go · L206-L300 (95 LOC)internal/daemon/daemon.go
func (d *Daemon) Run() {
defer close(d.done)
// Write PID file on startup
if err := checkAndWritePID(); err != nil {
if errors.Is(err, ErrDaemonAlreadyRunning) {
dlog.Fatal("%s", err.Error())
} else {
dlog.Fatal("failed to write PID file: %v", err)
}
return
}
// Clean up PID file on shutdown
defer removePIDFile()
// Set up SIGHUP handler for config reload
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
go func() {
for range sigChan {
select {
case d.reload <- struct{}{}:
default:
// reload channel already has a pending signal
}
}
}()
poolSize := d.config.MaxWorkers
if poolSize < 1 {
poolSize = 1
}
ticker := time.NewTicker(d.config.TickInterval)
defer ticker.Stop()
dlog.Startup(d.config.TickInterval.String(), strings.Join(d.config.Runners, ", "), poolSize)
// Check for updates on startup if auto_update is enabled
if d.config.AutoUpdate {
currentVersion := version()
latestVersion, err := updater.Chdaemon.Daemon.Stop method · go · L302-L307 (6 LOC)internal/daemon/daemon.go
func (d *Daemon) Stop() {
d.stopOnce.Do(func() {
close(d.stop)
})
<-d.done
}daemon.Daemon.reloadConfig method · go · L317-L362 (46 LOC)internal/daemon/daemon.go
func (d *Daemon) reloadConfig() {
newConfig, err := config.Load()
if err != nil {
dlog.Warn("failed to reload config: %v", err)
return
}
var changes []string
// max_workers: can grow or shrink
if newConfig.MaxWorkers != d.config.MaxWorkers {
oldVal := d.config.MaxWorkers
d.config.MaxWorkers = newConfig.MaxWorkers
changes = append(changes, fmt.Sprintf("max_workers %d->%d", oldVal, newConfig.MaxWorkers))
}
// timeout: apply to new tasks
if newConfig.Timeout != d.config.Timeout {
oldVal := d.config.Timeout
d.config.Timeout = newConfig.Timeout
changes = append(changes, fmt.Sprintf("timeout %v->%v", oldVal, newConfig.Timeout))
}
// runners: apply to new tasks
if len(newConfig.Runners) > 0 {
oldRunners := strings.Join(d.config.Runners, ", ")
newRunners := strings.Join(newConfig.Runners, ", ")
if oldRunners != newRunners {
d.config.Runners = newConfig.Runners
changes = append(changes, fmt.Sprintf("runners %s->%s", oldRunners, newRunners))
}
}
/daemon.Daemon.worker method · go · L365-L374 (10 LOC)internal/daemon/daemon.go
func (d *Daemon) worker(id int) {
dlog.WorkerStarted(id)
for item := range d.workQueue {
projName := filepath.Base(item.project.Path)
dlog.WorkerPickup(id, projName, item.todo.Name, item.todo.Priority)
d.runTask(id, item.project, item.todo)
dlog.WorkerIdle(id)
}
dlog.WorkerStopped(id)
}daemon.Daemon.runHook method · go · L762-L790 (29 LOC)internal/daemon/daemon.go
func (d *Daemon) runHook(hookName, command, projectPath string, t project.Todo, logPath, sessionID string, startTime time.Time, elapsed time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
hookCmd := exec.CommandContext(ctx, "sh", "-c", command)
hookCmd.Dir = projectPath
exitCode := "0"
if hookName == "on_failure" {
exitCode = "1"
}
hookCmd.Env = append(os.Environ(),
"ANVIL_TASK_NAME="+t.Name,
"ANVIL_EXIT_CODE="+exitCode,
"ANVIL_LOG_PATH="+logPath,
"ANVIL_PROJECT="+projectPath,
"ANVIL_SESSION_ID="+sessionID,
"ANVIL_START_TIME="+startTime.Format(time.RFC3339),
"ANVIL_END_TIME="+time.Now().Format(time.RFC3339),
fmt.Sprintf("ANVIL_ELAPSED_MS=%d", elapsed.Milliseconds()),
)
if hookErr := hookCmd.Run(); hookErr != nil {
dlog.Warn("%s hook failed for %s: %v", hookName, t.Name, hookErr)
} else {
dlog.Info("%s hook completed for %s", hookName, t.Name)
}
}daemon.captureOutputSummary function · go · L794-L807 (14 LOC)internal/daemon/daemon.go
func captureOutputSummary(logPath string) (string, error) {
const maxLines = 3
data, err := os.ReadFile(logPath)
if err != nil {
return "", err
}
lines := strings.Split(string(data), "\n")
if len(lines) <= maxLines*2 {
return string(data), nil
}
first := strings.Join(lines[:maxLines], "\n")
last := strings.Join(lines[len(lines)-maxLines:], "\n")
return first + "\n...\n" + last, nil
}daemon.truncateLog function · go · L812-L851 (40 LOC)internal/daemon/daemon.go
func truncateLog(logPath string, maxSize int64) {
info, err := os.Stat(logPath)
if err != nil || info.Size() <= maxSize {
return
}
f, err := os.Open(logPath)
if err != nil {
return
}
// Seek to keep the last maxSize bytes (minus room for the marker)
marker := []byte("\n--- [log truncated: exceeded max_log_size] ---\n")
keepSize := maxSize - int64(len(marker))
if keepSize < 0 {
keepSize = 0
}
offset := info.Size() - keepSize
if _, err := f.Seek(offset, io.SeekStart); err != nil {
f.Close()
return
}
tail, err := io.ReadAll(f)
f.Close()
if err != nil {
return
}
// Skip to the next newline to avoid a partial first line
if idx := bytes.IndexByte(tail, '\n'); idx >= 0 && idx < len(tail)-1 {
tail = tail[idx+1:]
}
// Write truncated content back
if err := os.WriteFile(logPath, append(marker, tail...), 0644); err != nil {
dlog.Warn("failed to truncate log %s: %v", logPath, err)
}
}All rows above produced by Repobility · https://repobility.com
daemon.Daemon.startSocketServer method · go · L853-L889 (37 LOC)internal/daemon/daemon.go
func (d *Daemon) startSocketServer() {
// Remove any existing socket file
os.Remove(d.socketPath)
// Create unix socket
listener, err := net.Listen("unix", d.socketPath)
if err != nil {
dlog.SocketStartFailed(err)
return
}
defer listener.Close()
// Set socket permissions for read/write by user
os.Chmod(d.socketPath, 0600)
mux := http.NewServeMux()
mux.HandleFunc("/ps", d.handlePs)
mux.HandleFunc("/kill", d.handleKill)
mux.HandleFunc("/drain", d.handleDrain)
mux.HandleFunc("/drain/task", d.handleDrainTask)
mux.HandleFunc("/run", d.handleRun)
mux.HandleFunc("/status", d.handleStatus)
mux.HandleFunc("/timeout", d.handleTimeout)
mux.HandleFunc("/queue", d.handleQueue)
mux.HandleFunc("/reload", d.handleReload)
mux.HandleFunc("/stop", d.handleStopTask)
mux.HandleFunc("/start", d.handleStartTask)
d.httpServer = &http.Server{
Handler: mux,
}
dlog.SocketListening(d.socketPath)
if err := d.httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {daemon.Daemon.handlePs method · go · L891-L935 (45 LOC)internal/daemon/daemon.go
func (d *Daemon) handlePs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
d.tasksMu.RLock()
tasks := make([]*RunningTask, 0, len(d.tasks))
for _, task := range d.tasks {
tasks = append(tasks, task)
}
d.tasksMu.RUnlock()
var result []TaskInfo
now := time.Now()
for _, task := range tasks {
elapsed := now.Sub(task.Started)
// Use per-task timeout if set, otherwise fall back to global config
timeout := task.Timeout
if timeout == 0 {
timeout = d.config.Timeout
}
result = append(result, TaskInfo{
Project: task.Project,
Name: task.Name,
PID: task.PID,
Started: task.Started.Format(time.RFC3339),
Elapsed: elapsed.Round(time.Second).String(),
Timeout: timeout.String(),
TimeRemaining: (timeout - elapsed).String(),
PercentUsed: elapsed.Round(time.Second).Seconds() / timeout.Seconds() * 100,
LogPadaemon.Daemon.handleTimeout method · go · L937-L980 (44 LOC)internal/daemon/daemon.go
func (d *Daemon) handleTimeout(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
d.tasksMu.RLock()
tasks := make([]*RunningTask, 0, len(d.tasks))
for _, task := range d.tasks {
tasks = append(tasks, task)
}
d.tasksMu.RUnlock()
var result []TaskInfo
now := time.Now()
for _, task := range tasks {
elapsed := now.Sub(task.Started)
// Use per-task timeout if set, otherwise fall back to global config
timeout := task.Timeout
if timeout == 0 {
timeout = d.config.Timeout
}
result = append(result, TaskInfo{
Project: task.Project,
Name: task.Name,
PID: task.PID,
Started: task.Started.Format(time.RFC3339),
Elapsed: elapsed.Round(time.Second).String(),
Timeout: timeout.String(),
TimeRemaining: (timeout - elapsed).String(),
PercentUsed: elapsed.Round(time.Second).Seconds() / timeout.Seconds() * 100,
daemon.Daemon.handleQueue method · go · L982-L1037 (56 LOC)internal/daemon/daemon.go
func (d *Daemon) handleQueue(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var result []TaskQueueInfo
// First, get running tasks
d.tasksMu.RLock()
for _, task := range d.tasks {
result = append(result, TaskQueueInfo{
Project: task.Project,
Name: task.Name,
Status: "running",
})
}
d.tasksMu.RUnlock()
// Get pending/skipped tasks from the last tick
d.pendingTasksMu.RLock()
for taskKey, skipReason := range d.pendingTasks {
// Parse taskKey as project/path
parts := strings.Split(taskKey, "/")
projectPath := ""
taskName := ""
if len(parts) >= 2 {
projectPath = strings.Join(parts[:len(parts)-1], "/")
taskName = parts[len(parts)-1]
} else {
taskName = taskKey
}
status := "pending"
if skipReason != "" {
status = "skipped"
}
result = append(result, TaskQueueInfo{
Project: projectPath,
Name: taskName,
Status:daemon.Daemon.handleReload method · go · L1039-L1053 (15 LOC)internal/daemon/daemon.go
func (d *Daemon) handleReload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Trigger reload via the channel
select {
case d.reload <- struct{}{}:
fmt.Fprintf(w, "config reload triggered")
default:
// Channel already has a pending signal
fmt.Fprintf(w, "config reload already in progress")
}
}daemon.Daemon.handleKill method · go · L1055-L1093 (39 LOC)internal/daemon/daemon.go
func (d *Daemon) handleKill(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req KillRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
d.tasksMu.Lock()
defer d.tasksMu.Unlock()
// Find task by name or UUID (ID field contains the todo ID)
var found *RunningTask
for _, task := range d.tasks {
if task.TaskID == req.ID || task.Name == req.ID || task.Project == req.ID {
found = task
break
}
}
if found == nil {
http.Error(w, "task not found", http.StatusNotFound)
return
}
// Cancel the task's context
found.Cancel()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "killed", "name": found.Name})
}daemon.Daemon.handleStatus method · go · L1100-L1110 (11 LOC)internal/daemon/daemon.go
func (d *Daemon) handleStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
status := DaemonStatus{
Draining: atomic.LoadInt32(&d.draining) == 1,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}daemon.Daemon.handleDrain method · go · L1112-L1134 (23 LOC)internal/daemon/daemon.go
func (d *Daemon) handleDrain(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
atomic.StoreInt32(&d.draining, 1)
dlog.Info("stop-on-idle activated — daemon will drain and exit when all tasks finish")
// If nothing is running or queued right now, stop immediately.
d.tasksMu.RLock()
tasksRunning := len(d.tasks)
d.tasksMu.RUnlock()
d.inFlightMu.Lock()
queued := len(d.inFlight)
d.inFlightMu.Unlock()
if tasksRunning == 0 && queued == 0 {
dlog.Info("drain complete — no tasks running, stopping daemon")
go d.Stop()
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "draining"})
}Powered by Repobility — scan your code at https://repobility.com
daemon.Daemon.handleDrainTask method · go · L1148-L1171 (24 LOC)internal/daemon/daemon.go
func (d *Daemon) handleDrainTask(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req DrainTaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
d.drainedMu.Lock()
d.drainedTasks[req.ID] = true
d.drainedMu.Unlock()
dlog.Info("stop-on-idle set for task %s — will not reschedule after current run", req.ID)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "drained", "id": req.ID})
}daemon.Daemon.handleRun method · go · L1173-L1270 (98 LOC)internal/daemon/daemon.go
func (d *Daemon) handleRun(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req RunRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if req.ProjectPath == "" || req.TaskID == "" {
http.Error(w, "project_path and task_id are required", http.StatusBadRequest)
return
}
// Check if the task is already running
d.tasksMu.Lock()
var runningTask *RunningTask
for _, task := range d.tasks {
if task.TaskID == req.TaskID {
runningTask = task
break
}
}
d.tasksMu.Unlock()
// Load the project and find the todo
proj, err := project.Load(req.ProjectPath)
if err != nil {
http.Error(w, fmt.Sprintf("failed to load project: %v", err), http.StatusInternalServerError)
return
}
allTodos, err := proj.LoadTodos()
if err != nil {
http.Error(w, fmt.Sprintf("failed to load todos: %vdaemon.Daemon.handleStopTask method · go · L1284-L1327 (44 LOC)internal/daemon/daemon.go
func (d *Daemon) handleStopTask(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req StopRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
// Mark the task as stopped so it won't be re-dispatched
d.stoppedMu.Lock()
d.stoppedTasks[req.ID] = true
d.stoppedMu.Unlock()
// Kill the running instance if any
d.tasksMu.Lock()
var found *RunningTask
for _, task := range d.tasks {
if task.TaskID == req.ID || task.Name == req.ID {
found = task
break
}
}
if found != nil {
found.Cancel()
}
d.tasksMu.Unlock()
name := req.ID
if found != nil {
name = found.Name
}
dlog.Info("persistent task %s stopped — will not be re-dispatched until started", name)
w.Header().Set("Content-Type", "appli