|
|
@@ -1,20 +1,43 @@
|
|
|
package com.giantan.data.mds.task.impl;
|
|
|
|
|
|
-import com.giantan.data.mds.service.IHybridSearch;
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import com.giantan.ai.common.util.JsonUtil;
|
|
|
+import com.giantan.data.mds.bot.GChatClient;
|
|
|
+import com.giantan.data.mds.chunk.MdChunk;
|
|
|
+import com.giantan.data.index.IHybridSearch;
|
|
|
import com.giantan.data.mds.service.IMdChunksService;
|
|
|
import com.giantan.data.mds.service.IVectorization;
|
|
|
+import com.giantan.data.index.dto.DocReq;
|
|
|
+import com.giantan.data.index.dto.DocResp;
|
|
|
import com.giantan.data.tasks.TaskContext;
|
|
|
import com.giantan.data.tasks.TaskType;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
public class ChunksTaskHandler extends BaseTaskHandler {
|
|
|
|
|
|
+ private static final org.slf4j.Logger log
|
|
|
+ = org.slf4j.LoggerFactory.getLogger(ChunksTaskHandler.class);
|
|
|
+
|
|
|
IMdChunksService mdChunksService;
|
|
|
IVectorization vectorizationService;
|
|
|
IHybridSearch hybridSearch;
|
|
|
+ GChatClient gChatClient;
|
|
|
+
|
|
|
+ public ChunksTaskHandler(IMdChunksService mdChunksService,
|
|
|
+ IVectorization vectorizationService,
|
|
|
+ IHybridSearch hybridSearch,
|
|
|
+ GChatClient gChatClient
|
|
|
+ ) {
|
|
|
+ this.mdChunksService = mdChunksService;
|
|
|
+ this.vectorizationService = vectorizationService;
|
|
|
+ this.hybridSearch = hybridSearch;
|
|
|
+ this.gChatClient = gChatClient;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public TaskType getType() {
|
|
|
@@ -23,22 +46,27 @@ public class ChunksTaskHandler extends BaseTaskHandler {
|
|
|
|
|
|
protected void preProcess(final TaskContext context) {
|
|
|
List<Object> objects = new ArrayList<>();
|
|
|
+ context.setObjectIds(objects);
|
|
|
+
|
|
|
String coll = context.getCollection();
|
|
|
Map<String, Object> payload = context.getParams();
|
|
|
|
|
|
if (payload.containsKey(CHUNK_IDS)) {
|
|
|
- objects = (List<Object>) payload.get(CHUNK_IDS);
|
|
|
+ Object o = payload.get(CHUNK_IDS);
|
|
|
//context.setObjectIds(objects);
|
|
|
+ if (o != null && o instanceof List) {
|
|
|
+ objects = (List<Object>) o;
|
|
|
+ }
|
|
|
} else if (payload.containsKey(CHUNK_START_ID) && payload.containsKey(CHUNK_END_ID)) {
|
|
|
int from = (int) payload.get(CHUNK_START_ID);
|
|
|
int to = (int) payload.get(CHUNK_END_ID);
|
|
|
- objects = new ArrayList<>();
|
|
|
+ //objects = new ArrayList<>();
|
|
|
//for (int i = from; i <= to; i++) {
|
|
|
for (int i = from; i < to; i++) {
|
|
|
objects.add(i);
|
|
|
}
|
|
|
//context.setObjectIds(objects);
|
|
|
- }else if (payload.containsKey(MD_IDS)) {
|
|
|
+ } else if (payload.containsKey(MD_IDS)) {
|
|
|
List<Object> mdIds = (List<Object>) payload.get(MD_IDS);
|
|
|
for (Object mdId : mdIds) {
|
|
|
List<Map<String, Object>> rets = mdChunksService.getKeywordsByMdId(coll, toInt(mdId));
|
|
|
@@ -49,7 +77,7 @@ public class ChunksTaskHandler extends BaseTaskHandler {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- }else if (payload.containsKey(MD_START_ID) && payload.containsKey(MD_END_ID)) {
|
|
|
+ } else if (payload.containsKey(MD_START_ID) && payload.containsKey(MD_END_ID)) {
|
|
|
int from = (int) payload.get(MD_START_ID);
|
|
|
int to = (int) payload.get(MD_END_ID);
|
|
|
for (int i = from; i < to; i++) {
|
|
|
@@ -68,6 +96,254 @@ public class ChunksTaskHandler extends BaseTaskHandler {
|
|
|
@Override
|
|
|
public void doing(TaskContext context, Object objectId) {
|
|
|
List<String> operations = context.getOperations();
|
|
|
+ for (String operation : operations) {
|
|
|
+ doing(context, objectId, operation);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void doing(TaskContext context, Object objectId, String operation) {
|
|
|
+ if (operation.equalsIgnoreCase("keywords")) {
|
|
|
+ extractKeywords(context, objectId, "keywords");
|
|
|
+ } else if (operation.equalsIgnoreCase("vectorization")) {
|
|
|
+ vectorization(context, objectId, "vectorization");
|
|
|
+ } else if (operation.equalsIgnoreCase("indexCreate")) {
|
|
|
+ indexCreate(context, objectId, "indexCreate");
|
|
|
+ } else if (operation.equalsIgnoreCase("indexDelete")) {
|
|
|
+ indexDelete(context, objectId, "indexDelete");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void indexDelete(TaskContext context, Object objectId, String operation) {
|
|
|
+ try {
|
|
|
+ String coll = context.getCollection();
|
|
|
+ Long chunkId = toLong(objectId);
|
|
|
+ MdChunk chunk = mdChunksService.findById(coll, chunkId);
|
|
|
+ String uid = chunk.getSectionPath();
|
|
|
+ hybridSearch.delete(coll, List.of(uid));
|
|
|
+ context.logSuccess(objectId.toString(), operation);
|
|
|
+ } catch (Exception e) {
|
|
|
+ context.logFailure(objectId.toString(), operation, e.getMessage());
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void indexCreate(TaskContext context, Object objectId, String operation) {
|
|
|
+ try {
|
|
|
+ String coll = context.getCollection();
|
|
|
+ Map<String, Object> params = context.getParams();
|
|
|
+ Long chunkId = toLong(objectId);
|
|
|
+ MdChunk chunk = mdChunksService.findById(coll, chunkId);
|
|
|
+
|
|
|
+ String plainText = chunk.getPlainText();
|
|
|
+ Object o = params.get("embeddingMetadata");
|
|
|
+
|
|
|
+ Map<String, Object> extra = null;
|
|
|
+ if (o != null && o instanceof Map) {
|
|
|
+ Map<String, String> mapping = (Map<String, String>) o;
|
|
|
+ //Map<String, String> kvs = getChunkMetadata(mapping, chunkMetadata);
|
|
|
+ extra = buildExtra(mapping, chunk);
|
|
|
+ }
|
|
|
|
|
|
+ Boolean skipChatGptIfExists = (Boolean) params.getOrDefault("skipChatGptIfExists", true);
|
|
|
+
|
|
|
+ callIndex(coll, chunkId, chunk, extra);
|
|
|
+ context.logSuccess(objectId.toString(), operation);
|
|
|
+ } catch (Exception e) {
|
|
|
+ context.logFailure(objectId.toString(), operation, e.getMessage());
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void vectorization(TaskContext context, Object objectId, String operation) {
|
|
|
+ try {
|
|
|
+ String coll = context.getCollection();
|
|
|
+ Map<String, Object> params = context.getParams();
|
|
|
+ Long chunkId = toLong(objectId);
|
|
|
+ MdChunk chunk = mdChunksService.findById(coll, chunkId);
|
|
|
+ String plainText = chunk.getPlainText();
|
|
|
+
|
|
|
+ Object o = params.get("embeddingMetadata");
|
|
|
+
|
|
|
+ Map<String, Object> extra = null;
|
|
|
+ if (o != null && o instanceof Map) {
|
|
|
+ Map<String, String> mapping = (Map<String, String>) o;
|
|
|
+ //Map<String, String> kvs = getChunkMetadata(mapping, chunkMetadata);
|
|
|
+ extra = buildExtra(mapping, chunk);
|
|
|
+ }
|
|
|
+
|
|
|
+ //System.out.println("kvs=" + kvs);
|
|
|
+
|
|
|
+ Boolean skipChatGptIfExists = (Boolean) params.getOrDefault("skipChatGptIfExists", true);
|
|
|
+ //System.out.println("skipChatGptIfExists=" + skipChatGptIfExists);
|
|
|
+
|
|
|
+ String embedding = chunk.getEmbedding();
|
|
|
+ if (skipChatGptIfExists) {
|
|
|
+ if (embedding != null) {
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // 调用LLM
|
|
|
+ callEmbedding(coll, chunkId, plainText, extra);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 调用LLM
|
|
|
+ callEmbedding(coll, chunkId, plainText, extra);
|
|
|
+ }
|
|
|
+ context.logSuccess(objectId.toString(), operation);
|
|
|
+ } catch (Exception e) {
|
|
|
+ context.logFailure(objectId.toString(), operation, e.getMessage());
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ private void extractKeywords(TaskContext context, Object objectId, String operation) {
|
|
|
+ try {
|
|
|
+ String coll = context.getCollection();
|
|
|
+ Map<String, Object> params = context.getParams();
|
|
|
+ Long chunkId = toLong(objectId);
|
|
|
+ MdChunk chunk = mdChunksService.findById(coll, chunkId);
|
|
|
+ //System.out.println("chunk=" + chunk);
|
|
|
+ Map<String, Object> metadata = chunk.getMetadata();
|
|
|
+ //System.out.println("metadata=" + metadata);
|
|
|
+ String plainText = chunk.getPlainText();
|
|
|
+ //System.out.println("plainText=" + plainText);
|
|
|
+
|
|
|
+ Map<String, String> kvs = new HashMap<>();
|
|
|
+
|
|
|
+ Object o = params.get("chunkMetadata");
|
|
|
+ if (o != null && o instanceof Map) {
|
|
|
+ Map<String, String> mapping = (Map<String, String>) o;
|
|
|
+ kvs = getChunkMetadata(mapping, metadata);
|
|
|
+ }
|
|
|
+
|
|
|
+ Boolean skipChatGptIfExists = (Boolean) params.getOrDefault("skipChatGptIfExists", true);
|
|
|
+ //System.out.println("skipChatGptIfExists=" + skipChatGptIfExists);
|
|
|
+ List<String> keywords = chunk.getKeywords();
|
|
|
+ if (skipChatGptIfExists) {
|
|
|
+ if (keywords != null && keywords.size() > 0) {
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // 调用LLM
|
|
|
+ callLLM(coll, chunkId, plainText, kvs);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 调用LLM
|
|
|
+ callLLM(coll, chunkId, plainText, kvs);
|
|
|
+ }
|
|
|
+ context.logSuccess(objectId.toString(), operation);
|
|
|
+ } catch (Exception e) {
|
|
|
+ context.logFailure(objectId.toString(), operation, e.getMessage());
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void callLLM(String coll, Long chunkId, String plainText, Map<String, String> kvs) throws JsonProcessingException {
|
|
|
+ //long t = System.currentTimeMillis();
|
|
|
+ Map<String, Object> kws = gChatClient.getKeywordsAndQuestions(plainText, kvs);
|
|
|
+ //t = System.currentTimeMillis() - t;
|
|
|
+ //System.out.println("kws=" + kws);
|
|
|
+ //System.out.println("used time = " + t);
|
|
|
+ if (kws != null && kws.size() > 0) {
|
|
|
+ List<String> keywords = null;
|
|
|
+ List<String> questions = null;
|
|
|
+ Object o = kws.get("keywords");
|
|
|
+ if (o != null && o instanceof List) {
|
|
|
+ keywords = (List<String>) o;
|
|
|
+ }
|
|
|
+ o = kws.get("questions");
|
|
|
+ if (o != null && o instanceof List) {
|
|
|
+ questions = (List<String>) o;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (keywords != null && questions != null) {
|
|
|
+ int i = mdChunksService.updateKeywordsOrMetadata(coll, chunkId, keywords, Map.of("llm_questions", questions));
|
|
|
+ //System.out.println("i=" + i);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private Map<String, String> getChunkMetadata(Map<String, String> mapping, Map<String, Object> metadata) {
|
|
|
+ Map<String, String> m2 = new HashMap<String, String>();
|
|
|
+ mapping.forEach((k, v) -> {
|
|
|
+ Object o = metadata.get(k);
|
|
|
+ if (o != null) {
|
|
|
+ if (o instanceof String) {
|
|
|
+ m2.put(v, (String) o);
|
|
|
+ } else if (o instanceof List) {
|
|
|
+ List l = (List) o;
|
|
|
+ if (l.size() > 0) {
|
|
|
+ m2.put(v, String.join(",", l));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return m2;
|
|
|
+ }
|
|
|
+
|
|
|
+ private int callEmbedding(String coll, Long chunkId, String plainText, Map<String, Object> extra) throws IOException, InterruptedException {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ extra.forEach((k, v) -> {
|
|
|
+ sb.append(k).append(": ").append(v).append("\n");
|
|
|
+ });
|
|
|
+
|
|
|
+ sb.append(plainText);
|
|
|
+ float[] fs = vectorizationService.singleVectorization(sb.toString());
|
|
|
+ if (fs != null) {
|
|
|
+ String js = JsonUtil.toJsonString(fs);
|
|
|
+ int r = mdChunksService.setEmbedding(coll, chunkId, js);
|
|
|
+ return r;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void callIndex(String coll, Long chunkId, MdChunk chunk, Map<String, Object> extra) throws IOException, InterruptedException {
|
|
|
+
|
|
|
+ Map<String, Object> metadata = chunk.getMetadata();
|
|
|
+ metadata.put("chunkId", chunk.getChunkIndex());
|
|
|
+ metadata.put("mdId", chunk.getMdId());
|
|
|
+ metadata.put("iid", chunk.getId());
|
|
|
+ metadata.put("offsetStart", chunk.getOffsetStart());
|
|
|
+ metadata.put("offsetEnd", chunk.getOffsetEnd());
|
|
|
+ metadata.put("content", chunk.getContent());
|
|
|
+
|
|
|
+ DocReq doc = new DocReq();
|
|
|
+ doc.setId(chunk.getSectionPath());
|
|
|
+ doc.setText(chunk.getPlainText());
|
|
|
+ doc.setEmbedding(chunk.getEmbedding());
|
|
|
+ doc.setMetadata(metadata);
|
|
|
+
|
|
|
+ List<DocResp> ret = hybridSearch.add(coll, List.of(doc));
|
|
|
+ //System.out.println(ret);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Object> buildExtra(Map<String, String> mapping, MdChunk chunk) {
|
|
|
+ Map<String, Object> extra = new HashMap<String, Object>();
|
|
|
+ Map<String, Object> metadata = chunk.getMetadata();
|
|
|
+ mapping.forEach((k, v) -> {
|
|
|
+ if (k.equals("keywords")) {
|
|
|
+ List<String> keywords = chunk.getKeywords();
|
|
|
+ if (keywords != null && keywords.size() > 0) {
|
|
|
+ extra.put(v, String.join(",", keywords));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Object o = metadata.get(k);
|
|
|
+ if (o != null) {
|
|
|
+ if (o instanceof String) {
|
|
|
+ extra.put(v, (String) o);
|
|
|
+ } else if (o instanceof List) {
|
|
|
+ List l = (List) o;
|
|
|
+ if (l.size() > 0) {
|
|
|
+ extra.put(v, String.join("\n", l));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return extra;
|
|
|
+ }
|
|
|
+
|
|
|
}
|