Zookeeper从单机到集群分布式
Zookeeper从单机到集群分布式场景安装及配置解压tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz重命名mv apache-zookeeper-3.6.0-bin zookeeper在/opt/zookeeper/这个目录上创建zkData和zkLog目录mkdir zkDatamkdir zkLog进入/opt/zookeeper/conf这个路径,复制
Zookeeper从单机到集群分布式及源码分析
场景
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
Zookeeper可以理解为一个文件系统+一个通知机制
特点
- 是一个leader和多个follower来组成的集群
- 集群中只要有半数以上的节点存活,Zookeeper就能正常工作
- 全局数据一致性,每台服务器都保存一份相同的数据副本,无论client连接哪台server,数据都是一致的
- 数据更新原子性,一次数据要么成功,要么失败
- 实时性,在一定时间范围内,client能读取到最新数据
数据结构
- Zookeeper的文件结构看起来像一棵树,和Linux的文件结构类似
- 每一个节点上可以存储1MB的容量,可以看成k-v的键值对形式
负载均衡
Zookeeper会记录每一台服务器的访问数量,争取做到雨露均沾
选举机制
- Zookeeper只要超过半数以上的服务器正常启动着,整个集群就会存活,因此推荐集群中搭建的服务器数量是单数
- 整个集群中会有一个Leader服务器,通过选举投票产生,获得半数以上的票则该服务器为Leader
1. 五台服务器,每台服务器只有一票,从左到右启动,每一台服务器会把票第一时间投给自己
2. 第一台服务器把票投给自己得到的票数不超过半数以上,第二台也是一样
3. 第一台和第二台互相告知,第一台发现第二台的id比自己大,于是把自己的票转移给第二台
4. 第二台得到两票后,仍不满足,继续等待其他服务器启动
5. 第三台服务器把票投给自己发现不满足leader条件,但第二台发现id没有第三台大,于是把票都给了第三台
6. 第三台拿到三票后已经超过了半数投票,成为集群中的Leader
7. 第四台和第五台尽管自己的id比第三台大,但是第三台已经成为Leader,则无可奈何只能选择成为Follwer
8. 如果第三台服务器挂掉,则重新选举,谁的id大谁就最有可能成为Leader
9. 影响成为Leader的三个因素:id、启动顺序、服务器的数量
节点类型
- 持久性
- 持久性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点依然存在
- 持久性带编号节点:客户端与Zookeeper断开连接后,不仅由此客户端创建的节点依然存在,创建的节点还会编排好编号类似于mysql的auto_increment
- 持久性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点依然存在
- 短暂性
- 短暂性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点删除
- 短暂性带编号节点:客户端与Zookeeper断开连接后,不仅由此客户端创建的节点删除,创建的节点还会编排好编号类似于mysql的auto_increment
- 短暂性节点:客户端与Zookeeper断开连接后,由此客户端创建的节点删除
监听原理
- Zookeeper的客户端启动后会创建两个线程,一个用来做数据的通信传输,一个用来监听某节点变化
- Zookeeper可以通过命令addWatch选择监听的节点
- 如果节点改变则会告知客户端中用于监听节点的线程
- 此线程无论是节点的数据还是节点的路径发生变化都会被告知
- 被告知后线程会在内部调用process方法
事务操作流程
- 如果客户端直接找的Leader服务器,则Leader服务器广播给每个Follwer
- 每个Follwer需要确定写入之前的数据和Leader一致,如果一致则告知Leader
- Leader如果收到一半以上的告知(包括Leader本身),则会同意本次写入操作
- Leader自身写入后会告知客户端,并通知其余Follwer
- 其余Follwer通过Leader保持数据的一致性
- 如果客户端找到是Follwer服务器,则此Follwer会先告知Leader
- Leader服务器广播给每个Follwer需要进行写入操作
- 每个Follwer需要确定写入之前的数据和Leader一致,如果一致则告知Leader
- Leader如果收到一半以上的告知(包括Leader本身),则会同意本次写入操作,并告知与客户端联系的Follwer
- 与客户端联系的Follwer会将消息告诉客户端,此时Leader会通知其余Follwer
- 其余Follwer通过Leader保持数据的一致性
安装及配置
-
解压
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
-
重命名
mv apache-zookeeper-3.6.0-bin zookeeper
-
在/opt/zookeeper/这个目录上创建zkData和zkLog目录
mkdir zkData mkdir zkLog
-
进入/opt/zookeeper/conf这个路径,复制一份 zoo_sample.cfg 文件并命名为 zoo.cfg
cd conf cp zoo_sample.cfg zoo.cfg
-
编辑zoo.cfg文件,修改dataDir路径:
dataDir=/opt/zookeeper/zkData dataLogDir=/opt/zookeeper/zkLog
-
zoo.cfg配置文件中的参数详解
tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。 initLimit =10:LF初始通信时限,集群中的Follower跟随者服务器与Leader领导者服务器之间,启动时能容忍的最多心跳数 10 * tickTime(10个心跳时间)如果领导和跟随者没有发出心跳通信,就视为失效的连接,领导和跟随者彻底断开。 syncLimit =5:LF同步通信时限,集群启动后,Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime->10秒,Leader就认为Follwer已经死掉,会将Follwer从服务器列表中删除。 dataDir:数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。 dataLogDir:日志文件目录。 clientPort =2181:客户端连接端口,监听客户端连接的端口。
常用命令
-
启动Zookeeper
./zkServer.sh start
-
查看进程是否启动(QuorumPeerMain:是zookeeper集群的启动入口类,是用来加载配置启动QuorumPeer线程的)
jps
-
查看状态
./zkServer.sh status
-
启动客户端
./zkCli.sh
-
退出客户端
quit
-
查看根节点下的一级节点
ls /
-
查看节点的详细数据
老版本:ls2 / 新版本:ls -s /
- cZxid:创建节点的事务
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
事务ID是ZooKeeper中所有修改总的次序。
每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。 - ctime:被创建的毫秒数(从1970年开始)
- mZxid:最后更新的事务zxid
- mtime:最后修改的毫秒数(从1970年开始)
- pZxid:最后更新的子节点zxid
- cversion:创建版本号,子节点修改次数
- dataVersion:数据变化版本号
- aclVersion:权限版本号
- ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点
则是0。 - dataLength:数据长度
- numChildren:子节点数
- cZxid:创建节点的事务
-
创建节点
创建节点:create /xx 创建节点并赋值:create /xx xxx 创建多级节点:create /xx/xx 创建短暂节点:create -e /xx 创建排序节点:create -s /xx 创建短暂并排序节点:create -e -s /xx
-
查看节点的值
get /xx
-
修改节点的值
set /xx xxx
-
删除节点
删除节点:delete /xx 递归删除存在子节点的节点:deleteall /xx
-
监听节点变化
addwatch /xxx 如果被监听的节点修改内容后则:WatchedEvent state:SyncConnected type:NodeDataChanged path:/xxx 如果被监听的节点创建子节点后则:WatchedEvent state:SyncConnected type:NodeCreated path:/xxx/yyy
集群部署
假设有三台服务器,集群部署Zookeeper(先搞定一台,再克隆两台)
-
在/zkData目录下创建myid文件
vim myid 1(编号,其余两台机器分别写2和3)
-
配置zoo.cfg文件
vim zoo.cfg #######################cluster########################## server.1=192.168.126.134:2888:3888 server.2=192.168.126.137:2888:3888 server.3=192.168.126.131:2888:3888
- 解读配置的内容 server.A=B:C:D
- A:一个数字,表示第几号服务器
集群模式下配置的/opt/zookeeper/zkData/myid文件里面的数据就是A的值 - B:服务器的ip地址
- C:与集群中Leader服务器交换信息的端口
- D:选举时专用端口,万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选
出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
- A:一个数字,表示第几号服务器
- 解读配置的内容 server.A=B:C:D
-
克隆其余两台服务器
- 在虚拟机数据目录vms下,创建zk02
- 将本台服务器数据目录下的.vmx文件和所有的.vmdk文件分别拷贝zk02下
- 虚拟机->文件->打开 (选择zk02下的.vmx文件)
- 开启此虚拟机,弹出对话框,选择“我已复制该虚拟机”
- 进入系统后,修改linux中的ip,修改/opt/zookeeper/zkData/myid中的数值为2
- 第三台服务器重复如上步骤 -
关闭每台机器的防火墙
临时关闭
systemctl stop firewalld.service
- 启动第一台并检查状态
./zkServer.sh start
./zkServer.sh status
得到结果:
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Error contacting service. It is probably not running.
因为没有超过半数以上的服务器,所以集群失败 (防火墙没有关闭也会导致失败)
- 当启动第2台服务器时
查看第1台的状态:Mode: follower
查看第2台的状态:Mode: leader
关联Java
-
创建Maven工程
-
添加pom文件
<dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
- 在resources下创建log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/zk.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 创建Zookeeper客户端
private String connStr = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181"; /** * session超时 60秒:一定不能太少,因为连接zookeeper和加载集群环境会因为性能原因延迟略高 * 如果时间太少,还没有创建好客户端,就开始操作节点,会报错的 */ private int sessionTimeout = 10 * 1000; private ZooKeeper zkCli; @Before public void init() throws Exception { zkCli = new ZooKeeper(connStr, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("监听到了"); System.out.println(watchedEvent.getType()); } }); } /** * 创建节点 * @throws Exception */ @Test public void create() throws Exception{ String value = "zookeeperStudy"; byte[] bytes = value.getBytes(); String nodeCreated = zkCli.create("/lagou", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("nodeCreated = " + nodeCreated); } @Test public void get() throws Exception{ byte[] data = zkCli.getData("/lagou", false, new Stat()); String str = new String(data); System.out.println("str = " + str); } @Test public void set() throws Exception{ Stat lagou = zkCli.setData("/lagou", "zookeeperA".getBytes(), 0); System.out.println("lagou = " + lagou); } @Test public void delete() throws Exception{ zkCli.delete("/test", 1); System.out.println("删除成功"); } @Test public void childRen() throws Exception{ List<String> children = zkCli.getChildren("/", true); for (String child : children) { System.out.println("child = " + child); } //顺便监听子节点变化 System.in.read(); } @Test public void exist() throws Exception{ Stat exists = zkCli.exists("/lagouafds", true); System.out.println(exists == null ? "不存在" : "存在"); }
商家消费者模型
需求
- 模拟美团服务平台,商家营业通知,商家打烊通知
商家类
/**
* @author 张哲
*/
public class ShopServer {
private String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
private static int sessionTimeout = 10 * 1000;
private ZooKeeper zk = null;
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
}
// 注册到集群
public void register(String ShopName) throws Exception {
// 一定是"EPHEMERAL_SEQUENTIAL短暂有序型"的节点,才能给shop编号,shop1. shop2...”
String create = zk.create("/meituan/Shop", ShopName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("【" + ShopName + "】 开始营业! " + create);
}
// 业务功能
public void business(String ShopName) throws Exception {
System.out.println("【"+ShopName+"】 正在营业中 ...");
System.in.read();
}
public static void main(String[] args) throws Exception {
ShopServer shop = new ShopServer();
// 1.连接zookeeper集群(和美团取得联系)
shop.getConnect();
// 2.将服务器节点注册(入住美团)
shop.register(args[0]);
// 3.业务逻辑处理(做生意)
shop.business(args[0]);
}
}
消费者
/**
* @author 张哲
*/
public class Customers {
private String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
private static int sessionTimeout = 10 * 1000;
private ZooKeeper zk = null;
// 创建到zk的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
// 再次获取所有商家
try {
getShopList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getShopList() throws Exception {
// 1获取服务器子节点信息,并且对父节点进行监听
List<String> shops = zk.getChildren("/meituan", true);
// 2存储服务器信息列表
ArrayList<String> shoplist = new ArrayList();
// 3遍历所有节点,获取节点中的主机名称信息
for (String shop : shops) {
System.out.println(zk);
byte[] data = zk.getData("/meituan/" + shop, false, new Stat());
shoplist.add(new String(data));
}
// 4打印服务器列表信息
System.out.println(shoplist);
}
// 业务功能
public void business() throws Exception {
System.out.println("客户正在浏览商家 ...");
System.in.read();
}
public static void main(String[] args) throws Exception{
Customers client = new Customers();
client.getConnect();
// 2.获取/meituan的子节点信息,从中获取服务器信息列表(从美团中获取商家列表)
client.getShopList();
// 3.业务进程启动 (对比商家,点餐)
client.business();
}
}
运行流程
-
IDEA中配置args参数
-
首先在linux中添加一个商家,然后观察客户端的控制台输出(商家列表会立刻更新出最新商
家),多添加几个,也会实时输出商家列表create /meituan/KFC "KFC" create /meituan/BKC "BurgerKing" create /meituan/baozi "baozi"
-
在linux中删除商家,在客户端的控制台也会实时看到商家移除后的最新商家列表
delete /meituan/baozi
分布式锁(核心代码)
非锁写法:
@Autowired
private ProductService productService;
@GetMapping("/product/reduce")
@ResponseBody
public Object reduceStock(int id) throws Exception{
productService.reduceStock(id);
return "ok";
}
加锁写法:
@Autowired
private ProductService productService;
private static String connectString = "192.168.126.131:2181,192.168.126.134:2181,192.168.126.137:2181";
@GetMapping("/product/reduce")
@ResponseBody
public Object reduceStock(int id) throws Exception{
// 重试策略 (1000毫秒试1次,最多试3次)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//1.创建curator工具对象
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
//2.根据工具对象创建“内部互斥锁”
InterProcessMutex lock = new InterProcessMutex(client, "/product_"+id);
try {
//3.加锁
lock.acquire();
productService.reduceStock(id);
}catch(Exception e){
if(e instanceof RuntimeException){
throw e;
}
}finally{
//4.释放锁
lock.release();
}
return "ok";
}
更多推荐
所有评论(0)