diff --git a/config/worker.yaml b/config/worker.yaml index 8e7fb1c..5dcce4d 100644 --- a/config/worker.yaml +++ b/config/worker.yaml @@ -8,7 +8,7 @@ limit: cpu: 1 memory: 512 -process: 16 +process: 4 name: "猫猫" diff --git a/internal/worker/RunCppTask.go b/internal/worker/RunCppTask.go index bfec4f6..bc998f5 100644 --- a/internal/worker/RunCppTask.go +++ b/internal/worker/RunCppTask.go @@ -2,7 +2,9 @@ package worker import ( "context" + "encoding/json" "fmt" + "log" "os" "path/filepath" "time" @@ -13,7 +15,50 @@ import ( "github.com/docker/docker/client" ) -func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste) error { +func monitorMemory(ctx context.Context, cli *client.Client, containerID string, maxMem *uint64) { + // 创建独立上下文用于内存监控 + memCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ticker := time.NewTicker(64 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-memCtx.Done(): + return + case <-ticker.C: + // 检查容器是否仍在运行 + _, err := cli.ContainerInspect(memCtx, containerID) + if err != nil { + return + } + + statsResp, err := cli.ContainerStats(memCtx, containerID, false) + if err != nil { + continue // 忽略临时错误 + } + + var statsJSON container.StatsResponse + if err := json.NewDecoder(statsResp.Body).Decode(&statsJSON); err != nil { + statsResp.Body.Close() + continue + } + statsResp.Body.Close() + + if statsJSON.MemoryStats.Stats != nil { + if currentMem, ok := statsJSON.MemoryStats.Stats["anon"]; ok { + log.Println(currentMem) + if currentMem > *maxMem { + *maxMem = currentMem + } + } + } + } + } +} + +func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste, cli *client.Client) error { // Create temporary workspace tmpDir, err := os.MkdirTemp("", "cpp_compile_") if err != nil { @@ -29,47 +74,42 @@ func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste) error { // Get execution limits from config timeout := time.Duration(w.cfg.Limit.Time) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) + compliteCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Initialize Docker client - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - return fmt.Errorf("failed to create docker client: %v", err) - } - hostConfig := &container.HostConfig{ Binds: []string{tmpDir + ":/app"}, Resources: container.Resources{ Memory: int64(w.cfg.Limit.Memory * 1024 * 1024), CPUQuota: int64(w.cfg.Limit.Cpu * 100000), }, - AutoRemove: true, NetworkMode: "none", } // Create compile container configuration - resp, err := cli.ContainerCreate(ctx, &container.Config{ + resp, err := cli.ContainerCreate(compliteCtx, &container.Config{ Image: "gcc:14", Cmd: []string{"sh", "-c", "g++ -std=c++20 /app/main.cpp -o /app/output > /app/compile.txt 2>&1"}, }, hostConfig, nil, nil, filepath.Base(tmpDir)+"_builder") if err != nil { return fmt.Errorf("create compile container error: %v", err) } + defer cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ + Force: true, + }) // Start compile container execution - if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { + if err := cli.ContainerStart(compliteCtx, resp.ID, container.StartOptions{}); err != nil { return fmt.Errorf("failed to start compile container: %v", err) } // Wait for compile container completion - statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) - + statusCh, errCh := cli.ContainerWait(compliteCtx, resp.ID, container.WaitConditionNotRunning) // Handle compile container execution results select { case <-statusCh: // Get compile container detailed state - containerState, err := cli.ContainerInspect(ctx, resp.ID) + containerState, err := cli.ContainerInspect(compliteCtx, resp.ID) if err != nil { return fmt.Errorf("container state inspection error: %v", err) } @@ -86,7 +126,7 @@ func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste) error { } case err := <-errCh: return err - case <-ctx.Done(): + case <-compliteCtx.Done(): task.Status = model.StatusCompileError task.CompileLog = "Compile process exceeded time limit" return nil @@ -98,41 +138,61 @@ func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste) error { return fmt.Errorf("write input file error: %v", err) } + runCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + // Create runner container configuration - resp, err = cli.ContainerCreate(ctx, &container.Config{ + resp, err = cli.ContainerCreate(runCtx, &container.Config{ Image: "gcc:14", Cmd: []string{"sh", "-c", "sh -c \"/app/output < /app/input.txt > /app/stdout.txt\" > /app/stderr.txt 2>&1"}, }, hostConfig, nil, nil, filepath.Base(tmpDir)+"_runner") if err != nil { return fmt.Errorf("create runner container error: %v", err) } + defer cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ + Force: true, + }) // Start runner container execution - if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { + if err := cli.ContainerStart(runCtx, resp.ID, container.StartOptions{}); err != nil { return fmt.Errorf("failed to start runner container: %v", err) } - // Wait for container completion - statusCh, errCh = cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) + // 启动内存监控协程(使用独立上下文) + var maxMem uint64 + go monitorMemory(context.Background(), cli, resp.ID, &maxMem) - // Handle container execution results + // 等待容器完成 + statusCh, errCh = cli.ContainerWait(runCtx, resp.ID, container.WaitConditionNotRunning) + + // 处理执行结果 select { case <-statusCh: // Get container detailed state - containerState, err := cli.ContainerInspect(ctx, resp.ID) + containerState, err := cli.ContainerInspect(runCtx, resp.ID) if err != nil { return fmt.Errorf("container state inspection error: %v", err) } - // Non-zero exit code indicates compilation failure - if containerState.State.ExitCode != 0 { + startTime, startErr := time.Parse(time.RFC3339Nano, containerState.State.StartedAt) + finishTime, finishErr := time.Parse(time.RFC3339Nano, containerState.State.FinishedAt) + + if startErr == nil && finishErr == nil && !startTime.IsZero() && !finishTime.IsZero() { + timeUsage := finishTime.Sub(startTime) + task.ExecutionTimeMs = int(timeUsage.Milliseconds()) + } + + // 非零退出码表示运行时错误 + if containerState.State.OOMKilled { + task.Status = model.StatusMemoryLimitExceed + } else if containerState.State.ExitCode != 0 { task.Status = model.StatusRuntimeError } else { task.Status = model.StatusCompleted } case err := <-errCh: return err - case <-ctx.Done(): + case <-runCtx.Done(): task.Status = model.StatusTimeLimitExceed } @@ -148,5 +208,17 @@ func (w *Worker) RunCppTask(ctx context.Context, task *model.Paste) error { task.Stderr = string(outData) } + // 确保内存统计有效性(至少1MB) + if maxMem > 0 { + task.MemoryUsageKb = int(maxMem / 1024) + } else { + // 如果统计失败,尝试从OOM状态获取 + if task.Status == model.StatusMemoryLimitExceed { + task.MemoryUsageKb = int(w.cfg.Limit.Memory * 1024) // 使用配置的内存限制值 + } else { + task.MemoryUsageKb = -1 // 表示统计不可用 + } + } + return nil } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index a06213f..8eb4dfa 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -9,6 +9,8 @@ import ( "runbin/internal/config" "runbin/internal/model" "runbin/internal/repository" + + "github.com/docker/docker/client" ) type Worker struct { @@ -39,6 +41,11 @@ func (w *Worker) processTasks(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + log.Fatalf("Failed to create docker client: %v", err) + } + log.Println("Thread start!") for { @@ -48,13 +55,12 @@ func (w *Worker) processTasks(ctx context.Context) { if task == nil { continue } - if err := w.handleTask(ctx, task); err != nil { + if err := w.handleTask(ctx, task, cli); err != nil { log.Printf("Worker error at PasteID: %s, error: %v\n", task.ID, err) } if err := w.repo.Update(task); err != nil { log.Printf("Update error at PasteID: %s, error: %v\n", task.ID, err) } - log.Printf("Judged task %s", task.ID) } else { log.Printf("Worker get task error: %v\n", err) } @@ -64,19 +70,18 @@ func (w *Worker) processTasks(ctx context.Context) { } } -func (w *Worker) handleTask(ctx context.Context, task *model.Paste) error { +func (w *Worker) handleTask(ctx context.Context, task *model.Paste, cli *client.Client) error { log.Printf("Hangling task %s for language %s", task.ID, task.Language) task.Status = model.StatusRunning task.BackEnd = w.cfg.Name w.repo.Update(task) - var err error switch task.Language { case "c++20": - err = w.RunCppTask(ctx, task) + err = w.RunCppTask(ctx, task, cli) default: err = fmt.Errorf("Unsupported language '%s'", task.Language) } @@ -85,5 +90,8 @@ func (w *Worker) handleTask(ctx context.Context, task *model.Paste) error { task.Status = model.StatusUnknownError task.CompileLog = err.Error() } + + log.Printf("Judged task %s, status: %s, runtime: %dms, memory: %dkb", task.ID, task.Status, task.ExecutionTimeMs, task.MemoryUsageKb) + return err }