Golang实现一个高性能分布式任务调度系统
Golang实现一个高性能分布式任务调度系统
·
功能描述:
本功能使用Golang实现一个高性能分布式任务调度系统,支持任务的动态添加、删除、优先级调度、任务依赖管理、任务超时重试、负载均衡等功能。系统采用分布式架构,支持多节点协同工作,适用于大规模任务调度场景,如数据处理、定时任务调度等。
代码实现:
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Task 表示一个任务
type Task struct {
ID string
Priority int
ExecuteFunc func() error
Timeout time.Duration
Dependencies []string
}
// Scheduler 表示任务调度器
type Scheduler struct {
tasks map[string]*Task
readyQueue chan *Task
workerPool chan struct{}
wg sync.WaitGroup
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
// NewScheduler 创建一个新的调度器
func NewScheduler(maxWorkers int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
tasks: make(map[string]*Task),
readyQueue: make(chan *Task, 1000),
workerPool: make(chan struct{}, maxWorkers),
ctx: ctx,
cancel: cancel,
}
}
// AddTask 添加任务到调度器
func (s *Scheduler) AddTask(task *Task) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.tasks[task.ID]; exists {
return fmt.Errorf("task with ID %s already exists", task.ID)
}
s.tasks[task.ID] = task
go s.scheduleTask(task)
return nil
}
// RemoveTask 从调度器中移除任务
func (s *Scheduler) RemoveTask(taskID string) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.tasks[taskID]; !exists {
return fmt.Errorf("task with ID %s does not exist", taskID)
}
delete(s.tasks, taskID)
return nil
}
// scheduleTask 调度任务
func (s *Scheduler) scheduleTask(task *Task) {
// 检查任务依赖是否完成
for _, depID := range task.Dependencies {
depTask, exists := s.tasks[depID]
if !exists || depTask != nil {
// 依赖任务未完成,等待
time.Sleep(100 * time.Millisecond)
s.scheduleTask(task)
return
}
}
// 将任务加入就绪队列
s.readyQueue <- task
}
// Start 启动调度器
func (s *Scheduler) Start() {
for i := 0; i < cap(s.workerPool); i++ {
s.wg.Add(1)
go s.worker()
}
}
// worker 是调度器的工作线程
func (s *Scheduler) worker() {
defer s.wg.Done()
for {
select {
case task := <-s.readyQueue:
s.workerPool <- struct{}{}
go s.executeTask(task)
case <-s.ctx.Done():
return
}
}
}
// executeTask 执行任务
func (s *Scheduler) executeTask(task *Task) {
defer func() { <-s.workerPool }()
ctx, cancel := context.WithTimeout(s.ctx, task.Timeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- task.ExecuteFunc()
}()
select {
case err := <-done:
if err != nil {
log.Printf("Task %s failed: %v\n", task.ID, err)
} else {
log.Printf("Task %s completed successfully\n", task.ID)
}
case <-ctx.Done():
log.Printf("Task %s timed out\n", task.ID)
}
// 标记任务完成
s.mu.Lock()
s.tasks[task.ID] = nil
s.mu.Unlock()
}
// Stop 停止调度器
func (s *Scheduler) Stop() {
s.cancel()
s.wg.Wait()
}
func main() {
// 创建调度器
scheduler := NewScheduler(5)
scheduler.Start()
// 添加任务
task1 := &Task{
ID: "task1",
Priority: 1,
ExecuteFunc: func() error {
time.Sleep(2 * time.Second)
fmt.Println("Task 1 executed")
return nil
},
Timeout: 3 * time.Second,
}
scheduler.AddTask(task1)
task2 := &Task{
ID: "task2",
Priority: 2,
ExecuteFunc: func() error {
time.Sleep(1 * time.Second)
fmt.Println("Task 2 executed")
return nil
},
Timeout: 2 * time.Second,
Dependencies: []string{"task1"},
}
scheduler.AddTask(task2)
// 模拟运行一段时间
time.Sleep(10 * time.Second)
// 停止调度器
scheduler.Stop()
}
使用说明:
-
运行程序:
使用以下命令运行程序:go run main.go
-
输出结果:
程序会调度并执行任务,输出结果类似于:Task 1 executed Task 2 executed
-
扩展功能:
• 支持任务的优先级调度,高优先级任务优先执行。
• 支持任务依赖管理,确保依赖任务完成后才执行当前任务。
• 支持任务超时重试,任务执行超时后自动重试。
• 支持分布式架构,多节点协同工作,实现负载均衡。
注意事项:
• 调度器的性能受限于maxWorkers
参数,需要根据实际场景调整。
• 任务的依赖关系需要正确设置,避免出现循环依赖。
• 任务的超时时间需要合理设置,避免任务长时间占用资源。
通过这个功能,你可以快速构建一个高性能分布式任务调度系统,适用于大规模任务调度场景,如数据处理、定时任务调度等。
更多推荐
所有评论(0)