You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
4.1 KiB

6 days ago
  1. // AnalysisStreamService.java
  2. package com.chenhai.chenhaiai.service;
  3. import com.chenhai.chenhaiai.entity.WeekPlanResponse;
  4. import com.chenhai.chenhaiai.utils.TextFormatUtils;
  5. import com.fasterxml.jackson.databind.ObjectMapper;
  6. import org.springframework.ai.chat.client.ChatClient;
  7. import org.springframework.stereotype.Service;
  8. import reactor.core.publisher.FluxSink;
  9. import java.util.Map;
  10. import java.util.Optional;
  11. /**
  12. * 流式分析服务
  13. */
  14. @Service
  15. public class AnalysisStreamService {
  16. private final ObjectMapper objectMapper = new ObjectMapper();
  17. /**
  18. * 发送流式分析结果
  19. */
  20. public void sendStreamAnalysis(
  21. FluxSink<String> sink,
  22. String analysisContent,
  23. ChatClient chatClient,
  24. String prompt) {
  25. try {
  26. // 使用 StringBuilder 收集完整响应
  27. StringBuilder fullResponse = new StringBuilder();
  28. chatClient.prompt()
  29. .user(prompt)
  30. .stream()
  31. .content()
  32. .subscribe(
  33. chunk -> {
  34. // 收集所有chunk
  35. fullResponse.append(chunk);
  36. },
  37. error -> {
  38. sink.next(TextFormatUtils.formatMessage("error",
  39. "分析失败: " + error.getMessage()));
  40. sink.complete();
  41. },
  42. () -> {
  43. try {
  44. // 在流式响应完成后,按模块分割发送
  45. String completeResponse = fullResponse.toString();
  46. sendFormattedModules(sink, completeResponse);
  47. } catch (Exception e) {
  48. sink.next(TextFormatUtils.formatMessage("error",
  49. "格式化输出失败: " + e.getMessage()));
  50. sink.complete();
  51. }
  52. }
  53. );
  54. } catch (Exception e) {
  55. sink.next(TextFormatUtils.formatMessage("error", "流式分析失败: " + e.getMessage()));
  56. sink.complete();
  57. }
  58. }
  59. /**
  60. * 发送格式化模块
  61. */
  62. private void sendFormattedModules(FluxSink<String> sink, String analysisContent) {
  63. // 使用工具类分割模块
  64. String[] modules = TextFormatUtils.splitAnalysisModules(analysisContent);
  65. for (String module : modules) {
  66. String trimmedModule = module.trim();
  67. if (!trimmedModule.isEmpty()) {
  68. // 发送完整的模块
  69. sink.next(TextFormatUtils.formatMessage("content", trimmedModule));
  70. // 每个模块之间稍微延迟,让前端有时间渲染
  71. try {
  72. Thread.sleep(200);
  73. } catch (InterruptedException e) {
  74. Thread.currentThread().interrupt();
  75. break;
  76. }
  77. }
  78. }
  79. // 发送完成消息
  80. sink.next(TextFormatUtils.formatMessage("complete", "分析完成"));
  81. sink.complete();
  82. }
  83. /**
  84. * 准备分析提示词
  85. */
  86. public String prepareAnalysisPrompt(String promptTemplate, WeekPlanResponse fullData) {
  87. try {
  88. String jsonData = objectMapper.writeValueAsString(fullData);
  89. return promptTemplate.replace("{jsonData}", jsonData);
  90. } catch (Exception e) {
  91. throw new RuntimeException("准备分析提示词失败", e);
  92. }
  93. }
  94. /**
  95. * 获取数据概览消息
  96. */
  97. public String getDataSummary(WeekPlanResponse fullData) {
  98. int planCount = fullData.getPlanDetails() != null ? fullData.getPlanDetails().size() : 0;
  99. int dailyCount = fullData.getDailyPapers() != null ? fullData.getDailyPapers().size() : 0;
  100. return String.format("获取到 %d 个计划任务和 %d 条日报记录", planCount, dailyCount);
  101. }
  102. }