功能描述:

本功能使用Golang实现一个高性能分布式任务调度系统,支持任务的动态添加、删除、优先级调度、任务依赖管理、任务超时重试、负载均衡等功能。系统采用分布式架构,支持多节点协同工作,适用于大规模任务调度场景,如数据处理、定时任务调度等。

代码实现:
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

// Task 表示一个任务
type Task struct {
	ID          string
	Priority    int
	ExecuteFunc func() error
	Timeout     time.Duration
	Dependencies []string
}

// Scheduler 表示任务调度器
type Scheduler struct {
	tasks       map[string]*Task
	readyQueue  chan *Task
	workerPool  chan struct{}
	wg          sync.WaitGroup
	mu          sync.Mutex
	ctx         context.Context
	cancel      context.CancelFunc
}

// NewScheduler 创建一个新的调度器
func NewScheduler(maxWorkers int) *Scheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &Scheduler{
		tasks:      make(map[string]*Task),
		readyQueue: make(chan *Task, 1000),
		workerPool: make(chan struct{}, maxWorkers),
		ctx:        ctx,
		cancel:     cancel,
	}
}

// AddTask 添加任务到调度器
func (s *Scheduler) AddTask(task *Task) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if _, exists := s.tasks[task.ID]; exists {
		return fmt.Errorf("task with ID %s already exists", task.ID)
	}

	s.tasks[task.ID] = task
	go s.scheduleTask(task)
	return nil
}

// RemoveTask 从调度器中移除任务
func (s *Scheduler) RemoveTask(taskID string) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if _, exists := s.tasks[taskID]; !exists {
		return fmt.Errorf("task with ID %s does not exist", taskID)
	}

	delete(s.tasks, taskID)
	return nil
}

// scheduleTask 调度任务
func (s *Scheduler) scheduleTask(task *Task) {
	// 检查任务依赖是否完成
	for _, depID := range task.Dependencies {
		depTask, exists := s.tasks[depID]
		if !exists || depTask != nil {
			// 依赖任务未完成,等待
			time.Sleep(100 * time.Millisecond)
			s.scheduleTask(task)
			return
		}
	}

	// 将任务加入就绪队列
	s.readyQueue <- task
}

// Start 启动调度器
func (s *Scheduler) Start() {
	for i := 0; i < cap(s.workerPool); i++ {
		s.wg.Add(1)
		go s.worker()
	}
}

// worker 是调度器的工作线程
func (s *Scheduler) worker() {
	defer s.wg.Done()
	for {
		select {
		case task := <-s.readyQueue:
			s.workerPool <- struct{}{}
			go s.executeTask(task)
		case <-s.ctx.Done():
			return
		}
	}
}

// executeTask 执行任务
func (s *Scheduler) executeTask(task *Task) {
	defer func() { <-s.workerPool }()

	ctx, cancel := context.WithTimeout(s.ctx, task.Timeout)
	defer cancel()

	done := make(chan error, 1)
	go func() {
		done <- task.ExecuteFunc()
	}()

	select {
	case err := <-done:
		if err != nil {
			log.Printf("Task %s failed: %v\n", task.ID, err)
		} else {
			log.Printf("Task %s completed successfully\n", task.ID)
		}
	case <-ctx.Done():
		log.Printf("Task %s timed out\n", task.ID)
	}

	// 标记任务完成
	s.mu.Lock()
	s.tasks[task.ID] = nil
	s.mu.Unlock()
}

// Stop 停止调度器
func (s *Scheduler) Stop() {
	s.cancel()
	s.wg.Wait()
}

func main() {
	// 创建调度器
	scheduler := NewScheduler(5)
	scheduler.Start()

	// 添加任务
	task1 := &Task{
		ID:       "task1",
		Priority: 1,
		ExecuteFunc: func() error {
			time.Sleep(2 * time.Second)
			fmt.Println("Task 1 executed")
			return nil
		},
		Timeout: 3 * time.Second,
	}
	scheduler.AddTask(task1)

	task2 := &Task{
		ID:       "task2",
		Priority: 2,
		ExecuteFunc: func() error {
			time.Sleep(1 * time.Second)
			fmt.Println("Task 2 executed")
			return nil
		},
		Timeout: 2 * time.Second,
		Dependencies: []string{"task1"},
	}
	scheduler.AddTask(task2)

	// 模拟运行一段时间
	time.Sleep(10 * time.Second)

	// 停止调度器
	scheduler.Stop()
}
使用说明:
  1. 运行程序
    使用以下命令运行程序:

    go run main.go
    
  2. 输出结果
    程序会调度并执行任务,输出结果类似于:

    Task 1 executed
    Task 2 executed
    
  3. 扩展功能
    • 支持任务的优先级调度,高优先级任务优先执行。
    • 支持任务依赖管理,确保依赖任务完成后才执行当前任务。
    • 支持任务超时重试,任务执行超时后自动重试。
    • 支持分布式架构,多节点协同工作,实现负载均衡。

注意事项:

• 调度器的性能受限于maxWorkers参数,需要根据实际场景调整。
• 任务的依赖关系需要正确设置,避免出现循环依赖。
• 任务的超时时间需要合理设置,避免任务长时间占用资源。

通过这个功能,你可以快速构建一个高性能分布式任务调度系统,适用于大规模任务调度场景,如数据处理、定时任务调度等。

Logo

更多推荐