kafka组件里面生产者和消费者的原理

开场个人观察

Kafka 这种组件,刚开始学的时候很容易记成几个名词:Producer、Consumer、Topic、Partition、Broker、Offset。真正用到项目里以后才会发现,Kafka 的难点不在于“会不会发消息”,而在于吞吐量、顺序性、可靠性和消费进度之间的取舍。

在供应链、订单、库存、ERP 同步这类系统里,Kafka 很常见。比如订单创建后通知库存系统预占库存,采购单状态变化后通知财务系统生成应付记录,仓库出库后通知报表系统刷新数据。这些业务都有一个共同点:消息量可能很大,但业务又不能随便丢。

所以理解 Kafka,不能只看 API,要看完整链路:生产者怎么把消息写进去,Broker 怎么存,消费者怎么拉取,offset 怎么提交,失败时怎么恢复。

Kafka生产者消费者工作流程

核心观点

Kafka 的核心设计可以概括成三句话。

第一,生产者不是一条一条傻发,而是会把消息按 topic 和 partition 组织起来,经过序列化、分区选择、批量缓存后再发送给 Broker。

第二,Broker 不是把消息存在一个普通队列里,而是把消息追加到分区日志中。分区是 Kafka 并行能力的基础,日志追加是它高吞吐的基础。

第三,消费者不是等 Broker 推消息,而是主动 poll 拉取。消费者属于某个 consumer group,同一个 group 里一个分区同一时刻通常只会分给一个消费者处理,处理进度通过 offset 记录。

Kafka 的吞吐量来自批量、顺序写、分区并行和零拷贝等机制;可靠性来自副本、ack、幂等、事务和 offset 提交策略。项目里真正要做的,是根据业务重要性选择合适参数,而不是一味追求“最快”。

实践方法

先看生产者。生产者发送一条消息时,大致会经历这些步骤:

  1. 业务代码构造消息,比如订单号、业务类型、变更时间。
  2. 序列化,把对象转成字节。
  3. 选择分区,如果指定 key,通常会根据 key hash 到固定 partition。
  4. 放入本地缓冲区,按 batch 组织。
  5. Sender 线程把 batch 发给对应 Broker。
  6. Broker 追加日志并根据 ack 策略返回结果。

如果要增大生产吞吐量,常见方向有几个。

batch.size 可以调大,让更多消息合并成一个批次。批量越充分,网络请求次数越少。

linger.ms 可以适当增加,让生产者多等几毫秒凑批次。它会牺牲一点延迟,换更高吞吐。

compression.type 可以使用 lz4snappyzstd,减少网络传输和磁盘占用。消息体较大时效果明显。

分区数要足够。一个 topic 如果只有一个 partition,再多消费者也无法在同一个 consumer group 内并行消费这个 topic。

生产者可以设置 acks=all、开启幂等 enable.idempotence=true,并合理配置重试。这样可以在 Broker 短暂失败时自动恢复,同时避免重试造成重复写入。

再看消费者。消费者是通过 poll 拉取消息,处理后提交 offset。这里最关键的是 offset 提交时机。

如果先提交 offset 再处理业务,消费者宕机后这批消息可能永远不会再处理,容易丢消息。

如果先处理业务再提交 offset,宕机后可能重复消费,但至少消息不会丢。大多数核心业务更接受“重复但可幂等”,而不是“直接丢”。

所以在订单、库存这类场景里,我更倾向于手动提交 offset:

1
2
3
4
poll 消息
执行业务处理
写入业务库或幂等表
处理成功后 commit offset

为了防止重复消费,业务侧要做幂等。比如用消息唯一 ID 建一张消费记录表,或者让订单状态流转本身具备幂等判断:已经处理过的状态不再重复扣减库存。

踩坑提醒

第一个坑,是只调大分区数,不看消费者处理能力。分区数增加能提高并行度,但也会带来更多文件句柄、更多 leader 选举和更复杂的再均衡。分区不是越多越好。

第二个坑,是为了吞吐把 acks 调成 0。这样生产者发出去就不管了,速度很快,但 Broker 是否收到并不确定。日志、埋点可以这么考虑,订单状态、库存变更就不应该这么随意。

第三个坑,是自动提交 offset。自动提交很方便,但它提交的是消费进度,不是业务成功。只要业务处理和 offset 提交之间没有绑定,就要接受消息丢失或重复的风险。

第四个坑,是忽略 rebalance。消费者数量变化、心跳超时、poll 时间过长,都可能触发再均衡。处理单条消息耗时很长时,要注意 max.poll.interval.ms 和批量大小,避免消费者被踢出 group。

第五个坑,是把 Kafka 当数据库。Kafka 适合做日志流和消息流,不适合承担复杂查询。业务状态仍然要落到数据库、缓存或搜索系统中。

总结

Kafka 的生产者负责高效、可靠地把消息写入分区日志;消费者负责按分区拉取消息、处理业务并提交 offset。吞吐量靠批量、压缩、分区和顺序写;可靠性靠副本、ack、幂等、事务和手动提交。

在真实项目里,我会按业务重要性分层:日志类消息可以优先吞吐,核心业务消息优先可靠;允许重复,但不能无声丢失。只要这条原则清楚,Kafka 参数就不会乱调。

SQL里面各种锁的原理

SQL 里面的锁,表面上看是数据库为了防止并发冲突做的限制,实际本质是数据库在并发读写之间做秩序管理。只要系统里存在多个事务同时读写同一批数据,就一定会遇到锁。锁设计得好,系统可以同时保证数据正确和较高吞吐;锁用得不好,轻则接口变慢,重则死锁、阻塞、库存扣错、订单状态错乱。

很多开发者第一次接触锁,是因为线上出现了 Lock wait timeout exceededDeadlock found。但真正理解锁,不能只背“共享锁、排他锁、行锁、表锁”这些名称,而要看清楚三个问题:锁保护的对象是什么、锁之间是否兼容、锁在事务什么时候加上和释放。

SQL 锁类型和事务执行流程

为什么数据库需要锁

数据库锁要解决的核心问题是并发一致性。

假设供应链系统里有一条库存记录:

1
2
3
sku_id = 1001
warehouse_id = 8
available_qty = 10

现在两个订单同时提交,每个订单都要锁定 8 件库存。如果两个事务都先读到 available_qty = 10,然后都认为库存足够,再分别把库存更新为 2,就会出现超卖。数据库必须让这两个更新按某种顺序执行,或者让其中一个事务发现条件已经不满足。

锁就是这个顺序的基础。它告诉数据库:某个事务正在读或写某个资源,其他事务能不能同时读、能不能同时写、要不要等待。

共享锁和排他锁

共享锁也叫 S 锁,英文是 Shared Lock。它表示当前事务要读取数据,并且希望读取期间数据不要被别人修改。

排他锁也叫 X 锁,英文是 Exclusive Lock。它表示当前事务要修改数据,其他事务不能同时修改,也通常不能再加共享锁读取同一行。

它们的兼容关系可以简单理解为:

已有锁 新申请共享锁 新申请排他锁
共享锁 兼容 不兼容
排他锁 不兼容 不兼容

共享锁之间兼容,是因为多个事务同时读同一行数据不会破坏数据。排他锁和任何锁都不兼容,是因为写操作必须独占资源。

在 MySQL InnoDB 中,可以通过下面的方式显式加锁:

1
2
3
SELECT * FROM inventory
WHERE sku_id = 1001
LOCK IN SHARE MODE;

或者:

1
2
3
SELECT * FROM inventory
WHERE sku_id = 1001
FOR UPDATE;

LOCK IN SHARE MODE 倾向于加共享锁,FOR UPDATE 会对命中的记录加排他锁。业务里更常见的是更新语句自动加排他锁:

1
2
3
4
UPDATE inventory
SET available_qty = available_qty - 8
WHERE sku_id = 1001
AND available_qty >= 8;

这条 SQL 在更新命中的记录时,会自动对相关记录加排他锁。

表锁和行锁

表锁保护的是整张表。一个事务锁住表以后,其他事务对这张表的读写可能都会受到影响。表锁粒度大,管理简单,但并发能力弱。

行锁保护的是某一行或某个索引范围。行锁粒度小,并发能力强,但实现复杂,也更容易出现死锁。

举个例子,ERP 系统里有一张 order 表。如果系统对整张订单表加表锁,那么一个用户修改订单时,其他用户可能连其他订单也无法修改。这对高并发系统非常不友好。

如果使用行锁,一个用户修改订单 A001,另一个用户修改订单 A002,两者互不影响。只有两个事务同时修改同一张订单时,才需要等待。

InnoDB 支持行级锁,但有一个非常重要的前提:行锁通常是加在索引上的。如果查询条件没有命中索引,数据库可能扫描大量记录,锁范围也会扩大,甚至表现得像锁了很多行。

例如:

1
2
3
UPDATE order_info
SET status = 'CLOSED'
WHERE order_no = 'SO20220917001';

如果 order_no 有唯一索引,InnoDB 可以精准锁住这一行。如果 order_no 没有索引,数据库需要扫描全表判断哪些行满足条件,锁冲突风险就会明显增加。

意向锁

意向锁是很多人容易忽略的一类锁。它不是直接锁某一行业务数据,而是用来协调表锁和行锁。

假设事务 A 已经对订单表中的某一行加了排他行锁。此时事务 B 想对整张订单表加表级排他锁。数据库必须知道表里是否已经有行锁,否则就要扫描整张表逐行检查,成本很高。

意向锁就是一个提示:某个事务打算在这张表里的某些行上加锁。

常见意向锁有:

  • IS,意向共享锁,表示事务准备在某些行上加共享锁。
  • IX,意向排他锁,表示事务准备在某些行上加排他锁。

当事务要给某行加共享锁时,会先在表上加 IS 锁。当事务要给某行加排他锁时,会先在表上加 IX 锁。

意向锁的价值是让表级锁判断冲突更快。它像是在表门口挂了一个牌子:里面已经有人在某些行上操作,整表加锁前先看看是否兼容。

记录锁

记录锁是 InnoDB 最容易理解的行锁,它锁住的是索引上的一条记录。

例如库存表有唯一索引:

1
UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id)

执行:

1
2
3
4
5
SELECT *
FROM inventory
WHERE sku_id = 1001
AND warehouse_id = 8
FOR UPDATE;

如果命中一条记录,InnoDB 会对这条索引记录加排他记录锁。其他事务再想更新同一条库存记录,就必须等待当前事务提交或回滚。

记录锁适合解决“同一行业务数据不能被并发修改”的问题,例如订单状态流转、库存数量变更、账户余额扣减。

间隙锁

间隙锁锁住的不是已经存在的记录,而是索引记录之间的空隙。它的目的主要是防止幻读。

假设库存预警表里已有预警阈值:

1
threshold: 10, 20, 50

一个事务执行范围查询:

1
2
3
4
SELECT *
FROM stock_warning_rule
WHERE threshold BETWEEN 10 AND 50
FOR UPDATE;

如果数据库只锁住 10、20、50 这几条已经存在的记录,另一个事务仍然可以插入 threshold = 30 的新记录。第一个事务再次查询时,就会发现多了一条之前不存在的数据,这就是幻读。

间隙锁会锁住索引范围中的空隙,让其他事务不能在这个范围里插入新记录。它牺牲了一部分并发能力,换取范围查询的一致性。

需要注意,间隙锁依赖索引范围。如果 SQL 没有合适索引,锁范围可能比预期大很多。

Next-Key Lock

Next-Key Lock 可以理解为记录锁加间隙锁。它既锁住已经存在的索引记录,也锁住记录前后的范围。

在 InnoDB 的可重复读隔离级别下,范围查询加锁时经常会使用 Next-Key Lock 来防止幻读。

例如:

1
2
3
4
5
SELECT *
FROM purchase_order
WHERE supplier_id = 88
AND amount BETWEEN 10000 AND 50000
FOR UPDATE;

如果 supplier_id, amount 上有联合索引,InnoDB 会锁定这个索引范围内的记录和间隙。其他事务不能随便插入符合这个范围的新采购单。

Next-Key Lock 的好处是一致性强,坏处是容易让范围更新、范围查询变得更容易互相阻塞。因此业务 SQL 要尽量让范围条件走合适索引,避免锁住过大的范围。

乐观锁

乐观锁不是数据库引擎内部固定的一种锁,而是一种并发控制思想。它假设冲突不常发生,所以不提前阻塞别人,而是在提交更新时检查数据有没有被别人改过。

最常见实现是版本号字段:

1
2
3
4
5
6
7
UPDATE inventory
SET available_qty = available_qty - 8,
version = version + 1
WHERE sku_id = 1001
AND warehouse_id = 8
AND available_qty >= 8
AND version = 12;

如果更新影响行数为 1,说明版本没变,扣减成功。如果影响行数为 0,说明数据已经被别人改过,当前事务需要重试或提示失败。

乐观锁适合读多写少、冲突概率低的场景。例如商品资料编辑、客户档案修改、配置项更新。它的优点是不会长时间阻塞,缺点是冲突发生时需要业务处理重试和失败提示。

悲观锁

悲观锁也是一种思想。它假设冲突很可能发生,所以在操作前先把数据锁住。

典型写法是:

1
2
3
4
5
SELECT *
FROM inventory
WHERE sku_id = 1001
AND warehouse_id = 8
FOR UPDATE;

当前事务拿到锁以后,再计算库存、写入订单、更新库存。其他事务想修改同一条库存记录,就必须等待。

悲观锁适合写冲突高、数据不能错的场景,例如库存扣减、余额扣减、核心单据状态流转。缺点也明显:事务时间越长,等待越多,吞吐越低。

所以悲观锁一定要控制事务范围。不要在持有锁期间调用外部接口、发送 MQ、请求第三方系统,也不要在事务里做复杂计算。

元数据锁

元数据锁也叫 MDL,Metadata Lock。它保护的是表结构,而不是具体业务行。

当一个事务正在查询或修改某张表时,数据库会持有这张表的元数据锁,防止另一个会话同时修改表结构。否则就可能出现一个事务读表读到一半,另一个事务把字段删了。

常见问题是:一个长事务一直不提交,导致 ALTER TABLE 等 DDL 操作被阻塞;DDL 又反过来阻塞后续普通查询,最后形成一串等待。

例如:

1
2
3
BEGIN;
SELECT * FROM order_info WHERE id = 1;
-- 长时间不提交

此时另一个会话执行:

1
ALTER TABLE order_info ADD COLUMN source_type varchar(32);

DDL 可能会等待前面的事务释放 MDL。后续新的查询又可能排在 DDL 后面,导致业务接口突然大面积变慢。

线上做 DDL 时,必须关注长事务和元数据锁等待。大表变更最好使用在线 DDL 工具或低峰期执行。

自增锁

自增锁用于处理自增主键分配。多个事务同时插入数据时,数据库要保证自增 ID 不重复。

InnoDB 对自增锁做过很多优化,不同配置下表现不同。简单理解,普通插入通常可以较快分配自增值;批量插入、INSERT ... SELECT 这类语句可能持有自增相关锁更久。

业务上不建议依赖自增 ID 的连续性。事务回滚、插入失败、并发插入都可能造成 ID 跳号。自增 ID 的目标是唯一和大体递增,不是绝对连续。

死锁是怎么发生的

死锁是两个或多个事务互相等待对方释放锁。

例如供应链系统里同时更新订单和库存:

事务 A:

1
2
1. 锁订单 O1001
2. 再锁库存 S1001

事务 B:

1
2
1. 锁库存 S1001
2. 再锁订单 O1001

事务 A 拿到了订单锁,等待库存锁。事务 B 拿到了库存锁,等待订单锁。双方都不释放,就形成死锁。

数据库通常会检测死锁,并主动回滚其中一个事务。业务系统看到的就是死锁异常。

减少死锁的关键方法是:

  • 多表更新保持固定顺序。
  • 批量更新时按主键排序。
  • 事务尽量短。
  • 查询条件命中索引,减少锁范围。
  • 避免在事务中做远程调用。
  • 捕获死锁异常,对幂等操作做有限重试。

怎么排查锁等待

线上出现锁等待时,不要只盯着慢 SQL。要看谁在等锁,谁持有锁,事务已经执行了多久。

MySQL 里常用的排查方向包括:

1
SHOW PROCESSLIST;

查看当前连接状态。

1
SHOW ENGINE INNODB STATUS;

查看最近死锁、锁等待、事务信息。

在 MySQL 8 中,也可以通过 performance_schemasys 库查看锁等待关系。

排查时重点看:

  • 哪个事务持有锁。
  • 持锁事务执行了多久。
  • 等待的 SQL 是什么。
  • 是否存在未提交长事务。
  • SQL 是否走了索引。
  • 是否有 DDL 和普通业务 SQL 互相阻塞。

实际开发里的用锁建议

第一,能用一条原子 SQL 解决的,不要拆成先查再改。

库存扣减推荐写成:

1
2
3
4
5
6
UPDATE inventory
SET available_qty = available_qty - 8,
locked_qty = locked_qty + 8
WHERE sku_id = 1001
AND warehouse_id = 8
AND available_qty >= 8;

然后根据影响行数判断是否成功。

第二,加锁查询必须有合适索引。

FOR UPDATE 不是魔法。如果条件没有索引,锁范围会扩大,性能和并发都会出问题。

第三,事务里只放必须保持一致的操作。

订单创建、库存锁定、订单主表写入可以放在事务里;短信通知、日志上报、消息推送应该放到事务提交后。

第四,统一更新顺序。

如果业务规定先锁订单,再锁库存,再锁财务单据,那么所有代码都要遵守这个顺序。不要一个接口先锁订单,另一个接口先锁库存。

第五,锁冲突高的热点数据要做业务拆分。

例如某个爆款 SKU 的库存行成为热点,可以按仓库、批次、库存桶拆分,降低单行竞争。

总结

SQL 里的锁不是孤立概念,而是一套并发控制体系。共享锁和排他锁决定读写是否兼容,表锁和行锁决定锁粒度,意向锁协调表级和行级锁,记录锁保护已有记录,间隙锁和 Next-Key Lock 保护索引范围,乐观锁和悲观锁是两种业务并发控制思想,元数据锁保护表结构,自增锁保证自增值分配。

真正写业务代码时,最重要的不是记住所有锁名,而是控制三个东西:索引、事务范围、更新顺序。索引决定锁得准不准,事务范围决定锁持有多久,更新顺序决定是否容易死锁。把这三点做好,绝大多数 SQL 锁问题都会少很多。

redis入门

安装gcc

  • Redis在linux上的安装首先必须先安装gcc,这个是用来编译redis的源文件的。首先需要先切换的到root用户:

    [chenjx@localhost ~]$ su
    Password: 
    [root@localhost chenjx]#
    
  • 然后开始安装gcc:

    //这个命令是在线安装的,所以在这之前你的PC必须能够上网
    [root@localhost /]# yum install gcc-c++
    

解压redis的源文件

[root@localhost chenjx]# cd Desktop/
[root@localhost Desktop]# ls
redis-4.0.1.tar.gz
[root@localhost Desktop]# tar zxvf redis-4.0.1.tar.gz 
redis-4.0.1/
redis-4.0.1/.gitignore
redis-4.0.1/00-RELEASENOTES
redis-4.0.1/BUGS
redis-4.0.1/CONTRIBUTING

进入redis的解压目录

[root@localhost Desktop]# ls
redis-4.0.1  redis-4.0.1.tar.gz
[root@localhost Desktop]# cd redis-4.0.1/

使用make命令编译

[root@localhost redis-4.0.1]# make

//出现以下信息即是编译成功
CC notify.o
CC setproctitle.o
CC blocked.o
CC hyperloglog.o
CC latency.o
CC sparkline.o
CC redis-check-rdb.o
CC redis-check-aof.o
CC geo.o
CC lazyfree.o
CC module.o
CC evict.o
CC expire.o
CC geohash.o
CC geohash_helper.o
CC childinfo.o
CC defrag.o
CC siphash.o
CC rax.o
LINK redis-server
INSTALL redis-sentinel
CC redis-cli.o
LINK redis-cli
CC redis-benchmark.o
LINK redis-benchmark
INSTALL redis-check-rdb
INSTALL redis-check-aof

进入解压的src目录下

[root@localhost redis-4.0.1]# cd src

运行make test测试是否可以安装,检查的一大堆,都是绿色的ok。这就可以了

安装

[root@localhost src]# make PREFIX=/usr/local/redis install 

安装成功后,去/usr/local/redis/bin目录看,会发现如下文件:

都是一个个工具命令

然后把解压的redis路径下的redis.conf文件拷贝到安装路径/usr/local/redis下面

[root@localhost redis-4.0.1]# cp redis.conf /usr/local/redis
[root@localhost redis-4.0.1]# cd /usr/local/redis
[root@localhost redis]# ls
bin  redis.conf

到这里redis已经安装成功了。

启动redis

第一种方法:进入安装路径下的bin

[root@localhost redis]# cd bin
[root@localhost bin]# ./redis-server

但是这属于前端启动,启动redis之后,我们的控制台就不能进行任何操作了。只能ctrl+c停止启动。

第二种方法:后端启动

  • 首先编辑redis.conf

    [root@localhost redis]# vim redis.conf

  • 找到daemonize no将其改为yes

  • 再次启动

    [root@localhost redis]# ls
    bin  redis.conf
    [root@localhost redis]# ./bin/redis-server ./redis.conf
    //这里加载配置文件
    
    80167:C 30 Jul 16:01:58.145 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
    80167:C 30 Jul 16:01:58.145 # Redis version=4.0.1, bits=64, commit=00000000, modified=0, pid=80167, just started
    80167:C 30 Jul 16:01:58.145 # Configuration loaded
    

这样redis就启动了

可以通过
ps -ef|grep redis
来查看是否启动

关闭redis

[root@localhost redis]# ./bin/redis-cli shutdown

简单的使用

//首先链接客户端
[root@localhost redis]# ./bin/redis-cli
//检查网络是否可以
127.0.0.1:6379> ping
PONG
//设置一个键值对
127.0.0.1:6379> set name chenjx
OK
//获取刚刚设置的键值对
127.0.0.1:6379> get name
"chenjx"
//查看所有的键
127.0.0.1:6379> keys *
1) "name"
//删除name这个键
127.0.0.1:6379> del name
(integer) 1
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>

设置登录密码

redis在生产环境中通常都会设置密码以保证一定的安全性

首先修改redis.conf

打开redis.conf文件,搜索requirepass关键字

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
# requirepass foobared

关注标记的那一行,#requirepass foobared。设置密码的方法就是去掉注释的#,把foobared替换成自己的密码即可,例如将密码设置为123456:

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
requirepass 123456

修改完成后重启redis,再次通过redis客户端redis-cli登录并操作可以发现会报一个身份认证错误:

这就说明我们已经成功的设置了密码,所以通过客户端连接的话必须加上密码参数才能正常连接:

如上图所示,加了-a参数之后即可正常连接并操作redis。

jedis连接设置密码

当我们用Java客户端连接redis时会遇到同样的问题,下面看一段简单的jedis连接redis的测试代码:

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

运行junit后我们发现报异常了:

redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
at redis.clients.jedis.Protocol.processError(Protocol.java:117)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)

由于我们设置了密码但在这里又没有指定密码,所以报了和刚才相同的错误,Jedis的父类BinaryJedis提供了这样一样方法:

public String auth(final String password) {
    checkIsInMulti();
    client.auth(password);
    return client.getStatusCodeReply();
}

所以在创建了Jedis的实例后再加上一行jedis.auth(“123456”); 即可

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
    jedis.auth("123456");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

spring-data-redis设置密码

通常情况下在实际的java项目中我们会选择Spring提供的spring-data-redis来操作redis,spring的封装可以给我们提供很多便捷之处。那么spring-data-redis又是如何设置密码的呢?首先定义一个redis.properties配置文件,定义一组redis属性供spring加载使用,其中就包含密码(redis.password):

# Redis settings  
redis.host=192.168.145.10 
redis.port=6379  
redis.password=123456
redis.database=2
redis.timeout=100000  
redis.maxTotal=300  
redis.maxIdle=100
redis.minIdle=10
redis.maxWaitMillis=1000  
redis.testOnBorrow=true  

然后在由Spring封装的JedisConnectionFactory中来设置密码属性即可,下面是完整redis配置:

<!-- redis配置 -->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.maxIdle}" />
    <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    <property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>

<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
    p:host-name="${redis.host}" p:port="${redis.port}" 
    p:password="${redis.password}" p:pool-config-ref="poolConfig" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
</bean>

Redis常用数据类型介绍、使用场景及其操作命令

Redis目前支持5种数据类型,分别是:

  • String(字符串)
  • List(列表)
  • Hash(字典)
  • Set(集合)
  • Sorted Set(有序集合)

下面就分别介绍这五种数据类型及其相应的操作命令。

String(字符串)

String是简单的 key-value 键值对,value 不仅可以是 String,也可以是数字。String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

  • 应用场景

String是最常用的一种数据类型,普通的key/value存储都可以归为此类,这里就不所做解释了。

  • 相关命令

    SET key value                   设置key=value
    GET key                         或者键key对应的值
    GETRANGE key start end          得到字符串的子字符串存放在一个键
    GETSET key value                设置键的字符串值,并返回旧值
    GETBIT key offset               返回存储在键位值的字符串值的偏移
    MGET key1 [key2..]              得到所有的给定键的值
    SETBIT key offset value         设置或清除该位在存储在键的字符串值偏移
    SETEX key seconds value         键到期时设置值
    SETNX key value                 设置键的值,只有当该键不存在
    SETRANGE key offset value       覆盖字符串的一部分从指定键的偏移
    STRLEN key                      得到存储在键的值的长度
    MSET key value [key value...]   设置多个键和多个值
    MSETNX key value [key value...] 设置多个键多个值,只有在当没有按键的存在时
    PSETEX key milliseconds value   设置键的毫秒值和到期时间
    INCR key                        增加键的整数值一次
    INCRBY key increment            由给定的数量递增键的整数值
    INCRBYFLOAT key increment       由给定的数量递增键的浮点值
    DECR key                        递减键一次的整数值
    DECRBY key decrement            由给定数目递减键的整数值
    APPEND key value                追加值到一个键
    
  • 其中用于操作管理键的命令有:

    DEL key                         如果存在删除键
    DUMP key                        返回存储在指定键的值的序列化版本
    EXISTS key                      此命令检查该键是否存在
    EXPIRE key seconds              指定键的过期时间
    EXPIREAT key timestamp          指定的键过期时间。在这里,时间是在Unix时间戳格式
    PEXPIRE key milliseconds        设置键以毫秒为单位到期
    PEXPIREAT key milliseconds-timestamp        设置键在Unix时间戳指定为毫秒到期
    KEYS pattern                    查找与指定模式匹配的所有键
    MOVE key db                     移动键到另一个数据库
    PERSIST key                     移除过期的键
    PTTL key                        以毫秒为单位获取剩余时间的到期键。
    TTL key                         获取键到期的剩余时间。
    RANDOMKEY                       从Redis返回随机键
    RENAME key newkey               更改键的名称
    RENAMENX key newkey             重命名键,如果新的键不存在
    TYPE key                        返回存储在键的数据类型的值。
    
  • 实例:

    redis 127.0.0.1:6379> set baidu http://www.baidu
    OK
    redis 127.0.0.1:6379> append baidu .com
    (integer) 20
    redis 127.0.0.1:6379> get baidu
    "http://www.baidu.com"
    redis 127.0.0.1:6379> set visitors 0
    OK
    redis 127.0.0.1:6379> incr visitors
    (integer) 1
    redis 127.0.0.1:6379> incr visitors
    (integer) 2
    redis 127.0.0.1:6379> get visitors
    "2"
    redis 127.0.0.1:6379> incrby visitors 100
    (integer) 102
    redis 127.0.0.1:6379> get visitors
    "102"
    redis 127.0.0.1:6379> type baidu
    string
    redis 127.0.0.1:6379> type visitors
    string
    redis 127.0.0.1:6379> ttl baidu
    (integer) -1
    redis 127.0.0.1:6379> rename baidu baidu-site
    OK
    redis 127.0.0.1:6379> get baidu
    (nil)
    redis 127.0.0.1:6379> get baidu-site
    "http://www.baidu.com"
    

List(列表)

Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用的这个数据结构。可以从头部或尾部向Redis列表添加元素。列表的最大长度为2^32 - 1,也即每个列表支持超过40亿个元素。

  • 应用场景

Redis list的应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表、粉丝列表等都可以用Redis的list结构来实现,再比如有的应用使用Redis的list类型实现一个简单的轻量级消息队列,生产者push,消费者pop

  • 相关命令

    BLPOP
    BLPOP key1 [key2 ] timeout 取出并获取列表中的第一个元素,或阻塞,直到有可用
    BRPOP
    BRPOP key1 [key2 ] timeout 取出并获取列表中的最后一个元素,或阻塞,直到有可用
    BRPOPLPUSH
    BRPOPLPUSH source destination timeout 从列表中弹出一个值,它推到另一个列表并返回它;或阻塞,直到有可用
    LINDEX
    LINDEX key index 从一个列表其索引获取对应的元素
    LINSERT
    LINSERT key BEFORE|AFTER pivot value 在列表中的其他元素之后或之前插入一个元素
    LLEN
    LLEN key 获取列表的长度
    LPOP
    LPOP key 获取并取出列表中的第一个元素
    LPUSH
    LPUSH key value1 [value2] 在前面加上一个或多个值的列表
    LPUSHX
    LPUSHX key value 在前面加上一个值列表,仅当列表中存在
    LRANGE
    LRANGE key start stop 从一个列表获取各种元素
    LREM
    LREM key count value 从列表中删除元素
    LSET
    LSET key index value 在列表中的索引设置一个元素的值
    LTRIM
    LTRIM key start stop 修剪列表到指定的范围内
    RPOP
    RPOP key 取出并获取列表中的最后一个元素
    RPOPLPUSH
    RPOPLPUSH source destination 删除最后一个元素的列表,将其附加到另一个列表并返回它
    RPUSH
    RPUSH key value1 [value2] 添加一个或多个值到列表头部
    RPUSHX
    RPUSHX key value 添加一个值列表,仅当列表中存在
    
  • 使用示例

    redis 127.0.0.1:6379> lpush list1 redis
    (integer) 1
    redis 127.0.0.1:6379> lpush list1 hello
    (integer) 2
    redis 127.0.0.1:6379> rpush list1 world
    (integer) 3
    redis 127.0.0.1:6379> llen list1
    (integer) 3
    redis 127.0.0.1:6379> lrange list1 0 3
    1) "hello"
    2) "redis"
    3) "world"
    redis 127.0.0.1:6379> lpop list1
    "hello"
    redis 127.0.0.1:6379> rpop list1
    "world"
    redis 127.0.0.1:6379> lrange list1 0 3
    1) "redis"
    

Hash(字典,哈希表)

类似C#中的dict类型或者C++中的hash_map类型

Redis Hash对应Value内部实际就是一个HashMap,实际这里会有2种不同实现,这个Hash的成员比较少时Redis为了节省内存会采用类似一维数组的方式来紧凑存储,而不会采用真正的HashMap结构,对应的value redisObject的encoding为zipmap,当成员数量增大时会自动转成真正的HashMap

  • 应用场景

假设有多个用户及对应的用户信息,可以用来存储以用户ID为key,将用户信息序列化为比如json格式做为value进行保存。

  • 相关命令

    HDEL
    HDEL key field[field...] 删除对象的一个或几个属性域,不存在的属性将被忽略
    HEXISTS
    HEXISTS key field 查看对象是否存在该属性域
    HGET
    HGET key field 获取对象中该field属性域的值
    HGETALL
    HGETALL key 获取对象的所有属性域和值
    HINCRBY
    HINCRBY key field value 将该对象中指定域的值增加给定的value,原子自增操作,只能是integer的属性值可以使用
    HINCRBYFLOAT
    HINCRBYFLOAT key field increment 将该对象中指定域的值增加给定的浮点数
    HKEYS
    HKEYS key 获取对象的所有属性字段
    HVALS
    HVALS key 获取对象的所有属性值
    HLEN
    HLEN key 获取对象的所有属性字段的总数
    HMGET
    HMGET key field[field...] 获取对象的一个或多个指定字段的值
    HSET
    HSET key field value 设置对象指定字段的值
    HMSET
    HMSET key field value [field value ...] 同时设置对象中一个或多个字段的值
    HSETNX
    HSETNX key field value 只在对象不存在指定的字段时才设置字段的值
    HSTRLEN
    HSTRLEN key field 返回对象指定field的value的字符串长度,如果该对象或者field不存在,返回0.
    HSCAN
    HSCAN key cursor [MATCH pattern] [COUNT count] 类似SCAN命令
    
  • 使用示例

    127.0.0.1:6379> hset person name jack
    (integer) 1
    127.0.0.1:6379> hset person age 20
    (integer) 1
    127.0.0.1:6379> hset person sex famale
    (integer) 1
    127.0.0.1:6379> hgetall person
    1) "name"
    2) "jack"
    3) "age"
    4) "20"
    5) "sex"
    6) "famale"
    127.0.0.1:6379> hkeys person
    1) "name"
    2) "age"
    3) "sex"
    127.0.0.1:6379> hvals person
    1) "jack"
    2) "20"
    3) "famale"
    

Set(集合)

可以理解为一堆值不重复的列表,类似数学领域中的集合概念,且Redis也提供了针对集合的求交集、并集、差集等操作。

set 的内部实现是一个 value永远为null的HashMap,实际就是通过计算hash的方式来快速排重的,这也是set能提供判断一个成员是否在集合内的原因。

  • 应用场景

Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。

  • 相关命令

    SADD
    SADD key member [member ...] 添加一个或者多个元素到集合(set)里
    SACRD
    SCARD key 获取集合里面的元素数量
    SDIFF
    SDIFF key [key ...] 获得队列不存在的元素
    SDIFFSTORE
    SDIFFSTORE destination key [key ...] 获得队列不存在的元素,并存储在一个关键的结果集
    SINTER
    SINTER key [key ...] 获得两个集合的交集
    SINTERSTORE
    SINTERSTORE destination key [key ...] 获得两个集合的交集,并存储在一个集合中
    SISMEMBER
    SISMEMBER key member 确定一个给定的值是一个集合的成员
    SMEMBERS
    SMEMBERS key 获取集合里面的所有key
    SMOVE
    SMOVE source destination member 移动集合里面的一个key到另一个集合
    SPOP
    SPOP key [count] 获取并删除一个集合里面的元素
    SRANDMEMBER
    SRANDMEMBER key [count] 从集合里面随机获取一个元素
    SREM
    SREM key member [member ...] 从集合里删除一个或多个元素,不存在的元素会被忽略
    SUNION
    SUNION key [key ...] 添加多个set元素
    SUNIONSTORE
    SUNIONSTORE destination key [key ...] 合并set元素,并将结果存入新的set里面
    SSCAN
    SSCAN key cursor [MATCH pattern] [COUNT count] 迭代set里面的元素
    
  • 使用示例

    redis> SADD myset "Hello"
    (integer) 1
    redis> SADD myset "World"
    (integer) 1
    redis> SMEMBERS myset
    1) "World"
    2) "Hello"
    redis> SADD myset "one"
    (integer) 1
    redis> SISMEMBER myset "one"
    (integer) 1
    redis> SISMEMBER myset "two"
    (integer) 0
    

Sorted Set(有序集合)

Redis有序集合类似Redis集合,不同的是增加了一个功能,即集合是有序的。一个有序集合的每个成员带有分数,用于进行排序。

Redis有序集合添加、删除和测试的时间复杂度均为O(1)(固定时间,无论里面包含的元素集合的数量)。列表的最大长度为2^32- 1元素(4294967295,超过40亿每个元素的集合)。

Redis sorted set的内部使用HashMap和跳跃表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

  • 使用场景

Redis sorted set的使用场景与set类似,区别是set不是自动有序的,而sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set数据结构.

比如用户的积分排行榜需求就可以通过有序集合实现。还有上面介绍的使用List实现轻量级的消息队列,其实也可以通过Sorted Set实现有优先级或按权重的队列。

  • 相关命令

    ZADD
    ZADD key score1 member1 [score2 member2] 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数
    ZCARD
    ZCARD key 得到的有序集合成员的数量
    ZCOUNT
    ZCOUNT key min max 计算一个有序集合成员与给定值范围内的分数
    ZINCRBY
    ZINCRBY key increment member 在有序集合增加成员的分数
    ZINTERSTORE
    ZINTERSTORE destination numkeys key [key ...] 多重交叉排序集合,并存储生成一个新的键有序集合。
    ZLEXCOUNT
    ZLEXCOUNT key min max 计算一个给定的字典范围之间的有序集合成员的数量
    ZRANGE
    ZRANGE key start stop [WITHSCORES] 由索引返回一个成员范围的有序集合(从低到高)
    ZRANGEBYLEX
    ZRANGEBYLEX key min max [LIMIT offset count]返回一个成员范围的有序集合(由字典范围)
    ZRANGEBYSCORE
    ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] 返回有序集key中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员,有序集成员按 score 值递增(从小到大)次序排列
    ZRANK
    ZRANK key member 确定成员的索引中有序集合
    ZREM
    ZREM key member [member ...] 从有序集合中删除一个或多个成员,不存在的成员将被忽略
    ZREMRANGEBYLEX
    ZREMRANGEBYLEX key min max 删除所有成员在给定的字典范围之间的有序集合
    ZREMRANGEBYRANK
    ZREMRANGEBYRANK key start stop 在给定的索引之内删除所有成员的有序集合
    ZREMRANGEBYSCORE
    ZREMRANGEBYSCORE key min max 在给定的分数之内删除所有成员的有序集合
    ZREVRANGE
    ZREVRANGE key start stop [WITHSCORES] 返回一个成员范围的有序集合,通过索引,以分数排序,从高分到低分
    ZREVRANGEBYSCORE
    ZREVRANGEBYSCORE key max min [WITHSCORES] 返回一个成员范围的有序集合,以socre排序从高到低
    ZREVRANK
    ZREVRANK key member 确定一个有序集合成员的索引,以分数排序,从高分到低分
    ZSCORE
    ZSCORE key member 获取给定成员相关联的分数在一个有序集合
    ZUNIONSTORE
    ZUNIONSTORE destination numkeys key [key ...] 添加多个集排序,所得排序集合存储在一个新的键
    ZSCAN
    ZSCAN key cursor [MATCH pattern] [COUNT count] 增量迭代排序元素集和相关的分数
    
  • 使用示例

    redis 127.0.0.1:6379> zadd dbs 100 redis
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 98 memcached
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 99 mongodb
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 99 leveldb
    (integer) 1
    redis 127.0.0.1:6379> zcard dbs
    (integer) 4
    redis 127.0.0.1:6379> zcount dbs 10 99
    (integer) 3
    redis 127.0.0.1:6379> zrank dbs leveldb
    (integer) 1
    redis 127.0.0.1:6379> zrank dbs other
    (nil)
    redis 127.0.0.1:6379> zrangebyscore dbs 98 100
    1) "memcached"
    2) "leveldb"
    3) "mongodb"
    4) "redis"
    

kafka集群

Kafka初识

  • Kafka使用背景

    在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:

    • 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
    • 我想对用户的搜索关键词进行统计,分析出当前的流行趋势
    • 有些数据,存储数据库浪费,直接存储硬盘效率又低
  • Kafka的定义

    它是一个分布式消息系统,具有高水平扩展和高吞吐量的特点。

Kafka相关概念

  • AMQP协议:是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。

  • 一些基本的概念

    • 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
    • 生产者:(Producer):向broker发布消息的应用程序
    • AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

  • Kafka架构

    生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:

    kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:

    一些基本的概念:

    • 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
    • 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

      kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

      工作图:

      备份(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

Kafka集群搭建

  • 软件环境

    • 已经搭建好的zookeeper集群(文章最后另写)
    • 软件版本kafka_2.11-0.9.0.1.tgz
    • 软件版本kafka_2.11-0.9.0.1.tgz
  • 创建目录并下载安装软件

    #下载软件
    wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
    #解压软件
    tar -xvf kafka_2.11-0.9.0.1.tgz
    
  • 修改配置文件

    主要关注config/server.properties

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=9092 #当前kafka对外提供服务的端口默认是9092
    host.name = 120.76.230.134 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题
    #申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置
    listeners = PLAINTEXT://120.76.230.134:9092 
    advertised.host.name = 120.76.230.134
    advertised.listeners=PLAINTEXT://120.76.230.134:9092
    log.dirs=/data/kafka/kafkalogs  #消息存放的目录
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    zookeeper.connect=120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 #zookeeper配置地址
    
每个机子的broker.id、host.name都是不一样的,主要是关注以上几项参数配置。
  • 启动Kafka集群并测试

    启动服务

    #从后台启动Kafka集群(3台都需要启动)
    cd /data/kafka/kafka_2.11-0.11.0.1/bin #进入到kafka的bin目录 
    ./kafka-server-start.sh -daemon ../config/server.properties
    

    检查服务是否启动

    #执行命令jps
    22246 Bootstrap
    26393 Kafka
    

    创建Topic来验证是否创建成功

    #创建Topic
    ./kafka-topics.sh --create --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 --replication-factor 2 --partitions 1 --topic test
    
    #解释
    --replication-factor 2   #复制两份
    --partitions 1 #创建1个分区
    --topic #主题为test
    
    #在一台服务器上创建一个发布者
    #创建一个broker,发布者
    ./kafka-console-producer.sh --broker-list 120.76.230.134:9092 --topic test
    
    #在一台服务器上创建一个订阅者
    ./kafka-console-consumer.sh --bootstrap-server 120.76.230.134:9092 --topic test --from-beginning
    

    测试(在发布者那里发布消息看看订阅者那里是否能正常收到~):

  • 其他命令

    大部分命令可以去官方文档查看

    • 查看topic

      ./kafka-topics.sh --list --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181
      #就会显示我们创建的所有topic
      

      kafka集群搭建完毕

zookeeper集群部署

  • 为什么是基数

    • 集群必须有一半以上的机器统一才能成为leader
    • 一半的机器挂掉 整个集群挂掉
  • 去官网下载好zookeeper包,解压

  • 修改配置/conf/zoo.cfg文件

    # 基本事件单元,以毫秒为单位,用来控制心跳和超时
    tickTime=2000
    # 参数设定了允许所有跟随者与领导者进行连接并同步的时间,如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,进行另一次的领导选举
    # 如果zk集群环境数量确实很大,同步数据的时间会变长,因此这种情况下可以适当调大该参数
    #集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
    initLimit=10
    # 参数设定了允许一个跟随者与一个领导者进行同步的时间,如果在设定的时间段内,跟随者未完成同步,它将会被集群丢弃
    # 所有关联到这个跟随者的客户端将连接到另外一个跟随着
    syncLimit=5
    # 存储持久数据的本地文件系统位置
    dataDir=/data/zookeeper-3.4.9/data
    dataLogDir=/data/zookeeper-3.4.9/logs
    # 监听客户端连接的端口
    clientPort=5181
    
    server.1=120.76.230.134:2888:3888
    server.2=120.76.193.192:2888:3888
    server.3=120.25.239.155:2888:3888
    
    #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
    #120.76.230.134为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
    

    zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。

  • 创建myid文件

    /data/zookeeper-3.4.9/data目录下要配置myid文件,内容跟server.id相对应,比如:1

    ./zkServer.sh star启动zookeeper,每台都要启动一遍

  • 重要配置说明

    • myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

    • zoo.cfg 文件是zookeeper配置文件 在conf目录里。

快速排序

快速排序

介绍

在数组中找一个支点(任意),经过一趟排序后,支点左边的数都要比支点小,支点右边的数都要比支点大!

现在我们有一个数组:int arr[]={1,4,5,67,2,7,8,6,9,44};

经过一趟排序之后,如果我选择数组中间的数作为支点:7(任意的),那么第一趟排序后的结果是这样的:{1,4,5,6,2,7,8,67,9,44}

那么就实现了支点左边的数比支点小,支点右边的数比支点大

递归分析与代码实现

现在我们的数组是这样的:{1,4,5,6,2,7,8,67,9,44},既然我们比7小的在左边,比7大的在右边,那么我们只要将”左边“的排好顺序,又将”右边“的排好序,那整个数组是不是就有序了?

回顾一下递归:”左边“的排好顺序,”右边“的排好序,跟我们第一趟排序的做法是一样的。

只不过是参数不一样:第一趟排序是任选了一个支点,比支点小的在左边,比支点大的在右边。那么,我们想要”左边“的排好顺序,只要在”左边“部分找一个支点,比支点小的在左边,比支点大的在右边。

递归出口也很容易找到:如果数组只有一个元素时,那么就不用排序了

代码如下:

public static void main(String[] args) {
    int[] arr = {1, 4, 5, 67, 2, 7, 8, 6, 9, 44};

    quickSort(arr, 0, 9);

    System.out.println(arr);


}

/**
 * 快速排序
 *
 * @param arr
 * @param L   指向数组第一个元素
 * @param R   指向数组最后一个元素
 */
public static void quickSort(int[] arr, int L, int R) {
    int i = L;
    int j = R;

    //支点
    int pivot = arr[(L + R) / 2];

    //左右两端进行扫描,只要两端还没有交替,就一直扫描
    while (i <= j) {

        //寻找直到比支点大的数
        while (pivot > arr[i])
            i++;

        //寻找直到比支点小的数
        while (pivot < arr[j])
            j--;

        //此时已经分别找到了比支点小的数(右边)、比支点大的数(左边),它们进行交换
        if (i <= j) {
            int temp = arr[i];
            arr[i] = arr[j];
            arr[j] = temp;
            i++;
            j--;
        }
    }
    //上面一个while保证了第一趟排序支点的左边比支点小,支点的右边比支点大了。


    //“左边”再做排序,直到左边剩下一个数(递归出口)
    if (L < j)
        quickSort(arr, L, j);

    //“右边”再做排序,直到右边剩下一个数(递归出口)
    if (i < R)
        quickSort(arr, i, R);
}