Skip to content

Commit

Permalink
eventstore time change
Browse files Browse the repository at this point in the history
  • Loading branch information
anruence committed May 31, 2023
1 parent 14aaa2c commit ae905a6
Show file tree
Hide file tree
Showing 52 changed files with 245 additions and 399 deletions.
67 changes: 37 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

### `enode`启动配置

```java
```
@SpringBootApplication
@EnableEnode(value = "org.enodeframework.tests")
@ComponentScan(value = "org.enodeframework.tests")
Expand Down Expand Up @@ -112,7 +112,7 @@ spring.enode.mq.topic.event=EnodeBankEventTopic
#### producer

```java
```
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
Expand All @@ -134,7 +134,7 @@ public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, Strin

#### consumer

```java
```
@Value("${spring.enode.mq.topic.command}")
private String commandTopic;
Expand Down Expand Up @@ -239,17 +239,17 @@ public class DbConfig {

注意有两个唯一索引,这个是实现幂等的常用思路,因为我们认为大部分情况下不会出现重复写问题

#### `MySQL` & `TiDB`
#### `MySQL`

```sql
CREATE TABLE event_stream (
id BIGINT AUTO_INCREMENT NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
aggregate_root_id VARCHAR(64) NOT NULL,
version INT NOT NULL,
command_id VARCHAR(36) NOT NULL,
gmt_create DATETIME NOT NULL,
command_id VARCHAR(64) NOT NULL,
events MEDIUMTEXT NOT NULL,
create_at BIGINT NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_aggregate_root_id_version (aggregate_root_id, version),
UNIQUE KEY uk_aggregate_root_id_command_id (aggregate_root_id, command_id)
Expand All @@ -259,12 +259,14 @@ CREATE TABLE published_version (
id BIGINT AUTO_INCREMENT NOT NULL,
processor_name VARCHAR(128) NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
aggregate_root_id VARCHAR(64) NOT NULL,
version INT NOT NULL,
gmt_create DATETIME NOT NULL,
create_at BIGINT NOT NULL,
update_at BIGINT NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_processor_name_aggregate_root_id (processor_name, aggregate_root_id)
UNIQUE KEY uk_aggregate_root_id_version_processor_name (aggregate_root_id, version, processor_name)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

```

#### `postgresql`
Expand All @@ -273,11 +275,11 @@ CREATE TABLE published_version (
CREATE TABLE event_stream (
id bigserial,
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
aggregate_root_id varchar(64),
version integer,
command_id varchar(36),
gmt_create date,
command_id varchar(64),
events text,
create_at bigint,
PRIMARY KEY (id),
CONSTRAINT uk_aggregate_root_id_version UNIQUE (aggregate_root_id, version),
CONSTRAINT uk_aggregate_root_id_command_id UNIQUE (aggregate_root_id, command_id)
Expand All @@ -287,20 +289,22 @@ CREATE TABLE published_version (
id bigserial,
processor_name varchar(128),
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
aggregate_root_id varchar(64),
version integer,
gmt_create date,
create_at bigint,
update_at bigint,
PRIMARY KEY (id),
CONSTRAINT uk_processor_name_aggregate_root_id UNIQUE (processor_name, aggregate_root_id)
CONSTRAINT uk_aggregate_root_id_version_processor_name UNIQUE (aggregate_root_id, version, processor_name)
);

```

#### `MongoDB`

```js
```bash
db.event_stream.createIndex({aggregateRootId:1,commandId:1},{unique:true})
db.event_stream.createIndex({aggregateRootId:1,version:1},{unique:true})
db.published_version.createIndex({processorName:1,aggregateRootId:1},{unique:true})
db.published_version.createIndex({aggregateRootId:1,version:1,processorName:1,},{unique:true})
```

### 编程模型
Expand Down Expand Up @@ -348,14 +352,17 @@ class ChangeNoteTitleCommandHandler {
}
```

```java
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
```
@Command
public class BankAccountCommandHandler {
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
}
}
```

Expand All @@ -367,7 +374,7 @@ CompletableFuture<CommandResult> future = commandService.executeAsync(createNote

命令处理:

```java
```
/**
* 银行账户相关命令处理
* CommandHandler<CreateAccountCommand>, //开户
Expand Down Expand Up @@ -450,7 +457,7 @@ public class DepositTransactionProcessManager {
}

@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationAddedEvent evnt) {
public CompletableFuture<SendMessageResult> handleAsync(TransactionPreparationAddedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositPreparationCommand command = new ConfirmDepositPreparationCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
Expand All @@ -460,14 +467,14 @@ public class DepositTransactionProcessManager {
}

@Subscribe
public CompletableFuture<Boolean> handleAsync(DepositTransactionPreparationCompletedEvent evnt) {
public CompletableFuture<SendMessageResult> handleAsync(DepositTransactionPreparationCompletedEvent evnt) {
CommitTransactionPreparationCommand command = new CommitTransactionPreparationCommand(evnt.accountId, evnt.getAggregateRootId());
command.setId(evnt.getId());
return (commandBus.sendAsync(command));
}

@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationCommittedEvent evnt) {
public CompletableFuture<SendMessageResult> handleAsync(TransactionPreparationCommittedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositCommand command = new ConfirmDepositCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<version>${revision}</version>
<packaging>pom</packaging>
<properties>
<revision>1.1.5</revision>
<revision>1.1.6</revision>
<maven.flatten.version>1.5.0</maven.flatten.version>
<maven.gpg.version>1.6</maven.gpg.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DefaultCommandProcessor(
val aggregateRootId = processingCommand.message.aggregateRootId
Assert.nonNullOrEmpty(
aggregateRootId,
String.format("aggregateRootId of command, commandId: %s", processingCommand.message.id)
"aggregateRootId of command, commandId: ${processingCommand.message.id}"
)
var mailbox = mailboxDict.computeIfAbsent(aggregateRootId) { x: String ->
ProcessingCommandMailbox(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,8 @@ class DefaultProcessingCommandHandler(
override fun handleAsync(processingCommand: ProcessingCommand): CompletableFuture<Boolean> {
val command = processingCommand.message
if (Strings.isNullOrEmpty(command.aggregateRootId)) {
val errorMessage = String.format(
"The aggregateRootId of command cannot be null or empty. commandType:%s, commandId:%s",
command.javaClass.name,
command.id
)
val errorMessage =
"The aggregateRootId of command cannot be null or empty. commandType:${command.javaClass.name}, commandId:${command.id}"
logger.error(errorMessage)
return completeCommand(processingCommand, CommandStatus.Failed, String::class.java.name, errorMessage)
}
Expand Down Expand Up @@ -100,11 +97,8 @@ class DefaultProcessingCommandHandler(
}

HandlerFindStatus.NotFound -> {
val errorMessage = String.format(
"No command handler found of command. commandType:%s, commandId:%s",
command.javaClass.name,
command.id
)
val errorMessage =
"No command handler found of command. commandType:${command.javaClass.name}, commandId:${command.id}"
logger.error(errorMessage)
return completeCommand(processingCommand, CommandStatus.Failed, String::class.java.name, errorMessage)
}
Expand Down Expand Up @@ -191,13 +185,7 @@ class DefaultProcessingCommandHandler(
}
}
}, {
String.format(
"[command:[id:%s,type:%s],handlerType:%s,aggregateRootId:%s]",
command.id,
command.javaClass.name,
commandHandler.getInnerObject().javaClass.name,
command.aggregateRootId
)
"[command:[id:${command.id},type:${command.javaClass.name}],handlerType:${commandHandler.getInnerObject().javaClass.name},aggregateRootId:${command.aggregateRootId}]"
}, { ex: Throwable, errorMessage: String ->
handleExceptionAsync(
processingCommand, commandHandler, ex, errorMessage, 0
Expand All @@ -218,11 +206,8 @@ class DefaultProcessingCommandHandler(
if (events.size > 0) {
dirtyAggregateRootCount++
if (dirtyAggregateRootCount > 1) {
val errorMessage = String.format(
"Detected more than one aggregate created or modified by command. commandType:%s, commandId:%s",
command.javaClass.name,
command.id
)
val errorMessage =
"Detected more than one aggregate created or modified by command. commandType:${command.javaClass.name}, commandId:${command.id}"
logger.error(errorMessage)
return completeCommand(
processingCommand, CommandStatus.Failed, String::class.java.name, errorMessage
Expand Down Expand Up @@ -280,7 +265,7 @@ class DefaultProcessingCommandHandler(
).whenComplete { _, _ -> future.complete(true) }
}
}, {
String.format("[commandId:%s]", command.id)
"[commandId: ${command.id}]"
}, null, retryTimes, true)
return future
}
Expand Down Expand Up @@ -322,14 +307,7 @@ class DefaultProcessingCommandHandler(
}
}
}, {
String.format(
"[command:[id:%s,type:%s],handlerType:%s,aggregateRootId:%s] %s",
command.id,
command.javaClass.name,
commandHandler.getInnerObject().javaClass.name,
command.aggregateRootId,
errorMessage
)
"[command:[id:${command.id},type:${command.javaClass.name}],handlerType:${commandHandler.getInnerObject().javaClass.name},aggregateRootId:${command.aggregateRootId}] $errorMessage"
}, null, retryTimes, true)
return future
}
Expand Down Expand Up @@ -391,13 +369,7 @@ class DefaultProcessingCommandHandler(
processingCommand, CommandStatus.Success, message.javaClass.name, serializeService.serialize(message)
).whenComplete { _, _ -> future.complete(true) }
}, {
String.format(
"[application message:[id:%s,type:%s],command:[id:%s,type:%s]]",
message.id,
message.javaClass.name,
command.id,
command.javaClass.name
)
"[application message:[id:${message.id},type:${message.javaClass.name}],command:[id:${command.id},type:${command.javaClass.name}]]"
}, null, retryTimes, true)
return future
}
Expand Down
4 changes: 2 additions & 2 deletions enode/src/main/java/org/enodeframework/common/io/IOHelper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object IOHelper {
return try {
func.apply()
} catch (ex: Exception) {
throw IORuntimeException(String.format("%s failed.", funcName), ex)
throw IORuntimeException("$funcName failed.", ex)
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ object IOHelper {
getContextInfo(contextInfoFunc),
currentRetryTimes, ex
)
executeFailedAction(ex, String.format("Task '%s' was cancelled.", actionName))
executeFailedAction(ex, "Task '${actionName}' was cancelled.")
null
}
return
Expand Down
3 changes: 3 additions & 0 deletions enode/src/main/java/org/enodeframework/common/io/Task.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ object Task {
@JvmField
var completedTask: CompletableFuture<Boolean> = CompletableFuture.completedFuture(true)

@JvmField
var emptyTask: CompletableFuture<Nothing?> = CompletableFuture.completedFuture(null)

@JvmStatic
fun await(latch: CountDownLatch) {
try {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ class DefaultAggregateRootInternalHandlerProvider : AssemblyInitializer {
}
}
throw HandlerNotFoundException(
String.format(
"Could not find event handler for [%s] of [%s]",
eventType.javaClass.name,
aggregateRootType.javaClass.name
)
"Could not find event handler for [${eventType.javaClass.name}] of [${aggregateRootType.javaClass.name}]"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ class DefaultAggregateSnapshotter(private val aggregateRepositoryProvider: Aggre
{ aggregateRepository.getAsync(aggregateRootId) },
{ result: T -> taskSource.complete(result) },
{
String.format(
"aggregateRepository.getAsync has unknown exception, aggregateRepository: %s, aggregateRootTypeName: %s, aggregateRootId: %s",
aggregateRepository.javaClass.name,
aggregateRootType.name,
aggregateRootId
)
"aggregateRepository.getAsync has unknown exception, aggregateRepository: ${aggregateRepository.javaClass.name}, aggregateRootTypeName: ${aggregateRootType.name}, aggregateRootId: $aggregateRootId"
},
null,
retryTimes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ class DefaultMemoryCache(
}
if (aggregateRootInfo.aggregateRoot.javaClass != aggregateRootType) {
throw AggregateRootTypeNotMatchException(
String.format(
"Incorrect aggregate root type, aggregateRootId:%s, type:%s, expecting type:%s",
aggregateRootId,
aggregateRootInfo.aggregateRoot.javaClass,
aggregateRootType
)
"Incorrect aggregate root type, aggregateRootId:$aggregateRootId, type:${aggregateRootInfo.aggregateRoot.javaClass}, expecting type:$aggregateRootType"
)
}
if (aggregateRootInfo.aggregateRoot.changes.isNotEmpty()) {
Expand Down
Loading

0 comments on commit ae905a6

Please sign in to comment.