Initial commit
This commit is contained in:
89
doc_convert/ReadMe.md
Normal file
89
doc_convert/ReadMe.md
Normal file
@@ -0,0 +1,89 @@
|
||||
## doc_convert 模块
|
||||
|
||||
独立文档转换服务,用于消费 MQ 任务,将 Word 转为分页 HTML、上传 OSS,并生成 ZIP 包供前端下载。
|
||||
|
||||
### 功能概览
|
||||
- MQ 消费任务(长耗时任务容器)
|
||||
- Word -> HTML(按页切分)
|
||||
- HTML 与资源上传 OSS
|
||||
- 打包 ZIP 上传 OSS
|
||||
- 回写结果消息(MQ callback)
|
||||
- 幂等/去重/版本过期处理(Redis)
|
||||
|
||||
### 依赖准备
|
||||
- 将 `aspose-words-23.1.jar` 放入 `api/doc_convert/lib/` 并确保合法授权
|
||||
- 配置 Redis(用于 OSS 配置和幂等/去重)
|
||||
- 配置 RabbitMQ
|
||||
|
||||
### 任务消息示例(JSON)
|
||||
```json
|
||||
{
|
||||
"taskId": "t-10001",
|
||||
"docId": "doc-888",
|
||||
"version": 3,
|
||||
"ossKey": "upload/documents/xxx.docx",
|
||||
"ossConfigKey": "default",
|
||||
"fileHash": "sha256...",
|
||||
"splitPages": 10,
|
||||
"options": {
|
||||
"splitPages": 10,
|
||||
"embedImages": false,
|
||||
"extractImages": true,
|
||||
"cssInline": true,
|
||||
"outputPrefix": "doc_convert"
|
||||
},
|
||||
"callback": {
|
||||
"exchange": "doc_convert.result.exchange",
|
||||
"routingKey": "doc_convert.result"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 结果消息示例(JSON)
|
||||
```json
|
||||
{
|
||||
"taskId": "t-10001",
|
||||
"docId": "doc-888",
|
||||
"version": 3,
|
||||
"status": "ready",
|
||||
"message": "ok",
|
||||
"pageCount": 42,
|
||||
"splitPages": 10,
|
||||
"zipUrl": "https://oss.xxx/doc_convert/doc-888/3/zip/doc-888_3.zip",
|
||||
"manifestUrl": "https://oss.xxx/doc_convert/doc-888/3/manifest.json",
|
||||
"parts": [
|
||||
{
|
||||
"startPage": 1,
|
||||
"endPage": 10,
|
||||
"htmlKey": "doc_convert/doc-888/3/parts/part_1-10.html",
|
||||
"htmlUrl": "https://oss.xxx/doc_convert/doc-888/3/parts/part_1-10.html",
|
||||
"assetsPrefix": "doc_convert/doc-888/3/assets/part_1-10"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 打包说明
|
||||
ZIP 包结构示例:
|
||||
```
|
||||
manifest.json
|
||||
parts/
|
||||
part_1-10.html
|
||||
part_1-10.css
|
||||
assets/
|
||||
part_1-10/
|
||||
image1.png
|
||||
```
|
||||
|
||||
### 配置
|
||||
参考 `api/doc_convert/src/main/resources/application-*.yml`。
|
||||
|
||||
- `doc-convert.default-split-pages`:默认每份 HTML 页数
|
||||
- `doc-convert.output-prefix`:OSS 输出前缀
|
||||
- `doc-convert.task-ttl-hours`:幂等任务缓存 TTL
|
||||
- `doc-convert.hash-ttl-days`:文件 Hash 去重 TTL
|
||||
|
||||
### 运行
|
||||
```bash
|
||||
mvn -pl doc_convert -am clean package -P dev
|
||||
```
|
||||
20
doc_convert/lib/Aspose.license.lic
Normal file
20
doc_convert/lib/Aspose.license.lic
Normal file
@@ -0,0 +1,20 @@
|
||||
<License>
|
||||
<Data>
|
||||
<LicensedTo>iMedRIS Data Corporation</LicensedTo>
|
||||
<EmailTo>whs@imedris.com</EmailTo>
|
||||
<LicenseType>Developer OEM</LicenseType>
|
||||
<LicenseNote>Limited to 1 developer, unlimited physical locations</LicenseNote>
|
||||
<OrderID>190103154649</OrderID>
|
||||
<UserID>289601</UserID>
|
||||
<OEM>This is a redistributable license</OEM>
|
||||
<Products>
|
||||
<Product>Aspose.Total for Java</Product>
|
||||
</Products>
|
||||
<EditionType>Enterprise</EditionType>
|
||||
<SerialNumber>7f645a78-5687-4fad-ab7f-9b4141651d27</SerialNumber>
|
||||
<SubscriptionExpiry>20200403</SubscriptionExpiry>
|
||||
<LicenseVersion>3.0</LicenseVersion>
|
||||
<LicenseInstructions>https://purchase.aspose.com/policies/use-license</LicenseInstructions>
|
||||
</Data>
|
||||
<Signature>cqB2T9ic/d8/1ICIPO2PoYfvwQaljWeY3om8xXMo4zllu3h/xL1CbF8iFc6xk1RXXq2UOd3XKfJL9QnoRtjm3IHod2iINB+DC1lolh5GumiJkR/C8oighnumtCIDJHZyEQvyAK1oqELhTWVlLcpyXOkIwdhXDI6l4/uhptYGz3E=</Signature>
|
||||
</License>
|
||||
BIN
doc_convert/lib/aspose-words-20.12-jdk17-cracked.jar
Normal file
BIN
doc_convert/lib/aspose-words-20.12-jdk17-cracked.jar
Normal file
Binary file not shown.
BIN
doc_convert/lib/aspose.pdf-20.3.jar
Normal file
BIN
doc_convert/lib/aspose.pdf-20.3.jar
Normal file
Binary file not shown.
159
doc_convert/pom.xml
Normal file
159
doc_convert/pom.xml
Normal file
@@ -0,0 +1,159 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-vue-plus</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>doc_convert</artifactId>
|
||||
<name>Archetype - doc_convert</name>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>local</id>
|
||||
<properties>
|
||||
<profiles.active>local</profiles.active>
|
||||
<logging.level>info</logging.level>
|
||||
<monitor.username>ruoyi</monitor.username>
|
||||
<monitor.password>123456</monitor.password>
|
||||
</properties>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>dev</id>
|
||||
<properties>
|
||||
<profiles.active>dev</profiles.active>
|
||||
<logging.level>info</logging.level>
|
||||
<monitor.username>ruoyi</monitor.username>
|
||||
<monitor.password>123456</monitor.password>
|
||||
</properties>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>prod</id>
|
||||
<properties>
|
||||
<profiles.active>prod</profiles.active>
|
||||
<logging.level>warn</logging.level>
|
||||
<monitor.username>ruoyi</monitor.username>
|
||||
<monitor.password>123456</monitor.password>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.aspose</groupId>
|
||||
<artifactId>aspose-pdf</artifactId>
|
||||
<version>20.3</version>
|
||||
<scope>system</scope>
|
||||
<systemPath>${project.basedir}/lib/aspose.pdf-20.3.jar</systemPath>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pdfbox</groupId>
|
||||
<artifactId>pdfbox</artifactId>
|
||||
<version>2.0.29</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-rabbitmq</artifactId>
|
||||
<version>5.4.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-oss</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>ruoyi-common-json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.aspose</groupId>
|
||||
<artifactId>aspose-words</artifactId>
|
||||
<version>23.1</version>
|
||||
<scope>system</scope>
|
||||
<systemPath>${project.basedir}/lib/aspose-words-20.12-jdk17-cracked.jar</systemPath>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<build>
|
||||
<finalName>${project.artifactId}</finalName>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
</resource>
|
||||
<resource>
|
||||
<directory>lib</directory>
|
||||
<includes>
|
||||
<include>Aspose.license.lic</include>
|
||||
</includes>
|
||||
<targetPath>.</targetPath>
|
||||
</resource>
|
||||
</resources>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>3.4.7</version>
|
||||
<configuration>
|
||||
<includeSystemScope>true</includeSystemScope>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.4.2</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-war-plugin</artifactId>
|
||||
<version>3.4.0</version>
|
||||
<configuration>
|
||||
<failOnMissingWebXml>false</failOnMissingWebXml>
|
||||
<warName>${project.artifactId}</warName>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@@ -0,0 +1,16 @@
|
||||
package org.dromara.docConvert;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
@SpringBootApplication
|
||||
@ConfigurationPropertiesScan
|
||||
@ComponentScan(basePackages = "org.dromara")
|
||||
public class DocConvertApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(DocConvertApplication.class, args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package org.dromara.docConvert.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
@Data
|
||||
@ConfigurationProperties(prefix = "doc-convert")
|
||||
public class DocConvertProperties {
|
||||
|
||||
/**
|
||||
* 本地临时目录(为空则使用系统临时目录)
|
||||
*/
|
||||
private String tempDir;
|
||||
|
||||
/**
|
||||
* 默认每份HTML包含的页数
|
||||
*/
|
||||
private int defaultSplitPages = 10;
|
||||
|
||||
/**
|
||||
* 默认是否将图片内嵌为base64
|
||||
*/
|
||||
private boolean defaultEmbedImages = false;
|
||||
|
||||
/**
|
||||
* 默认是否将图片单独导出
|
||||
*/
|
||||
private boolean defaultExtractImages = true;
|
||||
|
||||
/**
|
||||
* 默认是否使用CSS内联
|
||||
*/
|
||||
private boolean defaultCssInline = true;
|
||||
|
||||
/**
|
||||
* 结果默认OSS前缀
|
||||
*/
|
||||
private String outputPrefix = "doc_convert";
|
||||
|
||||
/**
|
||||
* Aspose 字体目录,优先使用该配置
|
||||
*/
|
||||
private String fontsDir;
|
||||
|
||||
/**
|
||||
* Aspose.PDF license 文件路径
|
||||
*/
|
||||
private String pdfLicensePath;
|
||||
|
||||
/**
|
||||
* 任务幂等缓存TTL(小时)
|
||||
*/
|
||||
private int taskTtlHours = 24;
|
||||
|
||||
/**
|
||||
* Hash去重缓存TTL(天)
|
||||
*/
|
||||
private int hashTtlDays = 7;
|
||||
|
||||
/**
|
||||
* 结果缓存TTL(天)
|
||||
*/
|
||||
private int resultTtlDays = 7;
|
||||
}
|
||||
@@ -0,0 +1,248 @@
|
||||
package org.dromara.docConvert.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.rabbitmq.constant.RabbitMQConstants;
|
||||
import org.dromara.docConvert.config.DocConvertProperties;
|
||||
import org.dromara.docConvert.domain.*;
|
||||
import org.dromara.docConvert.service.DocConvertService;
|
||||
import org.dromara.docConvert.service.DocConvertStateService;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class AIConvertConsumer {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final DocConvertService docConvertService;
|
||||
private final DocConvertStateService stateService;
|
||||
private final DocConvertProperties properties;
|
||||
|
||||
@RabbitListener(
|
||||
queues = RabbitMQConstants.AI_DOC_CONVERT_QUEUE,
|
||||
containerFactory = "docConvertRabbitListenerContainerFactory"
|
||||
)
|
||||
public void handleMessage(String message,
|
||||
org.springframework.amqp.core.Message amqpMessage,
|
||||
Channel channel) throws IOException {
|
||||
log.info("[ai-doc-convert-consumer] message received, payloadLength={}, deliveryTag={}",
|
||||
message == null ? 0 : message.length(),
|
||||
amqpMessage.getMessageProperties().getDeliveryTag());
|
||||
DocConvertTaskMessage task = parseTask(message);
|
||||
if (task == null) {
|
||||
log.warn("Invalid ai convert message payload, skip. payload={}", message);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
log.info("[ai-doc-convert-consumer] task parsed, taskId={}, docId={}, version={}, fileHash={}",
|
||||
task.getTaskId(), task.getDocId(), task.getVersion(), task.getFileHash());
|
||||
|
||||
DocConvertResultMessage result = new DocConvertResultMessage();
|
||||
result.setTaskId(task.getTaskId());
|
||||
result.setDocId(task.getDocId());
|
||||
result.setVersion(task.getVersion());
|
||||
|
||||
try {
|
||||
if (task.getTaskId() == null || task.getTaskId().isEmpty()) {
|
||||
result.setStatus(DocConvertStatus.FAILED);
|
||||
result.setMessage("taskId is required");
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
stateService.updateLatestVersion(task.getDocId(), task.getVersion());
|
||||
|
||||
if (stateService.isDuplicateTask(task.getTaskId())) {
|
||||
result.setStatus(DocConvertStatus.SKIPPED);
|
||||
result.setMessage("duplicate task");
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (stateService.isStale(task.getDocId(), task.getVersion())) {
|
||||
result.setStatus(DocConvertStatus.STALE);
|
||||
result.setMessage("stale version");
|
||||
sendResult(task.getCallback(), result);
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
result.setStatus(DocConvertStatus.PROCESSING);
|
||||
result.setMessage("processing");
|
||||
sendResult(task.getCallback(), result);
|
||||
log.info("[ai-doc-convert-consumer] processing started, taskId={}", task.getTaskId());
|
||||
|
||||
if (allowHashReuse(task) && task.getFileHash() != null && !task.getFileHash().isEmpty()) {
|
||||
DocConvertResultMessage cached = stateService.getResultByHash(task.getFileHash());
|
||||
if (cached != null) {
|
||||
cached.setTaskId(task.getTaskId());
|
||||
cached.setDocId(task.getDocId());
|
||||
cached.setVersion(task.getVersion());
|
||||
cached.setStatus(DocConvertStatus.READY);
|
||||
cached.setMessage("reuse by fileHash");
|
||||
sendResult(task.getCallback(), cached);
|
||||
afterReady(task, cached);
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
log.info("[ai-doc-convert-consumer] hash reuse hit, taskId={}, docId={}, fileHash={}",
|
||||
task.getTaskId(), task.getDocId(), task.getFileHash());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[ai-doc-convert-consumer] convert start, taskId={}", task.getTaskId());
|
||||
DocConvertResultMessage convertResult = docConvertService.convert(task);
|
||||
convertResult.setStatus(DocConvertStatus.READY);
|
||||
sendResult(task.getCallback(), convertResult);
|
||||
afterReady(task, convertResult);
|
||||
log.info("[ai-doc-convert-consumer] convert ready, taskId={}, manifestKey={}, zipKey={}, timeCostMs={}",
|
||||
task.getTaskId(), convertResult.getManifestKey(), convertResult.getZipKey(), convertResult.getTimeCostMs());
|
||||
|
||||
if (convertResult.getFileHash() != null) {
|
||||
stateService.saveResultByHash(convertResult.getFileHash(), convertResult);
|
||||
}
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
log.info("[ai-doc-convert-consumer] ack success, taskId={}", task.getTaskId());
|
||||
} catch (Exception e) {
|
||||
log.error("AI convert failed. taskId={}", task.getTaskId(), e);
|
||||
result.setStatus(DocConvertStatus.FAILED);
|
||||
result.setErrorCode("CONVERT_ERROR");
|
||||
result.setErrorMessage(e.getMessage());
|
||||
result.setRetryable(false);
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
log.info("[ai-doc-convert-consumer] ack after failure, taskId={}", task.getTaskId());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResult(DocConvertCallback callback, DocConvertResultMessage result) {
|
||||
if (callback == null || callback.getExchange() == null || callback.getRoutingKey() == null) {
|
||||
log.warn("[ai-doc-convert-consumer] callback missing, skip result send, taskId={}, status={}",
|
||||
result == null ? null : result.getTaskId(), result == null ? null : result.getStatus());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(
|
||||
callback.getExchange(),
|
||||
callback.getRoutingKey(),
|
||||
objectMapper.writeValueAsString(result)
|
||||
);
|
||||
log.info("[ai-doc-convert-consumer] result sent, taskId={}, status={}, exchange={}, routingKey={}",
|
||||
result.getTaskId(), result.getStatus(), callback.getExchange(), callback.getRoutingKey());
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to send ai convert result callback", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void afterReady(DocConvertTaskMessage task, DocConvertResultMessage result) {
|
||||
DocConvertReadyState currentState = new DocConvertReadyState();
|
||||
currentState.setDocId(task.getDocId());
|
||||
currentState.setVersion(task.getVersion());
|
||||
currentState.setOssConfigKey(task.getOssConfigKey());
|
||||
currentState.setOutputPrefix(resolveStoragePrefix(task, result));
|
||||
|
||||
DocConvertReadyState previousState = stateService.swapLatestReadyState(currentState);
|
||||
if (shouldCleanup(previousState, currentState)) {
|
||||
sendCleanup(previousState);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendCleanup(DocConvertReadyState state) {
|
||||
DocConvertCleanupMessage message = new DocConvertCleanupMessage();
|
||||
message.setDocId(state.getDocId());
|
||||
message.setVersion(state.getVersion());
|
||||
message.setOssConfigKey(state.getOssConfigKey());
|
||||
message.setOutputPrefix(state.getOutputPrefix());
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConstants.DOC_CONVERT_CLEANUP_EXCHANGE,
|
||||
RabbitMQConstants.DOC_CONVERT_CLEANUP_ROUTING_KEY,
|
||||
objectMapper.writeValueAsString(message)
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to send ai convert cleanup message for docId={}, version={}",
|
||||
state.getDocId(), state.getVersion(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldCleanup(DocConvertReadyState previousState, DocConvertReadyState currentState) {
|
||||
if (previousState == null || currentState == null) {
|
||||
return false;
|
||||
}
|
||||
if (previousState.getOutputPrefix() == null || previousState.getOutputPrefix().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (currentState.getOutputPrefix() == null || currentState.getOutputPrefix().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (previousState.getOutputPrefix().equals(currentState.getOutputPrefix())) {
|
||||
return false;
|
||||
}
|
||||
return previousState.getVersion() == null
|
||||
|| currentState.getVersion() == null
|
||||
|| currentState.getVersion() > previousState.getVersion();
|
||||
}
|
||||
|
||||
private boolean allowHashReuse(DocConvertTaskMessage task) {
|
||||
DocConvertTaskOptions options = task.getOptions();
|
||||
return options == null || options.getAllowHashReuse() == null || options.getAllowHashReuse();
|
||||
}
|
||||
|
||||
private String resolveStoragePrefix(DocConvertTaskMessage task, DocConvertResultMessage result) {
|
||||
if (result != null && result.getManifestKey() != null && !result.getManifestKey().isEmpty()) {
|
||||
String manifestKey = result.getManifestKey();
|
||||
int idx = manifestKey.lastIndexOf("/manifest.json");
|
||||
if (idx > 0) {
|
||||
return manifestKey.substring(0, idx);
|
||||
}
|
||||
}
|
||||
String prefix = null;
|
||||
DocConvertTaskOptions options = task.getOptions();
|
||||
if (options != null && options.getOutputPrefix() != null && !options.getOutputPrefix().isEmpty()) {
|
||||
prefix = options.getOutputPrefix();
|
||||
}
|
||||
if (prefix == null || prefix.isEmpty()) {
|
||||
prefix = properties.getOutputPrefix();
|
||||
}
|
||||
String docId = task.getDocId() != null ? task.getDocId() : "doc";
|
||||
String version = task.getVersion() != null ? String.valueOf(task.getVersion()) : "v1";
|
||||
return prefix + "/" + docId + "/" + version;
|
||||
}
|
||||
|
||||
private DocConvertTaskMessage parseTask(String message) {
|
||||
try {
|
||||
JsonNode root = objectMapper.readTree(message);
|
||||
if (root.has("businessData")) {
|
||||
JsonNode node = root.get("businessData");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertTaskMessage.class);
|
||||
}
|
||||
if (root.has("content")) {
|
||||
JsonNode node = root.get("content");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(root, DocConvertTaskMessage.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to parse ai convert message payload", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
package org.dromara.docConvert.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.rabbitmq.constant.RabbitMQConstants;
|
||||
import org.dromara.docConvert.domain.DocConvertCleanupMessage;
|
||||
import org.dromara.docConvert.service.DocConvertService;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DocConvertCleanupConsumer {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final DocConvertService docConvertService;
|
||||
|
||||
@RabbitListener(
|
||||
queues = RabbitMQConstants.DOC_CONVERT_CLEANUP_QUEUE,
|
||||
containerFactory = "longTaskRabbitListenerContainerFactory"
|
||||
)
|
||||
public void handleMessage(String message,
|
||||
org.springframework.amqp.core.Message amqpMessage,
|
||||
Channel channel) throws java.io.IOException {
|
||||
DocConvertCleanupMessage cleanupMessage = parseMessage(message);
|
||||
if (cleanupMessage == null) {
|
||||
log.warn("Invalid doc convert cleanup payload, skip. payload={}", message);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
docConvertService.cleanupOutputPrefix(cleanupMessage.getOssConfigKey(), cleanupMessage.getOutputPrefix());
|
||||
log.info("Doc convert cleanup finished. docId={}, version={}, prefix={}",
|
||||
cleanupMessage.getDocId(), cleanupMessage.getVersion(), cleanupMessage.getOutputPrefix());
|
||||
} catch (Exception e) {
|
||||
log.warn("Doc convert cleanup failed. docId={}, version={}, prefix={}",
|
||||
cleanupMessage.getDocId(), cleanupMessage.getVersion(), cleanupMessage.getOutputPrefix(), e);
|
||||
} finally {
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
|
||||
private DocConvertCleanupMessage parseMessage(String message) {
|
||||
try {
|
||||
JsonNode root = objectMapper.readTree(message);
|
||||
if (root.has("businessData")) {
|
||||
JsonNode node = root.get("businessData");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertCleanupMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertCleanupMessage.class);
|
||||
}
|
||||
if (root.has("content")) {
|
||||
JsonNode node = root.get("content");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertCleanupMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertCleanupMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(root, DocConvertCleanupMessage.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to parse doc convert cleanup payload", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,272 @@
|
||||
package org.dromara.docConvert.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.dromara.common.rabbitmq.constant.RabbitMQConstants;
|
||||
import org.dromara.docConvert.config.DocConvertProperties;
|
||||
import org.dromara.docConvert.domain.DocConvertCallback;
|
||||
import org.dromara.docConvert.domain.DocConvertCleanupMessage;
|
||||
import org.dromara.docConvert.domain.DocConvertReadyState;
|
||||
import org.dromara.docConvert.domain.DocConvertResultMessage;
|
||||
import org.dromara.docConvert.domain.DocConvertStatus;
|
||||
import org.dromara.docConvert.domain.DocConvertTaskMessage;
|
||||
import org.dromara.docConvert.domain.DocConvertTaskOptions;
|
||||
import org.dromara.docConvert.service.DocConvertService;
|
||||
import org.dromara.docConvert.service.DocConvertStateService;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DocConvertConsumer {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final DocConvertService docConvertService;
|
||||
private final DocConvertStateService stateService;
|
||||
private final DocConvertProperties properties;
|
||||
|
||||
@RabbitListener(
|
||||
queues = RabbitMQConstants.DOC_CONVERT_QUEUE,
|
||||
containerFactory = "docConvertRabbitListenerContainerFactory"
|
||||
)
|
||||
public void handleMessage(String message,
|
||||
org.springframework.amqp.core.Message amqpMessage,
|
||||
Channel channel) throws IOException {
|
||||
System.out.println("[doc-convert-consumer] message received, payloadLength="
|
||||
+ (message == null ? 0 : message.length())
|
||||
+ ", deliveryTag=" + amqpMessage.getMessageProperties().getDeliveryTag());
|
||||
DocConvertTaskMessage task = parseTask(message);
|
||||
if (task == null) {
|
||||
System.out.println("[doc-convert-consumer] invalid payload, ack and skip");
|
||||
log.warn("Invalid message payload, skip. payload={}", message);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
System.out.println("[doc-convert-consumer] task parsed, taskId=" + task.getTaskId()
|
||||
+ ", docId=" + task.getDocId()
|
||||
+ ", version=" + task.getVersion()
|
||||
+ ", fileHash=" + task.getFileHash());
|
||||
|
||||
DocConvertResultMessage result = new DocConvertResultMessage();
|
||||
result.setTaskId(task.getTaskId());
|
||||
result.setDocId(task.getDocId());
|
||||
result.setVersion(task.getVersion());
|
||||
|
||||
try {
|
||||
if (task.getTaskId() == null || task.getTaskId().isEmpty()) {
|
||||
System.out.println("[doc-convert-consumer] taskId missing, mark failed and ack");
|
||||
result.setStatus(DocConvertStatus.FAILED);
|
||||
result.setMessage("taskId is required");
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
stateService.updateLatestVersion(task.getDocId(), task.getVersion());
|
||||
System.out.println("[doc-convert-consumer] latest version updated, docId=" + task.getDocId()
|
||||
+ ", version=" + task.getVersion());
|
||||
|
||||
if (stateService.isDuplicateTask(task.getTaskId())) {
|
||||
System.out.println("[doc-convert-consumer] duplicate task, taskId=" + task.getTaskId());
|
||||
result.setStatus(DocConvertStatus.SKIPPED);
|
||||
result.setMessage("duplicate task");
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (stateService.isStale(task.getDocId(), task.getVersion())) {
|
||||
System.out.println("[doc-convert-consumer] stale task, taskId=" + task.getTaskId()
|
||||
+ ", docId=" + task.getDocId() + ", version=" + task.getVersion());
|
||||
result.setStatus(DocConvertStatus.STALE);
|
||||
result.setMessage("stale version");
|
||||
sendResult(task.getCallback(), result);
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
result.setStatus(DocConvertStatus.PROCESSING);
|
||||
result.setMessage("processing");
|
||||
sendResult(task.getCallback(), result);
|
||||
System.out.println("[doc-convert-consumer] processing started, taskId=" + task.getTaskId());
|
||||
|
||||
if (allowHashReuse(task) && task.getFileHash() != null && !task.getFileHash().isEmpty()) {
|
||||
DocConvertResultMessage cached = stateService.getResultByHash(task.getFileHash());
|
||||
if (cached != null) {
|
||||
System.out.println("[doc-convert-consumer] hash reuse hit, taskId=" + task.getTaskId()
|
||||
+ ", docId=" + task.getDocId() + ", fileHash=" + task.getFileHash());
|
||||
cached.setTaskId(task.getTaskId());
|
||||
cached.setDocId(task.getDocId());
|
||||
cached.setVersion(task.getVersion());
|
||||
cached.setStatus(DocConvertStatus.READY);
|
||||
cached.setMessage("reuse by fileHash");
|
||||
sendResult(task.getCallback(), cached);
|
||||
afterReady(task, cached);
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
System.out.println("[doc-convert-consumer] convert start, taskId=" + task.getTaskId());
|
||||
DocConvertResultMessage convertResult = docConvertService.convert(task);
|
||||
convertResult.setStatus(DocConvertStatus.READY);
|
||||
sendResult(task.getCallback(), convertResult);
|
||||
afterReady(task, convertResult);
|
||||
System.out.println("[doc-convert-consumer] convert ready, taskId=" + task.getTaskId()
|
||||
+ ", manifestKey=" + convertResult.getManifestKey()
|
||||
+ ", zipKey=" + convertResult.getZipKey()
|
||||
+ ", timeCostMs=" + convertResult.getTimeCostMs());
|
||||
|
||||
if (convertResult.getFileHash() != null) {
|
||||
stateService.saveResultByHash(convertResult.getFileHash(), convertResult);
|
||||
}
|
||||
stateService.markTaskDone(task.getTaskId());
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
System.out.println("[doc-convert-consumer] ack success, taskId=" + task.getTaskId());
|
||||
} catch (Exception e) {
|
||||
System.out.println("[doc-convert-consumer] convert failed, taskId=" + task.getTaskId()
|
||||
+ ", error=" + e.getMessage());
|
||||
log.error("Doc convert failed. taskId={}", task.getTaskId(), e);
|
||||
result.setStatus(DocConvertStatus.FAILED);
|
||||
result.setErrorCode("CONVERT_ERROR");
|
||||
result.setErrorMessage(e.getMessage());
|
||||
result.setRetryable(false);
|
||||
sendResult(task.getCallback(), result);
|
||||
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
|
||||
System.out.println("[doc-convert-consumer] ack after failure, taskId=" + task.getTaskId());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendResult(DocConvertCallback callback, DocConvertResultMessage result) {
|
||||
if (callback == null || callback.getExchange() == null || callback.getRoutingKey() == null) {
|
||||
System.out.println("[doc-convert-consumer] callback missing, skip result send, taskId="
|
||||
+ (result == null ? null : result.getTaskId())
|
||||
+ ", status=" + (result == null ? null : result.getStatus()));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String payload = objectMapper.writeValueAsString(result);
|
||||
rabbitTemplate.convertAndSend(callback.getExchange(), callback.getRoutingKey(), payload);
|
||||
System.out.println("[doc-convert-consumer] result sent, taskId=" + result.getTaskId()
|
||||
+ ", status=" + result.getStatus()
|
||||
+ ", exchange=" + callback.getExchange()
|
||||
+ ", routingKey=" + callback.getRoutingKey());
|
||||
} catch (Exception e) {
|
||||
System.out.println("[doc-convert-consumer] result send failed, taskId="
|
||||
+ (result == null ? null : result.getTaskId()) + ", error=" + e.getMessage());
|
||||
log.warn("Failed to send result callback", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void afterReady(DocConvertTaskMessage task, DocConvertResultMessage result) {
|
||||
DocConvertReadyState currentState = new DocConvertReadyState();
|
||||
currentState.setDocId(task.getDocId());
|
||||
currentState.setVersion(task.getVersion());
|
||||
currentState.setOssConfigKey(task.getOssConfigKey());
|
||||
currentState.setOutputPrefix(resolveStoragePrefix(task, result));
|
||||
|
||||
DocConvertReadyState previousState = stateService.swapLatestReadyState(currentState);
|
||||
if (shouldCleanup(previousState, currentState)) {
|
||||
sendCleanup(previousState);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendCleanup(DocConvertReadyState state) {
|
||||
DocConvertCleanupMessage message = new DocConvertCleanupMessage();
|
||||
message.setDocId(state.getDocId());
|
||||
message.setVersion(state.getVersion());
|
||||
message.setOssConfigKey(state.getOssConfigKey());
|
||||
message.setOutputPrefix(state.getOutputPrefix());
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConstants.DOC_CONVERT_CLEANUP_EXCHANGE,
|
||||
RabbitMQConstants.DOC_CONVERT_CLEANUP_ROUTING_KEY,
|
||||
objectMapper.writeValueAsString(message)
|
||||
);
|
||||
System.out.println("[doc-convert-consumer] cleanup sent, docId=" + state.getDocId()
|
||||
+ ", version=" + state.getVersion()
|
||||
+ ", outputPrefix=" + state.getOutputPrefix());
|
||||
} catch (Exception e) {
|
||||
System.out.println("[doc-convert-consumer] cleanup send failed, docId=" + state.getDocId()
|
||||
+ ", version=" + state.getVersion() + ", error=" + e.getMessage());
|
||||
log.warn("Failed to send cleanup message for docId={}, version={}", state.getDocId(), state.getVersion(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldCleanup(DocConvertReadyState previousState, DocConvertReadyState currentState) {
|
||||
if (previousState == null || currentState == null) {
|
||||
return false;
|
||||
}
|
||||
if (previousState.getOutputPrefix() == null || previousState.getOutputPrefix().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (currentState.getOutputPrefix() == null || currentState.getOutputPrefix().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (previousState.getOutputPrefix().equals(currentState.getOutputPrefix())) {
|
||||
return false;
|
||||
}
|
||||
return previousState.getVersion() == null
|
||||
|| currentState.getVersion() == null
|
||||
|| currentState.getVersion() > previousState.getVersion();
|
||||
}
|
||||
|
||||
private boolean allowHashReuse(DocConvertTaskMessage task) {
|
||||
DocConvertTaskOptions options = task.getOptions();
|
||||
return options == null || options.getAllowHashReuse() == null || options.getAllowHashReuse();
|
||||
}
|
||||
|
||||
private String resolveStoragePrefix(DocConvertTaskMessage task, DocConvertResultMessage result) {
|
||||
if (result != null && result.getManifestKey() != null && !result.getManifestKey().isEmpty()) {
|
||||
String manifestKey = result.getManifestKey();
|
||||
int idx = manifestKey.lastIndexOf("/manifest.json");
|
||||
if (idx > 0) {
|
||||
return manifestKey.substring(0, idx);
|
||||
}
|
||||
}
|
||||
String prefix = null;
|
||||
DocConvertTaskOptions options = task.getOptions();
|
||||
if (options != null && options.getOutputPrefix() != null && !options.getOutputPrefix().isEmpty()) {
|
||||
prefix = options.getOutputPrefix();
|
||||
}
|
||||
if (prefix == null || prefix.isEmpty()) {
|
||||
prefix = properties.getOutputPrefix();
|
||||
}
|
||||
String docId = task.getDocId() != null ? task.getDocId() : "doc";
|
||||
String version = task.getVersion() != null ? String.valueOf(task.getVersion()) : "v1";
|
||||
return prefix + "/" + docId + "/" + version;
|
||||
}
|
||||
|
||||
private DocConvertTaskMessage parseTask(String message) {
|
||||
try {
|
||||
JsonNode root = objectMapper.readTree(message);
|
||||
if (root.has("businessData")) {
|
||||
JsonNode node = root.get("businessData");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertTaskMessage.class);
|
||||
}
|
||||
if (root.has("content")) {
|
||||
JsonNode node = root.get("content");
|
||||
if (node.isTextual()) {
|
||||
return objectMapper.readValue(node.asText(), DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(node, DocConvertTaskMessage.class);
|
||||
}
|
||||
return objectMapper.treeToValue(root, DocConvertTaskMessage.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to parse message payload", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertCallback {
|
||||
private String exchange;
|
||||
private String routingKey;
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertCleanupMessage {
|
||||
private String docId;
|
||||
private Integer version;
|
||||
private String ossConfigKey;
|
||||
private String outputPrefix;
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertManifestPart {
|
||||
private int startPage;
|
||||
private int endPage;
|
||||
private String htmlFile;
|
||||
private String assetsDir;
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertPart {
|
||||
private int startPage;
|
||||
private int endPage;
|
||||
private String htmlKey;
|
||||
private String htmlUrl;
|
||||
private String assetsPrefix;
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertReadyState {
|
||||
private String docId;
|
||||
private Integer version;
|
||||
private String ossConfigKey;
|
||||
private String outputPrefix;
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class DocConvertResultMessage {
|
||||
private String taskId;
|
||||
private String docId;
|
||||
private Integer version;
|
||||
private String status;
|
||||
private String message;
|
||||
private Integer pageCount;
|
||||
private Integer splitPages;
|
||||
private List<DocConvertPart> parts;
|
||||
private String zipKey;
|
||||
private String zipUrl;
|
||||
private String manifestKey;
|
||||
private String manifestUrl;
|
||||
private String fileHash;
|
||||
private Long timeCostMs;
|
||||
private String errorCode;
|
||||
private String errorMessage;
|
||||
private Boolean retryable;
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
public final class DocConvertStatus {
|
||||
public static final String PROCESSING = "processing";
|
||||
public static final String READY = "ready";
|
||||
public static final String FAILED = "failed";
|
||||
public static final String STALE = "stale";
|
||||
public static final String SKIPPED = "skipped";
|
||||
|
||||
private DocConvertStatus() {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertTaskMessage {
|
||||
private String taskId;
|
||||
private String docId;
|
||||
private Integer version;
|
||||
private String ossKey;
|
||||
private String ossUrl;
|
||||
private String ossConfigKey;
|
||||
private String fileName;
|
||||
private String fileHash;
|
||||
private String tenantId;
|
||||
private Integer splitPages;
|
||||
private DocConvertTaskOptions options;
|
||||
private DocConvertCallback callback;
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package org.dromara.docConvert.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class DocConvertTaskOptions {
|
||||
private Integer splitPages;
|
||||
private Boolean embedImages;
|
||||
private Boolean extractImages;
|
||||
private Boolean cssInline;
|
||||
private Boolean gzipHtml;
|
||||
private Boolean allowHashReuse;
|
||||
private String outputPrefix;
|
||||
private String catalogueJson;
|
||||
/**
|
||||
* 历史兼容字段,统一实现下不再用于切换转换服务。
|
||||
*/
|
||||
private Boolean useAiConvert;
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,121 @@
|
||||
package org.dromara.docConvert.service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.docConvert.domain.DocConvertReadyState;
|
||||
import org.dromara.docConvert.config.DocConvertProperties;
|
||||
import org.dromara.docConvert.domain.DocConvertResultMessage;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DocConvertStateService {
|
||||
|
||||
private static final String KEY_TASK = "doc_convert:task:";
|
||||
private static final String KEY_LATEST = "doc_convert:latest:";
|
||||
private static final String KEY_HASH = "doc_convert:hash:";
|
||||
private static final String KEY_READY = "doc_convert:ready:";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final DocConvertProperties properties;
|
||||
|
||||
public boolean isDuplicateTask(String taskId) {
|
||||
if (taskId == null || taskId.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
String key = KEY_TASK + taskId;
|
||||
return RedisUtils.getCacheObject(key) != null;
|
||||
}
|
||||
|
||||
public void markTaskDone(String taskId) {
|
||||
if (taskId == null || taskId.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String key = KEY_TASK + taskId;
|
||||
RedisUtils.setCacheObject(key, "done", Duration.ofHours(properties.getTaskTtlHours()));
|
||||
}
|
||||
|
||||
public boolean isStale(String docId, Integer version) {
|
||||
if (docId == null || version == null) {
|
||||
return false;
|
||||
}
|
||||
Integer latest = RedisUtils.getCacheObject(KEY_LATEST + docId);
|
||||
return latest != null && version < latest;
|
||||
}
|
||||
|
||||
public void updateLatestVersion(String docId, Integer version) {
|
||||
if (docId == null || version == null) {
|
||||
return;
|
||||
}
|
||||
String key = KEY_LATEST + docId;
|
||||
Integer latest = RedisUtils.getCacheObject(key);
|
||||
if (latest == null || version > latest) {
|
||||
RedisUtils.setCacheObject(key, version, Duration.ofDays(properties.getResultTtlDays()));
|
||||
}
|
||||
}
|
||||
|
||||
public DocConvertResultMessage getResultByHash(String fileHash) {
|
||||
if (fileHash == null || fileHash.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
String json = RedisUtils.getCacheObject(KEY_HASH + fileHash);
|
||||
if (json == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return objectMapper.readValue(json, DocConvertResultMessage.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to deserialize hash result", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void saveResultByHash(String fileHash, DocConvertResultMessage result) {
|
||||
if (fileHash == null || fileHash.isEmpty() || result == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(result);
|
||||
RedisUtils.setCacheObject(KEY_HASH + fileHash, json, Duration.ofDays(properties.getHashTtlDays()));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("Failed to serialize hash result", e);
|
||||
}
|
||||
}
|
||||
|
||||
public DocConvertReadyState swapLatestReadyState(DocConvertReadyState newState) {
|
||||
if (newState == null || newState.getDocId() == null || newState.getDocId().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
String key = KEY_READY + newState.getDocId();
|
||||
DocConvertReadyState previous = getLatestReadyState(newState.getDocId());
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(newState);
|
||||
RedisUtils.setCacheObject(key, json, Duration.ofDays(properties.getResultTtlDays()));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("Failed to serialize latest ready state", e);
|
||||
}
|
||||
return previous;
|
||||
}
|
||||
|
||||
public DocConvertReadyState getLatestReadyState(String docId) {
|
||||
if (docId == null || docId.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
String json = RedisUtils.getCacheObject(KEY_READY + docId);
|
||||
if (json == null || json.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return objectMapper.readValue(json, DocConvertReadyState.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to deserialize latest ready state", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package org.dromara.docConvert.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.MessageDigest;
|
||||
|
||||
public final class HashUtils {
|
||||
|
||||
private HashUtils() {
|
||||
}
|
||||
|
||||
public static String sha256(Path file) throws IOException {
|
||||
MessageDigest digest;
|
||||
try {
|
||||
digest = MessageDigest.getInstance("SHA-256");
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("SHA-256 not available", e);
|
||||
}
|
||||
try (InputStream in = Files.newInputStream(file)) {
|
||||
byte[] buffer = new byte[8192];
|
||||
int read;
|
||||
while ((read = in.read(buffer)) > 0) {
|
||||
digest.update(buffer, 0, read);
|
||||
}
|
||||
}
|
||||
return toHex(digest.digest());
|
||||
}
|
||||
|
||||
private static String toHex(byte[] bytes) {
|
||||
StringBuilder sb = new StringBuilder(bytes.length * 2);
|
||||
for (byte b : bytes) {
|
||||
sb.append(String.format("%02x", b));
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package org.dromara.docConvert.util;
|
||||
|
||||
public final class PathUtils {
|
||||
|
||||
private PathUtils() {
|
||||
}
|
||||
|
||||
public static String join(String... parts) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (String part : parts) {
|
||||
if (part == null || part.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
String cleaned = part.replace("\\", "/");
|
||||
if (sb.length() > 0 && sb.charAt(sb.length() - 1) != '/') {
|
||||
sb.append('/');
|
||||
}
|
||||
if (cleaned.startsWith("/")) {
|
||||
cleaned = cleaned.substring(1);
|
||||
}
|
||||
if (cleaned.endsWith("/")) {
|
||||
cleaned = cleaned.substring(0, cleaned.length() - 1);
|
||||
}
|
||||
if (!cleaned.isEmpty()) {
|
||||
sb.append(cleaned);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package org.dromara.docConvert.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
public final class ZipUtils {
|
||||
|
||||
private ZipUtils() {
|
||||
}
|
||||
|
||||
public static void zipDirectory(Path sourceDir, Path zipFile) throws IOException {
|
||||
try (OutputStream fos = Files.newOutputStream(zipFile);
|
||||
ZipOutputStream zos = new ZipOutputStream(fos)) {
|
||||
Files.walkFileTree(sourceDir, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
Path relPath = sourceDir.relativize(file);
|
||||
ZipEntry entry = new ZipEntry(relPath.toString().replace("\\", "/"));
|
||||
zos.putNextEntry(entry);
|
||||
Files.copy(file, zos);
|
||||
zos.closeEntry();
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
68
doc_convert/src/main/resources/application-dev.yml
Normal file
68
doc_convert/src/main/resources/application-dev.yml
Normal file
@@ -0,0 +1,68 @@
|
||||
---
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: 123.60.156.158
|
||||
port: 5672
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: ai_review_test_cjh
|
||||
# 开启发送确认
|
||||
publisher-confirm-type: correlated
|
||||
# 开启发送失败退回
|
||||
publisher-returns: true
|
||||
# 心跳检测间隔(秒)。Spring AMQP 在独立后台线程发送心跳,不受消费者业务逻辑阻塞影响。
|
||||
# 必须设置正值:0 会与 RabbitMQ 服务端协商为无心跳,导致连接被防火墙/负载均衡器在长时间
|
||||
# 空闲后强制断开,即 pika.exceptions.ConnectionClosed / channel already closed 类错误。
|
||||
requested-heartbeat: 60
|
||||
# 连接超时(毫秒)
|
||||
connection-timeout: 60000
|
||||
# 开启消费者手动确认
|
||||
listener:
|
||||
simple:
|
||||
acknowledge-mode: manual
|
||||
# 长任务消费者每次只取1个消息,避免大文件处理时占用太多消息
|
||||
prefetch: 1
|
||||
# 消息被拒绝时不重新入队(避免死循环),改走死信队列
|
||||
default-requeue-rejected: false
|
||||
# 空闲消费者心跳事件间隔(毫秒),用于监控,不影响连接保活
|
||||
idle-event-interval: 3600000 # 1小时
|
||||
# 注:forceCloseChannel=false 通过 Java 代码在 longTaskRabbitListenerContainerFactory 中配置
|
||||
# Spring AMQP 无对应的 yml 属性,在此仅做说明
|
||||
|
||||
---
|
||||
spring.data:
|
||||
redis:
|
||||
# 地址
|
||||
host: 8.163.1.126
|
||||
# 端口,默认为6379
|
||||
port: 6379
|
||||
# 数据库索引
|
||||
database: 3
|
||||
# redis 密码必须配置
|
||||
password: lyzbim2016
|
||||
# 连接超时时间
|
||||
timeout: 30s
|
||||
# 是否开启ssl
|
||||
ssl.enabled: false
|
||||
|
||||
redisson:
|
||||
# redis key前缀
|
||||
keyPrefix:
|
||||
# 线程池数量
|
||||
threads: 4
|
||||
# Netty线程池数量
|
||||
nettyThreads: 8
|
||||
# 单节点配置
|
||||
singleServerConfig:
|
||||
# 客户端名称 不能用中文
|
||||
clientName: RuoYi-Vue-Plus
|
||||
# 最小空闲连接数
|
||||
connectionMinimumIdleSize: 8
|
||||
# 连接池大小
|
||||
connectionPoolSize: 32
|
||||
# 连接空闲超时,单位:毫秒
|
||||
idleConnectionTimeout: 10000
|
||||
# 命令等待超时,单位:毫秒
|
||||
timeout: 30000
|
||||
# 发布和订阅连接池大小
|
||||
subscriptionConnectionPoolSize: 50
|
||||
64
doc_convert/src/main/resources/application-local.yml
Normal file
64
doc_convert/src/main/resources/application-local.yml
Normal file
@@ -0,0 +1,64 @@
|
||||
---
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: 123.60.156.158
|
||||
port: 5672
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: ai_review_test
|
||||
# 开启发送确认
|
||||
publisher-confirm-type: correlated
|
||||
# 开启发送失败退回
|
||||
publisher-returns: true
|
||||
# 开启消费者手动确认
|
||||
listener:
|
||||
simple:
|
||||
acknowledge-mode: manual
|
||||
# 每次从队列中取1个消息处理(避免一次取太多导致内存占用过高)
|
||||
prefetch: 1
|
||||
# 消费者数量配置(会被@RabbitListener的concurrency覆盖)
|
||||
# concurrency: 3
|
||||
# max-concurrency: 10
|
||||
# 消费者ACK超时时间(30分钟),避免长时间AI调用导致超时
|
||||
# 如果处理时间超过这个值,RabbitMQ会认为消费者已死,重新投递消息
|
||||
default-requeue-rejected: false
|
||||
# 设置空闲时消费者的超时时间(单位:毫秒)
|
||||
idle-event-interval: 1200000 # 30分钟
|
||||
|
||||
---
|
||||
spring.data:
|
||||
redis:
|
||||
# 地址
|
||||
host: 8.163.1.126
|
||||
# 端口,默认为6379
|
||||
port: 6379
|
||||
# 数据库索引
|
||||
database: 4
|
||||
# redis 密码必须配置
|
||||
password: lyzbim2016
|
||||
# 连接超时时间
|
||||
timeout: 30s
|
||||
# 是否开启ssl
|
||||
ssl.enabled: false
|
||||
|
||||
redisson:
|
||||
# redis key前缀
|
||||
keyPrefix:
|
||||
# 线程池数量
|
||||
threads: 4
|
||||
# Netty线程池数量
|
||||
nettyThreads: 8
|
||||
# 单节点配置
|
||||
singleServerConfig:
|
||||
# 客户端名称 不能用中文
|
||||
clientName: RuoYi-Vue-Plus
|
||||
# 最小空闲连接数
|
||||
connectionMinimumIdleSize: 8
|
||||
# 连接池大小
|
||||
connectionPoolSize: 32
|
||||
# 连接空闲超时,单位:毫秒
|
||||
idleConnectionTimeout: 10000
|
||||
# 命令等待超时,单位:毫秒
|
||||
timeout: 30000
|
||||
# 发布和订阅连接池大小
|
||||
subscriptionConnectionPoolSize: 50
|
||||
64
doc_convert/src/main/resources/application-prod.yml
Normal file
64
doc_convert/src/main/resources/application-prod.yml
Normal file
@@ -0,0 +1,64 @@
|
||||
---
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: 123.60.156.158
|
||||
port: 5672
|
||||
username: guest
|
||||
password: guest
|
||||
virtual-host: ai_review_prod
|
||||
publisher-confirm-type: correlated
|
||||
publisher-returns: true
|
||||
# 心跳检测间隔(秒)。Spring AMQP 在独立后台线程发送心跳,不受消费者业务逻辑阻塞影响。
|
||||
# 必须设置正值:0 会与 RabbitMQ 服务端协商为无心跳,导致连接被防火墙/负载均衡器在长时间
|
||||
# 空闲后强制断开,即 pika.exceptions.ConnectionClosed / channel already closed 类错误。
|
||||
requested-heartbeat: 60
|
||||
# 连接超时(毫秒)
|
||||
connection-timeout: 60000
|
||||
listener:
|
||||
simple:
|
||||
acknowledge-mode: manual
|
||||
retry:
|
||||
enabled: true
|
||||
max-attempts: 3
|
||||
initial-interval: 1000
|
||||
prefetch: 5
|
||||
default-requeue-rejected: false
|
||||
idle-event-interval: 3600000 # 1小时
|
||||
|
||||
---
|
||||
spring.data:
|
||||
redis:
|
||||
# 地址
|
||||
host: 192.168.1.112
|
||||
# 端口,默认为6379
|
||||
port: 6379
|
||||
# 数据库索引
|
||||
database: 2
|
||||
# redis 密码必须配置
|
||||
password: lyz_2026..
|
||||
# 连接超时时间
|
||||
timeout: 100s
|
||||
# 是否开启ssl
|
||||
ssl.enabled: false
|
||||
|
||||
redisson:
|
||||
# redis key前缀
|
||||
keyPrefix:
|
||||
# 线程池数量
|
||||
threads: 16
|
||||
# Netty线程池数量
|
||||
nettyThreads: 32
|
||||
# 单节点配置
|
||||
singleServerConfig:
|
||||
# 客户端名称 不能用中文
|
||||
clientName: RuoYi-Vue-Plus
|
||||
# 最小空闲连接数
|
||||
connectionMinimumIdleSize: 32
|
||||
# 连接池大小
|
||||
connectionPoolSize: 64
|
||||
# 连接空闲超时,单位:毫秒
|
||||
idleConnectionTimeout: 10000
|
||||
# 命令等待超时,单位:毫秒
|
||||
timeout: 60000
|
||||
# 发布和订阅连接池大小
|
||||
subscriptionConnectionPoolSize: 50
|
||||
33
doc_convert/src/main/resources/application.yml
Normal file
33
doc_convert/src/main/resources/application.yml
Normal file
@@ -0,0 +1,33 @@
|
||||
server:
|
||||
port: 19081
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: doc-convert
|
||||
main:
|
||||
web-application-type: none
|
||||
profiles:
|
||||
active: ${SPRING_PROFILES_ACTIVE:dev}
|
||||
|
||||
doc-convert:
|
||||
default-split-pages: 10
|
||||
default-embed-images: false
|
||||
default-extract-images: true
|
||||
default-css-inline: true
|
||||
output-prefix: doc_convert
|
||||
fonts-dir: /Users/lyz/Desktop/ai_app/fonts
|
||||
pdf-license-path: ${DOC_CONVERT_PDF_LICENSE_PATH:${ASPOSE_PDF_LICENSE_PATH:}}
|
||||
task-ttl-hours: 24
|
||||
hash-ttl-days: 7
|
||||
result-ttl-days: 7
|
||||
|
||||
file:
|
||||
local:
|
||||
path: ${java.io.tmpdir}/doc_convert
|
||||
|
||||
rabbitmq:
|
||||
ai-review:
|
||||
enabled: false
|
||||
doc-convert-queue:
|
||||
message-ttl: 86400000
|
||||
consumer-timeout: 14400000
|
||||
Reference in New Issue
Block a user