go-redis源码分析:连接池

更新时间:2023-05-12 01:49:19 阅读: 评论:0

go-redis源码分析:连接池
笔者最近在项⽬中基于 go-redis 实现 Redis 缓存优化性能。go-redis 是⼀个 Go 语⾔实现的 Redis 客户端,既然是⽹络服务的客户端,为了⾼效利⽤有限资源,避免重复创建和销毁⽹络连接,就必需对其进⾏管理。⽽资源管理⼜是编程领域中的⼀个重点难点,抱着对是否能利⽤Go 语⾔语法简洁的特点来优雅实现连接池的好奇,笔者决定阅读并分析 go-redis 连接池部分的源码,⼀探究竟。以下是对源码的分析,分为接⼝与结构体、连接池管理、建⽴与关闭连接、获取与放回连接、监控统计等5⼤部分,。
接⼝与结构体
连接结构体:
type Conn struct {
netConn net.Conn  // 基于 tcp 的⽹络连接
rd *proto.Reader // 根据 Redis 通信协议实现的 Reader
wr *proto.Writer // 根据 Redis 通信协议实现的 Writer
Inited    bool // 是否完成初始化
pooled    bool // 是否放进连接池
createdAt time.Time // 创建时间
udAt    int64 // 使⽤时间,atomic
}
连接池接⼝:
type Pooler interface {
NewConn(context.Context) (*Conn, error) // 创建连接
CloConn(*Conn) error // 关闭连接
Get(context.Context) (*Conn, error) // 获取连接
Put(*Conn) // 放回连接
Remove(*Conn, error) // 移除连接
Len() int // 连接池长度
IdleLen() int // 空闲连接数量
Stats() *Stats // 连接池统计
Clo() error // 关闭连接池
}
连接池结构体:
type ConnPool struct {
opt *Options // 连接池配置
dialErrorsNum uint32 // 连接错误次数,atomic
lastDialErrorMu sync.RWMutex // 上⼀次连接错误锁,读写锁
lastDialError  error // 上⼀次连接错误
queue chan struct{} // ⼯作连接队列
connsMu      sync.Mutex // 连接队列锁
conns        []*Conn // 连接队列
idleConns    []*Conn // 空闲连接队列
poolSize    int // 连接池⼤⼩
idleConnsLen int // 空闲连接队列长度
stats Stats // 连接池统计
_clod  uint32 // 连接池关闭标志,atomic
clodCh chan struct{} // 通知连接池关闭通道
}
连接池管理
初始化
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue:    make(chan struct{}, opt.PoolSize),
conns:    make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
clodCh:  make(chan struct{}),
}
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
aper(opt.IdleCheckFrequency)
}
return p
}
1. 创建连接池,传⼊连接池配置选项参数 opt,⼯⼚函数根据 opt 创建连接池实例。连接池主要依靠以下四个数据结构实现管理和通信:
queue:存储⼯作连接的缓冲通道
conns:存储所有连接的切⽚
idleConns:存储空闲连接的切⽚
clod:⽤于通知所有协程连接池已经关闭的通道
2. 检查连接池的空闲连接数量是否满⾜最⼩空闲连接数量要求,若不满⾜,则创建⾜够的空闲连接。
3. 若连接池配置选项规定了空闲连接超时和检查空闲连接频率,则开启⼀个清理空闲连接的协程。
关闭
func (p *ConnPool) Clo() error {
if !atomic.CompareAndSwapUint32(&p._clod, 0, 1) {
return ErrClod
}
clo(p.clodCh)
var firstErr error
for _, cn := s {
if err := p.cloConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
p.poolSize = 0
p.idleConns = nil
p.idleConnsLen = 0
return firstErr
}
1. 原⼦性检查连接池是否已经关闭,若没关闭,则将关闭标志置为1
2. 关闭 clodCh 通道,连接池中的所有协程都可以通过判断该通道是否关闭来确定连接池是否已经关闭。
3. 连接队列锁上锁,关闭队列中的所有连接,并置空所有维护连接池状态的数据结构,解锁。
过滤
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
for _, cn := s {
if fn(cn) {
if err := p.cloConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
实质上是遍历连接池中的所有连接,并调⽤传⼊的 fn 过滤函数作⽤在每个连接上,过滤出符合业务要求的连接。
清理
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
lect {
ca <-ticker.C:
// It is possible that ticker and clodCh arrive together,
// and lect pudo-randomly pick ticker ca, we double
// check here to prevent being executed after clod.
if p.clod() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf("ReapStaleConns failed: %s", err)
continue
}
ca <-p.clodCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
cn := p.reapStaleConn()
p.freeTurn()
if cn != nil {
_ = p.cloConn(cn)
n++
} el {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
return cn
}
1. 开启⼀个⽤于检查并清理过期连接的 goroutine 每隔 frequency 时间遍历检查连接池中是否存在过期连接,并清理。
2. 创建⼀个时间间隔为 frequency 的计时器,在连接池关闭时关闭该计时器
3. 循环判断计时器是否到时和连接池是否关闭
4. 移除空闲连接队列中的过期连接
建⽴与关闭连接
建⽴连接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {    cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
if pooled {
// If pool is full remove the cn on next Put.
if p.poolSize >= p.opt.PoolSize {
cn.pooled = fal
} el {
p.poolSize++
}
}
return cn, nil
}
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {    if p.clod() {
return nil, ErrClod
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
netConn, err := p.opt.Dialer(ctx)
if err != nil {
p.tLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
Dial()
}
return nil, err
}
cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
}
func (p *ConnPool) tryDial() {
for {
if p.clod() {
return
}
conn, err := p.opt.Dialer(context.Background())
if err != nil {
p.tLastDialError(err)
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Clo()
return
}
}
创建连接流程图:
newConn流程图.png
DialConn流程图.png
移除与关闭连接
func (p *ConnPool) Remove(cn *Conn, reason error) {
p.freeTurn()
_ = p.cloConn(cn)
}
func (p *ConnPool) CloConn(cn *Conn) error {
return p.cloConn(cn)
}
func (p *ConnPool) removeConnWithLock(cn *Conn) {
}
func (p *ConnPool) removeConn(cn *Conn) {
for i, c := s {
if c == cn {
if cn.pooled {
p.poolSize--
p.checkMinIdleConns()
}
return
}
}
}
func (p *ConnPool) cloConn(cn *Conn) error {
if p.opt.OnClo != nil {
_ = p.opt.OnClo(cn)
}
return cn.Clo()
}
连接池⽆论移除还是关闭连接,底层调⽤的都是 removeConnWithLock 函数。removeConnWithLock 函数的⼯作流程如下:
1. 连接队列上锁
2. 遍历连接队列找到要关闭的连接,并将其移除出连接队列
3. 更新连接池统计数据
4. 检查连接池最⼩空闲连接数量
5. 连接队列解锁
6. 关闭连接,先执⾏关闭连接时的回调函数(创建连接池时的配置选项传⼊),再关闭连接
获取与放回连接
获取
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.clod() {
return nil, ErrClod
}
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
for {
cn := p.popIdle()
if cn == nil {
break
}
if p.isStaleConn(cn) {
_ = p.CloConn(cn)
continue

本文发布于:2023-05-12 01:49:19,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/593165.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:连接   关闭   空闲   队列   是否   创建
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图
  • 我要关灯
    我要开灯
  • 返回顶部