对象池化管理
概述
很多常见组件的 SDK 或者 API 的使用方法,均是实例化一个 XXXClient 对象,然后利用该对象对具体的组件进行操作。例如:AWS S3 中的 S3AsyncClient
,InfluxDB 中的 InfluxDBClient
等等。常规使用中,通常是将此类对象实例化为一个单例,或者 Spring Boot 中的 Bean,然后在代码中统一使用。
一般来说,这种使用方式没有任何问题。但是随着技术的发展以及对性能和吞吐能力需求的提升,大量产品或者组件的 SDK 慢慢的逐步改为异步调用方式。异步操作就意味着原来的阻塞式逻辑变为多线程逻辑,那么原有单例方式实例化核心对象的操作方式就很难满足并发需求。
为了解决这个问题,Dante Cloud 对于此类对象的使用均改为"池"化对象方式。即构建一个对象池,在使用时生成多个核心对象实例,随用随取。类似于连接数据库的线程池,这样可以有效规避异步操作带来的时序不一致以及单一实例多线程操作不稳定问题。同时,减少反复创建和关闭连接带来的损耗。
[一]使用方法
Dante Cloud 池化对象封装类,主要有两个:
cn.herodotus.stirrup.core.definition.support.AbstractObjectPool
cn.herodotus.stirrup.core.definition.support.AbstractPooledObjectService
使用方法主要有以下几个步骤:
- 定义自己的
BasePooledObjectFactory
实现类 - 定义自己的
AbstractObjectPool
实现类 - 将
AbstractObjectPool
实现类注册为 Bean - 编写自己的
AbstractPooledObjectService
实现类,在其中实现自己的业务逻辑。
如果要同时实现对象池的配置参数,可以在自己的属性定义类中,增加一个 cn.herodotus.stirrup.core.definition.domain.Pool
属性即可。
[二]应用案例
下面以 Dante Cloud 中 InfluxDB2
模块的实现作为案例,来说明池化对象的具体使用方法。
[1]定义配置属性
如前文所述,在自己的配置属性类型,添加 cn.herodotus.stirrup.core.definition.domain.Pool
对象作为属性,实现对象池的配置。
ConfigurationProperties(prefix = InfluxDB2Constants.PROPERTY_NOSQL_INFLUXDB2)
public class InfluxDB2Properties {
/**
* Influxdb2 连接访问 URL
*/
private String url = "http://localhost:8086";
/**
* Influxdb2 Token。Influxdb2 不再使用用户名和密码访问服务端
*/
private String token = "kGTzf2ey86guP0mkwF5s7tfRFdtGyVYcG-QVbTQyP8zmQLigb1i6guHbQ2DGB9Wbadu93b4151NJjKt0vWXQfQ==";
/**
* Influxdb2 组织, 默认值:herodotus
*/
private String organization = "herodotus";
/**
* Influxdb2 存储桶。相比V1 移除了database 和 RP,增加了bucket
*/
private String bucket = "herodotus-bucket";
/**
* 对象池设置
*/
private Pool pool = new Pool();
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public String getOrganization() {
return organization;
}
public void setOrganization(String organization) {
this.organization = organization;
}
public String getBucket() {
return bucket;
}
public void setBucket(String bucket) {
this.bucket = bucket;
}
public Pool getPool() {
return pool;
}
public void setPool(Pool pool) {
this.pool = pool;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("url", url)
.add("token", token)
.add("organization", organization)
.add("bucket", bucket)
.add("pool", pool)
.toString();
}
}
[2]定义 BasePooledObjectFactory
实现类
定义一个 InfluxDB2ClientPooledObjectFactory
类,作为 BasePooledObjectFactory
实现类
public class InfluxDB2ClientPooledObjectFactory extends BasePooledObjectFactory<InfluxDBClient> {
private final InfluxDB2Properties influxdb2Properties;
public InfluxDB2ClientPooledObjectFactory(InfluxDB2Properties influxdb2Properties) {
this.influxdb2Properties = influxdb2Properties;
}
@Override
public InfluxDBClient create() throws Exception {
return InfluxDBClientFactory.create(influxdb2Properties.getUrl(),
influxdb2Properties.getToken().toCharArray(),
influxdb2Properties.getOrganization(),
influxdb2Properties.getBucket());
}
@Override
public PooledObject<InfluxDBClient> wrap(InfluxDBClient influxDBClient) {
return new DefaultPooledObject<>(influxDBClient);
}
@Override
public void destroyObject(PooledObject<InfluxDBClient> p) throws Exception {
p.getObject().close();
}
}
提示
BasePooledObjectFactory
(池化对象工厂)类,主要实现对象池中,对象的生成和回收。实现必要的方法即可,其它内容根据实际需求添加即可。
[3]定义 AbstractObjectPool
实现类
定义一个 InfluxDB2ClientObjectPool
作为 AbstractObjectPool
实现类。
public class InfluxDB2ClientObjectPool extends AbstractObjectPool<InfluxDBClient> {
public InfluxDB2ClientObjectPool(InfluxDB2ClientPooledObjectFactory influxdb2ClientPooledObjectFactory, InfluxDB2Properties influxdb2Properties) {
super(influxdb2ClientPooledObjectFactory, influxdb2Properties.getPool());
}
}
提示
AbstractObjectPool
类,对象池的主要操作类。实际的业务代码中,可以直接使用 InfluxDB2ClientObjectPool
类,或者该类注入到其它类中,例如:XXXService 类中进行使用。
[三]注意事项
对象池的操作很类似于 JDBC,需要自己手动生成对象,并且手动关闭对象。
重要
每次操作完,一定要手动关闭对象。这里的关闭并不是真正关闭,而是将使用后的对象放回对象池。如果忘记关闭,就会像 JDBC 一样,链接被耗尽导致使用一段时间后出现操作失败问题。
下面以 Dante Cloud 对象存储为例,展示具体的操作方式:
[1]定义服务的通用操作
public abstract class AbstractS3AsyncClientService extends AbstractPooledObjectService<S3AsyncClient> {
public AbstractS3AsyncClientService(S3AsyncClientObjectPool objectPool) {
super(objectPool);
}
protected <T> CompletableFuture<T> toFuture(Function<S3AsyncClient, CompletableFuture<T>> operate) {
S3AsyncClient client = getClient();
CompletableFuture<T> future = operate.apply(client);
close(client);
return future;
}
}
注意
手动创建对象,完成操作后,关闭对象。
[2]实现具体的业务操作
@Service
public class S3AsyncClientBucketService extends AbstractS3AsyncClientService {
public S3AsyncClientBucketService(S3AsyncClientObjectPool objectPool) {
super(objectPool);
}
public CompletableFuture<HeadBucketResponse> headBucket(HeadBucketRequest headBucketRequest) {
return toFuture(client -> client.headBucket(headBucketRequest));
}
public CompletableFuture<CreateBucketResponse> createBucket(CreateBucketRequest createBucketRequest) {
return toFuture(client -> client.createBucket(createBucketRequest));
}
public CompletableFuture<DeleteBucketResponse> deleteBucket(DeleteBucketRequest deleteBucketRequest) {
return toFuture(client -> client.deleteBucket(deleteBucketRequest));
}
public CompletableFuture<ListBucketsResponse> listBuckets(ListBucketsRequest listBucketsRequest) {
return toFuture(client -> client.listBuckets(listBucketsRequest));
}
}