Quellcode durchsuchen

重构了TaskManager,md和qa 可以分别构建自己的异步任务管理

dwp vor 7 Monaten
Ursprung
Commit
ffabd88ea0

+ 1 - 1
server/src/main/java/com/giantan/ai/util/SpringContextUtil.java

@@ -5,7 +5,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
 
-@Component
+//Component
 public class SpringContextUtil implements ApplicationContextAware {
     private static ApplicationContext context;
 

+ 20 - 0
server/src/main/java/com/giantan/data/common/SpringContextUtil.java

@@ -0,0 +1,20 @@
+package com.giantan.data.common;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SpringContextUtil implements ApplicationContextAware {
+    private static ApplicationContext context;
+
+    @Override
+    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
+        context = ctx;
+    }
+
+    public static <T> T getBean(Class<T> clazz) {
+        return context.getBean(clazz);
+    }
+}

+ 1 - 1
server/src/main/java/com/giantan/data/mds/MdsApplication.java

@@ -8,7 +8,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
 
 @SpringBootApplication
 @EnableAsync
-@ComponentScan(basePackages = {"com.giantan.data.mds", "com.giantan.data.index", "com.giantan.data.qa"})
+@ComponentScan(basePackages = {"com.giantan.data.mds", "com.giantan.data.index", "com.giantan.data.qa", "com.giantan.data.common"})
 //@ComponentScan("com.giantan.data.se")
 public class MdsApplication {
     private static final org.slf4j.Logger log

+ 47 - 50
server/src/main/java/com/giantan/data/mds/config/TaskConfiguration.java

@@ -5,12 +5,9 @@ import com.giantan.data.index.IHybridSearch;
 import com.giantan.data.mds.service.IMdChunksService;
 import com.giantan.data.mds.service.IMdFilesService;
 import com.giantan.data.mds.service.IVectorization;
-import com.giantan.data.mds.task.impl.*;
 import com.giantan.data.qa.service.IQaDocsService;
 import com.giantan.data.qa.service.task.QasTaskHandler;
-import com.giantan.data.tasks.*;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -37,56 +34,56 @@ class TaskConfiguration {
     @Autowired
     IHybridSearch hybridSearch;
 
-    @Autowired
-    IPersistentTaskService persistentTaskService;
+//    @Autowired
+//    IPersistentTaskService persistentTaskService;
 
     @Autowired
     IQaDocsService qaDocsService;
 
-    @Bean
-    public Executor taskExecutor() {
-        //return Executors.newFixedThreadPool(10);
-        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
-        taskExecutor.setCorePoolSize(5);
-        taskExecutor.setMaxPoolSize(20);
-        taskExecutor.setQueueCapacity(40);
-        taskExecutor.initialize();
-        return taskExecutor;
-    }
-
-    @Bean
-    public EventBus eventBus(Executor taskExecutor) {
-        return new AsyncEventBus(taskExecutor);
-    }
-
-    @Bean
-    public TaskManager taskManager(EventBus eventBus) {
-        TaskManager taskManager = new TaskManager(eventBus);
-        taskManager.setPersistentTaskService(persistentTaskService);
-        return taskManager;
-    }
-
-    @Bean
-    public TaskHandlerRegistry taskHandlerRegistry(List<ITaskHandler> handlers) {
-        return new TaskHandlerRegistry(handlers);
-    }
-
-    @Bean
-    public TaskEventListener taskEventListener(TaskHandlerRegistry registry, TaskManager manager, EventBus eventBus) {
-        TaskEventListener listener = new TaskEventListener(registry, manager);
-        eventBus.register(listener); // 注册监听器
-        return listener;
-    }
-
-    @Bean
-    public MdsTaskHandler mdTaskHandler() {
-        return new MdsTaskHandler(mdFilesService,mdChunksService);
-    }
-
-    @Bean
-    public ChunksTaskHandler chunkTaskHandler() {
-        return new ChunksTaskHandler(mdChunksService,vectorizationService,hybridSearch,gChatClient);
-    }
+//    @Bean
+//    public Executor taskExecutor() {
+//        //return Executors.newFixedThreadPool(10);
+//        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+//        taskExecutor.setCorePoolSize(5);
+//        taskExecutor.setMaxPoolSize(20);
+//        taskExecutor.setQueueCapacity(40);
+//        taskExecutor.initialize();
+//        return taskExecutor;
+//    }
+//
+//    @Bean
+//    public EventBus eventBus(Executor taskExecutor) {
+//        return new AsyncEventBus(taskExecutor);
+//    }
+//
+//    @Bean
+//    public TaskManager taskManager(EventBus eventBus) {
+//        TaskManager taskManager = new TaskManager(eventBus);
+//        taskManager.setPersistentTaskService(persistentTaskService);
+//        return taskManager;
+//    }
+//
+//    @Bean
+//    public TaskHandlerRegistry taskHandlerRegistry(List<ITaskHandler> handlers) {
+//        return new TaskHandlerRegistry(handlers);
+//    }
+//
+//    @Bean
+//    public TaskEventListener taskEventListener(TaskHandlerRegistry registry, TaskManager manager, EventBus eventBus) {
+//        TaskEventListener listener = new TaskEventListener(registry, manager);
+//        eventBus.register(listener); // 注册监听器
+//        return listener;
+//    }
+
+//    @Bean
+//    public MdsTaskHandler mdTaskHandler() {
+//        return new MdsTaskHandler(mdFilesService,mdChunksService);
+//    }
+//
+//    @Bean
+//    public ChunksTaskHandler chunkTaskHandler() {
+//        return new ChunksTaskHandler(mdChunksService,vectorizationService,hybridSearch,gChatClient);
+//    }
 
     @Bean
     public QasTaskHandler qasTaskHandler() {

+ 2 - 2
server/src/main/java/com/giantan/data/mds/controller/MdDocsController.java

@@ -5,7 +5,7 @@ import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.mds.constant.MdConstants;
 import com.giantan.data.mds.service.*;
 import com.giantan.data.mds.service.impl.FileProcessingService;
-import com.giantan.data.mds.service.impl.TaskStatusManager;
+import com.giantan.data.mds.service.impl.MdFileTaskStatusManager;
 import com.giantan.data.tasks.TaskStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
@@ -59,7 +59,7 @@ public class MdDocsController {
     private FileProcessingService fileProcessingService;
 
     @Autowired
-    private TaskStatusManager taskStatusManager;
+    private MdFileTaskStatusManager taskStatusManager;
 
 //    @Autowired
 //    ICollectionService collectionService;

+ 5 - 3
server/src/main/java/com/giantan/data/mds/controller/TaskController.java

@@ -1,6 +1,7 @@
 package com.giantan.data.mds.controller;
 
 import com.giantan.data.mds.constant.MdConstants;
+import com.giantan.data.mds.service.impl.MdTaskManager;
 import com.giantan.data.tasks.*;
 import com.giantan.data.tasks.repository.TaskStatusHistory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -16,7 +17,8 @@ import java.util.*;
 public class TaskController {
 
     @Autowired
-    private TaskManager manager;
+    //private TaskManager manager;
+    private MdTaskManager manager;
 
     /*
     {
@@ -115,7 +117,7 @@ public class TaskController {
         if (createdAtEnd != null) {
             endTime = LocalDateTime.parse(createdAtEnd);
         }
-        return manager.getHistoryTasks(collId,startTime, endTime, status);
+        return manager.getHistoryTasks(collId, startTime, endTime, status);
     }
 
     @DeleteMapping("/history/cleanup")
@@ -133,7 +135,7 @@ public class TaskController {
         if (createdAtEnd != null) {
             endTime = LocalDateTime.parse(createdAtEnd);
         }
-        return manager.deleteHistoryTasks(collId,startTime, endTime, status);
+        return manager.deleteHistoryTasks(collId, startTime, endTime, status);
     }
 
 }

+ 1 - 1
server/src/main/java/com/giantan/data/mds/service/impl/FileProcessingService.java

@@ -26,7 +26,7 @@ public class FileProcessingService {
             = org.slf4j.LoggerFactory.getLogger(FileProcessingService.class);
 
     @Autowired
-    private TaskStatusManager taskStatusManager;
+    private MdFileTaskStatusManager taskStatusManager;
 
 //    @Autowired
 //    TaskManager taskManager;

+ 1 - 1
server/src/main/java/com/giantan/data/mds/service/impl/TaskStatusManager.java → server/src/main/java/com/giantan/data/mds/service/impl/MdFileTaskStatusManager.java

@@ -11,7 +11,7 @@ import java.util.concurrent.TimeUnit;
 
 
 @Component
-public class TaskStatusManager {
+public class MdFileTaskStatusManager {
 
     private final Map<String, TaskStatus> processingMap = new ConcurrentHashMap<>();
 

+ 137 - 0
server/src/main/java/com/giantan/data/mds/service/impl/MdTaskManager.java

@@ -0,0 +1,137 @@
+package com.giantan.data.mds.service.impl;
+
+import com.giantan.data.index.IHybridSearch;
+import com.giantan.data.mds.bot.GChatClient;
+import com.giantan.data.mds.repository.MdDynamicTaskRepository;
+import com.giantan.data.mds.service.IMdChunksService;
+import com.giantan.data.mds.service.IMdFilesService;
+import com.giantan.data.mds.service.IVectorization;
+import com.giantan.data.mds.task.MdPersistentTaskService;
+import com.giantan.data.mds.task.impl.ChunksTaskHandler;
+import com.giantan.data.mds.task.impl.MdsTaskHandler;
+import com.giantan.data.tasks.ITaskHandler;
+import com.giantan.data.tasks.TaskEventListener;
+import com.giantan.data.tasks.TaskHandlerRegistry;
+import com.giantan.data.tasks.TaskManager;
+import com.google.common.eventbus.AsyncEventBus;
+import jakarta.annotation.PostConstruct;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+//task:
+//  executor:
+//    corePoolSize: 2
+//    maxPoolSize: 20
+//    queueCapacity: 40
+//    keepAliveSeconds: 60
+
+@Service
+public class MdTaskManager extends TaskManager {
+    @Value("${task.executor.corePoolSize}")
+    int corePoolSize = 5;
+    @Value("${task.executor.maxPoolSize}")
+    int maximumPoolSize = 20;
+    @Value("${task.executor.keepAliveSeconds}")
+    int keepAliveSeconds = 60;
+    @Value("${task.executor.queueCapacity}")
+    int queueCapacity = 40;
+
+    @Autowired
+    IMdFilesService mdFilesService;
+
+    @Autowired
+    IMdChunksService mdChunksService;
+
+    @Autowired
+    GChatClient gChatClient;
+
+    @Autowired
+    IVectorization vectorizationService;
+
+    @Autowired
+    IHybridSearch hybridSearch;
+
+    @Autowired
+    MdCollectionsService mdCollectionsService;
+
+    @Autowired
+    MdDynamicTaskRepository mdDynamicTaskRepository;
+
+//
+//    @Bean
+//    public EventBus eventBus(Executor taskExecutor) {
+//        return new AsyncEventBus(taskExecutor);
+//    }
+//
+//    @Bean
+//    public TaskManager taskManager(EventBus eventBus) {
+//        TaskManager taskManager = new TaskManager(eventBus);
+//        taskManager.setPersistentTaskService(persistentTaskService);
+//        return taskManager;
+//    }
+//
+//    @Bean
+//    public TaskHandlerRegistry taskHandlerRegistry(List<ITaskHandler> handlers) {
+//        return new TaskHandlerRegistry(handlers);
+//    }
+//
+//    @Bean
+//    public TaskEventListener taskEventListener(TaskHandlerRegistry registry, TaskManager manager, EventBus eventBus) {
+//        TaskEventListener listener = new TaskEventListener(registry, manager);
+//        eventBus.register(listener); // 注册监听器
+//        return listener;
+//    }
+
+    public MdTaskManager() {
+
+    }
+
+    @PostConstruct
+    public void init() {
+        Executor executor = taskExecutor();
+        AsyncEventBus eventBus = new AsyncEventBus(executor);
+        setEventBus(eventBus);
+
+        MdPersistentTaskService persistentTaskService = new MdPersistentTaskService(mdCollectionsService,mdDynamicTaskRepository);
+        setPersistentTaskService(persistentTaskService);
+
+        TaskHandlerRegistry taskHandlerRegistry = getTaskHandlerRegistry();
+        //eventBus.register(taskHandlerRegistry);
+
+        TaskEventListener listener = getTaskEventListener(taskHandlerRegistry, this);
+        eventBus.register(listener); // 注册监听器
+    }
+
+    public Executor taskExecutor() {
+        //return Executors.newFixedThreadPool(10);
+        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+        taskExecutor.setCorePoolSize(corePoolSize);
+        taskExecutor.setMaxPoolSize(maximumPoolSize);
+        taskExecutor.setQueueCapacity(queueCapacity);
+        taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
+        taskExecutor.initialize();
+        return taskExecutor;
+    }
+
+    public TaskEventListener getTaskEventListener(TaskHandlerRegistry registry, TaskManager manager) {
+        TaskEventListener listener = new TaskEventListener(registry, manager);
+        return listener;
+    }
+
+    protected TaskHandlerRegistry getTaskHandlerRegistry(){
+        List<ITaskHandler> handlers = new ArrayList<>();
+        MdsTaskHandler mdsTaskHandler = new MdsTaskHandler(mdFilesService, mdChunksService);
+        handlers.add(mdsTaskHandler);
+        ChunksTaskHandler chunksTaskHandler = new ChunksTaskHandler(mdChunksService, vectorizationService, hybridSearch, gChatClient);
+        handlers.add(chunksTaskHandler);
+        TaskHandlerRegistry registry = new TaskHandlerRegistry(handlers);
+        return registry;
+    }
+
+}

+ 11 - 7
server/src/main/java/com/giantan/data/mds/task/PersistentTaskService.java → server/src/main/java/com/giantan/data/mds/task/MdPersistentTaskService.java

@@ -6,22 +6,26 @@ import com.giantan.data.tasks.IPersistentTaskService;
 import com.giantan.data.tasks.TaskContext;
 import com.giantan.data.tasks.repository.TaskConverter;
 import com.giantan.data.tasks.repository.TaskStatusHistory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+
 
 import java.time.LocalDateTime;
 import java.util.List;
 
-@Service
-public class PersistentTaskService implements IPersistentTaskService {
+//@Service
+public class MdPersistentTaskService implements IPersistentTaskService {
 
-    @Autowired
+    //@Autowired
     MdCollectionsService mdCollectionsService;
 
-    @Autowired
+    //@Autowired
     MdDynamicTaskRepository mdDynamicTaskRepository;
 
-    public PersistentTaskService() {
+    public MdPersistentTaskService() {
+    }
+
+    public MdPersistentTaskService(MdCollectionsService mdCollectionsService, MdDynamicTaskRepository mdDynamicTaskRepository) {
+        this.mdCollectionsService = mdCollectionsService;
+        this.mdDynamicTaskRepository = mdDynamicTaskRepository;
     }
 
     @Override

+ 12 - 1
server/src/main/java/com/giantan/data/tasks/TaskManager.java

@@ -2,6 +2,7 @@ package com.giantan.data.tasks;
 
 
 import com.giantan.data.tasks.repository.TaskStatusHistory;
+import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
 
 import java.time.Duration;
@@ -20,7 +21,7 @@ public class TaskManager implements ITaskManager {
 
     private IPersistentTaskService persistentTaskService;
 
-    private final EventBus eventBus;
+    private EventBus eventBus;
 
     //@Value("${task.cleanup.expire-minutes:10}")
     private long expireMinutes = 30;
@@ -31,6 +32,11 @@ public class TaskManager implements ITaskManager {
     private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
 
 
+    public TaskManager() {
+        this.eventBus = new AsyncEventBus(Executors.newFixedThreadPool(5));
+        cleaner.scheduleAtFixedRate(this::cleanupTasks, 1, expireMinutes, TimeUnit.MINUTES);
+    }
+
     public TaskManager(EventBus eventBus) {
         this.eventBus = eventBus;
         cleaner.scheduleAtFixedRate(this::cleanupTasks, 1, expireMinutes, TimeUnit.MINUTES);
@@ -40,6 +46,11 @@ public class TaskManager implements ITaskManager {
         return eventBus;
     }
 
+
+    public void setEventBus(EventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+
     public IPersistentTaskService getPersistentTaskService() {
         return persistentTaskService;
     }

+ 6 - 0
server/src/main/resources/application.yml

@@ -53,6 +53,12 @@ oss:
 
 
 task:
+  executor:
+    corePoolSize: 2
+    maxPoolSize: 20
+    queueCapacity: 40
+    keepAliveSeconds: 60
+
   cleanup:
     expire-minutes: 10
     keep-last: 10