فهرست منبع

原来的toSqlArray(),connection没释放

dwp 7 ماه پیش
والد
کامیت
63fbebdc8d
54فایلهای تغییر یافته به همراه1696 افزوده شده و 336 حذف شده
  1. 1 1
      server/pom.xml
  2. 2 2
      server/src/main/java/com/giantan/ai/common/config/DataSourceConfig.java
  3. 33 1
      server/src/main/java/com/giantan/ai/common/util/JsonUtil.java
  4. 212 61
      server/src/main/java/com/giantan/data/kvs/repository/GDynamicRepository.java
  5. 16 6
      server/src/main/java/com/giantan/data/kvs/repository/GRepository.java
  6. 1 1
      server/src/main/java/com/giantan/data/mds/MdsApplication.java
  7. 3 3
      server/src/main/java/com/giantan/data/mds/bot/ExtractPrompts.java
  8. 34 7
      server/src/main/java/com/giantan/data/mds/bot/GChatClient.java
  9. 214 40
      server/src/main/java/com/giantan/data/mds/chunk/DynamicChunkRepository.java
  10. 0 27
      server/src/main/java/com/giantan/data/mds/chunk/MdChunkController.java
  11. 5 4
      server/src/main/java/com/giantan/data/mds/chunk/MdChunkRepository.java
  12. 1 1
      server/src/main/java/com/giantan/data/mds/chunk/Readme.java
  13. 0 29
      server/src/main/java/com/giantan/data/mds/chunk/SqlArrayUtils.java
  14. 25 1
      server/src/main/java/com/giantan/data/mds/config/TaskConfiguration.java
  15. 9 9
      server/src/main/java/com/giantan/data/mds/controller/ChatController.java
  16. 59 0
      server/src/main/java/com/giantan/data/mds/controller/ChunkController.java
  17. 1 2
      server/src/main/java/com/giantan/data/mds/controller/MdCollectionsController.java
  18. 3 0
      server/src/main/java/com/giantan/data/mds/controller/MdDocsController.java
  19. 17 18
      server/src/main/java/com/giantan/data/mds/controller/TaskController.java
  20. 1 1
      server/src/main/java/com/giantan/data/mds/controller/TaxonomyController.java
  21. 3 1
      server/src/main/java/com/giantan/data/mds/repository/MdDynamicChunkRepository.java
  22. 12 0
      server/src/main/java/com/giantan/data/mds/service/IHybridSearch.java
  23. 19 2
      server/src/main/java/com/giantan/data/mds/service/IMdChunksService.java
  24. 15 0
      server/src/main/java/com/giantan/data/mds/service/IVectorization.java
  25. 0 38
      server/src/main/java/com/giantan/data/mds/service/TaskStatus.java
  26. 58 0
      server/src/main/java/com/giantan/data/mds/service/impl/DocReq.java
  27. 2 1
      server/src/main/java/com/giantan/data/mds/service/impl/DynamicTaxonomyService2.java
  28. 2 2
      server/src/main/java/com/giantan/data/mds/service/impl/FileProcessingService.java
  29. 69 0
      server/src/main/java/com/giantan/data/mds/service/impl/HybridSearch.java
  30. 1 1
      server/src/main/java/com/giantan/data/mds/service/impl/MdCache.java
  31. 37 1
      server/src/main/java/com/giantan/data/mds/service/impl/MdChunksService.java
  32. 2 1
      server/src/main/java/com/giantan/data/mds/service/impl/MdCollectionsService.java
  33. 2 1
      server/src/main/java/com/giantan/data/mds/service/impl/MdCores.java
  34. 3 2
      server/src/main/java/com/giantan/data/mds/service/impl/MdDocsService.java
  35. 3 1
      server/src/main/java/com/giantan/data/mds/service/impl/MdFilesService.java
  36. 2 1
      server/src/main/java/com/giantan/data/mds/service/impl/TaskStatusManager.java
  37. 145 0
      server/src/main/java/com/giantan/data/mds/service/impl/Vectorization.java
  38. 82 4
      server/src/main/java/com/giantan/data/mds/task/impl/BaseTaskHandler.java
  39. 73 0
      server/src/main/java/com/giantan/data/mds/task/impl/ChunksTaskHandler.java
  40. 140 0
      server/src/main/java/com/giantan/data/mds/task/impl/EmbeddingTaskHandler.java
  41. 139 12
      server/src/main/java/com/giantan/data/mds/task/impl/KeywordsTaskHandler.java
  42. 4 6
      server/src/main/java/com/giantan/data/mds/task/impl/SliceTaskHandler.java
  43. 24 10
      server/src/main/java/com/giantan/data/tasks/TaskContext.java
  44. 25 8
      server/src/main/java/com/giantan/data/tasks/TaskEventListener.java
  45. 20 1
      server/src/main/java/com/giantan/data/tasks/TaskManager.java
  46. 1 1
      server/src/main/java/com/giantan/data/tasks/TaskObjectStatus.java
  47. 9 0
      server/src/main/java/com/giantan/data/tasks/TaskState.java
  48. 32 5
      server/src/main/java/com/giantan/data/tasks/TaskStatus.java
  49. 1 1
      server/src/main/java/com/giantan/data/tasks/TaskType.java
  50. 2 2
      server/src/main/java/com/giantan/data/tasks/repository/TaskStatusHistory.java
  51. 2 2
      server/src/main/java/com/giantan/data/tasks/repository/TaskStatusHistoryRepository.java
  52. 56 0
      server/src/main/java/com/giantan/data/util/JdbcUtils.java
  53. 11 9
      server/src/main/resources/application.yml
  54. 63 9
      server/src/test/java/com/giantan/data/mds/MdsApplicationTests.java

+ 1 - 1
server/pom.xml

@@ -9,7 +9,7 @@
         <version>1.0.0</version>
     </parent>
 
-    <version>1.2.1</version>
+    <version>1.6.3</version>
     <artifactId>mdserver</artifactId>
 
     <properties>

+ 2 - 2
server/src/main/java/com/giantan/ai/common/config/DataSourceConfig.java

@@ -11,10 +11,10 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 
-@Configuration
+//@Configuration
 public class DataSourceConfig {
 
-    @Autowired
+    //@Autowired
     DataSourceProperties dataSourceProperties;
 
     public DataSourceConfig() {

+ 33 - 1
server/src/main/java/com/giantan/ai/common/util/JsonUtil.java

@@ -1,13 +1,45 @@
 package com.giantan.ai.common.util;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.util.List;
+import java.util.Map;
+
 public class JsonUtil {
+    static ObjectMapper objectMapper = new ObjectMapper();
 
     public static String toJsonString(Object obj) throws JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
         String s = objectMapper.writeValueAsString(obj);
         return s;
     }
+
+    public static Map<String, Object> fromJsonString(String json) throws JsonProcessingException {
+        Map<String, Object> map = objectMapper.readValue(json, new TypeReference<Map<String, Object>>() {
+        });
+        return map;
+    }
+
+    public static List<Double> toDoubleList(String js) throws JsonProcessingException {
+        List<Double> list = objectMapper.readValue(js, new TypeReference<List<Double>>() {});
+        return list;
+    }
+
+    public static List<Float> toFloatList(String js) throws JsonProcessingException {
+        List<Float> list = objectMapper.readValue(js, new TypeReference<List<Float>>() {});
+        return list;
+    }
+
+    public static List<List<Float>> toListFloatList(String js) throws JsonProcessingException {
+        List<List<Float>> ret = objectMapper.readValue(js, new TypeReference<List<List<Float>>>() {});
+        return ret;
+    }
+
+    // 检查并清理控制字符
+    public static String cleanControlChars(String input) {
+        if (input == null) return null;
+        return input.replaceAll("[\\x00-\\x1F]", ""); // 删除所有 JSON 不允许的控制字符
+    }
+    //
 }

+ 212 - 61
server/src/main/java/com/giantan/data/kvs/repository/GDynamicRepository.java

@@ -8,6 +8,8 @@ import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.kvs.kvstore.IGDynamicRepository;
 import com.giantan.ai.util.id.IdGenerator;
 import com.giantan.ai.util.id.UuidGenerator;
+import com.giantan.data.util.JdbcUtils;
+import org.springframework.jdbc.core.ConnectionCallback;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.sql.Array;
@@ -171,17 +173,26 @@ public class GDynamicRepository implements IGDynamicRepository {
         }
     }
 
-    private Array toSqlArray(List<String> ls) {
-        if (ls == null) {
-            return null;
-        }
-        try {
-            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
-            return jdbc.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
-        } catch (SQLException e) {
-            throw new RuntimeException("Error creating SQL Array", e);
-        }
-    }
+//    private Array toSqlArray(List<String> ls) {
+//        if (ls == null) {
+//            return null;
+//        }
+//        try {
+//            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
+//            return jdbc.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
+//        } catch (SQLException e) {
+//            throw new RuntimeException("Error creating SQL Array", e);
+//        }
+//    }
+
+//    private Array toSqlArray(List<String> ls) {
+//        if (ls == null) {
+//            return null;
+//        }
+//        return jdbc.execute((ConnectionCallback<Array>) conn ->
+//                conn.createArrayOf("TEXT", ls.toArray(new String[0]))
+//        );
+//    }
 
     // 这个比用  new GRowMapper() 高效
     private GEntity toGEntry(ResultSet rs) throws SQLException {
@@ -255,12 +266,15 @@ public class GDynamicRepository implements IGDynamicRepository {
                 (PreparedStatement ps, GEntity entity) -> {
                     ps.setString(1, entity.getGid());
                     ps.setString(2, entity.getName());
-                    ps.setArray(3, toSqlArray(entity.getAltlabels()));
+                    //ps.setArray(3, toSqlArray(entity.getAltlabels()));
                     ps.setInt(4, toMark(entity.getMark()));
                     ps.setString(5, entity.getDescription());
-                    ps.setArray(6, toSqlArray(entity.getTags()));
+                    //ps.setArray(6, toSqlArray(entity.getTags()));
                     ps.setString(7, entity.getPath());
                     ps.setString(8, toJson(entity.getAttributes())); // 必须是合法 JSON 字符串
+
+                    JdbcUtils.setStringArray(ps,3,entity.getAltlabels());
+                    JdbcUtils.setStringArray(ps,6,entity.getTags());
                 });
 
         List<Integer> rets = new ArrayList<>();
@@ -393,6 +407,7 @@ public class GDynamicRepository implements IGDynamicRepository {
                 .collect(Collectors.joining(","));
 
         String sql = String.format("DELETE FROM %s.%s WHERE id IN (" + placeholders + ")", schema, tableName(collId));
+        ////QQQ
         Object[] params = ids.toArray();
         return jdbc.update(sql, params);
     }
@@ -403,6 +418,48 @@ public class GDynamicRepository implements IGDynamicRepository {
         return jdbc.update(sql);
     }
 
+// toArray() 有 Connection未释放问题
+//    private GEntity updating(String collId, GBaseKeyValue updateFields) throws Throwable {
+//        Object id = updateFields.remove("id");
+//        Object gid = updateFields.remove("gid");
+//
+//        String sql1 = String.format("UPDATE %s.%s SET ", schema, tableName(collId));
+//        StringBuilder sql = new StringBuilder(sql1);
+//
+//        List<Object> objs = new ArrayList<>();
+//        updateFields.forEach((key, value) -> {
+//                    if (GEntityConfig.UPDATABLE_FIELDS_SET.contains(key)) {
+//                        if ("attributes".equals(key)) {
+//                            sql.append(key).append(" = ?::jsonb").append(", ");
+//                            objs.add(toJson((Map<String, Object>) value));
+//                        } else if ("altlabels".equals(key) || "tags".equals(key)) {
+//                            sql.append(key).append(" = ?").append(", ");
+//                            objs.add(toArray((List) value));
+//                        } else {
+//                            sql.append(key).append(" = ?").append(", ");
+//                            objs.add(value);
+//                        }
+//                    }
+//                }
+//        );
+//        // 移除最后的逗号和空格
+//        sql.setLength(sql.length() - 2);
+//        if (id != null) {
+//            sql.append(" WHERE id = ? RETURNING * ;");
+//            objs.add(id);
+//        } else {
+//            sql.append(" WHERE gid = ? RETURNING * ;");
+//            objs.add(gid);
+//        }
+//
+//        GEntity ret = jdbc.queryForObject(sql.toString(),
+//                objs.toArray(new Object[0]),
+//                (rs, rowNum) -> toGEntry(rs)
+//        );
+//
+//        return ret;
+//    }
+
 
     private GEntity updating(String collId, GBaseKeyValue updateFields) throws Throwable {
         Object id = updateFields.remove("id");
@@ -411,40 +468,63 @@ public class GDynamicRepository implements IGDynamicRepository {
         String sql1 = String.format("UPDATE %s.%s SET ", schema, tableName(collId));
         StringBuilder sql = new StringBuilder(sql1);
 
-        List<Object> objs = new ArrayList<>();
+        List<String> keys = new ArrayList<>();
         updateFields.forEach((key, value) -> {
-                    if (GEntityConfig.UPDATABLE_FIELDS_SET.contains(key)) {
-                        if ("attributes".equals(key)) {
-                            sql.append(key).append(" = ?::jsonb").append(", ");
-                            objs.add(toJson((Map<String, Object>) value));
-                        } else if ("altlabels".equals(key) || "tags".equals(key)) {
-                            sql.append(key).append(" = ?").append(", ");
-                            objs.add(toArray((List) value));
-                        } else {
-                            sql.append(key).append(" = ?").append(", ");
-                            objs.add(value);
-                        }
-                    }
+            if (GEntityConfig.UPDATABLE_FIELDS_SET.contains(key)) {
+                sql.append(key);
+                if ("attributes".equals(key)) {
+                    sql.append(" = ?::jsonb");
+                } else if ("altlabels".equals(key) || "tags".equals(key)) {
+                    sql.append(" = ?");
+                } else {
+                    sql.append(" = ?");
                 }
-        );
-        // 移除最后的逗号和空格
-        sql.setLength(sql.length() - 2);
+                sql.append(", ");
+                keys.add(key);
+            }
+        });
+        sql.setLength(sql.length() - 2); // 移除最后逗号
+
         if (id != null) {
             sql.append(" WHERE id = ? RETURNING * ;");
-            objs.add(id);
         } else {
             sql.append(" WHERE gid = ? RETURNING * ;");
-            objs.add(gid);
         }
 
-        GEntity ret = jdbc.queryForObject(sql.toString(),
-                objs.toArray(new Object[0]),
-                (rs, rowNum) -> toGEntry(rs)
-        );
+        List<GEntity> rets = jdbc.query(connection -> {
+            PreparedStatement ps = connection.prepareStatement(sql.toString());
 
-        return ret;
+            int index = 1;
+            for (String key : keys) {
+                Object value = updateFields.get(key);
+                if ("attributes".equals(key)) {
+                    ps.setString(index++, toJson((Map<String, Object>) value));
+                } else if ("altlabels".equals(key) || "tags".equals(key)) {
+                    JdbcUtils.setStringArray(ps, index++, (List<String>) value);
+                } else {
+                    ps.setObject(index++, value);
+                }
+            }
+
+            // 最后绑定 id/gid
+            if (id != null) {
+                ps.setObject(index, id);
+            } else {
+                ps.setObject(index, gid);
+            }
+
+            return ps;
+        }, (rs, rowNum) -> toGEntry(rs));
+        if (rets!= null && !rets.isEmpty()) {
+            return rets.get(0);
+        }
+        return null;
     }
 
+
+
+
+
     @Override
     public GBaseKeyValue update(String collId, GBaseKeyValue kv) throws Throwable {
         if (kv.size() == 0) {
@@ -591,50 +671,121 @@ public class GDynamicRepository implements IGDynamicRepository {
     }
 
 
+//    @Override
+//    public List<String> appendArrayField(String collId, Integer id, String field, List<String> values) {
+//        if (field == null || values == null || values.size() == 0) {
+//            return Collections.emptyList();
+//        }
+//
+//        Array array1 = toSqlArray(values);
+//        String sql = String.format("UPDATE %s.%s SET %s = %s || ?::text[] WHERE id = ? RETURNING %s", schema, tableName(collId), field, field, field);
+//        List<String> rets = jdbc.queryForObject(
+//                sql,
+//                new Object[]{
+//                        array1,
+//                        id
+//                },
+//                (rs, rowNum) -> {
+//                    Array array = rs.getArray(field);
+//                    return toList(array);
+//                }
+//        );
+//        return rets;
+//    }
+
     @Override
     public List<String> appendArrayField(String collId, Integer id, String field, List<String> values) {
-        if (field == null || values == null || values.size() == 0) {
+        if (field == null || values == null || values.isEmpty()) {
             return Collections.emptyList();
         }
 
-        Array array1 = toSqlArray(values);
-        String sql = String.format("UPDATE %s.%s SET %s = %s || ?::text[] WHERE id = ? RETURNING %s", schema, tableName(collId), field, field, field);
-        List<String> rets = jdbc.queryForObject(
-                sql,
-                new Object[]{
-                        array1,
-                        id
+        String sql = String.format(
+                "UPDATE %s.%s SET %s = %s || ?::text[] WHERE id = ? RETURNING %s",
+                schema,
+                tableName(collId),
+                field,
+                field,
+                field
+        );
+
+        return jdbc.query(
+                con -> {
+                    PreparedStatement ps = con.prepareStatement(sql);
+                    // 创建 SQL Array 并绑定到参数
+                    //Array sqlArray = con.createArrayOf("text", values.toArray(new String[0]));
+                    //ps.setArray(1, sqlArray);
+                    JdbcUtils.setStringArray(ps, 1, values);
+                    ps.setInt(2, id);
+                    return ps;
                 },
-                (rs, rowNum) -> {
-                    Array array = rs.getArray(field);
-                    return toList(array);
+                (rs) -> {
+                    if (rs.next()) {
+                        Array array = rs.getArray(field);
+                        return toList(array);
+                    }
+                    return Collections.<String>emptyList();
                 }
         );
-        return rets;
     }
 
+
+
+
+//    @Override
+//    public List<String> setArrayField1(String collId, Integer id, String field, List<String> values) {
+//        if (field == null || values == null) {
+//            return Collections.emptyList();
+//        }
+//
+//        Array array1 = toSqlArray(values);
+//        String sql = String.format("UPDATE %s.%s SET %s = ? WHERE id = ? RETURNING %s", schema, tableName(collId), field, field);
+//        List<String> rets = jdbc.queryForObject(
+//                sql,
+//                new Object[]{
+//                        array1,
+//                        id
+//                },
+//                (rs, rowNum) -> {
+//                    Array array = rs.getArray(field);
+//                    return toList(array);
+//                }
+//        );
+//        return rets;
+//    }
+
     @Override
     public List<String> setArrayField(String collId, Integer id, String field, List<String> values) {
         if (field == null || values == null) {
             return Collections.emptyList();
         }
 
-        Array array1 = toSqlArray(values);
-        String sql = String.format("UPDATE %s.%s SET %s = ? WHERE id = ? RETURNING %s", schema, tableName(collId), field, field);
-        List<String> rets = jdbc.queryForObject(
-                sql,
-                new Object[]{
-                        array1,
-                        id
-                },
-                (rs, rowNum) -> {
-                    Array array = rs.getArray(field);
-                    return toList(array);
-                }
+        String sql = String.format(
+                "UPDATE %s.%s SET %s = ? WHERE id = ? RETURNING %s",
+                schema, tableName(collId), field, field
         );
-        return rets;
+
+        return jdbc.query(con -> {
+            PreparedStatement ps = con.prepareStatement(sql);
+
+            // 第 1 个参数:安全设置 array
+            JdbcUtils.setStringArray(ps, 1, values);
+
+            // 第 2 个参数:id
+            ps.setInt(2, id);
+
+            return ps;
+        }, rs -> {
+            if (rs.next()) {
+                Array array = rs.getArray(field);
+                return toList(array);
+            } else {
+                return Collections.emptyList();
+            }
+        });
     }
 
+
+
     @Override
     public List<String> removeArrayField(String collId, Integer id, String field, List<String> values) {
         if (field == null || values == null || values.size() == 0) {

+ 16 - 6
server/src/main/java/com/giantan/data/kvs/repository/GRepository.java

@@ -37,6 +37,7 @@ import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.kvs.kvstore.IGkvRepository;
 import com.giantan.ai.util.id.IdGenerator;
 import com.giantan.ai.util.id.UuidGenerator;
+import org.springframework.jdbc.core.ConnectionCallback;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.sql.Array;
@@ -145,16 +146,25 @@ public class GRepository implements IGkvRepository {
         }
     }
 
+//    private Array toSqlArray(List<String> ls) {
+//        if (ls == null) {
+//            return null;
+//        }
+//        try {
+//            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
+//            return jdbc.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
+//        } catch (SQLException e) {
+//            throw new RuntimeException("Error creating SQL Array", e);
+//        }
+//    }
+
     private Array toSqlArray(List<String> ls) {
         if (ls == null) {
             return null;
         }
-        try {
-            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
-            return jdbc.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
-        } catch (SQLException e) {
-            throw new RuntimeException("Error creating SQL Array", e);
-        }
+        return jdbc.execute((ConnectionCallback<Array>) conn ->
+                conn.createArrayOf("TEXT", ls.toArray(new String[0]))
+        );
     }
 
     // 这个比用  new GRowMapper() 高效

+ 1 - 1
server/src/main/java/com/giantan/data/mds/MdsApplication.java

@@ -16,7 +16,7 @@ public class MdsApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(MdsApplication.class, args);
-        log.info("Mds server started. Version 1.2.1.1");
+        log.info("Mds server started. Version 1.6.3");
     }
 
 }

+ 3 - 3
server/src/main/java/com/giantan/data/mds/bot/ExtractPrompts.java

@@ -144,7 +144,7 @@ public class ExtractPrompts {
 
     static String QUESTION_DEFAULT = "若干";
 
-    public static String toMetadataStr(Map<String, Object> metadata) {
+    public static String toMetadataStr(Map<String, String> metadata) {
         if (metadata == null || metadata.size() <= 0) return "";
         StringBuilder sb = new StringBuilder();
 
@@ -155,11 +155,11 @@ public class ExtractPrompts {
         return sb.toString();
     }
 
-    public static String getKeywordAndQuestionPrompt(String text, Map<String, Object> metadata) {
+    public static String getKeywordAndQuestionPrompt(String text, Map<String, String> metadata) {
         return getKeywordAndQuestionPrompt(text, metadata, QUESTION_DEFAULT, KEYWORD_DEFAULT, KEYWORD_QUESTION_GEN_TMPL_OUTPUT_FORMAT_ZH);
     }
 
-    public static String getKeywordAndQuestionPrompt(String text, Map<String, Object> metadata, String num_questions, String num_keywords, String output_format) {
+    public static String getKeywordAndQuestionPrompt(String text, Map<String, String> metadata, String num_questions, String num_keywords, String output_format) {
         PromptTemplate promptTemplate = PromptTemplate.builder()
                 .renderer(StTemplateRenderer.builder().startDelimiterToken('{').endDelimiterToken('}').build())
                 .template(KEYWORD_QUESTION_GEN_TMPL_ZH)

+ 34 - 7
server/src/main/java/com/giantan/data/mds/bot/GChatClient.java

@@ -1,5 +1,7 @@
 package com.giantan.data.mds.bot;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.giantan.ai.common.util.JsonUtil;
 import org.springframework.ai.chat.client.ChatClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.io.DefaultResourceLoader;
@@ -7,6 +9,9 @@ import org.springframework.core.io.Resource;
 import org.springframework.core.io.ResourceLoader;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.Map;
+
 @Service
 public class GChatClient {
 
@@ -48,10 +53,22 @@ public class GChatClient {
     @Autowired
     ChatClient openAiChatClient;
 
+    //
     public String ask(String question) {
         //SystemPromptTemplate st = new SystemPromptTemplate();
         ChatClient.CallResponseSpec ret = deepSeekChatClient.prompt(question).call();
-        return ret.content();
+        String s = ret.content();
+        if (s != null){
+            boolean b = s.startsWith("```json");
+            if (b){
+                s = s.substring("```json".length());
+            }
+            b = s.endsWith("```");
+            if (b){
+                s = s.substring(0, s.length()-3);
+            }
+        }
+        return s;
     }
 
     public String askOpenai(String question) {
@@ -59,13 +76,23 @@ public class GChatClient {
         return ret.content();
     }
 
-    public String askOpenaiForAijiu(String question) {
-        //SystemPromptTemplate st = new SystemPromptTemplate();
-        String prompt = JingluoPrompts.aijiuPropmt(question);
-        //ChatClient.CallResponseSpec ret = openAiChatClient.prompt(prompt).call();
+//    public String askOpenaiForAijiu(String question) {
+//        //SystemPromptTemplate st = new SystemPromptTemplate();
+//        String prompt = JingluoPrompts.aijiuPropmt(question);
+//        //ChatClient.CallResponseSpec ret = openAiChatClient.prompt(prompt).call();
+//        String ret = ask(prompt);
+//        return ret;
+//    }
+
+    public Map<String, Object> getKeywordsAndQuestions(String text,Map<String,String> metadata) throws JsonProcessingException {
+        String prompt = ExtractPrompts.getKeywordAndQuestionPrompt(text, metadata);
+        //String ret = askOpenai(prompt);
         String ret = ask(prompt);
-        return ret;
+        if (ret == null) {
+            return null;
+        }
+        //System.out.println(ret);
+        return JsonUtil.fromJsonString(ret);
     }
 
-
 }

+ 214 - 40
server/src/main/java/com/giantan/data/mds/chunk/DynamicChunkRepository.java

@@ -4,12 +4,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.giantan.ai.util.id.IdGenerator;
 import com.giantan.ai.util.id.UuidGenerator;
+import com.giantan.data.kvs.kvstore.GBaseKeyValue;
+import com.giantan.data.util.JdbcUtils;
 import org.springframework.jdbc.core.JdbcTemplate;
 
-import java.sql.Array;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.sql.*;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -87,6 +88,16 @@ public class DynamicChunkRepository {
         jdbc.update(sql);
     }
 
+    public List<MdChunk> findAll(String collId) {
+        String sql = String.format("SELECT * FROM %s ORDER BY chunk_index",tableName(collId));
+        return jdbc.query(sql, this::mapRow);
+    }
+
+    public MdChunk findById(String collId, Long id) {
+        String sql = String.format("SELECT * FROM %s WHERE id = ?",tableName(collId));
+        List<MdChunk> rets = jdbc.query(sql, new Object[]{id}, this::mapRow);
+        return rets.isEmpty() ? null : rets.get(0);
+    }
 
     public long deleteByMdId(String collId, Integer mdId) {
         //String sql = "DELETE FROM %s WHERE md_id = ?";
@@ -94,35 +105,91 @@ public class DynamicChunkRepository {
         return jdbc.update(sql, mdId);
     }
 
+//    public Integer save(String collId, MdChunk chunk) {
+//        String sql1 = """
+//                    INSERT INTO %s (
+//                        md_id, chunk_index, content, plain_text, embedding, chunk_type,
+//                        paragraph_start, paragraph_end, offset_start, offset_end, section_path,
+//                        keywords, metadata, extra
+//                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
+//                """;
+//        String sql = String.format(sql1, tableName(collId));
+//
+//        int updated = jdbc.update(sql,
+//                chunk.getMdId(),
+//                chunk.getChunkIndex(),
+//                chunk.getContent(),
+//                chunk.getPlainText(),
+//                chunk.getEmbedding(),
+//                chunk.getChunkType(),
+//                chunk.getParagraphStart(),
+//                chunk.getParagraphEnd(),
+//                chunk.getOffsetStart(),
+//                chunk.getOffsetEnd(),
+//                chunk.getSectionPath(),
+//                toSqlArray(chunk.getKeywords()),
+//                toJson(chunk.getMetadata()),
+//                toJson(chunk.getExtra())
+//        );
+//        return updated;
+//    }
+
+
     public Integer save(String collId, MdChunk chunk) {
-        String sql1 = """
-                    INSERT INTO %s (
-                        md_id, chunk_index, content, plain_text, embedding, chunk_type,
-                        paragraph_start, paragraph_end, offset_start, offset_end, section_path,
-                        keywords, metadata, extra
-                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
-                """;
-        String sql = String.format(sql1, tableName(collId));
+        String sqlTemplate = """
+            INSERT INTO %s (
+                md_id, chunk_index, content, plain_text, embedding, chunk_type,
+                paragraph_start, paragraph_end, offset_start, offset_end, section_path,
+                keywords, metadata, extra
+            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::jsonb, ?::jsonb)
+            """;
+        String sql = String.format(sqlTemplate, tableName(collId));
 
-        int updated = jdbc.update(sql,
-                chunk.getMdId(),
-                chunk.getChunkIndex(),
-                chunk.getContent(),
-                chunk.getPlainText(),
-                chunk.getEmbedding(),
-                chunk.getChunkType(),
-                chunk.getParagraphStart(),
-                chunk.getParagraphEnd(),
-                chunk.getOffsetStart(),
-                chunk.getOffsetEnd(),
-                chunk.getSectionPath(),
-                toSqlArray(chunk.getKeywords()),
-                toJson(chunk.getMetadata()),
-                toJson(chunk.getExtra())
-        );
-        return updated;
+        return jdbc.update(con -> {
+            PreparedStatement ps = con.prepareStatement(sql);
+
+            ps.setLong(1, chunk.getMdId());
+            ps.setInt(2, chunk.getChunkIndex());
+            ps.setString(3, chunk.getContent());
+            ps.setString(4, chunk.getPlainText());
+            ps.setObject(5, chunk.getEmbedding()); // 假设是 bytea 或 vector
+            ps.setString(6, chunk.getChunkType());
+            ps.setInt(7, chunk.getParagraphStart());
+            ps.setInt(8, chunk.getParagraphEnd());
+            ps.setInt(9, chunk.getOffsetStart());
+            ps.setInt(10, chunk.getOffsetEnd());
+            ps.setString(11, chunk.getSectionPath());
+
+            // keywords → text[]
+            if (chunk.getKeywords() != null) {
+                Array sqlArray = con.createArrayOf("text", chunk.getKeywords().toArray(new String[0]));
+                ps.setArray(12, sqlArray);
+            } else {
+                ps.setNull(12, Types.ARRAY);
+            }
+
+            // metadata → jsonb
+            String metadataJson = toJson(chunk.getMetadata());
+            if (metadataJson != null) {
+                ps.setString(13, metadataJson);
+            } else {
+                ps.setNull(13, Types.VARCHAR);
+            }
+
+            // extra → jsonb
+            String extraJson = toJson(chunk.getExtra());
+            if (extraJson != null) {
+                ps.setString(14, extraJson);
+            } else {
+                ps.setNull(14, Types.VARCHAR);
+            }
+
+            return ps;
+        });
     }
 
+
+
     public List<Integer> saveAll(String collId, List<MdChunk> chunks) {
         String sql1 = """
                     INSERT INTO %s (
@@ -151,9 +218,11 @@ public class DynamicChunkRepository {
                 ps.setObject(9, chunk.getOffsetStart());
                 ps.setObject(10, chunk.getOffsetEnd());
                 ps.setObject(11, chunk.getSectionPath());
-                ps.setArray(12, toSqlArray(chunk.getKeywords()));
+                //ps.setArray(12, toSqlArray(chunk.getKeywords()));
                 ps.setObject(13, toJson(chunk.getMetadata()));
                 ps.setObject(14, toJson(chunk.getExtra()));
+
+                JdbcUtils.setStringArray(ps,12,chunk.getKeywords());
             });
             for (int j = 0; j < batched.length; j++) {
                 for (int k = 0; k < batched[j].length; k++) {
@@ -164,16 +233,120 @@ public class DynamicChunkRepository {
         return rets;
     }
 
-    private Array toSqlArray(List<String> ls) {
-        if (ls == null) {
-            return null;
-        }
-        try {
-            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
-            return jdbc.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
-        } catch (SQLException e) {
-            throw new RuntimeException("Error creating SQL Array", e);
-        }
+
+//    private Array toSqlArray(List<String> ls) {
+//        if (ls == null) {
+//            return null;
+//        }
+//        try (Connection conn = jdbc.getDataSource().getConnection()) {
+//            return conn.createArrayOf("TEXT", ls.toArray(new String[0]));
+//        } catch (SQLException e) {
+//            throw new RuntimeException("Error creating SQL Array", e);
+//        }
+//    }
+
+    public List<Map<String,Object>> getKeywordsByMdId(String collId, Integer mdId) {
+        String sql1 = "SELECT id,keywords FROM %s WHERE md_id = ?";
+        String sql = String.format(sql1, tableName(collId));
+        List<Map<String,Object>> keywordsList = jdbc.query(
+                sql,
+                new Object[]{mdId},
+                (rs, rowNum) -> {
+                    Map<String,Object> map = new HashMap<>();
+                    long id = rs.getLong("id");
+                    map.put("id", id);
+                    Array array = rs.getArray("keywords");
+                    if (array != null) {
+                        map.put("keywords",(String[]) array.getArray());
+                    }else{
+                        map.put("keywords",null);
+                    }
+                    //return new String[0];
+                    return map;
+                }
+        );
+        return keywordsList;
+    }
+
+//    public int updateKeywords(String collId, Long id, List<String> keywords, Map<String, Object> metadata) {
+//        String sql1 = """
+//                    UPDATE %s
+//                    SET
+//                        keywords = COALESCE(?, keywords),
+//                        metadata = CASE
+//                                      WHEN ?::jsonb IS NOT NULL
+//                                      THEN metadata || ?::jsonb
+//                                      ELSE metadata
+//                                   END
+//                    WHERE id = ?
+//                """;
+//
+//        String sql = String.format(sql1, tableName(collId));
+//        String json = toJson(metadata);
+//        int updated = jdbc.update(sql,
+//                toSqlArray(keywords),       // 可传 null
+//                json,   // 可传 null
+//                json,   // 占位符重复,合并用
+//                id
+//        );
+//
+//        return updated;
+//    }
+
+
+    public int updateKeywords(String collId, Long id, List<String> keywords, Map<String, Object> metadata) {
+        String sqlTemplate = """
+            UPDATE %s
+            SET 
+                keywords = COALESCE(?, keywords),
+                metadata = CASE 
+                              WHEN ?::jsonb IS NOT NULL 
+                              THEN metadata || ?::jsonb
+                              ELSE metadata
+                           END
+            WHERE id = ?
+            """;
+
+        String sql = String.format(sqlTemplate, tableName(collId));
+        String json = toJson(metadata); // 可能为 null
+
+        return jdbc.update(con -> {
+            PreparedStatement ps = con.prepareStatement(sql);
+
+            // keywords → text[]
+            if (keywords != null) {
+                Array sqlArray = con.createArrayOf("text", keywords.toArray(new String[0]));
+                ps.setArray(1, sqlArray);
+            } else {
+                ps.setNull(1, Types.ARRAY);
+            }
+
+            // metadata → jsonb
+            if (json != null) {
+                ps.setString(2, json);
+                ps.setString(3, json);
+            } else {
+                ps.setNull(2, Types.VARCHAR); // 仍然能匹配 ?::jsonb
+                ps.setNull(3, Types.VARCHAR);
+            }
+
+            ps.setLong(4, id);
+            return ps;
+        });
+    }
+
+
+    public int setEmbedding(String collId, Long id, String es) {
+        String sql1 = """
+                    UPDATE %s
+                    SET 
+                        embedding = ?
+                    WHERE id = ?
+                """;
+
+        String sql = String.format(sql1, tableName(collId));
+        int updated = jdbc.update(sql, es, id);
+        return updated;
     }
 
     private MdChunk mapRow(ResultSet rs, int rowNum) throws SQLException {
@@ -190,7 +363,7 @@ public class DynamicChunkRepository {
         c.setOffsetStart(rs.getInt("offset_start"));
         c.setOffsetEnd(rs.getInt("offset_end"));
         c.setSectionPath(rs.getString("section_path"));
-        c.setKeywords(SqlArrayUtils.fromStringArray(rs.getArray("keywords")));
+        c.setKeywords(JdbcUtils.fromStringArray(rs.getArray("keywords")));
         c.setMetadata(fromJson(rs.getString("metadata")));
         c.setExtra(fromJson(rs.getString("extra")));
         c.setCreatedAt(rs.getTimestamp("created_at").toInstant());
@@ -214,4 +387,5 @@ public class DynamicChunkRepository {
         }
     }
 
+
 }

+ 0 - 27
server/src/main/java/com/giantan/data/mds/chunk/MdChunkController.java

@@ -1,27 +0,0 @@
-package com.giantan.data.mds.chunk;
-
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import java.util.List;
-
-//@RestController
-//@RequestMapping("/api/md-chunks")
-public class MdChunkController {
-    private final MdChunkRepository repository;
-
-    public MdChunkController(MdChunkRepository repository) {
-        this.repository = repository;
-    }
-
-    @PostMapping
-    public ResponseEntity<?> saveChunk(@RequestBody MdChunk chunk) {
-        repository.save(chunk);
-        return ResponseEntity.ok().build();
-    }
-
-    @GetMapping("/by-md/{mdId}")
-    public List<MdChunk> getByMdId(@PathVariable int mdId) {
-        return repository.findByMdId(mdId);
-    }
-}

+ 5 - 4
server/src/main/java/com/giantan/data/mds/chunk/MdChunkRepository.java

@@ -2,10 +2,12 @@ package com.giantan.data.mds.chunk;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.giantan.data.util.JdbcUtils;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Repository;
 
 import java.sql.Array;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
@@ -52,9 +54,8 @@ public class MdChunkRepository {
         if (ls == null) {
             return null;
         }
-        try {
-            // 使用 JdbcTemplate 连接的 DataSource 创建 Array
-            return jdbcTemplate.getDataSource().getConnection().createArrayOf("TEXT", ls.toArray(new String[0]));
+        try (Connection conn = jdbcTemplate.getDataSource().getConnection()) {
+            return conn.createArrayOf("TEXT", ls.toArray(new String[0]));
         } catch (SQLException e) {
             throw new RuntimeException("Error creating SQL Array", e);
         }
@@ -79,7 +80,7 @@ public class MdChunkRepository {
         c.setOffsetStart(rs.getInt("offset_start"));
         c.setOffsetEnd(rs.getInt("offset_end"));
         c.setSectionPath(rs.getString("section_path"));
-        c.setKeywords(SqlArrayUtils.fromStringArray(rs.getArray("keywords")));
+        c.setKeywords(JdbcUtils.fromStringArray(rs.getArray("keywords")));
         c.setMetadata(fromJson(rs.getString("metadata")));
         c.setExtra(fromJson(rs.getString("extra")));
         c.setCreatedAt(rs.getTimestamp("created_at").toInstant());

+ 1 - 1
server/src/main/java/com/giantan/data/mds/chunk/Readme.java

@@ -14,7 +14,7 @@ package com.giantan.data.mds.chunk;
 //    paragraph_end INT,                             -- 段落结束编号
 //    offset_start INT,                              -- 起始字符偏移
 //    offset_end INT,                                -- 结束字符偏移
-//    section_path TEXT,                             -- 如 "# 第一章 > ## 第二节"
+//    section_path TEXT,                             -- 如 "# 第一章 > ## 第二节" 现在以用于 id (ulid)
 //
 //    keywords TEXT[],                                -- 关键词数组
 //    metadata JSONB,                                -- chunk 元信息(来源、评分、解析方式等)

+ 0 - 29
server/src/main/java/com/giantan/data/mds/chunk/SqlArrayUtils.java

@@ -1,29 +0,0 @@
-package com.giantan.data.mds.chunk;
-
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class SqlArrayUtils {
-//    public static java.sql.Array toPgArray(List<?> list) {
-//        if (list == null) return null;
-//        try (Connection conn = DataSourceUtils.getConnection(new DriverManagerDataSource())) {
-//            return conn.createArrayOf("float8", list.toArray());
-//        } catch (Exception e) {
-//            throw new RuntimeException(e);
-//        }
-//    }
-
-    public static List<Double> fromPgArray(java.sql.Array array) throws SQLException {
-        if (array == null) return null;
-        Object[] objArray = (Object[]) array.getArray();
-        return Arrays.stream(objArray).map(o -> (Double) o).collect(Collectors.toList());
-    }
-
-    public static List<String> fromStringArray(java.sql.Array array) throws SQLException {
-        if (array == null) return null;
-        Object[] objArray = (Object[]) array.getArray();
-        return Arrays.stream(objArray).map(Object::toString).collect(Collectors.toList());
-    }
-}

+ 25 - 1
server/src/main/java/com/giantan/data/mds/config/TaskConfiguration.java

@@ -1,7 +1,12 @@
 package com.giantan.data.mds.config;
 
+import com.giantan.data.mds.bot.GChatClient;
+import com.giantan.data.mds.service.IHybridSearch;
 import com.giantan.data.mds.service.IMdChunksService;
 import com.giantan.data.mds.service.IMdFilesService;
+import com.giantan.data.mds.service.IVectorization;
+import com.giantan.data.mds.task.impl.EmbeddingTaskHandler;
+import com.giantan.data.mds.task.impl.IndexTaskHandler;
 import com.giantan.data.tasks.TaskEventListener;
 import com.giantan.data.tasks.ITaskHandler;
 import com.giantan.data.tasks.TaskHandlerRegistry;
@@ -32,6 +37,15 @@ class TaskConfiguration {
     @Autowired
     IMdChunksService mdChunksService;
 
+    @Autowired
+    GChatClient gChatClient;
+
+    @Autowired
+    IVectorization vectorizationService;
+
+    @Autowired
+    IHybridSearch hybridSearch;
+
     @Bean
     public Executor taskExecutor() {
         //return Executors.newFixedThreadPool(10);
@@ -72,6 +86,16 @@ class TaskConfiguration {
 
     @Bean
     public KeywordsTaskHandler keywordsTaskHandler(){
-        return new KeywordsTaskHandler();
+        return new KeywordsTaskHandler(mdChunksService,gChatClient);
+    }
+
+    @Bean
+    public EmbeddingTaskHandler embeddingTaskHandler(){
+        return new EmbeddingTaskHandler(mdChunksService,vectorizationService);
+    }
+
+    @Bean
+    public IndexTaskHandler indexTaskHandler(){
+        return new IndexTaskHandler(mdChunksService,hybridSearch);
     }
 }

+ 9 - 9
server/src/main/java/com/giantan/data/mds/controller/ChatController.java

@@ -52,14 +52,14 @@ public class ChatController {
         return Map.of("generation", ret);
     }
 
-    @GetMapping("/ask2")
-    public String ask2(@RequestParam(value = "question", defaultValue = "Tell me a joke") String question) {
-//        String ret = chatModel.call(question);
-//        return Map.of("generation", ret);
-        //System.out.println(question);
-        log.info(question);
-        String ret = deepseek2.askOpenaiForAijiu(question);
-        return ret;
-    }
+//    @GetMapping("/ask2")
+//    public String ask2(@RequestParam(value = "question", defaultValue = "Tell me a joke") String question) {
+////        String ret = chatModel.call(question);
+////        return Map.of("generation", ret);
+//        //System.out.println(question);
+//        log.info(question);
+//        String ret = deepseek2.askOpenaiForAijiu(question);
+//        return ret;
+//    }
 
 }

+ 59 - 0
server/src/main/java/com/giantan/data/mds/controller/ChunkController.java

@@ -0,0 +1,59 @@
+package com.giantan.data.mds.controller;
+
+import com.giantan.ai.common.reponse.R;
+import com.giantan.data.kvs.constant.KvConstants;
+import com.giantan.data.mds.chunk.MdChunk;
+import com.giantan.data.mds.service.impl.MdChunksService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+import java.util.Map;
+
+@RestController
+@RequestMapping(KvConstants.API_PREFIX + "/collections/{coll}/chunks")
+public class ChunkController {
+
+    @Autowired
+    MdChunksService mdChunksService;
+
+    @GetMapping("/all")
+    public R<List<MdChunk>> getAll(@PathVariable String coll) {
+        List<MdChunk> ret = null;
+        try {
+            ret = mdChunksService.findAll(coll);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+        return R.data(ret);
+    }
+
+    @GetMapping("/by-md/{mdId}")
+    public R<List<Map<String, Object>>> getKeywordsByMdId(@PathVariable String coll, @PathVariable Integer mdId) {
+        List<Map<String, Object>> ret = null;
+        try {
+            ret = mdChunksService.getKeywordsByMdId(coll, mdId);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+        return R.data(ret);
+    }
+
+    @GetMapping("/{id}")
+    public R<MdChunk> getById(@PathVariable String coll, @PathVariable Long id) {
+        MdChunk ret = null;
+        try {
+            ret = mdChunksService.findById(coll, id);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+        return R.data(ret);
+    }
+
+    @DeleteMapping("/all")
+    public R<Map<String, Object>> deleteAll( @PathVariable String coll) throws Exception {
+        mdChunksService.deleteAll(coll);
+        return R.data(Map.of("deleted",true));
+    }
+
+}

+ 1 - 2
server/src/main/java/com/giantan/data/mds/controller/MdCollectionsController.java

@@ -3,8 +3,7 @@ package com.giantan.data.mds.controller;
 
 import com.giantan.ai.common.reponse.R;
 import com.giantan.data.kvs.constant.KvConstants;
-import com.giantan.data.mds.service.MdCollectionsService;
-import com.giantan.data.mds.service.MdDocsService;
+import com.giantan.data.mds.service.impl.MdCollectionsService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;

+ 3 - 0
server/src/main/java/com/giantan/data/mds/controller/MdDocsController.java

@@ -4,6 +4,9 @@ import com.giantan.ai.common.reponse.R;
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.kvs.constant.KvConstants;
 import com.giantan.data.mds.service.*;
+import com.giantan.data.mds.service.impl.FileProcessingService;
+import com.giantan.data.mds.service.impl.TaskStatusManager;
+import com.giantan.data.tasks.TaskStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;

+ 17 - 18
server/src/main/java/com/giantan/data/mds/controller/TaskController.java

@@ -3,14 +3,13 @@ package com.giantan.data.mds.controller;
 import com.giantan.data.kvs.constant.KvConstants;
 import com.giantan.data.tasks.TaskContext;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.bind.annotation.*;
 
 import com.giantan.data.tasks.TaskManager;
 import com.giantan.data.tasks.TaskType;
-import com.giantan.data.tasks.TaskStatus;
+import com.giantan.data.tasks.TaskState;
 
 import java.util.*;
 
@@ -40,25 +39,25 @@ public class TaskController {
 
         TaskType type = TaskType.valueOf(t);
 
-        List<Object> objects = null;
-
-        if (payload.containsKey("objectIds")) {
-            objects = (List<Object>) payload.remove("objectIds");
-        } else if (payload.containsKey("fromId") && payload.containsKey("toId")) {
-            int from = (int) payload.remove("fromId");
-            int to = (int) payload.remove("toId");
-            objects = new ArrayList<>();
-            for (int i = from; i <= to; i++) {
-                objects.add(i);
-            }
-        } else {
-            throw new IllegalArgumentException("必须提供 objectIds 或 fromId/toId");
-        }
+//        List<Object> objects = null;
+//
+//        if (payload.containsKey("objectIds")) {
+//            objects = (List<Object>) payload.remove("objectIds");
+//        } else if (payload.containsKey("fromId") && payload.containsKey("toId")) {
+//            int from = (int) payload.remove("fromId");
+//            int to = (int) payload.remove("toId");
+//            objects = new ArrayList<>();
+//            for (int i = from; i <= to; i++) {
+//                objects.add(i);
+//            }
+//        } else {
+//            throw new IllegalArgumentException("必须提供 objectIds 或 fromId/toId");
+//        }
 
         //Map<String, Object> params = (Map<String, Object>) payload.getOrDefault("params", new HashMap<>());
         Map<String, Object> params = new HashMap<>(payload);
 
-        String ret = manager.submit(collId, type, objects, params);
+        String ret = manager.submit(collId, type, params);
         return Map.of("taskId", ret);
     }
 
@@ -75,7 +74,7 @@ public class TaskController {
     }
 
     @GetMapping("/{id}/status")
-    public Map<String, TaskStatus> status(@PathVariable String collId, @PathVariable String id) {
+    public Map<String, TaskState> status(@PathVariable String collId, @PathVariable String id) {
         TaskContext ctx = manager.getTask(collId, id);
         return ctx != null ? ctx.getObjectStatus() : Collections.emptyMap();
     }

+ 1 - 1
server/src/main/java/com/giantan/data/mds/controller/TaxonomyController.java

@@ -3,7 +3,7 @@ package com.giantan.data.mds.controller;
 import com.giantan.ai.common.reponse.R;
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.kvs.constant.KvConstants;
-import com.giantan.data.mds.service.DynamicTaxonomyService2;
+import com.giantan.data.mds.service.impl.DynamicTaxonomyService2;
 import com.giantan.data.taxonomy.model.TaxonomyNode;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.ResponseEntity;

+ 3 - 1
server/src/main/java/com/giantan/data/mds/repository/MdDynamicChunkRepository.java

@@ -1,11 +1,14 @@
 package com.giantan.data.mds.repository;
 
+import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.mds.chunk.DynamicChunkRepository;
 import jakarta.annotation.PostConstruct;
 
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
+
 @Repository
 public class MdDynamicChunkRepository extends DynamicChunkRepository {
     private static final org.slf4j.Logger log
@@ -21,5 +24,4 @@ public class MdDynamicChunkRepository extends DynamicChunkRepository {
         setSchema("mddb", "chunks_");
     }
 
-
 }

+ 12 - 0
server/src/main/java/com/giantan/data/mds/service/IHybridSearch.java

@@ -0,0 +1,12 @@
+package com.giantan.data.mds.service;
+
+import com.giantan.data.mds.service.impl.DocReq;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface IHybridSearch {
+    List<DocReq> add(String coll, List<DocReq> docs) throws IOException, InterruptedException;
+
+    int delete(String coll, List<String> ids) throws IOException, InterruptedException;
+}

+ 19 - 2
server/src/main/java/com/giantan/data/mds/service/IMdChunksService.java

@@ -1,15 +1,32 @@
 package com.giantan.data.mds.service;
 
+import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.mds.chunk.MdChunk;
 import org.cnnlp.data.document.GDocument;
 
 import java.util.List;
+import java.util.Map;
 
 public interface IMdChunksService {
 
-    long deleteByMdId(String collId, Integer mdId);
+    long deleteByMdId(String coll, Integer mdId);
 
-    List<Integer> saveAll(String collId, List<MdChunk> chunks) throws Throwable;
+    List<Integer> saveAll(String coll, List<MdChunk> chunks) throws Throwable;
 
     Integer save(String coll, MdChunk chunk) throws Throwable;
+
+    List<Map<String, Object>> getKeywordsByMdId(String coll, Integer mdId);
+
+    int updateKeywordsOrMetadata(String coll, Long id, List<String> keywords, Map<String, Object> metadata);
+
+    List<MdChunk> findAll(String coll);
+
+    MdChunk findById(String coll, Long id);
+
+    int setEmbedding(String coll, Long id, String es);
+
+    void deleteAll(String coll);
+
+    //String getEmbedding(String coll, Long id);
+
 }

+ 15 - 0
server/src/main/java/com/giantan/data/mds/service/IVectorization.java

@@ -0,0 +1,15 @@
+package com.giantan.data.mds.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface IVectorization {
+    List<float[]> batchVectorization(List<String> chunkList) throws IOException, InterruptedException;
+
+    float[] singleVectorization(String chunk) throws IOException, InterruptedException;
+
+    List<float[]> embed(Map<String, Object> inputs) throws IOException, InterruptedException;
+}

+ 0 - 38
server/src/main/java/com/giantan/data/mds/service/TaskStatus.java

@@ -1,38 +0,0 @@
-package com.giantan.data.mds.service;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-//@AllArgsConstructor
-//@NoArgsConstructor
-public class TaskStatus {
-    private String collection;        // "处理中"、"成功"、"失败"
-    private String taskId;
-
-    private String status;        // "处理中"、"成功"、"失败"
-    private String message;
-    private long startTime;
-    private long endTime;
-
-    public TaskStatus(){
-
-    }
-//    public TaskStatus(String status, String message, long startTime, long endTime) {
-//        this.status = status;
-//        this.message = message;
-//        this.startTime = startTime;
-//        this.endTime = endTime;
-//    }
-
-    public TaskStatus(String collection, String taskId, String status, String message, long startTime, long endTime) {
-        this.collection = collection;
-        this.taskId = taskId;
-        this.status = status;
-        this.message = message;
-        this.startTime = startTime;
-        this.endTime = endTime;
-    }
-
-}

+ 58 - 0
server/src/main/java/com/giantan/data/mds/service/impl/DocReq.java

@@ -0,0 +1,58 @@
+package com.giantan.data.mds.service.impl;
+
+import lombok.Data;
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class DocReq {
+    private String id;
+    private String text;
+    private String embedding;
+    private List<String> tags;
+    private Map<String, Object> metadata;
+
+    public DocReq() {
+    }
+
+    public DocReq(String text) {
+        this.text = text;
+    }
+
+    public DocReq(String id, String text, List<String> tags, Map<String, Object> metadata) {
+        this.id = id;
+        this.text = text;
+        this.tags = tags;
+        this.metadata = metadata;
+    }
+
+    public DocReq(String id, String text, List<String> tags, Map<String, Object> metadata,String embedding) {
+        this.id = id;
+        this.text = text;
+        this.tags = tags;
+        this.metadata = metadata;
+        this.embedding = embedding;
+    }
+
+    public static DocReq fromMap(Map<String, Object> map) {
+        DocReq req = new DocReq();
+        req.id = map.get("id").toString();
+        req.text = (String) map.get("text");
+        Object o = map.get("embedding");
+        if (o != null) {
+            req.embedding = (String) o;
+        }
+
+        o = map.get("tags");
+        if (o != null) {
+            req.tags = (List<String>) o;
+        }
+
+        o = map.get("metadata");
+        if (o != null) {
+            req.metadata = (Map<String, Object>) o;
+        }
+        return req;
+    }
+
+}

+ 2 - 1
server/src/main/java/com/giantan/data/mds/service/DynamicTaxonomyService2.java → server/src/main/java/com/giantan/data/mds/service/impl/DynamicTaxonomyService2.java

@@ -1,7 +1,8 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.mds.repository.MdDynamicRepository;
+import com.giantan.data.mds.service.IMdDocsService;
 import com.giantan.data.taxonomy.model.TaxonomyNode;
 import com.giantan.data.taxonomy.repository.DynamicTaxonomyRepository;
 import com.giantan.gfs.service.impl.S3GkbService;

+ 2 - 2
server/src/main/java/com/giantan/data/mds/service/FileProcessingService.java → server/src/main/java/com/giantan/data/mds/service/impl/FileProcessingService.java

@@ -1,6 +1,6 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
-import com.giantan.data.tasks.TaskManager;
+import com.giantan.data.tasks.TaskStatus;
 import com.giantan.gfs.service.impl.S3GkbService;
 import com.giantan.gfs.storer.util.FileUtil;
 import com.giantan.gfs.storer.util.J7Zip;

+ 69 - 0
server/src/main/java/com/giantan/data/mds/service/impl/HybridSearch.java

@@ -0,0 +1,69 @@
+package com.giantan.data.mds.service.impl;
+
+
+import com.giantan.ai.common.util.JsonUtil;
+import com.giantan.data.mds.service.IHybridSearch;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class HybridSearch implements IHybridSearch {
+
+    @Value("${qas.url}")
+    String url = "http://120.78.4.46:7387/v1/collections/";
+    //String url = "http://120.78.4.46:7387/v1/embeddings/embed";
+
+    @Override
+    public List<DocReq> add(String coll, List<DocReq> docs) throws IOException, InterruptedException {
+        String body = JsonUtil.toJsonString(docs);
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(url+coll+"/documents/insert"))
+                .header("Content-Type", "application/json")
+                .header("User-Agent", "myClient/11.0.2")
+                .method("POST", HttpRequest.BodyPublishers.ofString(body))
+                .build();
+        HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
+        //System.out.println(response.body());
+        Map<String, Object> ret = JsonUtil.fromJsonString(response.body());
+        Object o = ret.get("data");
+        List<DocReq> ls = new ArrayList<>();
+        if (o != null && o instanceof List) {
+            List<Map<String, Object>> docList = (List<Map<String, Object>>) o;
+            for (Map<String, Object> doc : docList) {
+                DocReq dr = DocReq.fromMap(doc);
+                ls.add(dr);
+            }
+        }
+        return ls;
+    }
+
+
+    @Override
+    public int delete(String coll, List<String> ids) throws IOException, InterruptedException {
+        String body = JsonUtil.toJsonString(ids);
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(url+coll+"/documents"))
+                .header("Content-Type", "application/json")
+                .header("User-Agent", "myClient/11.0.2")
+                .method("DELETE", HttpRequest.BodyPublishers.ofString(body))
+                .build();
+        HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
+        //System.out.println(response.body());
+        Map<String, Object> ret = JsonUtil.fromJsonString(response.body());
+        Object o = ret.get("data");
+        if (o != null && o instanceof Integer) {
+            return (Integer) o;
+        }
+        return 0;
+    }
+}

+ 1 - 1
server/src/main/java/com/giantan/data/mds/service/MdCache.java → server/src/main/java/com/giantan/data/mds/service/impl/MdCache.java

@@ -1,4 +1,4 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 
 import org.cnnlp.data.md.MdSearcher;

+ 37 - 1
server/src/main/java/com/giantan/data/mds/service/MdChunksService.java → server/src/main/java/com/giantan/data/mds/service/impl/MdChunksService.java

@@ -1,12 +1,14 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 import com.giantan.data.mds.chunk.MdChunk;
 import com.giantan.data.mds.repository.MdDynamicChunkRepository;
+import com.giantan.data.mds.service.IMdChunksService;
 import jakarta.annotation.PostConstruct;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
 
 @Service
 public class MdChunksService implements IMdChunksService {
@@ -46,7 +48,41 @@ public class MdChunksService implements IMdChunksService {
         return ret;
     }
 
+    @Override
+    public List<Map<String, Object>> getKeywordsByMdId(String coll, Integer mdId) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        return mdDynamicChunkRepository.getKeywordsByMdId(Integer.toString(collId), mdId);
+    }
+
+    @Override
+    public int updateKeywordsOrMetadata(String coll, Long id, List<String> keywords, Map<String, Object> metadata) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        return mdDynamicChunkRepository.updateKeywords(Integer.toString(collId), id, keywords, metadata);
+    }
+
+    @Override
+    public List<MdChunk> findAll(String coll) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        return mdDynamicChunkRepository.findAll(Integer.toString(collId));
+    }
 
+    @Override
+    public MdChunk findById(String coll, Long id) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        return mdDynamicChunkRepository.findById(Integer.toString(collId), id);
+    }
+
+    @Override
+    public int setEmbedding(String coll, Long id, String es) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        return mdDynamicChunkRepository.setEmbedding(Integer.toString(collId), id, es);
+    }
+
+    @Override
+    public void deleteAll(String coll) {
+        int collId = mdCollectionsService.getCollectionId(coll);
+        mdDynamicChunkRepository.deleteAll(Integer.toString(collId));
+    }
 
 //    private MdChunk mapRow(GDocument rs) throws SQLException {
 //        MdChunk c = new MdChunk();

+ 2 - 1
server/src/main/java/com/giantan/data/mds/service/MdCollectionsService.java → server/src/main/java/com/giantan/data/mds/service/impl/MdCollectionsService.java

@@ -1,4 +1,4 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.kvs.repository.GEntityConfig;
@@ -6,6 +6,7 @@ import com.giantan.data.kvs.repository.GRepository;
 //import com.giantan.data.kvs.service.ICollection;
 import com.giantan.data.mds.repository.*;
 //import com.giantan.data.taxonomy.repository.DynamicTaxonomyRepository;
+import com.giantan.data.mds.service.CollectionInstance;
 import com.giantan.gfs.service.impl.S3GkbService;
 import jakarta.annotation.PostConstruct;
 import org.springframework.beans.factory.annotation.Autowired;

+ 2 - 1
server/src/main/java/com/giantan/data/mds/service/MdCores.java → server/src/main/java/com/giantan/data/mds/service/impl/MdCores.java

@@ -1,5 +1,6 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
+import com.giantan.data.mds.service.CollectionInstance;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;

+ 3 - 2
server/src/main/java/com/giantan/data/mds/service/MdDocsService.java → server/src/main/java/com/giantan/data/mds/service/impl/MdDocsService.java

@@ -1,7 +1,8 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
 import com.giantan.data.mds.repository.MdDynamicRepository;
+import com.giantan.data.mds.service.IMdDocsService;
 import com.giantan.gfs.service.impl.S3GkbService;
 import com.giantan.gfs.storer.util.FileUtil;
 import org.cnnlp.data.md.MdSearcher;
@@ -21,7 +22,7 @@ import java.util.Map;
 import static com.giantan.gfs.service.impl.S3GkbService.getObjectPath;
 
 @Service
-public class MdDocsService implements IMdDocsService{
+public class MdDocsService implements IMdDocsService {
 
     private static final org.slf4j.Logger log
             = org.slf4j.LoggerFactory.getLogger(MdDocsService.class);

+ 3 - 1
server/src/main/java/com/giantan/data/mds/service/MdFilesService.java → server/src/main/java/com/giantan/data/mds/service/impl/MdFilesService.java

@@ -1,6 +1,8 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
 import com.giantan.data.kvs.kvstore.GBaseKeyValue;
+import com.giantan.data.mds.service.IMdDocsService;
+import com.giantan.data.mds.service.IMdFilesService;
 import com.giantan.gfs.service.impl.S3GkbService;
 import org.apache.commons.io.IOUtils;
 import org.cnnlp.data.md.MdSearcher;

+ 2 - 1
server/src/main/java/com/giantan/data/mds/service/TaskStatusManager.java → server/src/main/java/com/giantan/data/mds/service/impl/TaskStatusManager.java

@@ -1,5 +1,6 @@
-package com.giantan.data.mds.service;
+package com.giantan.data.mds.service.impl;
 
+import com.giantan.data.tasks.TaskStatus;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.springframework.stereotype.Component;

+ 145 - 0
server/src/main/java/com/giantan/data/mds/service/impl/Vectorization.java

@@ -0,0 +1,145 @@
+package com.giantan.data.mds.service.impl;
+
+import com.giantan.ai.common.util.JsonUtil;
+import com.giantan.data.mds.service.IVectorization;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class Vectorization implements IVectorization {
+
+    @Value("${m3.url}")
+    String url = "http://120.78.4.46:7387/v1/embeddings/embed";
+
+//    String url = "http://120.78.4.46:7387/v1/embeddings/vectorize";
+//    String batchUrl = "http://120.78.4.46:7387/v1/embeddings/vectorize2";
+//    String gurl = "http://120.78.4.46:7387/v1/embeddings/embed";
+
+//    @Override
+//    public List<float[]> batchVectorization(List<String> chunkList) throws IOException, InterruptedException {
+//        String formatted = JsonUtil.toJsonString(chunkList);
+//        HttpRequest request = HttpRequest.newBuilder()
+//                .uri(URI.create(batchUrl))
+//                .header("Content-Type", "application/json")
+//                .header("User-Agent", "mdclient/11.0.2")
+//                .method("POST", HttpRequest.BodyPublishers.ofString(formatted))
+//                .build();
+//        HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
+//
+//        String data = response.body();
+//        if (data != null) {
+//            List<List<Float>> lss = JsonUtil.toListFloatList(data);
+//
+//            List<float[]> lf = new ArrayList<float[]>();
+//            for (List<Float> m : lss) {
+//                float[] arr = new float[m.size()];
+//                for (int i = 0; i < m.size(); i++) {
+//                    arr[i] = m.get(i);
+//                }
+//                lf.add(arr);
+//            }
+//
+//            return lf;
+//        }
+//
+//        return null;
+//    }
+//
+//    @Override
+//    public float[] singleVectorization(String chunk) throws IOException, InterruptedException {
+//        String s = """
+//                {
+//                   "input": "%s"
+//                }
+//                """;
+//        String formatted = s.formatted(chunk);
+//
+//        HttpRequest request = HttpRequest.newBuilder()
+//                .uri(URI.create(url))
+//                .header("Content-Type", "application/json")
+//                .header("User-Agent", "mdclient/11.0.2")
+//                .method("POST", HttpRequest.BodyPublishers.ofString(formatted))
+//                .build();
+//        HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
+//
+//        String data = response.body();
+//
+//        if (data != null) {
+//            List<Float> m = JsonUtil.toFloatList(data);
+//            float[] arr = new float[m.size()];
+//            for (int i = 0; i < m.size(); i++) {
+//                arr[i] = m.get(i);
+//            }
+//            return arr;
+//        }
+//
+//        return null;
+//    }
+
+    @Override
+    public List<float[]> batchVectorization(List<String> chunkList) throws IOException, InterruptedException {
+        return embed(Map.of("inputs",chunkList));
+    }
+
+    @Override
+    public float[] singleVectorization(String chunk) throws IOException, InterruptedException {
+        List<float[]> rets = embed(Map.of("inputs", List.of(chunk)));
+        if (rets != null && rets.size() > 0) {
+            return rets.get(0);
+        }
+        return null;
+    }
+
+    @Override
+    public List<float[]> embed(Map<String, Object> inputs) throws IOException, InterruptedException {
+        String requestBody = """
+                {
+                   "inputs": %s
+                }
+                """;
+        Object o = inputs.get("inputs");
+        String json = JsonUtil.toJsonString(o);
+
+        String formatted = String.format(requestBody, JsonUtil.cleanControlChars(json));
+        return getEmdedding(formatted);
+    }
+
+    public List<float[]> getEmdedding(String formatted) throws IOException, InterruptedException {
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(url))
+                .header("Content-Type", "application/json")
+                .header("User-Agent", "mdclient/11.0.2")
+                .method("POST", HttpRequest.BodyPublishers.ofString(formatted))
+                .build();
+        HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
+
+        String data = response.body();
+        if (data != null) {
+            List<List<Float>> lss = JsonUtil.toListFloatList(data);
+
+            List<float[]> lf = new ArrayList<float[]>();
+            for (List<Float> m : lss) {
+                float[] arr = new float[m.size()];
+                for (int i = 0; i < m.size(); i++) {
+                    arr[i] = m.get(i);
+                }
+                lf.add(arr);
+            }
+
+            return lf;
+        }
+
+        return null;
+    }
+
+}

+ 82 - 4
server/src/main/java/com/giantan/data/mds/task/impl/BaseTaskHandler.java

@@ -2,20 +2,89 @@ package com.giantan.data.mds.task.impl;
 
 import com.giantan.data.tasks.ITaskHandler;
 import com.giantan.data.tasks.TaskContext;
-import com.giantan.data.tasks.TaskStatus;
+import com.giantan.data.tasks.TaskState;
 import com.giantan.data.tasks.TaskType;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public abstract class BaseTaskHandler implements ITaskHandler {
+    public static final String TYPE = "type";
+    public static final String ACTION = "action";
+
+    public static final String MD_IDS = "mdIds";
+    public static final String MD_START_ID = "mdStartId";
+    public static final String MD_END_ID = "mdEndId";
+
+    public static final String CHUNK_IDS = "chunkIds";
+    public static final String CHUNK_START_ID = "chunkStartId";
+    public static final String CHUNK_END_ID = "chunkEndId";
+
 
     private static final org.slf4j.Logger log
             = org.slf4j.LoggerFactory.getLogger(BaseTaskHandler.class);
 
+    protected void preProcess(final TaskContext taskContext) {
+        List<Object> objects = null;
+        Map<String, Object> payload = taskContext.getParams();
+        if (payload.containsKey(MD_IDS)) {
+            objects = (List<Object>) payload.remove(MD_IDS);
+            taskContext.setObjectIds(objects);
+        } else if (payload.containsKey(MD_START_ID) && payload.containsKey(MD_END_ID)) {
+            int from = (int) payload.remove(MD_START_ID);
+            int to = (int) payload.remove(MD_END_ID);
+            objects = new ArrayList<>();
+            //for (int i = from; i <= to; i++) {
+            for (int i = from; i < to; i++) {
+                objects.add(i);
+            }
+            taskContext.setObjectIds(objects);
+        } else {
+            throw new IllegalArgumentException("必须提供 mdIds 或 mdStartId/mdEndId");
+        }
+    }
+
+    protected Long toLong(Object o) {
+        if (o instanceof Integer) {
+            int i1 = (Integer) o;
+            long l1 = i1;
+            return l1;
+        } else if (o instanceof Long) {
+            return (Long) o;
+        }
+        return Long.parseLong(o.toString());
+    }
+
+    protected Integer toInt(Object o) {
+        if (o instanceof Integer) {
+            int i1 = (Integer) o;
+            return i1;
+        } else if (o instanceof Long) {
+            Long l1 =  (Long) o;
+            int i1 = l1.intValue();
+            return i1;
+        }
+        return Integer.parseInt(o.toString());
+    }
+
     public void handle(TaskContext context) {
         boolean isCanceled = false;
+        long startTime = System.currentTimeMillis();
+        preProcess(context);
+
+        List<Object> objectIds = context.getObjectIds();
+        if (objectIds == null || objectIds.isEmpty()) {
+            //long endTime = System.currentTimeMillis()-startTime;
+            //log.info(getType() + " task: {} finished. Used time = {}", context.getTaskId(), endTime);
+            context.setStatus(TaskState.SUCCESS);
+            log.info(getType() + " task: {} finished. No mdIds provided.", context.getTaskId());
+            return;
+        }
         log.info(getType() + " task: {} started", context.getTaskId());
-        for (Object objectId : context.getObjectIds()) {
+        for (Object objectId : objectIds) {
             if (context.isCancelled()) {
-                context.setStatus(TaskStatus.CANCELLED);
+                context.setStatus(TaskState.CANCELLED);
                 log.info(getType() + " task: {} (objectId: {}) cancelled", context.getTaskId(), objectId);
                 isCanceled = true;
                 break;
@@ -29,7 +98,16 @@ public abstract class BaseTaskHandler implements ITaskHandler {
             }
         }
         if (!isCanceled) {
-            log.info(getType() + " task: {} finished", context.getTaskId());
+
+            Map<String, TaskState> objectStatus = context.getObjectStatus();
+            long endTime = System.currentTimeMillis()-startTime;
+            if (objectStatus.containsValue(TaskState.FAILED)){
+                context.setStatus(TaskState.FAILED);
+                log.info(getType() + " task: {} finished. Used time = {}", context.getTaskId(), endTime);
+            }else{
+                context.setStatus(TaskState.SUCCESS);
+                log.info(getType() + " task: {} succeed. Used time = {}", context.getTaskId(), endTime);
+            }
         }
     }
 

+ 73 - 0
server/src/main/java/com/giantan/data/mds/task/impl/ChunksTaskHandler.java

@@ -0,0 +1,73 @@
+package com.giantan.data.mds.task.impl;
+
+import com.giantan.data.mds.service.IHybridSearch;
+import com.giantan.data.mds.service.IMdChunksService;
+import com.giantan.data.mds.service.IVectorization;
+import com.giantan.data.tasks.TaskContext;
+import com.giantan.data.tasks.TaskType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ChunksTaskHandler extends BaseTaskHandler {
+
+    IMdChunksService mdChunksService;
+    IVectorization vectorizationService;
+    IHybridSearch hybridSearch;
+
+    @Override
+    public TaskType getType() {
+        return TaskType.CHUNK;
+    }
+
+    protected void preProcess(final TaskContext context) {
+        List<Object> objects = new ArrayList<>();
+        String coll = context.getCollection();
+        Map<String, Object> payload = context.getParams();
+
+        if (payload.containsKey(CHUNK_IDS)) {
+            objects = (List<Object>) payload.get(CHUNK_IDS);
+            //context.setObjectIds(objects);
+        } else if (payload.containsKey(CHUNK_START_ID) && payload.containsKey(CHUNK_END_ID)) {
+            int from = (int) payload.get(CHUNK_START_ID);
+            int to = (int) payload.get(CHUNK_END_ID);
+            objects = new ArrayList<>();
+            //for (int i = from; i <= to; i++) {
+            for (int i = from; i < to; i++) {
+                objects.add(i);
+            }
+            //context.setObjectIds(objects);
+        }else if (payload.containsKey(MD_IDS)) {
+            List<Object> mdIds = (List<Object>) payload.get(MD_IDS);
+            for (Object mdId : mdIds) {
+                List<Map<String, Object>> rets = mdChunksService.getKeywordsByMdId(coll, toInt(mdId));
+                if (rets != null) {
+                    for (Map<String, Object> ret : rets) {
+                        Object o = ret.get("id");
+                        objects.add(o);
+                    }
+                }
+            }
+        }else if (payload.containsKey(MD_START_ID) && payload.containsKey(MD_END_ID)) {
+            int from = (int) payload.get(MD_START_ID);
+            int to = (int) payload.get(MD_END_ID);
+            for (int i = from; i < to; i++) {
+                List<Map<String, Object>> rets = mdChunksService.getKeywordsByMdId(coll, i);
+                if (rets != null) {
+                    for (Map<String, Object> ret : rets) {
+                        Object o = ret.get("id");
+                        objects.add(o);
+                    }
+                }
+            }
+        }
+        context.setObjectIds(objects);
+    }
+
+    @Override
+    public void doing(TaskContext context, Object objectId) {
+        List<String> operations = context.getOperations();
+
+    }
+}

+ 140 - 0
server/src/main/java/com/giantan/data/mds/task/impl/EmbeddingTaskHandler.java

@@ -0,0 +1,140 @@
+package com.giantan.data.mds.task.impl;
+
+import com.giantan.ai.common.util.JsonUtil;
+import com.giantan.data.mds.chunk.MdChunk;
+import com.giantan.data.mds.service.IMdChunksService;
+import com.giantan.data.mds.service.IVectorization;
+import com.giantan.data.tasks.TaskContext;
+import com.giantan.data.tasks.TaskType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EmbeddingTaskHandler extends BaseTaskHandler {
+    private static final org.slf4j.Logger log
+            = org.slf4j.LoggerFactory.getLogger(EmbeddingTaskHandler.class);
+
+
+    IMdChunksService mdChunksService;
+
+    IVectorization vectorizationService;
+
+    public EmbeddingTaskHandler(IMdChunksService mdChunksService, IVectorization vectorizationService) {
+        this.mdChunksService = mdChunksService;
+        this.vectorizationService = vectorizationService;
+    }
+
+    @Override
+    public TaskType getType() {
+        return TaskType.EMBEDDING;
+    }
+
+    protected void preProcess(final TaskContext context) {
+        List<Object> objects = new ArrayList<>();
+        String coll = context.getCollection();
+        Map<String, Object> payload = context.getParams();
+        if (payload.containsKey(MD_IDS)) {
+
+            List<Object> mdIds = (List<Object>) payload.get(MD_IDS);
+            for (Object mdId : mdIds) {
+                List<Map<String, Object>> rets = mdChunksService.getKeywordsByMdId(coll, toInt(mdId));
+                if (rets != null) {
+                    for (Map<String, Object> ret : rets) {
+                        Object o = ret.get("id");
+                        objects.add(o);
+                    }
+                }
+            }
+        }
+        context.setObjectIds(objects);
+    }
+
+    @Override
+    public void doing(TaskContext context, Object objectId) {
+        try {
+            String coll = context.getCollection();
+            Map<String, Object> params = context.getParams();
+            Long chunkId = toLong(objectId);
+            MdChunk chunk = mdChunksService.findById(coll, chunkId);
+            //System.out.println("chunk=" + chunk);
+            // Map<String, Object> chunkMetadata = chunk.getMetadata();
+            //System.out.println("chunkMetadata=" + chunkMetadata);
+            String plainText = chunk.getPlainText();
+            //System.out.println("plainText=" + plainText);
+            Object o = params.get("embeddingMetadata");
+
+            Map<String, Object> extra = null;
+            if (o != null && o instanceof Map) {
+                Map<String, String> mapping = (Map<String, String>) o;
+                //Map<String, String> kvs = getChunkMetadata(mapping, chunkMetadata);
+                extra = buildExtra(mapping, chunk);
+            }
+
+            //System.out.println("kvs=" + kvs);
+
+            Boolean skipChatGptIfExists = (Boolean) params.getOrDefault("skipChatGptIfExists", true);
+            //System.out.println("skipChatGptIfExists=" + skipChatGptIfExists);
+
+            String embedding = chunk.getEmbedding();
+            if (skipChatGptIfExists) {
+                if (embedding != null) {
+
+                } else {
+                    // 调用LLM
+                    callEmbedding(coll, chunkId, plainText, extra);
+                }
+            } else {
+                // 调用LLM
+                callEmbedding(coll, chunkId, plainText, extra);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int callEmbedding(String coll, Long chunkId, String plainText, Map<String, Object> extra) throws IOException, InterruptedException {
+        StringBuilder sb = new StringBuilder();
+        extra.forEach((k, v) -> {
+            sb.append(k).append(": ").append(v).append("\n");
+        });
+
+        sb.append(plainText);
+        float[] fs = vectorizationService.singleVectorization(sb.toString());
+        if (fs != null) {
+            String js = JsonUtil.toJsonString(fs);
+            int r = mdChunksService.setEmbedding(coll, chunkId, js);
+            return r;
+        }
+        return 0;
+    }
+
+    private Map<String, Object> buildExtra(Map<String, String> mapping, MdChunk chunk) {
+        Map<String, Object> extra = new HashMap<String, Object>();
+        Map<String, Object> metadata = chunk.getMetadata();
+        mapping.forEach((k, v) -> {
+            if (k.equals("keywords")) {
+                List<String> keywords = chunk.getKeywords();
+                if (keywords != null && keywords.size() > 0) {
+                    extra.put(v, String.join(",", keywords));
+                }
+            } else {
+                Object o = metadata.get(k);
+                if (o != null) {
+                    if (o instanceof String) {
+                        extra.put(v, (String) o);
+                    } else if (o instanceof List) {
+                        List l = (List) o;
+                        if (l.size() > 0) {
+                            extra.put(v, String.join("\n", l));
+                        }
+                    }
+                }
+            }
+        });
+        return extra;
+    }
+
+}

+ 139 - 12
server/src/main/java/com/giantan/data/mds/task/impl/KeywordsTaskHandler.java

@@ -1,26 +1,153 @@
 package com.giantan.data.mds.task.impl;
 
-import com.giantan.data.tasks.ITaskHandler;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.giantan.data.mds.bot.GChatClient;
+import com.giantan.data.mds.chunk.MdChunk;
+import com.giantan.data.mds.service.IMdChunksService;
 import com.giantan.data.tasks.TaskContext;
 import com.giantan.data.tasks.TaskType;
 
-public class KeywordsTaskHandler implements ITaskHandler {
-    @Override
-    public void handle(TaskContext context) {
-        for (Object objectId : context.getObjectIds()) {
-            if (context.isCancelled()) break;
-            try {
-                System.out.println("Extract keywords: " + objectId);
-                Thread.sleep(300); // simulate
-                context.logSuccess(objectId.toString());
-            } catch (Exception e) {
-                context.logFailure(objectId.toString(), e.getMessage());
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KeywordsTaskHandler extends BaseTaskHandler {
+    private static final org.slf4j.Logger log
+            = org.slf4j.LoggerFactory.getLogger(KeywordsTaskHandler.class);
+
+
+    IMdChunksService mdChunksService;
+    GChatClient gChatClient;
+
+    public KeywordsTaskHandler(IMdChunksService mdChunksService, GChatClient gChatClient) {
+        this.mdChunksService = mdChunksService;
+        this.gChatClient = gChatClient;
+    }
+//    @Override
+//    public void handle(TaskContext context) {
+//        for (Object objectId : context.getObjectIds()) {
+//            if (context.isCancelled()) break;
+//            try {
+//                System.out.println("Extract keywords: " + objectId);
+//                Thread.sleep(300); // simulate
+//                context.logSuccess(objectId.toString());
+//            } catch (Exception e) {
+//                context.logFailure(objectId.toString(), e.getMessage());
+//            }
+//        }
+//    }
+
+    // mdIds  chunkIds
+    protected void preProcess(final TaskContext context) {
+        List<Object> objects = new ArrayList<>();
+        String coll = context.getCollection();
+        Map<String, Object> payload = context.getParams();
+
+
+        if (payload.containsKey(MD_IDS)) {
+            List<Object> mdIds = (List<Object>) payload.get(MD_IDS);
+            for (Object mdId : mdIds) {
+                List<Map<String, Object>> rets = mdChunksService.getKeywordsByMdId(coll, toInt(mdId));
+                if (rets != null){
+                    for (Map<String, Object> ret : rets) {
+                        Object o = ret.get("id");
+                        objects.add(o);
+                    }
+                }
             }
         }
+        context.setObjectIds(objects);
     }
 
     @Override
     public TaskType getType() {
         return TaskType.EXTRACT_KEYWORDS;
     }
+
+    @Override
+    public void doing(TaskContext context, Object objectId) {
+        try {
+            String coll = context.getCollection();
+            Map<String, Object> params = context.getParams();
+            Long chunkId = toLong(objectId);
+            MdChunk chunk = mdChunksService.findById(coll, chunkId);
+            //System.out.println("chunk=" + chunk);
+            Map<String, Object> metadata = chunk.getMetadata();
+            //System.out.println("metadata=" + metadata);
+            String plainText = chunk.getPlainText();
+            //System.out.println("plainText=" + plainText);
+
+            Map<String, String> kvs = new HashMap<>();
+
+            Object o = params.get("chunkMetadata");
+            if (o != null && o instanceof Map) {
+                Map<String, String> mapping = (Map<String, String>) o;
+                kvs = getChunkMetadata(mapping, metadata);
+            }
+
+            Boolean skipChatGptIfExists = (Boolean) params.getOrDefault("skipChatGptIfExists", true);
+            //System.out.println("skipChatGptIfExists=" + skipChatGptIfExists);
+            List<String> keywords = chunk.getKeywords();
+            if (skipChatGptIfExists) {
+                if (keywords != null && keywords.size() > 0) {
+
+                } else {
+                    // 调用LLM
+                    callLLM(coll,chunkId,plainText, kvs);
+                }
+            } else {
+                // 调用LLM
+                callLLM(coll,chunkId,plainText, kvs);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private void callLLM(String coll, Long chunkId, String plainText, Map<String, String> kvs) throws JsonProcessingException {
+        //long t = System.currentTimeMillis();
+        Map<String, Object> kws = gChatClient.getKeywordsAndQuestions(plainText, kvs);
+        //t = System.currentTimeMillis() - t;
+        //System.out.println("kws=" + kws);
+        //System.out.println("used time = " + t);
+        if (kws != null && kws.size() > 0) {
+            List<String> keywords = null;
+            List<String> questions = null;
+            Object o = kws.get("keywords");
+            if (o != null && o instanceof List) {
+                keywords = (List<String>) o;
+            }
+            o = kws.get("questions");
+            if (o != null && o instanceof List) {
+                questions = (List<String>) o;
+            }
+
+            if (keywords != null && questions != null) {
+                int i = mdChunksService.updateKeywordsOrMetadata(coll, chunkId, keywords, Map.of("llm_questions", questions));
+                //System.out.println("i=" + i);
+            }
+        }
+    }
+
+
+    private Map<String, String> getChunkMetadata(Map<String, String> mapping, Map<String, Object> metadata) {
+        Map<String, String> m2 = new HashMap<String, String>();
+        mapping.forEach((k, v) -> {
+            Object o = metadata.get(k);
+            if (o != null) {
+                if (o instanceof String) {
+                    m2.put(v, (String) o);
+                } else if (o instanceof List) {
+                    List l = (List) o;
+                    if (l.size() > 0) {
+                        m2.put(v, String.join(",", l));
+                    }
+                }
+            }
+        });
+        return m2;
+    }
 }

+ 4 - 6
server/src/main/java/com/giantan/data/mds/task/impl/SliceTaskHandler.java

@@ -26,10 +26,13 @@ public class SliceTaskHandler extends BaseTaskHandler {
     private static final String MD_TYPE = "mdType";
     private static final String CHUNK_METADATA = "chunkMetadata";
     private static final String FILE_NAME = "_fileName";
+    //private static final String CHUNK_ULID = "_ulid";
 
     IMdFilesService mdFilesService;
     IMdChunksService mdChunksService;
 
+    //IdGenerator idGen = new UlidGenerator();
+
     public SliceTaskHandler(IMdFilesService mdFilesService, IMdChunksService mdChunksService) {
         this.mdFilesService = mdFilesService;
         this.mdChunksService = mdChunksService;
@@ -137,12 +140,6 @@ public class SliceTaskHandler extends BaseTaskHandler {
 //        return docs;
 //    }
 
-    private Integer toInt(Object o) {
-        if (o instanceof Integer) {
-            return (Integer) o;
-        }
-        return Integer.parseInt(o.toString());
-    }
 
     protected MdChunk toChunk(Integer mdId, GDocument doc, int idx, String baseName, Map<String, Object> userMetadata) {
         MdChunk chunk = new MdChunk();
@@ -188,6 +185,7 @@ public class SliceTaskHandler extends BaseTaskHandler {
         metadata1.put(FILE_NAME, baseName);
         metadata1.putAll(metadata);
 
+        //metadata1.put(CHUNK_ULID, idGen.generateId());
         return chunk;
     }
 

+ 24 - 10
server/src/main/java/com/giantan/data/tasks/TaskContext.java

@@ -17,12 +17,12 @@ public class TaskContext implements Serializable {
     private final String taskId;
     private final String collection;
     private final TaskType type;
-    private final List<Object> objectIds;
-
+    private List<Object> objectIds;
+    private List<String> operations;
     private final Map<String, Object> params;
     private final AtomicBoolean cancelled = new AtomicBoolean(false);
-    private volatile TaskStatus status;
-    private final Map<String, TaskStatus> objectStatus = new ConcurrentHashMap<>();
+    private volatile TaskState status;
+    private final Map<String, TaskState> objectStatus = new ConcurrentHashMap<>();
     private Map<String, Object> extra = new HashMap<>();
     private volatile String error;
 
@@ -39,7 +39,7 @@ public class TaskContext implements Serializable {
         this.objectIds = objectIds;
         this.params = params;
         this.createdAt = Instant.now();
-        this.status = TaskStatus.PENDING;
+        this.status = TaskState.PENDING;
     }
 
     public static TaskContext from(TaskStatusHistory history) {
@@ -67,14 +67,28 @@ public class TaskContext implements Serializable {
         return ctx;
     }
 
+    public record Pair<L, R>(L left, R right) {}
+
     public String getTaskId() {
         return taskId;
     }
 
+    public void setObjectIds(List<Object> objectIds) {
+        this.objectIds = objectIds;
+    }
+
     public List<Object> getObjectIds() {
         return objectIds;
     }
 
+    public List<String> getOperations() {
+        return operations;
+    }
+
+    public void setOperations(List<String> operations) {
+        this.operations = operations;
+    }
+
     public String getCollection() {
         return collection;
     }
@@ -91,11 +105,11 @@ public class TaskContext implements Serializable {
         return createdAt;
     }
 
-    public TaskStatus getStatus() {
+    public TaskState getStatus() {
         return status;
     }
 
-    public void setStatus(TaskStatus status) {
+    public void setStatus(TaskState status) {
         this.status = status;
         if (isTerminal()) {
             markCompleted();
@@ -151,14 +165,14 @@ public class TaskContext implements Serializable {
     }
 
     public void logSuccess(String objectId) {
-        objectStatus.put(objectId, TaskStatus.SUCCESS);
+        objectStatus.put(objectId, TaskState.SUCCESS);
     }
 
     public void logFailure(String objectId, String reason) {
-        objectStatus.put(objectId, TaskStatus.FAILED);
+        objectStatus.put(objectId, TaskState.FAILED);
     }
 
-    public Map<String, TaskStatus> getObjectStatus() {
+    public Map<String, TaskState> getObjectStatus() {
         return objectStatus;
     }
 

+ 25 - 8
server/src/main/java/com/giantan/data/tasks/TaskEventListener.java

@@ -2,11 +2,15 @@ package com.giantan.data.tasks;
 
 import com.google.common.eventbus.Subscribe;
 
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class TaskEventListener {
+    private static final org.slf4j.Logger log
+            = org.slf4j.LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private final TaskHandlerRegistry registry;
     private final TaskManager manager;
@@ -44,32 +48,45 @@ public class TaskEventListener {
 
     @Subscribe
     public void onTask(TaskEvent event) {
-        TaskContext context = manager.getTask(event.collection,event.taskId);
+        TaskContext context = manager.getTask(event.collection, event.taskId);
         if (context == null || context.isCancelled()) return;
 
         ITaskHandler handler = registry.get(event.type);
         if (handler == null) {
-            context.setStatus(TaskStatus.UNKNOWN);
+            context.setStatus(TaskState.UNKNOWN);
             context.setError("No handler found");
             return;
         }
 
         try {
-            context.setStatus(TaskStatus.RUNNING);
+            context.setStatus(TaskState.RUNNING);
             handler.handle(context);
-            context.setStatus(TaskStatus.SUCCESS);
+
+            //context.setStatus(TaskState.SUCCESS);
+            // 下面 细化一下
+            if (context.getStatus() == TaskState.RUNNING) {
+                context.setStatus(TaskState.SUCCESS);
+                Map<String, TaskState> objectStatus = context.getObjectStatus();
+                if (objectStatus != null && objectStatus.size() > 0) {
+                    if (objectStatus.containsValue(TaskState.FAILED)) {
+                        context.setStatus(TaskState.FAILED);
+                    }
+                }
+            }
         } catch (Exception e) {
             context.incrementRetry();
             context.setError(e.getMessage());
-
+            log.error("Task: {} failed. {}", context.getTaskId(), e.getMessage());
             if (context.canRetry()) {
-                System.out.println("任务失败,准备重试:" + context.getTaskId() + ",第 " + context.getRetryCount() + " 次");
+                log.error("任务失败,准备重试:" + context.getTaskId() + ",第 " + context.getRetryCount() + " 次");
 
                 // 延迟后重试(可使用 scheduler 或线程池)
                 scheduleRetry(event, context.getRetryDelayMillis());
             } else {
-                context.setStatus(TaskStatus.FAILED);
-                System.out.println("任务失败,超过最大重试次数:" + context.getTaskId());
+                context.setStatus(TaskState.FAILED);
+                if (context.canRetry() && context.getMaxRetries() > 0) {
+                    log.error("任务失败,超过最大重试次数:" + context.getTaskId());
+                }
             }
         }
     }

+ 20 - 1
server/src/main/java/com/giantan/data/tasks/TaskManager.java

@@ -35,6 +35,25 @@ public class TaskManager implements ITaskManager {
     }
 
 
+    public String submit(String coll, TaskType type, Map<String, Object> payload) {
+        List<Object> objects = null;
+
+//        if (payload.containsKey("objectIds")) {
+//            objects = (List<Object>) payload.remove("objectIds");
+//        }
+//        else if (payload.containsKey("fromId") && payload.containsKey("toId")) {
+//            int from = (int) payload.remove("fromId");
+//            int to = (int) payload.remove("toId");
+//            objects = new ArrayList<>();
+//            for (int i = from; i <= to; i++) {
+//                objects.add(i);
+//            }
+//        } else {
+//            throw new IllegalArgumentException("必须提供 objectIds 或 fromId/toId");
+//        }
+        return submit(coll,type,objects,payload);
+    }
+
     public String submit(String coll, TaskType type, List<Object> objectIds, Map<String, Object> params) {
         String taskId = UUID.randomUUID().toString();
         TaskContext context = new TaskContext(taskId, coll, type, objectIds, params);
@@ -64,7 +83,7 @@ public class TaskManager implements ITaskManager {
 
     @Override
     public Collection<TaskContext> findByStatus(String coll, String status) {
-        TaskStatus taskType = TaskStatus.valueOf(status.toUpperCase());
+        TaskState taskType = TaskState.valueOf(status.toUpperCase());
         Collection<TaskContext> values = tasks.values();
         Collection<TaskContext> values2 = new ArrayList<>();
 

+ 1 - 1
server/src/main/java/com/giantan/data/tasks/TaskObjectStatus.java

@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 public class TaskObjectStatus {
-    private TaskStatus status;
+    private TaskState status;
     private String error;
     private int attempt;
 }

+ 9 - 0
server/src/main/java/com/giantan/data/tasks/TaskState.java

@@ -0,0 +1,9 @@
+package com.giantan.data.tasks;
+
+public enum TaskState {
+    PENDING, RUNNING, SUCCESS, FAILED, CANCELLED, UNKNOWN;
+
+    public boolean isTerminal() {
+        return this == SUCCESS || this == FAILED || this == CANCELLED;
+    }
+}

+ 32 - 5
server/src/main/java/com/giantan/data/tasks/TaskStatus.java

@@ -1,9 +1,36 @@
 package com.giantan.data.tasks;
 
-public enum TaskStatus {
-    PENDING, RUNNING, SUCCESS, FAILED, CANCELLED, UNKNOWN;
+import lombok.Data;
+
+@Data
+//@AllArgsConstructor
+//@NoArgsConstructor
+public class TaskStatus {
+    private String collection;        // "处理中"、"成功"、"失败"
+    private String taskId;
+
+    private String status;        // "处理中"、"成功"、"失败"
+    private String message;
+    private long startTime;
+    private long endTime;
+
+    public TaskStatus(){
 
-    public boolean isTerminal() {
-        return this == SUCCESS || this == FAILED || this == CANCELLED;
     }
-}
+//    public TaskStatus(String status, String message, long startTime, long endTime) {
+//        this.status = status;
+//        this.message = message;
+//        this.startTime = startTime;
+//        this.endTime = endTime;
+//    }
+
+    public TaskStatus(String collection, String taskId, String status, String message, long startTime, long endTime) {
+        this.collection = collection;
+        this.taskId = taskId;
+        this.status = status;
+        this.message = message;
+        this.startTime = startTime;
+        this.endTime = endTime;
+    }
+
+}

+ 1 - 1
server/src/main/java/com/giantan/data/tasks/TaskType.java

@@ -1,5 +1,5 @@
 package com.giantan.data.tasks;
 
 public enum TaskType {
-    UPLOAD, SLICE, INDEX, EXTRACT_KEYWORDS
+    UPLOAD, SLICE, CHUNK,EXTRACT_KEYWORDS, EMBEDDING, INDEX
 }

+ 2 - 2
server/src/main/java/com/giantan/data/tasks/repository/TaskStatusHistory.java

@@ -1,7 +1,7 @@
 package com.giantan.data.tasks.repository;
 
 import com.giantan.data.tasks.TaskObjectStatus;
-import com.giantan.data.tasks.TaskStatus;
+import com.giantan.data.tasks.TaskState;
 import lombok.Data;
 
 import java.time.Instant;
@@ -38,7 +38,7 @@ public class TaskStatusHistory {
     private Map<String, Object> params;
     private Map<String, Object> extra; // ✅ 新增字段
 
-    private TaskStatus status;
+    private TaskState status;
     private String error;
     private int retryCount;
     private int maxRetries;

+ 2 - 2
server/src/main/java/com/giantan/data/tasks/repository/TaskStatusHistoryRepository.java

@@ -2,7 +2,7 @@ package com.giantan.data.tasks.repository;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.giantan.data.tasks.TaskObjectStatus;
-import com.giantan.data.tasks.TaskStatus;
+import com.giantan.data.tasks.TaskState;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.RowMapper;
 
@@ -96,7 +96,7 @@ public class TaskStatusHistoryRepository {
             task.setObjectStatuses(jsonMapper.fromJson(rs.getString("object_statuses"), new TypeReference<>() {}));
             task.setParams(jsonMapper.fromJson(rs.getString("params"), new TypeReference<>() {}));
             task.setExtra(jsonMapper.fromJson(rs.getString("extra"), new TypeReference<>() {})); // ✅ 反序列化 extra
-            task.setStatus(TaskStatus.valueOf(rs.getString("status")));
+            task.setStatus(TaskState.valueOf(rs.getString("status")));
             task.setError(rs.getString("error"));
             task.setRetryCount(rs.getInt("retry_count"));
             task.setMaxRetries(rs.getInt("max_retries"));

+ 56 - 0
server/src/main/java/com/giantan/data/util/JdbcUtils.java

@@ -0,0 +1,56 @@
+package com.giantan.data.util;
+
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JdbcUtils {
+
+    /**
+     * 安全地给 PreparedStatement 设置 TEXT[] 参数
+     * 兼容老版本 JDBC 驱动(Array 不实现 AutoCloseable)
+     *
+     * @param ps    PreparedStatement
+     * @param index 参数下标
+     * @param list  字符串列表
+     * @throws SQLException
+     */
+    public static void setStringArray(PreparedStatement ps, int index, List<String> list) throws SQLException {
+        if (list == null) {
+            ps.setNull(index, java.sql.Types.ARRAY);
+            return;
+        }
+
+        Array sqlArray = null;
+        try {
+            sqlArray = ps.getConnection().createArrayOf("TEXT", list.toArray(new String[0]));
+            ps.setArray(index, sqlArray);
+        } finally {
+            // Array.free() 用于释放资源,不会关闭连接
+            if (sqlArray != null) {
+                try {
+                    sqlArray.free();
+                } catch (SQLException ignored) {
+                    // 忽略释放异常
+                }
+            }
+        }
+    }
+
+
+    public static List<Double> fromPgArray(java.sql.Array array) throws SQLException {
+        if (array == null) return null;
+        Object[] objArray = (Object[]) array.getArray();
+        return Arrays.stream(objArray).map(o -> (Double) o).collect(Collectors.toList());
+    }
+
+    public static List<String> fromStringArray(java.sql.Array array) throws SQLException {
+        if (array == null) return null;
+        Object[] objArray = (Object[]) array.getArray();
+        return Arrays.stream(objArray).map(Object::toString).collect(Collectors.toList());
+    }
+
+}

+ 11 - 9
server/src/main/resources/application.yml

@@ -14,14 +14,13 @@ spring:
     username: postgres
     password: 123456
     driver-class-name: org.postgresql.Driver
-
-  hikari:
-    maximum-pool-size: 10
-    minimum-idle: 2
-    idle-timeout: 600000
-    max-lifetime: 1800000
-    connection-timeout: 30000
-    pool-name: MyHikariPool
+    hikari:
+      maximum-pool-size: 30
+      minimum-idle: 5
+      idle-timeout: 30000
+      max-lifetime: 1740000
+      connection-timeout: 10000
+      pool-name: MyHikariPool
 
   ai:
     deepseek:
@@ -58,4 +57,7 @@ task:
     expire-minutes: 10
     keep-last: 10
 
-
+m3:
+  url: "http://120.78.4.46:7387/v1/embeddings/embed"
+qas:
+  url: "http://120.78.4.46:7387/v1/collections/"

+ 63 - 9
server/src/test/java/com/giantan/data/mds/MdsApplicationTests.java

@@ -1,29 +1,83 @@
 package com.giantan.data.mds;
 
+import com.giantan.ai.common.util.JsonUtil;
+import com.giantan.ai.util.id.IdGenerator;
+import com.giantan.ai.util.id.UlidGenerator;
 import com.giantan.data.mds.bot.GChatClient;
+import com.giantan.data.mds.service.impl.DocReq;
+import com.giantan.data.mds.service.impl.HybridSearch;
+import com.giantan.data.mds.service.impl.MdChunksService;
+import com.giantan.data.mds.service.impl.Vectorization;
+import com.github.f4b6a3.ulid.Ulid;
+import com.github.f4b6a3.ulid.UlidCreator;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @SpringBootTest
 class MdsApplicationTests {
 
 	@Autowired
 	GChatClient deepseek2;
 
+	@Autowired
+	MdChunksService mdChunksService;
+
+	@Autowired
+	Vectorization vectorization;
+
+	@Autowired
+	HybridSearch hybridSearch;
+
 	@Test
-	void contextLoads() {
+	void contextLoads() throws IOException, InterruptedException {
 		System.out.println("Hello World");
 		//fetchInfo();
 
-		String s="发烧";
-		long t= System.currentTimeMillis();
-		System.out.println(s);
-		//String ret = deepseek2.askOpenaiForAijiu(s);
-		String ret = deepseek2.ask(s);
-		t = System.currentTimeMillis()-t;
-		System.out.println(ret);
-		System.out.println("used time = "+t);
+//		String s="发烧";
+//		long t= System.currentTimeMillis();
+//		System.out.println(s);
+//		//String ret = deepseek2.askOpenaiForAijiu(s);
+//		String ret = deepseek2.ask(s);
+//		t = System.currentTimeMillis()-t;
+//		System.out.println(ret);
+//		System.out.println("used time = "+t);
+
+//		List<Map<String,Object>> demo11 = mdChunksService.getKeywordsByMdId("demo11", 2);
+//		System.out.println(demo11);
+
+		int i = mdChunksService.updateKeywordsOrMetadata("demo11", 316l, List.of("demo1", "测试"), Map.of("m1", "11"));
+		System.out.println(i);
+
+//		List<MdChunk> chunks = mdChunksService.findAll("demo11");
+//		System.out.println(chunks);
+
+//		float[] fs = vectorization.singleVectorization("花儿为什么这样红");
+//		System.out.println(fs);
+
+//		List<float[]> floats = vectorization.embed(Map.of("inputs",List.of("先天虽定,后天可调。", "补肾填精、培元固本")));
+//		System.out.println(floats);
+
+//		UlidGenerator uid = new UlidGenerator();
+//		float[] fs = vectorization.singleVectorization("先天虽定,后天可调。");
+//		System.out.println(fs[0]);
+//		DocReq doc = new DocReq();
+//		doc.setText("先天虽定,后天可调。");
+//		doc.setId(UlidCreator.getUlid().toLowerCase());
+//        doc.setEmbedding(JsonUtil.toJsonString(fs));
+//		doc.setTags(List.of("demo"));
+//		doc.setMetadata(new HashMap<>());
+//		hybridSearch.add("demo11",List.of(doc));
+
+		//"01k2p4c41mbprwk822r2b86ntd",
+		//"01k2p4henhmydpraw76ktnjzdg"
+//		int deleted = hybridSearch.delete("demo11", List.of("01k2p4c41mbprwk822r2b86ntd"));
+//		System.out.println(deleted);
 	}
 
 }