Java多线程基础:供应链系统为什么需要并发处理

Java 多线程不是为了把代码写复杂,而是为了在合适的业务场景里提高吞吐、降低等待时间、充分利用 CPU 和 IO 资源。供应链系统里有大量典型场景:订单创建后要校验库存、校验客户信用、计算运费、匹配促销、写操作日志;仓库出库时要生成波次、分配库位、通知 WMS、刷新库存快照。这些动作有些可以并行,有些必须串行,多线程的价值就是把这两类工作区分清楚。

线程和进程

进程是操作系统资源分配的基本单位,线程是 CPU 调度执行的基本单位。一个 Java 应用进程里可以有很多线程,比如 Tomcat 请求线程、业务线程池线程、GC 线程、定时任务线程。

在供应链系统中,一个订单接口请求通常由一个 Web 容器线程处理。如果接口里所有操作都串行执行,请求耗时会被每一步累加。如果某些步骤没有依赖关系,就可以并行执行。

线程生命周期

Java 线程常见状态包括:

  • NEW:线程对象已创建但未启动。
  • RUNNABLE:可运行,等待 CPU 调度或正在执行。
  • BLOCKED:等待进入 synchronized 临界区。
  • WAITING:无限期等待其他线程通知。
  • TIMED_WAITING:限时等待,比如 sleep、带超时的 wait
  • TERMINATED:线程执行结束。

排查供应链批处理卡顿时,线程状态很关键。大量 BLOCKED 说明锁竞争严重,大量 WAITING 可能是队列无任务或等待条件未满足,大量 RUNNABLE 但 CPU 很高可能是计算任务过重或死循环。

创建线程的方式

最原始的方式是直接创建 Thread

1
2
3
4
Thread thread = new Thread(() -> {
System.out.println("同步库存快照");
});
thread.start();

这种方式适合理解原理,不适合业务系统大量使用。真实项目应该使用线程池,避免频繁创建和销毁线程。

Runnable 没有返回值,适合只执行动作:

1
2
Runnable task = () -> inventorySyncService.syncWarehouse(8L);
new Thread(task).start();

Callable 有返回值,可以配合 Future

1
2
3
4
Callable<Integer> task = () -> inventoryQueryService.countAvailableSku(8L);
FutureTask<Integer> future = new FutureTask<>(task);
new Thread(future).start();
Integer count = future.get();

但业务里更常用线程池提交任务。

供应链订单校验的并行 demo

创建销售订单时,下面三个校验互不依赖:

  • 查询库存是否足够。
  • 查询客户信用额度。
  • 查询承运商是否覆盖收货地址。

串行写法:

1
2
3
4
5
6
public OrderCheckResult checkOrder(OrderCreateCommand command) {
InventoryCheckResult inventory = inventoryService.check(command.items());
CreditCheckResult credit = creditService.check(command.customerId(), command.amount());
CarrierCheckResult carrier = carrierService.check(command.address());
return OrderCheckResult.merge(inventory, credit, carrier);
}

如果三个调用分别耗时 80ms、60ms、50ms,总耗时至少接近 190ms。可以用线程池并行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public OrderCheckResult checkOrder(OrderCreateCommand command) throws Exception {
Future<InventoryCheckResult> inventoryFuture =
executor.submit(() -> inventoryService.check(command.items()));

Future<CreditCheckResult> creditFuture =
executor.submit(() -> creditService.check(command.customerId(), command.amount()));

Future<CarrierCheckResult> carrierFuture =
executor.submit(() -> carrierService.check(command.address()));

return OrderCheckResult.merge(
inventoryFuture.get(),
creditFuture.get(),
carrierFuture.get()
);
}

并行后,请求耗时接近最慢的那个任务,而不是三个任务之和。这里的收益来自 IO 等待重叠,而不是 CPU 被神奇地加速。

多线程不适合什么

多线程不是默认选项。以下场景要谨慎:

  • 任务之间有严格顺序,比如先扣库存再生成库存流水。
  • 数据共享复杂,容易产生竞态。
  • 单个任务非常小,线程切换成本超过收益。
  • 下游系统已经限流,并发只会把压力打爆。
  • 业务要求强事务一致性,不能拆成并发步骤。

比如订单扣库存不能简单把每个 SKU 丢给不同线程同时扣。如果一个订单内多个 SKU 需要同一个事务保证全部成功或全部失败,并行拆分反而会增加补偿复杂度。

小结

Java 多线程首先是一种工程手段,不是语法技巧。供应链系统使用多线程,核心收益是提升吞吐、缩短 IO 密集型流程耗时、让后台批处理更高效。使用前必须判断任务是否独立、是否共享状态、是否需要事务一致性。能并行的校验和查询可以并行,涉及最终库存和单据状态的数据修改必须保持清晰的事务边界。

redis入门

安装gcc

  • Redis在linux上的安装首先必须先安装gcc,这个是用来编译redis的源文件的。首先需要先切换的到root用户:

      [chenjx@localhost ~]$ su
      Password: 
      [root@localhost chenjx]#
    
  • 然后开始安装gcc:

      //这个命令是在线安装的,所以在这之前你的PC必须能够上网
      [root@localhost /]# yum install gcc-c++
    

解压redis的源文件

[root@localhost chenjx]# cd Desktop/
[root@localhost Desktop]# ls
redis-4.0.1.tar.gz
[root@localhost Desktop]# tar zxvf redis-4.0.1.tar.gz 
redis-4.0.1/
redis-4.0.1/.gitignore
redis-4.0.1/00-RELEASENOTES
redis-4.0.1/BUGS
redis-4.0.1/CONTRIBUTING

进入redis的解压目录

[root@localhost Desktop]# ls
redis-4.0.1  redis-4.0.1.tar.gz
[root@localhost Desktop]# cd redis-4.0.1/

使用make命令编译

[root@localhost redis-4.0.1]# make

//出现以下信息即是编译成功
CC notify.o
CC setproctitle.o
CC blocked.o
CC hyperloglog.o
CC latency.o
CC sparkline.o
CC redis-check-rdb.o
CC redis-check-aof.o
CC geo.o
CC lazyfree.o
CC module.o
CC evict.o
CC expire.o
CC geohash.o
CC geohash_helper.o
CC childinfo.o
CC defrag.o
CC siphash.o
CC rax.o
LINK redis-server
INSTALL redis-sentinel
CC redis-cli.o
LINK redis-cli
CC redis-benchmark.o
LINK redis-benchmark
INSTALL redis-check-rdb
INSTALL redis-check-aof

进入解压的src目录下

[root@localhost redis-4.0.1]# cd src

运行make test测试是否可以安装,检查的一大堆,都是绿色的ok。这就可以了

安装

[root@localhost src]# make PREFIX=/usr/local/redis install 

安装成功后,去/usr/local/redis/bin目录看,会发现如下文件:

都是一个个工具命令

然后把解压的redis路径下的redis.conf文件拷贝到安装路径/usr/local/redis下面

[root@localhost redis-4.0.1]# cp redis.conf /usr/local/redis
[root@localhost redis-4.0.1]# cd /usr/local/redis
[root@localhost redis]# ls
bin  redis.conf

到这里redis已经安装成功了。

启动redis

第一种方法:进入安装路径下的bin

[root@localhost redis]# cd bin
[root@localhost bin]# ./redis-server

但是这属于前端启动,启动redis之后,我们的控制台就不能进行任何操作了。只能ctrl+c停止启动。

第二种方法:后端启动

  • 首先编辑redis.conf

    [root@localhost redis]# vim redis.conf

  • 找到daemonize no将其改为yes

  • 再次启动

      [root@localhost redis]# ls
      bin  redis.conf
      [root@localhost redis]# ./bin/redis-server ./redis.conf
      //这里加载配置文件
      
      80167:C 30 Jul 16:01:58.145 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
      80167:C 30 Jul 16:01:58.145 # Redis version=4.0.1, bits=64, commit=00000000, modified=0, pid=80167, just started
      80167:C 30 Jul 16:01:58.145 # Configuration loaded
    

这样redis就启动了

可以通过
ps -ef|grep redis
来查看是否启动

关闭redis

[root@localhost redis]# ./bin/redis-cli shutdown

简单的使用

//首先链接客户端
[root@localhost redis]# ./bin/redis-cli
//检查网络是否可以
127.0.0.1:6379> ping
PONG
//设置一个键值对
127.0.0.1:6379> set name chenjx
OK
//获取刚刚设置的键值对
127.0.0.1:6379> get name
"chenjx"
//查看所有的键
127.0.0.1:6379> keys *
1) "name"
//删除name这个键
127.0.0.1:6379> del name
(integer) 1
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>

设置登录密码

redis在生产环境中通常都会设置密码以保证一定的安全性

首先修改redis.conf

打开redis.conf文件,搜索requirepass关键字

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
# requirepass foobared

关注标记的那一行,#requirepass foobared。设置密码的方法就是去掉注释的#,把foobared替换成自己的密码即可,例如将密码设置为123456:

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
requirepass 123456

修改完成后重启redis,再次通过redis客户端redis-cli登录并操作可以发现会报一个身份认证错误:

这就说明我们已经成功的设置了密码,所以通过客户端连接的话必须加上密码参数才能正常连接:

如上图所示,加了-a参数之后即可正常连接并操作redis。

jedis连接设置密码

当我们用Java客户端连接redis时会遇到同样的问题,下面看一段简单的jedis连接redis的测试代码:

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

运行junit后我们发现报异常了:

redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
at redis.clients.jedis.Protocol.processError(Protocol.java:117)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)

由于我们设置了密码但在这里又没有指定密码,所以报了和刚才相同的错误,Jedis的父类BinaryJedis提供了这样一样方法:

public String auth(final String password) {
    checkIsInMulti();
    client.auth(password);
    return client.getStatusCodeReply();
}

所以在创建了Jedis的实例后再加上一行jedis.auth(“123456”); 即可

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
	jedis.auth("123456");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

spring-data-redis设置密码

通常情况下在实际的java项目中我们会选择Spring提供的spring-data-redis来操作redis,spring的封装可以给我们提供很多便捷之处。那么spring-data-redis又是如何设置密码的呢?首先定义一个redis.properties配置文件,定义一组redis属性供spring加载使用,其中就包含密码(redis.password):

# Redis settings  
redis.host=192.168.145.10 
redis.port=6379  
redis.password=123456
redis.database=2
redis.timeout=100000  
redis.maxTotal=300  
redis.maxIdle=100
redis.minIdle=10
redis.maxWaitMillis=1000  
redis.testOnBorrow=true  

然后在由Spring封装的JedisConnectionFactory中来设置密码属性即可,下面是完整redis配置:

<!-- redis配置 -->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.maxIdle}" />
    <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    <property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>

<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
    p:host-name="${redis.host}" p:port="${redis.port}" 
    p:password="${redis.password}" p:pool-config-ref="poolConfig" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
</bean>

Redis常用数据类型介绍、使用场景及其操作命令

Redis目前支持5种数据类型,分别是:

  • String(字符串)
  • List(列表)
  • Hash(字典)
  • Set(集合)
  • Sorted Set(有序集合)

下面就分别介绍这五种数据类型及其相应的操作命令。

String(字符串)

String是简单的 key-value 键值对,value 不仅可以是 String,也可以是数字。String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

  • 应用场景

String是最常用的一种数据类型,普通的key/value存储都可以归为此类,这里就不所做解释了。

  • 相关命令

      SET key value                   设置key=value
      GET key                         或者键key对应的值
      GETRANGE key start end          得到字符串的子字符串存放在一个键
      GETSET key value                设置键的字符串值,并返回旧值
      GETBIT key offset               返回存储在键位值的字符串值的偏移
      MGET key1 [key2..]              得到所有的给定键的值
      SETBIT key offset value         设置或清除该位在存储在键的字符串值偏移
      SETEX key seconds value         键到期时设置值
      SETNX key value                 设置键的值,只有当该键不存在
      SETRANGE key offset value       覆盖字符串的一部分从指定键的偏移
      STRLEN key                      得到存储在键的值的长度
      MSET key value [key value...]   设置多个键和多个值
      MSETNX key value [key value...] 设置多个键多个值,只有在当没有按键的存在时
      PSETEX key milliseconds value   设置键的毫秒值和到期时间
      INCR key                        增加键的整数值一次
      INCRBY key increment            由给定的数量递增键的整数值
      INCRBYFLOAT key increment       由给定的数量递增键的浮点值
      DECR key                        递减键一次的整数值
      DECRBY key decrement            由给定数目递减键的整数值
      APPEND key value                追加值到一个键
    
  • 其中用于操作管理键的命令有:

      DEL key                         如果存在删除键
      DUMP key                        返回存储在指定键的值的序列化版本
      EXISTS key                      此命令检查该键是否存在
      EXPIRE key seconds              指定键的过期时间
      EXPIREAT key timestamp          指定的键过期时间。在这里,时间是在Unix时间戳格式
      PEXPIRE key milliseconds        设置键以毫秒为单位到期
      PEXPIREAT key milliseconds-timestamp        设置键在Unix时间戳指定为毫秒到期
      KEYS pattern                    查找与指定模式匹配的所有键
      MOVE key db                     移动键到另一个数据库
      PERSIST key                     移除过期的键
      PTTL key                        以毫秒为单位获取剩余时间的到期键。
      TTL key                         获取键到期的剩余时间。
      RANDOMKEY                       从Redis返回随机键
      RENAME key newkey               更改键的名称
      RENAMENX key newkey             重命名键,如果新的键不存在
      TYPE key                        返回存储在键的数据类型的值。
    
  • 实例:

      redis 127.0.0.1:6379> set baidu http://www.baidu
      OK
      redis 127.0.0.1:6379> append baidu .com
      (integer) 20
      redis 127.0.0.1:6379> get baidu
      "http://www.baidu.com"
      redis 127.0.0.1:6379> set visitors 0
      OK
      redis 127.0.0.1:6379> incr visitors
      (integer) 1
      redis 127.0.0.1:6379> incr visitors
      (integer) 2
      redis 127.0.0.1:6379> get visitors
      "2"
      redis 127.0.0.1:6379> incrby visitors 100
      (integer) 102
      redis 127.0.0.1:6379> get visitors
      "102"
      redis 127.0.0.1:6379> type baidu
      string
      redis 127.0.0.1:6379> type visitors
      string
      redis 127.0.0.1:6379> ttl baidu
      (integer) -1
      redis 127.0.0.1:6379> rename baidu baidu-site
      OK
      redis 127.0.0.1:6379> get baidu
      (nil)
      redis 127.0.0.1:6379> get baidu-site
      "http://www.baidu.com"
    

List(列表)

Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用的这个数据结构。可以从头部或尾部向Redis列表添加元素。列表的最大长度为2^32 - 1,也即每个列表支持超过40亿个元素。

  • 应用场景

Redis list的应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表、粉丝列表等都可以用Redis的list结构来实现,再比如有的应用使用Redis的list类型实现一个简单的轻量级消息队列,生产者push,消费者pop

  • 相关命令

      BLPOP
      BLPOP key1 [key2 ] timeout 取出并获取列表中的第一个元素,或阻塞,直到有可用
      BRPOP
      BRPOP key1 [key2 ] timeout 取出并获取列表中的最后一个元素,或阻塞,直到有可用
      BRPOPLPUSH
      BRPOPLPUSH source destination timeout 从列表中弹出一个值,它推到另一个列表并返回它;或阻塞,直到有可用
      LINDEX
      LINDEX key index 从一个列表其索引获取对应的元素
      LINSERT
      LINSERT key BEFORE|AFTER pivot value 在列表中的其他元素之后或之前插入一个元素
      LLEN
      LLEN key 获取列表的长度
      LPOP
      LPOP key 获取并取出列表中的第一个元素
      LPUSH
      LPUSH key value1 [value2] 在前面加上一个或多个值的列表
      LPUSHX
      LPUSHX key value 在前面加上一个值列表,仅当列表中存在
      LRANGE
      LRANGE key start stop 从一个列表获取各种元素
      LREM
      LREM key count value 从列表中删除元素
      LSET
      LSET key index value 在列表中的索引设置一个元素的值
      LTRIM
      LTRIM key start stop 修剪列表到指定的范围内
      RPOP
      RPOP key 取出并获取列表中的最后一个元素
      RPOPLPUSH
      RPOPLPUSH source destination 删除最后一个元素的列表,将其附加到另一个列表并返回它
      RPUSH
      RPUSH key value1 [value2] 添加一个或多个值到列表头部
      RPUSHX
      RPUSHX key value 添加一个值列表,仅当列表中存在
    
  • 使用示例

      redis 127.0.0.1:6379> lpush list1 redis
      (integer) 1
      redis 127.0.0.1:6379> lpush list1 hello
      (integer) 2
      redis 127.0.0.1:6379> rpush list1 world
      (integer) 3
      redis 127.0.0.1:6379> llen list1
      (integer) 3
      redis 127.0.0.1:6379> lrange list1 0 3
      1) "hello"
      2) "redis"
      3) "world"
      redis 127.0.0.1:6379> lpop list1
      "hello"
      redis 127.0.0.1:6379> rpop list1
      "world"
      redis 127.0.0.1:6379> lrange list1 0 3
      1) "redis"
    

Hash(字典,哈希表)

类似C#中的dict类型或者C++中的hash_map类型

Redis Hash对应Value内部实际就是一个HashMap,实际这里会有2种不同实现,这个Hash的成员比较少时Redis为了节省内存会采用类似一维数组的方式来紧凑存储,而不会采用真正的HashMap结构,对应的value redisObject的encoding为zipmap,当成员数量增大时会自动转成真正的HashMap

  • 应用场景

假设有多个用户及对应的用户信息,可以用来存储以用户ID为key,将用户信息序列化为比如json格式做为value进行保存。

  • 相关命令

      HDEL
      HDEL key field[field...] 删除对象的一个或几个属性域,不存在的属性将被忽略
      HEXISTS
      HEXISTS key field 查看对象是否存在该属性域
      HGET
      HGET key field 获取对象中该field属性域的值
      HGETALL
      HGETALL key 获取对象的所有属性域和值
      HINCRBY
      HINCRBY key field value 将该对象中指定域的值增加给定的value,原子自增操作,只能是integer的属性值可以使用
      HINCRBYFLOAT
      HINCRBYFLOAT key field increment 将该对象中指定域的值增加给定的浮点数
      HKEYS
      HKEYS key 获取对象的所有属性字段
      HVALS
      HVALS key 获取对象的所有属性值
      HLEN
      HLEN key 获取对象的所有属性字段的总数
      HMGET
      HMGET key field[field...] 获取对象的一个或多个指定字段的值
      HSET
      HSET key field value 设置对象指定字段的值
      HMSET
      HMSET key field value [field value ...] 同时设置对象中一个或多个字段的值
      HSETNX
      HSETNX key field value 只在对象不存在指定的字段时才设置字段的值
      HSTRLEN
      HSTRLEN key field 返回对象指定field的value的字符串长度,如果该对象或者field不存在,返回0.
      HSCAN
      HSCAN key cursor [MATCH pattern] [COUNT count] 类似SCAN命令
    
  • 使用示例

      127.0.0.1:6379> hset person name jack
      (integer) 1
      127.0.0.1:6379> hset person age 20
      (integer) 1
      127.0.0.1:6379> hset person sex famale
      (integer) 1
      127.0.0.1:6379> hgetall person
      1) "name"
      2) "jack"
      3) "age"
      4) "20"
      5) "sex"
      6) "famale"
      127.0.0.1:6379> hkeys person
      1) "name"
      2) "age"
      3) "sex"
      127.0.0.1:6379> hvals person
      1) "jack"
      2) "20"
      3) "famale"
    

Set(集合)

可以理解为一堆值不重复的列表,类似数学领域中的集合概念,且Redis也提供了针对集合的求交集、并集、差集等操作。

set 的内部实现是一个 value永远为null的HashMap,实际就是通过计算hash的方式来快速排重的,这也是set能提供判断一个成员是否在集合内的原因。

  • 应用场景

Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。

  • 相关命令

      SADD
      SADD key member [member ...] 添加一个或者多个元素到集合(set)里
      SACRD
      SCARD key 获取集合里面的元素数量
      SDIFF
      SDIFF key [key ...] 获得队列不存在的元素
      SDIFFSTORE
      SDIFFSTORE destination key [key ...] 获得队列不存在的元素,并存储在一个关键的结果集
      SINTER
      SINTER key [key ...] 获得两个集合的交集
      SINTERSTORE
      SINTERSTORE destination key [key ...] 获得两个集合的交集,并存储在一个集合中
      SISMEMBER
      SISMEMBER key member 确定一个给定的值是一个集合的成员
      SMEMBERS
      SMEMBERS key 获取集合里面的所有key
      SMOVE
      SMOVE source destination member 移动集合里面的一个key到另一个集合
      SPOP
      SPOP key [count] 获取并删除一个集合里面的元素
      SRANDMEMBER
      SRANDMEMBER key [count] 从集合里面随机获取一个元素
      SREM
      SREM key member [member ...] 从集合里删除一个或多个元素,不存在的元素会被忽略
      SUNION
      SUNION key [key ...] 添加多个set元素
      SUNIONSTORE
      SUNIONSTORE destination key [key ...] 合并set元素,并将结果存入新的set里面
      SSCAN
      SSCAN key cursor [MATCH pattern] [COUNT count] 迭代set里面的元素
    
  • 使用示例

      redis> SADD myset "Hello"
      (integer) 1
      redis> SADD myset "World"
      (integer) 1
      redis> SMEMBERS myset
      1) "World"
      2) "Hello"
      redis> SADD myset "one"
      (integer) 1
      redis> SISMEMBER myset "one"
      (integer) 1
      redis> SISMEMBER myset "two"
      (integer) 0
    

Sorted Set(有序集合)

Redis有序集合类似Redis集合,不同的是增加了一个功能,即集合是有序的。一个有序集合的每个成员带有分数,用于进行排序。

Redis有序集合添加、删除和测试的时间复杂度均为O(1)(固定时间,无论里面包含的元素集合的数量)。列表的最大长度为2^32- 1元素(4294967295,超过40亿每个元素的集合)。

Redis sorted set的内部使用HashMap和跳跃表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

  • 使用场景

Redis sorted set的使用场景与set类似,区别是set不是自动有序的,而sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set数据结构.

比如用户的积分排行榜需求就可以通过有序集合实现。还有上面介绍的使用List实现轻量级的消息队列,其实也可以通过Sorted Set实现有优先级或按权重的队列。

  • 相关命令

      ZADD
      ZADD key score1 member1 [score2 member2] 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数
      ZCARD
      ZCARD key 得到的有序集合成员的数量
      ZCOUNT
      ZCOUNT key min max 计算一个有序集合成员与给定值范围内的分数
      ZINCRBY
      ZINCRBY key increment member 在有序集合增加成员的分数
      ZINTERSTORE
      ZINTERSTORE destination numkeys key [key ...] 多重交叉排序集合,并存储生成一个新的键有序集合。
      ZLEXCOUNT
      ZLEXCOUNT key min max 计算一个给定的字典范围之间的有序集合成员的数量
      ZRANGE
      ZRANGE key start stop [WITHSCORES] 由索引返回一个成员范围的有序集合(从低到高)
      ZRANGEBYLEX
      ZRANGEBYLEX key min max [LIMIT offset count]返回一个成员范围的有序集合(由字典范围)
      ZRANGEBYSCORE
      ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] 返回有序集key中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员,有序集成员按 score 值递增(从小到大)次序排列
      ZRANK
      ZRANK key member 确定成员的索引中有序集合
      ZREM
      ZREM key member [member ...] 从有序集合中删除一个或多个成员,不存在的成员将被忽略
      ZREMRANGEBYLEX
      ZREMRANGEBYLEX key min max 删除所有成员在给定的字典范围之间的有序集合
      ZREMRANGEBYRANK
      ZREMRANGEBYRANK key start stop 在给定的索引之内删除所有成员的有序集合
      ZREMRANGEBYSCORE
      ZREMRANGEBYSCORE key min max 在给定的分数之内删除所有成员的有序集合
      ZREVRANGE
      ZREVRANGE key start stop [WITHSCORES] 返回一个成员范围的有序集合,通过索引,以分数排序,从高分到低分
      ZREVRANGEBYSCORE
      ZREVRANGEBYSCORE key max min [WITHSCORES] 返回一个成员范围的有序集合,以socre排序从高到低
      ZREVRANK
      ZREVRANK key member 确定一个有序集合成员的索引,以分数排序,从高分到低分
      ZSCORE
      ZSCORE key member 获取给定成员相关联的分数在一个有序集合
      ZUNIONSTORE
      ZUNIONSTORE destination numkeys key [key ...] 添加多个集排序,所得排序集合存储在一个新的键
      ZSCAN
      ZSCAN key cursor [MATCH pattern] [COUNT count] 增量迭代排序元素集和相关的分数
    
  • 使用示例

      redis 127.0.0.1:6379> zadd dbs 100 redis
      (integer) 1
      redis 127.0.0.1:6379> zadd dbs 98 memcached
      (integer) 1
      redis 127.0.0.1:6379> zadd dbs 99 mongodb
      (integer) 1
      redis 127.0.0.1:6379> zadd dbs 99 leveldb
      (integer) 1
      redis 127.0.0.1:6379> zcard dbs
      (integer) 4
      redis 127.0.0.1:6379> zcount dbs 10 99
      (integer) 3
      redis 127.0.0.1:6379> zrank dbs leveldb
      (integer) 1
      redis 127.0.0.1:6379> zrank dbs other
      (nil)
      redis 127.0.0.1:6379> zrangebyscore dbs 98 100
      1) "memcached"
      2) "leveldb"
      3) "mongodb"
      4) "redis"
    

kafka集群

Kafka初识

  • Kafka使用背景

    在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:

    • 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
    • 我想对用户的搜索关键词进行统计,分析出当前的流行趋势
    • 有些数据,存储数据库浪费,直接存储硬盘效率又低
  • Kafka的定义

    它是一个分布式消息系统,具有高水平扩展和高吞吐量的特点。

Kafka相关概念

  • AMQP协议:是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。

  • 一些基本的概念

    • 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
    • 生产者:(Producer):向broker发布消息的应用程序
    • AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

  • Kafka架构

    生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:

    kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:

    一些基本的概念:

    • 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
    • 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

    kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

    工作图:

    备份(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

Kafka集群搭建

  • 软件环境

    • 已经搭建好的zookeeper集群(文章最后另写)
    • 软件版本kafka_2.11-0.9.0.1.tgz
    • 软件版本kafka_2.11-0.9.0.1.tgz
  • 创建目录并下载安装软件
    #下载软件
    wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
    #解压软件
    tar -xvf kafka_2.11-0.9.0.1.tgz

  • 修改配置文件

    主要关注config/server.properties

      broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
      port=9092 #当前kafka对外提供服务的端口默认是9092
      host.name = 120.76.230.134 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题
      #申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置
      listeners = PLAINTEXT://120.76.230.134:9092 
      advertised.host.name = 120.76.230.134
      advertised.listeners=PLAINTEXT://120.76.230.134:9092
      log.dirs=/data/kafka/kafkalogs  #消息存放的目录
      #在log.retention.hours=168 下面新增下面三项
      message.max.byte=5242880
      default.replication.factor=2
      replica.fetch.max.bytes=5242880
      zookeeper.connect=120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 #zookeeper配置地址
    

    每个机子的broker.id、host.name都是不一样的,主要是关注以上几项参数配置。

  • 启动Kafka集群并测试

    启动服务

      #从后台启动Kafka集群(3台都需要启动)
      cd /data/kafka/kafka_2.11-0.11.0.1/bin #进入到kafka的bin目录 
      ./kafka-server-start.sh -daemon ../config/server.properties
    

    检查服务是否启动

      #执行命令jps
      22246 Bootstrap
      26393 Kafka
    

    创建Topic来验证是否创建成功

      #创建Topic
      ./kafka-topics.sh --create --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 --replication-factor 2 --partitions 1 --topic test
      
      #解释
      --replication-factor 2   #复制两份
      --partitions 1 #创建1个分区
      --topic #主题为test
      
      #在一台服务器上创建一个发布者
      #创建一个broker,发布者
      ./kafka-console-producer.sh --broker-list 120.76.230.134:9092 --topic test
      
      #在一台服务器上创建一个订阅者
      ./kafka-console-consumer.sh --bootstrap-server 120.76.230.134:9092 --topic test --from-beginning
    

    测试(在发布者那里发布消息看看订阅者那里是否能正常收到~):

  • 其他命令

    大部分命令可以去官方文档查看

    • 查看topic

        ./kafka-topics.sh --list --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181
        #就会显示我们创建的所有topic
      

    kafka集群搭建完毕

zookeeper集群部署

  • 为什么是基数

    • 集群必须有一半以上的机器统一才能成为leader
    • 一半的机器挂掉 整个集群挂掉
  • 去官网下载好zookeeper包,解压

  • 修改配置/conf/zoo.cfg文件

      # 基本事件单元,以毫秒为单位,用来控制心跳和超时
      tickTime=2000
      # 参数设定了允许所有跟随者与领导者进行连接并同步的时间,如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,进行另一次的领导选举
      # 如果zk集群环境数量确实很大,同步数据的时间会变长,因此这种情况下可以适当调大该参数
      #集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
      initLimit=10
      # 参数设定了允许一个跟随者与一个领导者进行同步的时间,如果在设定的时间段内,跟随者未完成同步,它将会被集群丢弃
      # 所有关联到这个跟随者的客户端将连接到另外一个跟随着
      syncLimit=5
      # 存储持久数据的本地文件系统位置
      dataDir=/data/zookeeper-3.4.9/data
      dataLogDir=/data/zookeeper-3.4.9/logs
      # 监听客户端连接的端口
      clientPort=5181
      
      server.1=120.76.230.134:2888:3888
      server.2=120.76.193.192:2888:3888
      server.3=120.25.239.155:2888:3888
    
      #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
      #120.76.230.134为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
    

    zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。

  • 创建myid文件

    /data/zookeeper-3.4.9/data目录下要配置myid文件,内容跟server.id相对应,比如:1

    ./zkServer.sh star启动zookeeper,每台都要启动一遍

  • 重要配置说明

    • myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

    • zoo.cfg 文件是zookeeper配置文件 在conf目录里。

快速排序

快速排序

介绍

在数组中找一个支点(任意),经过一趟排序后,支点左边的数都要比支点小,支点右边的数都要比支点大!

现在我们有一个数组:int arr[]={1,4,5,67,2,7,8,6,9,44};

经过一趟排序之后,如果我选择数组中间的数作为支点:7(任意的),那么第一趟排序后的结果是这样的:{1,4,5,6,2,7,8,67,9,44}

那么就实现了支点左边的数比支点小,支点右边的数比支点大

递归分析与代码实现

现在我们的数组是这样的:{1,4,5,6,2,7,8,67,9,44},既然我们比7小的在左边,比7大的在右边,那么我们只要将”左边“的排好顺序,又将”右边“的排好序,那整个数组是不是就有序了?

回顾一下递归:”左边“的排好顺序,”右边“的排好序,跟我们第一趟排序的做法是一样的。

只不过是参数不一样:第一趟排序是任选了一个支点,比支点小的在左边,比支点大的在右边。那么,我们想要”左边“的排好顺序,只要在”左边“部分找一个支点,比支点小的在左边,比支点大的在右边。

递归出口也很容易找到:如果数组只有一个元素时,那么就不用排序了

代码如下:

public static void main(String[] args) {
    int[] arr = {1, 4, 5, 67, 2, 7, 8, 6, 9, 44};

    quickSort(arr, 0, 9);

    System.out.println(arr);


}

/**
 * 快速排序
 *
 * @param arr
 * @param L   指向数组第一个元素
 * @param R   指向数组最后一个元素
 */
public static void quickSort(int[] arr, int L, int R) {
    int i = L;
    int j = R;

    //支点
    int pivot = arr[(L + R) / 2];

    //左右两端进行扫描,只要两端还没有交替,就一直扫描
    while (i <= j) {

        //寻找直到比支点大的数
        while (pivot > arr[i])
            i++;

        //寻找直到比支点小的数
        while (pivot < arr[j])
            j--;

        //此时已经分别找到了比支点小的数(右边)、比支点大的数(左边),它们进行交换
        if (i <= j) {
            int temp = arr[i];
            arr[i] = arr[j];
            arr[j] = temp;
            i++;
            j--;
        }
    }
    //上面一个while保证了第一趟排序支点的左边比支点小,支点的右边比支点大了。


    //“左边”再做排序,直到左边剩下一个数(递归出口)
    if (L < j)
        quickSort(arr, L, j);

    //“右边”再做排序,直到右边剩下一个数(递归出口)
    if (i < R)
        quickSort(arr, i, R);
}

冒泡排序

冒泡排序就这么简单

排序对我们来说是一点也不陌生了,当你打Dota的时候也有天梯分,从高往下数,这个排名是有规律的,就是一种排序。

冒泡排序的实现

冒泡排序是一种简单的排序算法。它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。这个算法的名字由来是因为越大的元素会经由交换慢慢“浮”到数列的顶端,故名冒泡排序。

算法描述:

  • i从0开始,i与i+1比较,如果i>i+1,那么就互换
  • i不断增加,直到i<n-1(n是数组元素的个数,n-1是数组已经最后一个元素) ,一趟下来,可以让数组元素中最大值排在数组的最后面

从最简单开始,首先我们创建一个数组,该数组有6位数字:

int[] arrays = {8, 3, 2, 6, 7, 9};

6位数的数组需要5躺排序的,每躺排序之后次数减1(因为前一趟已经把前一趟数的最大值确定下来了)!

于是我们可以根据for循环和变量将上面的代码进行排序:

int temp;

//外层循环是排序的趟数
for (int i = 0; i < arrays.length - 1 ; i++) {

    //内层循环是当前趟数需要比较的次数
    for (int j = 0; j < arrays.length - i - 1; j++) {

        //前一位与后一位与前一位比较,如果前一位比后一位要大,那么交换
        if (arrays[j] > arrays[j + 1]) {
            temp = arrays[j];
            arrays[j] = arrays[j + 1];
            arrays[j + 1] = temp;
        }
    }
}

冒泡排序优化

从上面的例子我们可以看出来,如果数据足够乱的情况下是需要经过5躺比较才能将数组完整排好序。但是我们在第二躺比较后就已经得到排好序的数组了。

但是,我们的程序在第二趟排序后仍会执行第三趟、第四趟排序。这是没有必要的,因此我们可以对其进行优化一下:

  • 如果在某躺排序中没有发生交换位置,那么我们可以认为该数组已经排好序了
    • 因为我们每趟排序的目的就是将当前趟最大的数置换到对应的位置上,没有发生置换说明就已经排好序了

代码如下:

//装载临时变量
int temp;

//记录是否发生了置换, 0 表示没有发生置换、 1 表示发生了置换
int isChange;

//外层循环是排序的趟数
for (int i = 0; i < arrays.length - 1; i++) {

    //每比较一趟就重新初始化为0
    isChange = 0;

    //内层循环是当前趟数需要比较的次数
    for (int j = 0; j < arrays.length - i - 1; j++) {

        //前一位与后一位与前一位比较,如果前一位比后一位要大,那么交换
        if (arrays[j] > arrays[j + 1]) {
            temp = arrays[j];
            arrays[j] = arrays[j + 1];
            arrays[j + 1] = temp;

            //如果进到这里面了,说明发生置换了
            isChange = 1;

        }
    }
    //如果比较完一趟没有发生置换,那么说明已经排好序了,不需要再执行下去了
    if (isChange == 0) {
        break;
    }
}