grpc(三)之grpc客户端使⽤连接池
本⽂使⽤commons-pool2来实现连接池应⽤
1、定义⼀个产⽣连接池的⼯⼚,需要继承BaPooledObjectFactory,其⽤处是⽣产和销毁连接池中保存的对象。根据需求,现在池⼦⾥保存的应该是grpc客户端对象。
GrpcClientFactory类:
pc;
import s.pool2.BaPooledObjectFactory;
import s.pool2.PooledObject;
import s.pool2.impl.DefaultPooledObject;
pc.client.GrpcClient;
utils.UtilFunctions;
public class GrpcClientFactory extends BaPooledObjectFactory<GrpcClient> {
@Override
public GrpcClient create() throws Exception {
return new GrpcClient("localhost", 23333);
}
@Override
public PooledObject<GrpcClient> wrap(GrpcClient client) {
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<GrpcClient> p) throws Exception {
UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
super.destroyObject(p);
}
}
2、连接池GrpcClientPool类
pc;
import s.pool2.impl.GenericObjectPool;
import s.pool2.impl.GenericObjectPoolConfig;
pc.client.GrpcClient;
utils.UtilFunctions;
public class GrpcClientPool {
private static GenericObjectPool<GrpcClient> objectPool = null;
static {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 池中的最⼤连接数
poolConfig.tMaxTotal(8);
// 最少的空闲连接数
poolConfig.tMinIdle(0);
// 最多的空闲连接数
poolConfig.tMaxIdle(8);
// 当连接池资源耗尽时,调⽤者最⼤阻塞的时间,超时时抛出异常单位:毫秒数
poolConfig.tMaxWaitMillis(-1);
/
/ 连接池存放池化对象⽅式,true放在空闲队列最前⾯,fal放在空闲队列最后
poolConfig.tLifo(true);
// 连接空闲的最⼩时间,达到此值后空闲连接可能会被移除,默认即为30分钟
poolConfig.tMinEvictableIdleTimeMillis(1000L * 60L * 30L);// 连接耗尽时是否阻塞,默认为true
poolConfig.tBlockWhenExhausted(true);
objectPool = new GenericObjectPool<>(new GrpcClientFactory(), poolConfig);
}
public static GrpcClient borrowObject() {
try {
GrpcClient client = objectPool.borrowObject();
UtilFunctions.log.info("=======total threads created: " + CreatedCount());
return client;
} catch (Exception e) {
("objectPool.borrowObject error, msg:{}, exception:{}", e.toString(), e);
}
return createClient();
}
public static void returnObject(GrpcClient client) {
try {
} catch (Exception e) {
("urnObject error, msg:{}, exception:{}", e.toString(), e);
}
}
private static GrpcClient createClient() {
return new GrpcClient("localhost", 23333);
}
}
3、客户端程序
这⾥仅仅简单列出了客户端GrpcClient类的代码,其他代码包括服务端代码见另⼀篇博客grpc(⼀)之helloworld。pc.client;
import flect.Method;
import urrent.TimeUnit;
pc.BookServiceGrpc;
pc.GrpcClientPool;
pc.GrpcLib.GrpcReply;
pc.GrpcLib.addBookRequest;
UrByIdRequest;
pc.UrServiceGrpc;
utils.UtilFunctions;
pc.ManagedChannel;
pc.ManagedChannelBuilder;
pc.StatusRuntimeException;
public class GrpcClient {
public static String host = "localhost";
private final ManagedChannel channel;
private final UrServiceGrpc.UrServiceBlockingStub urBlockingStub;
private final BookServiceGrpc.BookServiceBlockingStub bookBlockingStub;
public GrpcClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port).uPlaintext().build();
urBlockingStub = wBlockingStub(channel);
bookBlockingStub = wBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
}
@SuppressWarnings({ "rawtypes" })
public static Object call(String rpcMethoddName, args) throws Exception {
UtilFunctions.log.info("=========== GrpcClient#call begin ===========");
GrpcClient client = null;
try {
client = GrpcClientPool.borrowObject();
// client = new GrpcClient(host, 23333);
Class[] argsTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
UtilFunctions.log.info("args types: {}", args[i].getClass());
argsTypes[i] = args[i].getClass();
}
Method method = Class().getMethod(rpcMethoddName, argsTypes);
Object result = method.invoke(client, args);
UtilFunctions.log.info("=========== GrpcClient#call end ===========");
return result;
} catch (Exception e) {
("GrpcClient#call error, msg:{}, exception:{}", e.toString(), e);
return null;
} finally {
if (client != null) {
/
/ client.shutdown();
}
}
}
// ============= Ur module =============
public Object getUrById(Integer id) {
UtilFunctions.log.info("=========== GrpcClient#getUrById begin ===========");
getUrByIdRequest request = wBuilder().tId(id).build();
GrpcReply respon;
try {
respon = UrById(request);
UtilFunctions.log.info("GrpcClient#getUrById respon, code:{}, data:{}", Code(),
} catch (StatusRuntimeException e) {
("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
return null;
}
return respon;
}
// ============= Book module =============
public Object addBook(Integer id, String name, Double price) {
UtilFunctions.log.info("=========== GrpcClient#addBook begin ===========");
addBookRequest request = wBuilder().tId(id).tName(name).tPrice(price).build();
GrpcReply respon;
try {
respon = bookBlockingStub.addBook(request);
UtilFunctions.log.info("GrpcClient#addBook respon, code:{}, data:{}", Code(),
UtilFunctions.log.info("=========== GrpcClient#addBook end ===========");
} catch (StatusRuntimeException e) {
("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
return null;
}
return respon;
}
}
4、客户端测试
pc.client;
pc.GrpcClientPool;
pc.GrpcLib.GrpcReply;
utils.UtilFunctions;
public class TestService {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
GrpcReply result = null;
try {
// result = (GrpcReply) GrpcClient.call("getUrById", Integer.valueOf("1"));
// result = (GrpcReply) GrpcClient.call("getUrById", 2);
result = (GrpcReply) GrpcClient.call("addBook", 1, "thinking in java", 50.0);
} catch (Exception e) {
e.printStackTrace();
}
if (result != null) {
UtilFunctions.log.info("client call interface, get code:{}, data:{}", Code(),
}
// 如果注释掉下⾯两句,则客户端程序结束后,服务端报java.io.IOException: 远程主机强迫关闭了⼀个现有的连接。 // UtilFunctions.log.info("TestService#main: objectPool ");
// ObjectPool().clo();
}
}).start();
}
}
}
运⾏testService类的main()⽅法,客户端能正常调⽤grpc rver得到数据,但是grpc服务端报错:
2019-04-11 14:29:30.458 INFO 1192 --- [-worker-ELG-3-1] tions : Transport failed
java.io.IOException: 远程主机强迫关闭了⼀个现有的连接。
我在GrpcClientFactory⾥⾯也实现了销毁⽅法:
@Override
public void destroyObject(PooledObject<GrpcClient> p) throws Exception {
UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
super.destroyObject(p);
}
但是运⾏testService类的main()⽅法结束后服务端程序就结束了,程序没有主动调⽤destroyObject()⽅法销毁池⼦中的对象,所以grpcClient也没有shutdown,所以报错。
5、启动客户端springboot项⽬来测试
;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
pc.client.TestService;
@SpringBootApplication
public class Grpc007ClientMainApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(Grpc007ClientMainApplication.class, args);
TestService.main(args);
}
}
但是这样当关闭客户端程序,还是出现同样的问题。其实很好理解,因为关闭客户端程序时,池中的对象还处于空闲状态,没有销毁,destroyObject()⽅法没有调⽤,所以grpcClient也没有shutdown。
6、解决⽅法
客户端程序关闭时,池也要clo。
;
import javax.annotation.PreDestroy;
import s.pool2.impl.GenericObjectPool;
import org.springframework.stereotype.Controller;
pc.GrpcClientPool;
pc.client.GrpcClient;
utils.UtilFunctions;
@Controller
public class InitController {
@PreDestroy
public void destroy() {
UtilFunctions.log.info("InitController#");
GenericObjectPool<GrpcClient> objectPool = ObjectPool();
UtilFunctions.log.info("InitController#destroy, total threads created: " + CreatedCount());
UtilFunctions.log.info("InitController#destroy objectPool ");
objectPool.clo();
}
}