聊一聊 SpringBoot R2DBC

摘要

  • 本文聊聊SpringBoot R2dbc

  • 本文基于SpringBoot-3.1.2

  • Github代码地址 web-flux-mysql-redis-demo,webflux-mysql-multi-demo

打印SQL

  • 方式一: 开启 debug 日志

1
2
3
4
# 可以打印sql,但不能打印参数值
logging:
level:
org.springframework.r2dbc: DEBUG
  • 方式二: 使用 r2dbc-proxy

1
2
3
4
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-proxy</artifactId>
</dependency>
1
2
3
4
5
spring:
r2dbc:
# url: r2dbc:mysql://mysql.test.db:3306/springboot?useUnicode=true&characterEncoding=utf-8
# 开启代理,目的是打印sql,生产环境不建议,http://r2dbc.io/r2dbc-proxy/docs/current/docs/html/
url: r2dbc:proxy:mysql://mysql.test.db:3306/springboot?useUnicode=true&characterEncoding=utf-8&proxyListener=com.example.r2dbc.LogExecutionListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.r2dbc;

import io.r2dbc.proxy.core.QueryExecutionInfo;
import io.r2dbc.proxy.listener.ProxyMethodExecutionListener;
import io.r2dbc.proxy.support.QueryExecutionInfoFormatter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LogExecutionListener implements ProxyMethodExecutionListener {
private final QueryExecutionInfoFormatter queryFormatter = new QueryExecutionInfoFormatter()
.newLine()
.showTime().showType().showSuccess().showTransaction().showBatchSize().showConnection().showThread()
.newLine()
.showQuery()
.newLine()
.showBindingsSize().showBindings()
.newLine();

@Override
public void afterExecuteOnStatement(QueryExecutionInfo queryExecutionInfo) {
String stringBuilder = this.queryFormatter.format(queryExecutionInfo);
log.info(stringBuilder);
}
}

条件查询

  • springboot为我们提供了几个Repository,如R2dbcRepositoryReactiveCrudRepositoryReactiveSortingRepository等等,我们自己的Repository通过继承这些父接口,可以获得相应的CURD的能力,但是其没有对条件查询提供支持,此时可以通过自定义Repository接口及其实现类的方式实现扩展

  • 自定义Repository接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.example.r2dbc;

import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.data.relational.core.query.Criteria;
import org.springframework.data.relational.core.query.Query;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.r2dbc.core.DatabaseClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.util.Map;
import java.util.function.BiFunction;

@NoRepositoryBean //接口不参与jpa的代理
public interface BaseR2dbcRepository<T, ID extends Serializable> extends R2dbcRepository<T, ID> {

R2dbcEntityOperations getR2dbcEntityOperations();

/**
* 分页查询
*/
Mono<Page<T>> pageByQuery(Criteria criteria, Pageable pageable);

Mono<Long> countByQuery(Criteria criteria);

Flux<T> findByQuery(Criteria criteria);

Flux<T> findByQuery(Criteria criteria, Sort sort);

Flux<T> findByQuery(Criteria criteria, int limit);

Flux<T> findByQuery(Criteria criteria, Sort sort, int limit);

Flux<T> findByQuery(Query query);

Mono<T> findOneByQuery(Query query);

Mono<T> findOneByQuery(Criteria criteria);


default <R> Mono<R> execSqlToMono(String sql, Map<String, Object> bindMap, BiFunction<Row, RowMetadata, R> mappingFunction) {
DatabaseClient.GenericExecuteSpec genericExecuteSpec = getR2dbcEntityOperations().getDatabaseClient().sql(sql);
if (bindMap != null) {
for (Map.Entry<String, Object> entry : bindMap.entrySet()) {
genericExecuteSpec = genericExecuteSpec.bind(entry.getKey(), entry.getValue());
}
}
return genericExecuteSpec.map(mappingFunction).first();
}

default <R> Flux<R> execSqlToFlux(String sql, Map<String, Object> bindMap, BiFunction<Row, RowMetadata, R> mappingFunction) {
DatabaseClient.GenericExecuteSpec genericExecuteSpec = getR2dbcEntityOperations().getDatabaseClient().sql(sql);
if (bindMap != null) {
for (Map.Entry<String, Object> entry : bindMap.entrySet()) {
genericExecuteSpec = genericExecuteSpec.bind(entry.getKey(), entry.getValue());
}
}
return genericExecuteSpec.map(mappingFunction).all();
}

}
  • 自定义Repository接口的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.example.r2dbc;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.r2dbc.convert.R2dbcConverter;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.data.r2dbc.repository.support.SimpleR2dbcRepository;
import org.springframework.data.relational.core.query.Criteria;
import org.springframework.data.relational.core.query.Query;
import org.springframework.data.relational.repository.query.RelationalEntityInformation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.Serializable;

@Slf4j
public class SimpleBaseR2dbcRepository<T, ID extends Serializable> extends SimpleR2dbcRepository<T, ID> implements BaseR2dbcRepository<T, ID> {

private final R2dbcEntityOperations r2dbcEntityOperations;

private final RelationalEntityInformation<T, ID> entity;

public SimpleBaseR2dbcRepository(RelationalEntityInformation<T, ID> entity, R2dbcEntityOperations entityOperations, R2dbcConverter converter) {
super(entity, entityOperations, converter);
this.r2dbcEntityOperations = entityOperations;
this.entity = entity;
log.info("SimpleBaseR2dbcRepository");
}

@Override
public R2dbcEntityOperations getR2dbcEntityOperations() {
return r2dbcEntityOperations;
}

@Override
public Mono<Page<T>> pageByQuery(Criteria criteria, Pageable pageable) {
final Query query = Query.query(criteria);
return r2dbcEntityOperations.count(query, entity.getJavaType()).flatMap(total ->
r2dbcEntityOperations.select(query.with(pageable), entity.getJavaType())
.collectList()
.map(list -> new PageImpl<>(list, pageable, total)));
}

@Override
public Mono<Long> countByQuery(Criteria criteria) {
final Query query = Query.query(criteria);
return r2dbcEntityOperations.count(query, entity.getJavaType());
}

@Override
public Flux<T> findByQuery(Criteria criteria) {
final Query query = Query.query(criteria);
return r2dbcEntityOperations.select(query, entity.getJavaType());
}

@Override
public Flux<T> findByQuery(Criteria criteria, Sort sort) {
final Query query = Query.query(criteria).sort(sort);
return r2dbcEntityOperations.select(query, entity.getJavaType());
}

@Override
public Flux<T> findByQuery(Criteria criteria, int limit) {
final Query query = Query.query(criteria).limit(limit);
return r2dbcEntityOperations.select(query, entity.getJavaType());
}

@Override
public Flux<T> findByQuery(Criteria criteria, Sort sort, int limit) {
final Query query = Query.query(criteria).sort(sort).limit(limit);
return r2dbcEntityOperations.select(query, entity.getJavaType());
}

@Override
public Flux<T> findByQuery(Query query) {
return r2dbcEntityOperations.select(query, entity.getJavaType());
}

@Override
public Mono<T> findOneByQuery(Query query) {
return r2dbcEntityOperations.selectOne(query, entity.getJavaType());
}

@Override
public Mono<T> findOneByQuery(Criteria criteria) {
final Query query = Query.query(criteria);
return r2dbcEntityOperations.selectOne(query, entity.getJavaType());
}
}
  • 我们需要告知springboot使用我们自定义的Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.config;

import com.example.r2dbc.SimpleBaseR2dbcRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;

@Slf4j
@Configuration
// 这里必须指定 basePackages,否则 repositoryBaseClass 指定的实现类不能被关联到 各个 Repository 中
@EnableR2dbcRepositories(basePackages = "com.example.mysql", repositoryBaseClass = SimpleBaseR2dbcRepository.class)
public class ReactiveR2dbcConfig {

}
  • 此时我们在创建业务Repository时就可以继承我们自定义的Repository

1
2
3
public interface SysUserRepository extends BaseR2dbcRepository<SysUser, String> {

}
  • 可以看到我们自定义的Repository中,主要是通过Criteria来提供查询条件的封装,我了便于创建Criteria对象,这里提供了一个工具类 CustomCriteria,其主要功能是根据条件来拼接查询条件,代码比较多,就不在这里粘贴了,自行去Github代码地址中查看吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* <h1>CustomCriteria</h1>
* Criteria 构造器
* Created by hanqf on 2023/9/4 11:47.
*
*
* 示例:
* 1.and 关系: select * from tbl_table where (username like '%admin%' or username like 'lisi%') and enable = 1
* Criteria criteria = CustomCriteria.and()
* .like(true, "username", "%admin%", "lisi%")
* .eq(true, "enable", 1)
* .build();
*
* 2.or 关系: select * from tbl_table where (username like '%admin%' or username like 'lisi%') or enable = 1
* Criteria criteria = CustomCriteria.or()
* .like(true, "username", "%admin%", "lisi%")
* .eq(true, "enable", 1)
* .build();
*
* 3.复合关系:
* Criteria criteria1 = CustomCriteria.and()
* .like(true, "username", "%admin%", "lisi%")
* .eq(true, "enable", 1)
* .build();

* Criteria criteria2 = CustomCriteria.or()
* .like(true, "username", "%admin%", "lisi%")
* .eq(true, "enable", 1)
* .build();
*
* Criteria criteria = criteria1.and(criteria2); // or: criteria1.or(criteria2);
*
* 4.复杂查询建议直接使用 sql 进行查询,可以使用 BaseR2dbcRepository 中的 execSqlToMono 和 execSqlToFlux
* 示例:
* public Mono<TestOrder> getOne(String orderId) {
String sql = "select id, order_id from test_order where order_id = :orderId";

return testOrderRepository.execSqlToMono(sql, Map.of("orderId", orderId), (row, rowMetadata) -> {
final TestOrder testOrder = new TestOrder();
testOrder.setId(row.get("id", Long.class));
testOrder.setOrderId(row.get("order_id", String.class));
return testOrder;
});
}
*
*
* 说明:
* 1.eq(true, "enable", 1):第一个参数为真时当前条件加入查询,默认为真
* 2.支持条件方法详见代码
*/

新增

  • R2dbc提供的<S extends T> Mono<S> save(S entity);方法,要求table必须含有主键,其根据实体类中主键是否被填充来判断是新增还是修改操作,如果我们是自定义主键,而非数据库自动填充主键,此时就不能使用save方法新增记录,解决方法是在业务Repository中创建一个新增方法,比如:

1
2
3
4
5
6
7
8
9
10
11
/**
* 实体类在进行新增时会判断主键是否填充,如果没有填充就认为是新数据,采取真正的新增操作,主键需要数据库来自动填充;
* 如果主键存在值则认为是旧数据则调用更新操作。
* 对于自定义主键的情况,可以使用如下方式进行新增对象
*
* 对象形式传参::#{#对象名.字段名}
* 字段传参::字段名(@param定义)
*/
@Modifying
@Query("insert into sys_user (id,username,password,enable) values (:#{#sysUser.id},:#{#sysUser.username},:#{#sysUser.password},:#{#sysUser.enable})")
Mono<Integer> addSysUser(SysUser sysUser);

事务

  • 注解式事务

1
2
3
4
5
6
7
8
9
10
11
@Transactional
@RequestMapping("/tx")
public Mono<Integer> tx() {
final SysUser sysUser = new SysUser();
sysUser.setId(UUID.randomUUID().toString());
sysUser.setUsername("test");
sysUser.setPassword("123456");
sysUser.setEnable(true);
// 主键重复
return sysUserRepository.addSysUser(sysUser).then(sysUserRepository.addSysUser(sysUser));
}
  • 编程式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Autowired
private TransactionalOperator transactionalOperator;

@RequestMapping("/tx2")
public Mono<Integer> tx2() {
final SysUser sysUser = new SysUser();
sysUser.setId(UUID.randomUUID().toString());
sysUser.setUsername("test");
sysUser.setPassword("123456");
sysUser.setEnable(true);
// 主键重复
return sysUserRepository.addSysUser(sysUser).then(sysUserRepository.addSysUser(sysUser))
.as(transactionalOperator::transactional);
}

多数据源

  • application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spring:
#r2dbc mysql
r2dbc:
one:
#url: r2dbc:mysql://mysql.test.db:3306/springboot?useUnicode=true&characterEncoding=utf-8
# 开启代理,目的是打印sql,生产环境不建议,http://r2dbc.io/r2dbc-proxy/docs/current/docs/html/
url: r2dbc:proxy:mysql://mysql.test.db:3306/springboot?useUnicode=true&characterEncoding=utf-8&proxyListener=com.example.r2dbc.LogExecutionListener
username: testUser
password: 123456
pool:
enabled: true
initial-size: 5
max-size: 20
max-idle-time: 30m
two:
#url: r2dbc:mysql://mysql.test.db:3306/mybatis?useUnicode=true&characterEncoding=utf-8
url: r2dbc:proxy:mysql://mysql.test.db:3306/mybatis?useUnicode=true&characterEncoding=utf-8&proxyListener=com.example.r2dbc.LogExecutionListener
username: testUser
password: 123456
pool:
enabled: true
initial-size: 5
max-size: 20
max-idle-time: 30m
  • one:配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package com.example.config;

import com.example.r2dbc.SimpleBaseR2dbcRepository;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;

import java.util.function.Predicate;
import java.util.function.Supplier;


@Slf4j
@Configuration
// 这里必须指定 basePackages,否则 repositoryBaseClass 指定的实现类不能被关联到 各个 Repository 中
@EnableR2dbcRepositories(basePackages = "com.example.mysql.one.repository", repositoryBaseClass = SimpleBaseR2dbcRepository.class, entityOperationsRef = "oneR2dbcEntityTemplate")
public class OneReactiveR2dbcConfig {

@Primary
@Bean("oneR2dbcProperties")
@ConfigurationProperties(prefix = "spring.r2dbc.one")
public R2dbcProperties oneR2dbcProperties() {
return new R2dbcProperties();
}

@Primary
@Bean("oneR2dbcEntityTemplate")
public R2dbcEntityTemplate oneR2dbcEntityTemplate() {
log.info("oneR2dbcEntityTemplate");
return new R2dbcEntityTemplate(oneConnectionFactory());
}

@Primary
@Bean("oneConnectionFactory")
public ConnectionFactory oneConnectionFactory() {
log.info("oneConnectionFactory");
R2dbcProperties r2dbcProperties = oneR2dbcProperties();
ConnectionFactoryOptions urlOptions = ConnectionFactoryOptions.parse(r2dbcProperties.getUrl());
ConnectionFactoryOptions.Builder optionsBuilder = urlOptions.mutate();

configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.USER, r2dbcProperties::getUsername,
StringUtils::hasText);
configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.PASSWORD, r2dbcProperties::getPassword,
StringUtils::hasText);
configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.DATABASE,
() -> determineDatabaseName(r2dbcProperties), StringUtils::hasText);

if (r2dbcProperties.getProperties() != null) {
r2dbcProperties.getProperties()
.forEach((key, value) -> optionsBuilder.option(Option.valueOf(key), value));
}
final ConnectionFactoryOptions options = optionsBuilder.build();
return ConnectionFactories.get(options);
}

@Primary
@Bean("oneR2dbcTransactionManager")
public ReactiveTransactionManager oneR2dbcTransactionManager() {
return new R2dbcTransactionManager(oneConnectionFactory());
}

@Primary
@Bean("oneTransactionalOperator")
public TransactionalOperator oneTransactionalOperator() {
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
transactionDefinition.setName("oneTransactionalOperator");
// 这是默认行为
transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return TransactionalOperator.create(oneR2dbcTransactionManager(), transactionDefinition);
}


private <T extends CharSequence> void configureIf(ConnectionFactoryOptions.Builder optionsBuilder,
ConnectionFactoryOptions originalOptions, Option<T> option, Supplier<T> valueSupplier,
Predicate<T> setIf) {
if (originalOptions.hasOption(option)) {
return;
}
T value = valueSupplier.get();
if (setIf.test(value)) {
optionsBuilder.option(option, value);
}
}

private String determineDatabaseName(R2dbcProperties properties) {
if (properties.isGenerateUniqueName()) {
return properties.determineUniqueName();
}
if (StringUtils.hasLength(properties.getName())) {
return properties.getName();
}
return null;
}
}
  • two:配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.example.config;

import com.example.r2dbc.SimpleBaseR2dbcRepository;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;

import java.util.function.Predicate;
import java.util.function.Supplier;

@Slf4j
@Configuration
// 这里必须指定 basePackages,否则 repositoryBaseClass 指定的实现类不能被关联到 各个 Repository 中
@EnableR2dbcRepositories(basePackages = "com.example.mysql.two.repository", repositoryBaseClass = SimpleBaseR2dbcRepository.class, entityOperationsRef = "twoR2dbcEntityTemplate")
public class TwoReactiveR2dbcConfig {

@Bean("twoR2dbcProperties")
@ConfigurationProperties(prefix = "spring.r2dbc.two")
public R2dbcProperties twoR2dbcProperties() {
return new R2dbcProperties();
}

@Bean("twoR2dbcEntityTemplate")
public R2dbcEntityTemplate twoR2dbcEntityTemplate() {
log.info("twoR2dbcEntityTemplate");
return new R2dbcEntityTemplate(twoConnectionFactory());
}

@Bean("twoConnectionFactory")
public ConnectionFactory twoConnectionFactory() {
log.info("twoConnectionFactory");
R2dbcProperties r2dbcProperties = twoR2dbcProperties();
ConnectionFactoryOptions urlOptions = ConnectionFactoryOptions.parse(r2dbcProperties.getUrl());
ConnectionFactoryOptions.Builder optionsBuilder = urlOptions.mutate();

configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.USER, r2dbcProperties::getUsername,
StringUtils::hasText);
configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.PASSWORD, r2dbcProperties::getPassword,
StringUtils::hasText);
configureIf(optionsBuilder, urlOptions, ConnectionFactoryOptions.DATABASE,
() -> determineDatabaseName(r2dbcProperties), StringUtils::hasText);

if (r2dbcProperties.getProperties() != null) {
r2dbcProperties.getProperties()
.forEach((key, value) -> optionsBuilder.option(Option.valueOf(key), value));
}
final ConnectionFactoryOptions options = optionsBuilder.build();
return ConnectionFactories.get(options);
}


@Bean("twoR2dbcTransactionManager")
public ReactiveTransactionManager twoR2dbcTransactionManager() {
return new R2dbcTransactionManager(twoConnectionFactory());
}

@Bean("twoTransactionalOperator")
public TransactionalOperator twoTransactionalOperator() {
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
transactionDefinition.setName("twoTransactionalOperator");
// 这是默认行为
transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return TransactionalOperator.create(twoR2dbcTransactionManager(), transactionDefinition);
}

private <T extends CharSequence> void configureIf(ConnectionFactoryOptions.Builder optionsBuilder,
ConnectionFactoryOptions originalOptions, Option<T> option, Supplier<T> valueSupplier,
Predicate<T> setIf) {
if (originalOptions.hasOption(option)) {
return;
}
T value = valueSupplier.get();
if (setIf.test(value)) {
optionsBuilder.option(option, value);
}
}

private String determineDatabaseName(R2dbcProperties properties) {
if (properties.isGenerateUniqueName()) {
return properties.determineUniqueName();
}
if (StringUtils.hasLength(properties.getName())) {
return properties.getName();
}
return null;
}
}
  • 主要注意如下几点:

    • 基于配置文件中数据源的信息创建各自的 R2dbcEntityTemplateR2dbcTransactionManager
    • basePackages 指定不同的扫描路径
    • entityOperationsRef 指定各自的 R2dbcEntityTemplate
  • 完整代码参考 Github代码地址