许可优化
产品
解决方案
服务支持
关于
软件库
当前位置:服务支持 >  软件文章 >  PostgreSQL数据库高可用patroni源码学习——DCS类

PostgreSQL数据库高可用patroni源码学习——DCS类

阅读数 5
点赞 0
article_banner

DCS类代码位于patroni/dcs/__init__.py下,这一篇博客我们主要关注patroni组件的DCS类和如何利用ETCD DCS存储集群状态。
在这里插入图片描述在这里插入图片描述

get_dcs函数

Patroni子类的构造函数调用get_dcs(self.config)函数获取dcs类,其代码如下所示。dcs_modules函数根据执行环境获取 DCS 模块的名称。 如果使用 PyInstaller 打包,则无法通过扫描源目录动态发现模块,因为 FrozenImporter 没有实现 iter_modules 方法。 但是仍然可以通过迭代toc找到所有潜在的DCS模块,其中包含所有“冻结”资源的列表。

def dcs_modules():
    """Get names of DCS modules, depending on execution environment. If being packaged with PyInstaller,
    modules aren't discoverable dynamically by scanning source directory because `FrozenImporter` doesn't
    implement `iter_modules` method. But it is still possible to find all potential DCS modules by
    iterating through `toc`, which contains list of all "frozen" resources."""
    dcs_dirname = os.path.dirname(__file__)
    module_prefix = __package__ + '.'
    if getattr(sys, 'frozen', False):
        toc = set()
        # dcs_dirname may contain a dot, which causes pkgutil.iter_importers()
        # to misinterpret the path as a package name. This can be avoided
        # altogether by not passing a path at all, because PyInstaller's
        # FrozenImporter is a singleton and registered as top-level finder.
        for importer in pkgutil.iter_importers():
            if hasattr(importer, 'toc'):
                toc |= importer.toc
        return [module for module in toc if module.startswith(module_prefix) and module.count('.') == 2]
    else:
        return [module_prefix + name for _, name, is_pkg in pkgutil.iter_modules([dcs_dirname]) if not is_pkg]
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

get_dcs函数首先遍历执行环境获取到的 DCS 模块名称,如果配置文件中指定了,则使用importlib包的import_module函数导入这些模块(可以看看importlib如果动态导入包的代码,后续利用python开发可以使用)。

def get_dcs(config):
    modules = dcs_modules()
    for module_name in modules:
        name = module_name.split('.')[-1]
        if name in config:  # we will try to import only modules which have configuration section in the config file
            try:
                module = importlib.import_module(module_name)
                for key, item in module.__dict__.items():  # iterate through the module content
                    # try to find implementation of AbstractDCS interface, class name must match with module_name
                    if key.lower() == name and inspect.isclass(item) and issubclass(item, AbstractDCS):
                        # propagate some parameters
                        config[name].update({p: config[p] for p in ('namespace', 'name', 'scope', 'loop_wait', 'patronictl', 'ttl', 'retry_timeout') if p in config})
                        return item(config[name])
            except ImportError:
                logger.debug('Failed to import %s', module_name)

    available_implementations = []
    for module_name in modules:
        name = module_name.split('.')[-1]
        try:
            module = importlib.import_module(module_name)
            available_implementations.extend(name for key, item in module.__dict__.items() if key.lower() == name
                                             and inspect.isclass(item) and issubclass(item, AbstractDCS))
        except ImportError:
            logger.info('Failed to import %s', module_name)
    raise PatroniFatalException("""Can not find suitable configuration of distributed configuration store
Available implementations: """ + ', '.join(sorted(set(available_implementations))))
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

AbstractDCS接口

AbstractDCS接口的定义如下@six.add_metaclass(abc.ABCMeta) class AbstractDCS(object),如下是它的构造函数和成员函数。构造函数主要是初始化一些成员变量。

    def __init__(self, config):
        """
        :param config: dict, reference to config section of selected DCS.
            i.e.: `zookeeper` for zookeeper, `etcd` for etcd, etc...
        """
        self._name = config['name']
        self._base_path = re.sub('/+', '/', '/'.join(['', config.get('namespace', 'service'), config['scope']]))
        self._set_loop_wait(config.get('loop_wait', 10))
        self._ctl = bool(config.get('patronictl', False))
        self._cluster = None
        self._cluster_valid_till = 0
        self._cluster_thread_lock = Lock()
        self._last_lsn = ''
        self._last_seen = 0
        self._last_status = {}
        self.event = Event()
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

python多线程之事件(Event):小伙伴a,b,c围着吃火锅,当菜上齐了,请客的主人说:开吃!,于是小伙伴一起动筷子,这种场景如何实现?
事件处理的机制:全局定义了一个内置标志Flag,如果Flag值为 False,那么当程序执行 event.wait方法时就会阻塞,如果Flag值为True,那么event.wait 方法时便不再阻塞。Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

  • set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
  • clear(): 将标志设为False。
  • wait(timeout): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
  • isSet(): 获取内置标志状态,返回True或False。
成员函数 类型 作用
client_path(self, path) ‘/’.join([self._base_path, path.lstrip(’/’)]) 拼接客户路径
initialize_path(self) property self.client_path(self._INITIALIZE)
config_path(self) property self.client_path(self._CONFIG)
members_path(self) property self.client_path(self._MEMBERS)
member_path(self) property self.client_path(self._MEMBERS + self._name)
leader_path(self) property self.client_path(self._LEADER)
failover_path(self) property self.client_path(self._FAILOVER)
history_path(self) property self.client_path(self._HISTORY)
status_path(self) property self.client_path(self._STATUS)
leader_optime_path(self) property self.client_path(self._LEADER_OPTIME)
sync_path(self) property self.client_path(self._SYNC)
set_ttl(self, ttl) abstractmethod 设置leader key的ttl新值
ttl(self) abstractmethod 获取ttl新值
set_retry_timeout(self, retry_timeout) abstractmethod 设置retry_timeout的新值
_set_loop_wait(self, loop_wait) self._loop_wait = loop_wait
reload_config(self, config) self._set_loop_wait(config[‘loop_wait’]) self.set_ttl(config[‘ttl’]) self.set_retry_timeout(config[‘retry_timeout’])
loop_wait(self) property return self._loop_wait
last_seen(self) property return self._last_seen
_load_cluster(self) abc.abstractmethod 在内部,此方法应构建“集群”对象,该对象表示 DCS 中集群的当前状态和拓扑。 此方法应该仅由 get_cluster 方法调用。 如果 DCS 出现通信或其他问题,请引发 ~DCSError。 如果当前节点作为主节点运行并引发异常,则实例将被降级。
_bypass_caches(self) 仅由zookeeper使用
get_cluster(self, force=False)
cluster(self) property
reset_cluster(self)
_write_leader_optime(self, last_lsn) abc.abstractmethod 将当前 WAL LSN 写入 DCS 中的 /optime/leader 键 参数 last_lsn:绝对 WAL LSN(以字节为单位) 成功时返回:!True
write_leader_optime(self, last_lsn) self.write_status({self._OPTIME: last_lsn})
_write_status(self, value) abc.abstractmethod 将当前 WAL LSN 和永久槽的confirmed_flush_lsn 写入DCS 中的/status 键 参数值:以 JSON 形式序列化的状态 成功时返回:!True
_update_leader(self) abc.abstractmethod 如果领导者密钥(或会话)已成功更新,则更新领导者密钥(或会话)ttl:返回:!True。 如果不是,则必须返回 !False 并且当前实例将被降级。 您必须使用 CAS(比较和交换)操作来更新领导者密钥,例如 etcd 必须使用“prevValue”参数。
write_status(self, value)
update_leader(self, last_lsn, slots=None) 更新领导者密钥(或会话)ttl 和 optime/领导者 参数 last_lsn:绝对 WAL LSN(以字节为单位) 参数槽:带有永久槽的字典confirmed_flush_lsn 如果领导者密钥(或会话)已成功更新,则返回:!True。 如果不是,则必须返回 !False 并且当前实例将被降级
attempt_to_acquire_leader(self, permanent=False) abc.abstractmethod 尝试获取领导者锁 此方法应使用 value=~self._name 创建 /leader 键 :param 永久:如果设置为 !True,领导者密钥将永不过期。 用于外部主控的 patriotictl :returns: !True 如果密钥创建成功。 必须以原子方式创建密钥。 如果密钥已经存在,则不应该 被覆盖并且必须返回 !False
set_failover_value(self, value, index=None) abc.abstractmethod 创建或更新/failover key
manual_failover(self, leader, candidate, scheduled_at=None, index=None)
set_config_value(self, value, index=None) abc.abstractmethod 创建或更新/config key
abc.abstractmethod touch_member(self, data, permanent=False) 更新 DCS 中的成员密钥。 此方法应使用名称 = ‘/members/’ + ~self._name 创建或更新密钥并且值 = 给定 DCS 中的数据。:param data:关于实例的信息(包括连接字符串) :param ttl: ttl 为成员键,可选参数。 如果是 None ~self.member_ttl will be used :param 永久:如果设置为 !True,则成员密钥永不过期。 用于外部主设备的 patriotictl。 :returns: !True 成功否则 !False
take_leader(self) abc.abstractmethod 此方法应创建值 = ~self._name 和 ttl=~self.ttl 的领导键 由于只能在初始集群引导时调用它,因此无论如何它都可以创建此密钥, 必要时覆盖密钥
touch_member(self, data, permanent=False) abc.abstractmethod 更新 DCS 中的成员密钥。 此方法应使用名称 = ‘/members/’ + ~self._name 创建或更新密钥并且值 = 给定 DCS 中的数据。 :param data:关于实例的信息(包括连接字符串) :param ttl: ttl 为成员键,可选参数。 如果是 None ~self.member_ttl will be used :param 永久:如果设置为 !True,则成员密钥永不过期。 用于外部主设备的 patriotictl。 :returns: !True 成功否则 !False
initialize(self, create_new=True, sysid="") abc.abstractmethod 集群初始化竞赛。 :param create_new: 如果密钥应该已经存在则为 False(在我们设置 system_id 的情况下) :param sysid: PostgreSQL 集群系统标识符,如果指定,则写入 key :returns: !True 如果密钥创建成功。 这个方法应该创建原子初始化键并返回 !True 否则它应该返回 !False
_delete_leader(self) abc.abstractmethod 从 DCS 中删除领导者密钥。 如果当前实例是领导者,则此方法应删除领导者密钥
delete_leader(self, last_lsn=None) abc.abstractmethod 更新 optime/leader 并主动从 DCS 中移除 leader key。 如果当前实例是领导者,则此方法应删除领导者密钥。 :param last_lsn: 最新的检查点位置(以字节为单位)
cancel_initialization(self) abc.abstractmethod Removes the initialize key for a cluster
delete_cluster(self) abc.abstractmethod Delete cluster from DCS
sync_state(leader, sync_standby) staticmethod return {‘leader’: leader, ‘sync_standby’: sync_standby and ‘,’.join(sorted(sync_standby)) or None}
write_sync_state(self, leader, sync_standby, index=None)
set_history_value(self, value) abc.abstractmethod
set_sync_state_value(self, value, index=None) abc.abstractmethod
delete_sync_state(self, index=None) abc.abstractmethod
watch(self, leader_index, timeout) 如果当前节点是主节点,它应该只是休眠。 任何其他节点都应该在给定的超时时间内观察领导者密钥的变化 :param leader_index: 领导键的索引 :param timeout: 超时秒数 :returns: !True 如果你想重新安排下一次运行 ha 循环

watch函数如果当前节点是主节点,它应该只是休眠。任何其他节点都应该在给定的超时时间内观察领导者key的变化。

    def watch(self, leader_index, timeout):
        """If the current node is a master it should just sleep.
        Any other node should watch for changes of leader key with a given timeout
        :param leader_index: index of a leader key
        :param timeout: timeout in seconds
        :returns: `!True` if you would like to reschedule the next run of ha cycle"""
        self.event.wait(timeout)
        return self.event.isSet()
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Cluster类

代表 PostgreSQL 集群的不可变对象 (namedtuple)。 由以下字段组成:
:param initialize:显示该集群是否有存储在 DC 中的初始化密钥。
:param config: 全局动态配置,引用 ClusterConfig 对象
:param leader: Leader 对象,代表集群的当前领导者
:param last_lsn: int 或 long 对象,包含最后一个已知领导者 LSN 的位置。
此值存储在 /status 键或 /optime/leader(旧版)键中
:param members:Member对象列表,包括leader在内的所有PostgreSQL集群成员
:param 故障转移:对“故障转移”对象的引用
:param sync: 引用 SyncState 对象,最后观察到的同步复制状态。
:param history: 引用 TimelineHistory 对象
:param slot:主节点上永久逻辑复制槽的状态,格式为:{“slot_name”: int}
在这里插入图片描述
namedtuple是Python中存储数据类型,比较常见的数据类型还有有list和tuple数据类型。相比于list,tuple中的元素不可修改,在映射中可以当键使用。namedtuple类位于collections模块,有了namedtuple后通过属性访问数据能够让我们的代码更加的直观更好维护。
namedtuple能够用来创建类似于元祖的数据类型,除了能够用索引来访问数据,能够迭代,还能够方便的通过属性名来访问数据。接下来通过本文给大家分享python namedtuple()的使用,一起看看吧!

基本定义

collections.namedtuple(typename, field_names, *, rename=False, defaults=None, module=None)
(1)返回一个名为typename的新元组子类
(2)新的子类用于创建类似元组的对象,这些对象具有可通过属性查找访问的字段以及可索引和可​​迭代的字段field_names
typename
(1)typename表示这个子类的名字,比如C++、python、Java中的类名
field_names
(1)field_names是一个字符串序列,例如[‘x’,‘y’]
(2)field_names可以是单个字符串,每个字段名都用空格或逗号分隔,例如’x y’或’x,y’
others
(1)其它的参数并不常用,这里不再介绍啦

基本样例

from collections import namedtuple
# 基本例子
Point = namedtuple('Point',['x','y']) # 类名为Point,属性有'x'和'y'  
p = Point(11, y=22) # 用位置或关键字参数实例化,因为'x'在'y'前,所以x=11,和函数参数赋值一样的
print(p[0]+p[1]) # 我们也可以使用下标来访问
# 33  
x, y = p # 也可以像一个元组那样解析
print(x,y)
# (11, 22)  
print(p.x+p.y) # 也可以通过属性名来访问
# 33  
print(p) # 通过内置的__repr__函数,显示该对象的信息
# Point(x=11, y=22)
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

classmethod somenamedtuple._make(iterable)
(1)从一个序列或者可迭代对象中直接对field_names中的属性直接赋值,返回一个对象

t = [11, 22] # 列表 list
p = Point._make(t) # 从列表中直接赋值,返回对象
print(Point(x=11, y=22))
# Point(x=11, y=22)
python
运行
  • 1
  • 2
  • 3
  • 4

classmethod somenamedtuple._asdict()
(1)之前也说过了,说它是元组,感觉更像一个带名字的字典
(2)我们也可以直接使用_asdict()将它解析为一个字典dict

p = Point(x=11, y=22) # 新建一个对象
d = p._asdict() # 解析并返回一个字典对象
print(d)
# {'x': 11, 'y': 22}
python
运行
  • 1
  • 2
  • 3
  • 4

classmethod somenamedtuple._replace(**kwargs)
(1)这是对某些属性的值,进行修改的,从replace这个单词就可以看出来
(2)注意该函数返回的是一个新的对象,而不是对原始对象进行修改

p = Point(x=11, y=22) # x=11,y=22
print(p)
# Point(x=11, y=22)
d = p._replace(x=33) # x=33,y=22 新的对象
print(p)
# Point(x=11, y=22)
print(d)
# Point(x=33, y=22)
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

classmethod somenamedtuple._fields
(1)该方法返回该对象的所有属性名,以元组的形式
(2)因为是元组,因此支持加法操作

print(p._fields) # 查看属性名
# ('x', 'y')
  
Color = namedtuple('Color', 'red green blue')
Pixel = namedtuple('Pixel', Point._fields + Color._fields) # 新建一个子类,使用多个属性名
q = Pixel(11, 22, 128, 255, 0)
print(q)
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

classmethod somenamedtuple._field_defaults
(1)该方法是python3.8新增的函数,因为我的版本是3.6,无法验证其正确性
(2)下面给出官方的示例

Account = namedtuple('Account', ['type', 'balance'], defaults=[0])
print(Account._field_defaults)
#{'balance': 0}
print(Account('premium'))
#Account(type='premium', balance=0)
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5

getattr()函数

(1)用来获得属性的值

print(getattr(p, 'x'))
# 11
python
运行
  • 1
  • 2

字典创建namedtuple()
(1)从字典来构建namedtuple的对象

d = {'x': 11, 'y': 22} # 字典
p = Point(**d) # 双星号是重点
print(p)
# Point(x=11, y=22)
python
运行
  • 1
  • 2
  • 3
  • 4

CSV OR Sqlite3

(1)同样可以将从csv文件或者数据库中读取的文件存储到namedtuple中
(2)这里每次存的都是一行的内容

EmployeeRecord = namedtuple('EmployeeRecord', 'name, age, title, department, paygrade')
import csv
for emp in map(EmployeeRecord._make, csv.reader(open("employees.csv", "r"))):
 # 这里每行返回一个对象 注意!
 print(emp.name, emp.title)
import sqlite3
conn = sqlite3.connect('/companydata') # 连接数据库
cursor = conn.cursor()
cursor.execute('SELECT name, age, title, department, paygrade FROM employees')
for emp in map(EmployeeRecord._make, cursor.fetchall()):
 # 每行返回一个对象 注意!
 print(emp.name, emp.title)
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

类的继承

(1)接下来用deepmind的开源项目graph_nets中的一段代码来介绍

NODES = "nodes"
EDGES = "edges"
RECEIVERS = "receivers"
SENDERS = "senders"
GLOBALS = "globals"
N_NODE = "n_node"
N_EDGE = "n_edge" 
GRAPH_DATA_FIELDS = (NODES, EDGES, RECEIVERS, SENDERS, GLOBALS)
GRAPH_NUMBER_FIELDS = (N_NODE, N_EDGE)  
class GraphsTuple(
    # 定义元组子类名 以及字典形式的键名(属性名)
 collections.namedtuple("GraphsTuple",
    GRAPH_DATA_FIELDS + GRAPH_NUMBER_FIELDS)):   
    # 这个函数用来判断依赖是否满足,和我们的namedtuple关系不大
    def _validate_none_fields(self):
        """Asserts that the set of `None` fields in the instance is valid."""
        if self.n_node is None:
         raise ValueError("Field `n_node` cannot be None")
        if self.n_edge is None:
         raise ValueError("Field `n_edge` cannot be None")
        if self.receivers is None and self.senders is not None:
         raise ValueError(
             "Field `senders` must be None as field `receivers` is None")
        if self.senders is None and self.receivers is not None:
         raise ValueError(
             "Field `receivers` must be None as field `senders` is None")
        if self.receivers is None and self.edges is not None:
         raise ValueError(
             "Field `edges` must be None as field `receivers` and `senders` are "
             "None")
    # 用来初始化一些参数 不是重点
    def __init__(self, *args, **kwargs):
        del args, kwargs
        # The fields of a `namedtuple` are filled in the `__new__` method.
        # `__init__` does not accept parameters.
        super(GraphsTuple, self).__init__()
        self._validate_none_fields()  
    # 这就用到了_replace()函数,注意只要修改了属性值
    # 那么就返回一个新的对象
    def replace(self, **kwargs):
        output = self._replace(**kwargs) # 返回一个新的实例 
        output._validate_none_fields() # pylint: disable=protected-access 验证返回的新实例是否满足要求
        return output
    # 这是为了针对tensorflow1版本的函数
    # 返回一个拥有相同属性的对象,但是它的属性值是输入的大小和类型
    def map(self, field_fn, fields=GRAPH_FEATURE_FIELDS): # 对每个键应用函数
    """Applies `field_fn` to the fields `fields` of the instance.
    `field_fn` is applied exactly once per field in `fields`. The result must
    satisfy the `GraphsTuple` requirement w.r.t. `None` fields, i.e. the
    `SENDERS` cannot be `None` if the `EDGES` or `RECEIVERS` are not `None`,
    etc.
    Args:
     field_fn: A callable that take a single argument.
     fields: (iterable of `str`). An iterable of the fields to apply
        `field_fn` to.
    Returns:
     A copy of the instance, with the fields in `fields` replaced by the result
     of applying `field_fn` to them.
    """
    return self.replace(**{k: field_fn(getattr(self, k)) for k in fields}) # getattr(self, k) 获取的是键值对中的值, k表示键
python
运行
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

AbstractEtcd类

AbstractEtcd类是AbstractDCS接口的实现类


免责声明:本文系网络转载或改编,未找到原创作者,版权归原作者所有。如涉及版权,请联系删
相关文章
QR Code
微信扫一扫,欢迎咨询~

联系我们
武汉格发信息技术有限公司
湖北省武汉市经开区科技园西路6号103孵化器
电话:155-2731-8020 座机:027-59821821
邮件:tanzw@gofarlic.com
Copyright © 2023 Gofarsoft Co.,Ltd. 保留所有权利
遇到许可问题?该如何解决!?
评估许可证实际采购量? 
不清楚软件许可证使用数据? 
收到软件厂商律师函!?  
想要少购买点许可证,节省费用? 
收到软件厂商侵权通告!?  
有正版license,但许可证不够用,需要新购? 
联系方式 155-2731-8020
预留信息,一起解决您的问题
* 姓名:
* 手机:

* 公司名称:

姓名不为空

手机不正确

公司不为空