redis入门

安装gcc

  • Redis在linux上的安装首先必须先安装gcc,这个是用来编译redis的源文件的。首先需要先切换的到root用户:

    [chenjx@localhost ~]$ su
    Password: 
    [root@localhost chenjx]#
    
  • 然后开始安装gcc:

    //这个命令是在线安装的,所以在这之前你的PC必须能够上网
    [root@localhost /]# yum install gcc-c++
    

解压redis的源文件

[root@localhost chenjx]# cd Desktop/
[root@localhost Desktop]# ls
redis-4.0.1.tar.gz
[root@localhost Desktop]# tar zxvf redis-4.0.1.tar.gz 
redis-4.0.1/
redis-4.0.1/.gitignore
redis-4.0.1/00-RELEASENOTES
redis-4.0.1/BUGS
redis-4.0.1/CONTRIBUTING

进入redis的解压目录

[root@localhost Desktop]# ls
redis-4.0.1  redis-4.0.1.tar.gz
[root@localhost Desktop]# cd redis-4.0.1/

使用make命令编译

[root@localhost redis-4.0.1]# make

//出现以下信息即是编译成功
CC notify.o
CC setproctitle.o
CC blocked.o
CC hyperloglog.o
CC latency.o
CC sparkline.o
CC redis-check-rdb.o
CC redis-check-aof.o
CC geo.o
CC lazyfree.o
CC module.o
CC evict.o
CC expire.o
CC geohash.o
CC geohash_helper.o
CC childinfo.o
CC defrag.o
CC siphash.o
CC rax.o
LINK redis-server
INSTALL redis-sentinel
CC redis-cli.o
LINK redis-cli
CC redis-benchmark.o
LINK redis-benchmark
INSTALL redis-check-rdb
INSTALL redis-check-aof

进入解压的src目录下

[root@localhost redis-4.0.1]# cd src

运行make test测试是否可以安装,检查的一大堆,都是绿色的ok。这就可以了

安装

[root@localhost src]# make PREFIX=/usr/local/redis install 

安装成功后,去/usr/local/redis/bin目录看,会发现如下文件:

都是一个个工具命令

然后把解压的redis路径下的redis.conf文件拷贝到安装路径/usr/local/redis下面

[root@localhost redis-4.0.1]# cp redis.conf /usr/local/redis
[root@localhost redis-4.0.1]# cd /usr/local/redis
[root@localhost redis]# ls
bin  redis.conf

到这里redis已经安装成功了。

启动redis

第一种方法:进入安装路径下的bin

[root@localhost redis]# cd bin
[root@localhost bin]# ./redis-server

但是这属于前端启动,启动redis之后,我们的控制台就不能进行任何操作了。只能ctrl+c停止启动。

第二种方法:后端启动

  • 首先编辑redis.conf

    [root@localhost redis]# vim redis.conf

  • 找到daemonize no将其改为yes

  • 再次启动

    [root@localhost redis]# ls
    bin  redis.conf
    [root@localhost redis]# ./bin/redis-server ./redis.conf
    //这里加载配置文件
    
    80167:C 30 Jul 16:01:58.145 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
    80167:C 30 Jul 16:01:58.145 # Redis version=4.0.1, bits=64, commit=00000000, modified=0, pid=80167, just started
    80167:C 30 Jul 16:01:58.145 # Configuration loaded
    

这样redis就启动了

可以通过
ps -ef|grep redis
来查看是否启动

关闭redis

[root@localhost redis]# ./bin/redis-cli shutdown

简单的使用

//首先链接客户端
[root@localhost redis]# ./bin/redis-cli
//检查网络是否可以
127.0.0.1:6379> ping
PONG
//设置一个键值对
127.0.0.1:6379> set name chenjx
OK
//获取刚刚设置的键值对
127.0.0.1:6379> get name
"chenjx"
//查看所有的键
127.0.0.1:6379> keys *
1) "name"
//删除name这个键
127.0.0.1:6379> del name
(integer) 1
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>

设置登录密码

redis在生产环境中通常都会设置密码以保证一定的安全性

首先修改redis.conf

打开redis.conf文件,搜索requirepass关键字

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
# requirepass foobared

关注标记的那一行,#requirepass foobared。设置密码的方法就是去掉注释的#,把foobared替换成自己的密码即可,例如将密码设置为123456:

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
requirepass 123456

修改完成后重启redis,再次通过redis客户端redis-cli登录并操作可以发现会报一个身份认证错误:

这就说明我们已经成功的设置了密码,所以通过客户端连接的话必须加上密码参数才能正常连接:

如上图所示,加了-a参数之后即可正常连接并操作redis。

jedis连接设置密码

当我们用Java客户端连接redis时会遇到同样的问题,下面看一段简单的jedis连接redis的测试代码:

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

运行junit后我们发现报异常了:

redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
at redis.clients.jedis.Protocol.processError(Protocol.java:117)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)

由于我们设置了密码但在这里又没有指定密码,所以报了和刚才相同的错误,Jedis的父类BinaryJedis提供了这样一样方法:

public String auth(final String password) {
    checkIsInMulti();
    client.auth(password);
    return client.getStatusCodeReply();
}

所以在创建了Jedis的实例后再加上一行jedis.auth(“123456”); 即可

@Test
public void testTwo() {
    Jedis jedis = new Jedis("192.168.145.10");
    jedis.auth("123456");
    System.out.println("Connection to server sucessfully");
    // 查看服务是否运行
    System.out.println("Server is running: " + jedis.ping());
}

spring-data-redis设置密码

通常情况下在实际的java项目中我们会选择Spring提供的spring-data-redis来操作redis,spring的封装可以给我们提供很多便捷之处。那么spring-data-redis又是如何设置密码的呢?首先定义一个redis.properties配置文件,定义一组redis属性供spring加载使用,其中就包含密码(redis.password):

# Redis settings  
redis.host=192.168.145.10 
redis.port=6379  
redis.password=123456
redis.database=2
redis.timeout=100000  
redis.maxTotal=300  
redis.maxIdle=100
redis.minIdle=10
redis.maxWaitMillis=1000  
redis.testOnBorrow=true  

然后在由Spring封装的JedisConnectionFactory中来设置密码属性即可,下面是完整redis配置:

<!-- redis配置 -->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.maxIdle}" />
    <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
    <property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>

<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
    p:host-name="${redis.host}" p:port="${redis.port}" 
    p:password="${redis.password}" p:pool-config-ref="poolConfig" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
</bean>

Redis常用数据类型介绍、使用场景及其操作命令

Redis目前支持5种数据类型,分别是:

  • String(字符串)
  • List(列表)
  • Hash(字典)
  • Set(集合)
  • Sorted Set(有序集合)

下面就分别介绍这五种数据类型及其相应的操作命令。

String(字符串)

String是简单的 key-value 键值对,value 不仅可以是 String,也可以是数字。String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

String在redis内部存储默认就是一个字符串,被redisObject所引用,当遇到incr,decr等操作时会转成数值型进行计算,此时redisObject的encoding字段为int。

  • 应用场景

String是最常用的一种数据类型,普通的key/value存储都可以归为此类,这里就不所做解释了。

  • 相关命令

    SET key value                   设置key=value
    GET key                         或者键key对应的值
    GETRANGE key start end          得到字符串的子字符串存放在一个键
    GETSET key value                设置键的字符串值,并返回旧值
    GETBIT key offset               返回存储在键位值的字符串值的偏移
    MGET key1 [key2..]              得到所有的给定键的值
    SETBIT key offset value         设置或清除该位在存储在键的字符串值偏移
    SETEX key seconds value         键到期时设置值
    SETNX key value                 设置键的值,只有当该键不存在
    SETRANGE key offset value       覆盖字符串的一部分从指定键的偏移
    STRLEN key                      得到存储在键的值的长度
    MSET key value [key value...]   设置多个键和多个值
    MSETNX key value [key value...] 设置多个键多个值,只有在当没有按键的存在时
    PSETEX key milliseconds value   设置键的毫秒值和到期时间
    INCR key                        增加键的整数值一次
    INCRBY key increment            由给定的数量递增键的整数值
    INCRBYFLOAT key increment       由给定的数量递增键的浮点值
    DECR key                        递减键一次的整数值
    DECRBY key decrement            由给定数目递减键的整数值
    APPEND key value                追加值到一个键
    
  • 其中用于操作管理键的命令有:

    DEL key                         如果存在删除键
    DUMP key                        返回存储在指定键的值的序列化版本
    EXISTS key                      此命令检查该键是否存在
    EXPIRE key seconds              指定键的过期时间
    EXPIREAT key timestamp          指定的键过期时间。在这里,时间是在Unix时间戳格式
    PEXPIRE key milliseconds        设置键以毫秒为单位到期
    PEXPIREAT key milliseconds-timestamp        设置键在Unix时间戳指定为毫秒到期
    KEYS pattern                    查找与指定模式匹配的所有键
    MOVE key db                     移动键到另一个数据库
    PERSIST key                     移除过期的键
    PTTL key                        以毫秒为单位获取剩余时间的到期键。
    TTL key                         获取键到期的剩余时间。
    RANDOMKEY                       从Redis返回随机键
    RENAME key newkey               更改键的名称
    RENAMENX key newkey             重命名键,如果新的键不存在
    TYPE key                        返回存储在键的数据类型的值。
    
  • 实例:

    redis 127.0.0.1:6379> set baidu http://www.baidu
    OK
    redis 127.0.0.1:6379> append baidu .com
    (integer) 20
    redis 127.0.0.1:6379> get baidu
    "http://www.baidu.com"
    redis 127.0.0.1:6379> set visitors 0
    OK
    redis 127.0.0.1:6379> incr visitors
    (integer) 1
    redis 127.0.0.1:6379> incr visitors
    (integer) 2
    redis 127.0.0.1:6379> get visitors
    "2"
    redis 127.0.0.1:6379> incrby visitors 100
    (integer) 102
    redis 127.0.0.1:6379> get visitors
    "102"
    redis 127.0.0.1:6379> type baidu
    string
    redis 127.0.0.1:6379> type visitors
    string
    redis 127.0.0.1:6379> ttl baidu
    (integer) -1
    redis 127.0.0.1:6379> rename baidu baidu-site
    OK
    redis 127.0.0.1:6379> get baidu
    (nil)
    redis 127.0.0.1:6379> get baidu-site
    "http://www.baidu.com"
    

List(列表)

Redis list的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用的这个数据结构。可以从头部或尾部向Redis列表添加元素。列表的最大长度为2^32 - 1,也即每个列表支持超过40亿个元素。

  • 应用场景

Redis list的应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表、粉丝列表等都可以用Redis的list结构来实现,再比如有的应用使用Redis的list类型实现一个简单的轻量级消息队列,生产者push,消费者pop

  • 相关命令

    BLPOP
    BLPOP key1 [key2 ] timeout 取出并获取列表中的第一个元素,或阻塞,直到有可用
    BRPOP
    BRPOP key1 [key2 ] timeout 取出并获取列表中的最后一个元素,或阻塞,直到有可用
    BRPOPLPUSH
    BRPOPLPUSH source destination timeout 从列表中弹出一个值,它推到另一个列表并返回它;或阻塞,直到有可用
    LINDEX
    LINDEX key index 从一个列表其索引获取对应的元素
    LINSERT
    LINSERT key BEFORE|AFTER pivot value 在列表中的其他元素之后或之前插入一个元素
    LLEN
    LLEN key 获取列表的长度
    LPOP
    LPOP key 获取并取出列表中的第一个元素
    LPUSH
    LPUSH key value1 [value2] 在前面加上一个或多个值的列表
    LPUSHX
    LPUSHX key value 在前面加上一个值列表,仅当列表中存在
    LRANGE
    LRANGE key start stop 从一个列表获取各种元素
    LREM
    LREM key count value 从列表中删除元素
    LSET
    LSET key index value 在列表中的索引设置一个元素的值
    LTRIM
    LTRIM key start stop 修剪列表到指定的范围内
    RPOP
    RPOP key 取出并获取列表中的最后一个元素
    RPOPLPUSH
    RPOPLPUSH source destination 删除最后一个元素的列表,将其附加到另一个列表并返回它
    RPUSH
    RPUSH key value1 [value2] 添加一个或多个值到列表头部
    RPUSHX
    RPUSHX key value 添加一个值列表,仅当列表中存在
    
  • 使用示例

    redis 127.0.0.1:6379> lpush list1 redis
    (integer) 1
    redis 127.0.0.1:6379> lpush list1 hello
    (integer) 2
    redis 127.0.0.1:6379> rpush list1 world
    (integer) 3
    redis 127.0.0.1:6379> llen list1
    (integer) 3
    redis 127.0.0.1:6379> lrange list1 0 3
    1) "hello"
    2) "redis"
    3) "world"
    redis 127.0.0.1:6379> lpop list1
    "hello"
    redis 127.0.0.1:6379> rpop list1
    "world"
    redis 127.0.0.1:6379> lrange list1 0 3
    1) "redis"
    

Hash(字典,哈希表)

类似C#中的dict类型或者C++中的hash_map类型

Redis Hash对应Value内部实际就是一个HashMap,实际这里会有2种不同实现,这个Hash的成员比较少时Redis为了节省内存会采用类似一维数组的方式来紧凑存储,而不会采用真正的HashMap结构,对应的value redisObject的encoding为zipmap,当成员数量增大时会自动转成真正的HashMap

  • 应用场景

假设有多个用户及对应的用户信息,可以用来存储以用户ID为key,将用户信息序列化为比如json格式做为value进行保存。

  • 相关命令

    HDEL
    HDEL key field[field...] 删除对象的一个或几个属性域,不存在的属性将被忽略
    HEXISTS
    HEXISTS key field 查看对象是否存在该属性域
    HGET
    HGET key field 获取对象中该field属性域的值
    HGETALL
    HGETALL key 获取对象的所有属性域和值
    HINCRBY
    HINCRBY key field value 将该对象中指定域的值增加给定的value,原子自增操作,只能是integer的属性值可以使用
    HINCRBYFLOAT
    HINCRBYFLOAT key field increment 将该对象中指定域的值增加给定的浮点数
    HKEYS
    HKEYS key 获取对象的所有属性字段
    HVALS
    HVALS key 获取对象的所有属性值
    HLEN
    HLEN key 获取对象的所有属性字段的总数
    HMGET
    HMGET key field[field...] 获取对象的一个或多个指定字段的值
    HSET
    HSET key field value 设置对象指定字段的值
    HMSET
    HMSET key field value [field value ...] 同时设置对象中一个或多个字段的值
    HSETNX
    HSETNX key field value 只在对象不存在指定的字段时才设置字段的值
    HSTRLEN
    HSTRLEN key field 返回对象指定field的value的字符串长度,如果该对象或者field不存在,返回0.
    HSCAN
    HSCAN key cursor [MATCH pattern] [COUNT count] 类似SCAN命令
    
  • 使用示例

    127.0.0.1:6379> hset person name jack
    (integer) 1
    127.0.0.1:6379> hset person age 20
    (integer) 1
    127.0.0.1:6379> hset person sex famale
    (integer) 1
    127.0.0.1:6379> hgetall person
    1) "name"
    2) "jack"
    3) "age"
    4) "20"
    5) "sex"
    6) "famale"
    127.0.0.1:6379> hkeys person
    1) "name"
    2) "age"
    3) "sex"
    127.0.0.1:6379> hvals person
    1) "jack"
    2) "20"
    3) "famale"
    

Set(集合)

可以理解为一堆值不重复的列表,类似数学领域中的集合概念,且Redis也提供了针对集合的求交集、并集、差集等操作。

set 的内部实现是一个 value永远为null的HashMap,实际就是通过计算hash的方式来快速排重的,这也是set能提供判断一个成员是否在集合内的原因。

  • 应用场景

Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。

  • 相关命令

    SADD
    SADD key member [member ...] 添加一个或者多个元素到集合(set)里
    SACRD
    SCARD key 获取集合里面的元素数量
    SDIFF
    SDIFF key [key ...] 获得队列不存在的元素
    SDIFFSTORE
    SDIFFSTORE destination key [key ...] 获得队列不存在的元素,并存储在一个关键的结果集
    SINTER
    SINTER key [key ...] 获得两个集合的交集
    SINTERSTORE
    SINTERSTORE destination key [key ...] 获得两个集合的交集,并存储在一个集合中
    SISMEMBER
    SISMEMBER key member 确定一个给定的值是一个集合的成员
    SMEMBERS
    SMEMBERS key 获取集合里面的所有key
    SMOVE
    SMOVE source destination member 移动集合里面的一个key到另一个集合
    SPOP
    SPOP key [count] 获取并删除一个集合里面的元素
    SRANDMEMBER
    SRANDMEMBER key [count] 从集合里面随机获取一个元素
    SREM
    SREM key member [member ...] 从集合里删除一个或多个元素,不存在的元素会被忽略
    SUNION
    SUNION key [key ...] 添加多个set元素
    SUNIONSTORE
    SUNIONSTORE destination key [key ...] 合并set元素,并将结果存入新的set里面
    SSCAN
    SSCAN key cursor [MATCH pattern] [COUNT count] 迭代set里面的元素
    
  • 使用示例

    redis> SADD myset "Hello"
    (integer) 1
    redis> SADD myset "World"
    (integer) 1
    redis> SMEMBERS myset
    1) "World"
    2) "Hello"
    redis> SADD myset "one"
    (integer) 1
    redis> SISMEMBER myset "one"
    (integer) 1
    redis> SISMEMBER myset "two"
    (integer) 0
    

Sorted Set(有序集合)

Redis有序集合类似Redis集合,不同的是增加了一个功能,即集合是有序的。一个有序集合的每个成员带有分数,用于进行排序。

Redis有序集合添加、删除和测试的时间复杂度均为O(1)(固定时间,无论里面包含的元素集合的数量)。列表的最大长度为2^32- 1元素(4294967295,超过40亿每个元素的集合)。

Redis sorted set的内部使用HashMap和跳跃表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

  • 使用场景

Redis sorted set的使用场景与set类似,区别是set不是自动有序的,而sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set数据结构.

比如用户的积分排行榜需求就可以通过有序集合实现。还有上面介绍的使用List实现轻量级的消息队列,其实也可以通过Sorted Set实现有优先级或按权重的队列。

  • 相关命令

    ZADD
    ZADD key score1 member1 [score2 member2] 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数
    ZCARD
    ZCARD key 得到的有序集合成员的数量
    ZCOUNT
    ZCOUNT key min max 计算一个有序集合成员与给定值范围内的分数
    ZINCRBY
    ZINCRBY key increment member 在有序集合增加成员的分数
    ZINTERSTORE
    ZINTERSTORE destination numkeys key [key ...] 多重交叉排序集合,并存储生成一个新的键有序集合。
    ZLEXCOUNT
    ZLEXCOUNT key min max 计算一个给定的字典范围之间的有序集合成员的数量
    ZRANGE
    ZRANGE key start stop [WITHSCORES] 由索引返回一个成员范围的有序集合(从低到高)
    ZRANGEBYLEX
    ZRANGEBYLEX key min max [LIMIT offset count]返回一个成员范围的有序集合(由字典范围)
    ZRANGEBYSCORE
    ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] 返回有序集key中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员,有序集成员按 score 值递增(从小到大)次序排列
    ZRANK
    ZRANK key member 确定成员的索引中有序集合
    ZREM
    ZREM key member [member ...] 从有序集合中删除一个或多个成员,不存在的成员将被忽略
    ZREMRANGEBYLEX
    ZREMRANGEBYLEX key min max 删除所有成员在给定的字典范围之间的有序集合
    ZREMRANGEBYRANK
    ZREMRANGEBYRANK key start stop 在给定的索引之内删除所有成员的有序集合
    ZREMRANGEBYSCORE
    ZREMRANGEBYSCORE key min max 在给定的分数之内删除所有成员的有序集合
    ZREVRANGE
    ZREVRANGE key start stop [WITHSCORES] 返回一个成员范围的有序集合,通过索引,以分数排序,从高分到低分
    ZREVRANGEBYSCORE
    ZREVRANGEBYSCORE key max min [WITHSCORES] 返回一个成员范围的有序集合,以socre排序从高到低
    ZREVRANK
    ZREVRANK key member 确定一个有序集合成员的索引,以分数排序,从高分到低分
    ZSCORE
    ZSCORE key member 获取给定成员相关联的分数在一个有序集合
    ZUNIONSTORE
    ZUNIONSTORE destination numkeys key [key ...] 添加多个集排序,所得排序集合存储在一个新的键
    ZSCAN
    ZSCAN key cursor [MATCH pattern] [COUNT count] 增量迭代排序元素集和相关的分数
    
  • 使用示例

    redis 127.0.0.1:6379> zadd dbs 100 redis
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 98 memcached
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 99 mongodb
    (integer) 1
    redis 127.0.0.1:6379> zadd dbs 99 leveldb
    (integer) 1
    redis 127.0.0.1:6379> zcard dbs
    (integer) 4
    redis 127.0.0.1:6379> zcount dbs 10 99
    (integer) 3
    redis 127.0.0.1:6379> zrank dbs leveldb
    (integer) 1
    redis 127.0.0.1:6379> zrank dbs other
    (nil)
    redis 127.0.0.1:6379> zrangebyscore dbs 98 100
    1) "memcached"
    2) "leveldb"
    3) "mongodb"
    4) "redis"
    

kafka集群

Kafka初识

  • Kafka使用背景

    在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:

    • 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
    • 我想对用户的搜索关键词进行统计,分析出当前的流行趋势
    • 有些数据,存储数据库浪费,直接存储硬盘效率又低
  • Kafka的定义

    它是一个分布式消息系统,具有高水平扩展和高吞吐量的特点。

Kafka相关概念

  • AMQP协议:是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。

  • 一些基本的概念

    • 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
    • 生产者:(Producer):向broker发布消息的应用程序
    • AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

  • Kafka架构

    生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:

    kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:

    一些基本的概念:

    • 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
    • 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

      kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

      工作图:

      备份(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

Kafka集群搭建

  • 软件环境

    • 已经搭建好的zookeeper集群(文章最后另写)
    • 软件版本kafka_2.11-0.9.0.1.tgz
    • 软件版本kafka_2.11-0.9.0.1.tgz
  • 创建目录并下载安装软件

    #下载软件
    wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
    #解压软件
    tar -xvf kafka_2.11-0.9.0.1.tgz
    
  • 修改配置文件

    主要关注config/server.properties

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=9092 #当前kafka对外提供服务的端口默认是9092
    host.name = 120.76.230.134 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题
    #申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置
    listeners = PLAINTEXT://120.76.230.134:9092 
    advertised.host.name = 120.76.230.134
    advertised.listeners=PLAINTEXT://120.76.230.134:9092
    log.dirs=/data/kafka/kafkalogs  #消息存放的目录
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    zookeeper.connect=120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 #zookeeper配置地址
    
每个机子的broker.id、host.name都是不一样的,主要是关注以上几项参数配置。
  • 启动Kafka集群并测试

    启动服务

    #从后台启动Kafka集群(3台都需要启动)
    cd /data/kafka/kafka_2.11-0.11.0.1/bin #进入到kafka的bin目录 
    ./kafka-server-start.sh -daemon ../config/server.properties
    

    检查服务是否启动

    #执行命令jps
    22246 Bootstrap
    26393 Kafka
    

    创建Topic来验证是否创建成功

    #创建Topic
    ./kafka-topics.sh --create --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181 --replication-factor 2 --partitions 1 --topic test
    
    #解释
    --replication-factor 2   #复制两份
    --partitions 1 #创建1个分区
    --topic #主题为test
    
    #在一台服务器上创建一个发布者
    #创建一个broker,发布者
    ./kafka-console-producer.sh --broker-list 120.76.230.134:9092 --topic test
    
    #在一台服务器上创建一个订阅者
    ./kafka-console-consumer.sh --bootstrap-server 120.76.230.134:9092 --topic test --from-beginning
    

    测试(在发布者那里发布消息看看订阅者那里是否能正常收到~):

  • 其他命令

    大部分命令可以去官方文档查看

    • 查看topic

      ./kafka-topics.sh --list --zookeeper 120.76.230.134:5181,120.76.193.192:5181,120.25.239.155:5181
      #就会显示我们创建的所有topic
      

      kafka集群搭建完毕

zookeeper集群部署

  • 为什么是基数

    • 集群必须有一半以上的机器统一才能成为leader
    • 一半的机器挂掉 整个集群挂掉
  • 去官网下载好zookeeper包,解压

  • 修改配置/conf/zoo.cfg文件

    # 基本事件单元,以毫秒为单位,用来控制心跳和超时
    tickTime=2000
    # 参数设定了允许所有跟随者与领导者进行连接并同步的时间,如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,进行另一次的领导选举
    # 如果zk集群环境数量确实很大,同步数据的时间会变长,因此这种情况下可以适当调大该参数
    #集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)
    initLimit=10
    # 参数设定了允许一个跟随者与一个领导者进行同步的时间,如果在设定的时间段内,跟随者未完成同步,它将会被集群丢弃
    # 所有关联到这个跟随者的客户端将连接到另外一个跟随着
    syncLimit=5
    # 存储持久数据的本地文件系统位置
    dataDir=/data/zookeeper-3.4.9/data
    dataLogDir=/data/zookeeper-3.4.9/logs
    # 监听客户端连接的端口
    clientPort=5181
    
    server.1=120.76.230.134:2888:3888
    server.2=120.76.193.192:2888:3888
    server.3=120.25.239.155:2888:3888
    
    #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
    #120.76.230.134为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
    

    zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。

  • 创建myid文件

    /data/zookeeper-3.4.9/data目录下要配置myid文件,内容跟server.id相对应,比如:1

    ./zkServer.sh star启动zookeeper,每台都要启动一遍

  • 重要配置说明

    • myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

    • zoo.cfg 文件是zookeeper配置文件 在conf目录里。

快速排序

快速排序

介绍

在数组中找一个支点(任意),经过一趟排序后,支点左边的数都要比支点小,支点右边的数都要比支点大!

现在我们有一个数组:int arr[]={1,4,5,67,2,7,8,6,9,44};

经过一趟排序之后,如果我选择数组中间的数作为支点:7(任意的),那么第一趟排序后的结果是这样的:{1,4,5,6,2,7,8,67,9,44}

那么就实现了支点左边的数比支点小,支点右边的数比支点大

递归分析与代码实现

现在我们的数组是这样的:{1,4,5,6,2,7,8,67,9,44},既然我们比7小的在左边,比7大的在右边,那么我们只要将”左边“的排好顺序,又将”右边“的排好序,那整个数组是不是就有序了?

回顾一下递归:”左边“的排好顺序,”右边“的排好序,跟我们第一趟排序的做法是一样的。

只不过是参数不一样:第一趟排序是任选了一个支点,比支点小的在左边,比支点大的在右边。那么,我们想要”左边“的排好顺序,只要在”左边“部分找一个支点,比支点小的在左边,比支点大的在右边。

递归出口也很容易找到:如果数组只有一个元素时,那么就不用排序了

代码如下:

public static void main(String[] args) {
    int[] arr = {1, 4, 5, 67, 2, 7, 8, 6, 9, 44};

    quickSort(arr, 0, 9);

    System.out.println(arr);


}

/**
 * 快速排序
 *
 * @param arr
 * @param L   指向数组第一个元素
 * @param R   指向数组最后一个元素
 */
public static void quickSort(int[] arr, int L, int R) {
    int i = L;
    int j = R;

    //支点
    int pivot = arr[(L + R) / 2];

    //左右两端进行扫描,只要两端还没有交替,就一直扫描
    while (i <= j) {

        //寻找直到比支点大的数
        while (pivot > arr[i])
            i++;

        //寻找直到比支点小的数
        while (pivot < arr[j])
            j--;

        //此时已经分别找到了比支点小的数(右边)、比支点大的数(左边),它们进行交换
        if (i <= j) {
            int temp = arr[i];
            arr[i] = arr[j];
            arr[j] = temp;
            i++;
            j--;
        }
    }
    //上面一个while保证了第一趟排序支点的左边比支点小,支点的右边比支点大了。


    //“左边”再做排序,直到左边剩下一个数(递归出口)
    if (L < j)
        quickSort(arr, L, j);

    //“右边”再做排序,直到右边剩下一个数(递归出口)
    if (i < R)
        quickSort(arr, i, R);
}

冒泡排序

冒泡排序就这么简单

排序对我们来说是一点也不陌生了,当你打Dota的时候也有天梯分,从高往下数,这个排名是有规律的,就是一种排序。

冒泡排序的实现

冒泡排序是一种简单的排序算法。它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。这个算法的名字由来是因为越大的元素会经由交换慢慢“浮”到数列的顶端,故名冒泡排序。

算法描述:

  • i从0开始,i与i+1比较,如果i>i+1,那么就互换
  • i不断增加,直到i<n-1(n是数组元素的个数,n-1是数组已经最后一个元素) ,一趟下来,可以让数组元素中最大值排在数组的最后面

从最简单开始,首先我们创建一个数组,该数组有6位数字:

int[] arrays = {8, 3, 2, 6, 7, 9};

6位数的数组需要5躺排序的,每躺排序之后次数减1(因为前一趟已经把前一趟数的最大值确定下来了)!

于是我们可以根据for循环和变量将上面的代码进行排序:

int temp;

//外层循环是排序的趟数
for (int i = 0; i < arrays.length - 1 ; i++) {

    //内层循环是当前趟数需要比较的次数
    for (int j = 0; j < arrays.length - i - 1; j++) {

        //前一位与后一位与前一位比较,如果前一位比后一位要大,那么交换
        if (arrays[j] > arrays[j + 1]) {
            temp = arrays[j];
            arrays[j] = arrays[j + 1];
            arrays[j + 1] = temp;
        }
    }
}

冒泡排序优化

从上面的例子我们可以看出来,如果数据足够乱的情况下是需要经过5躺比较才能将数组完整排好序。但是我们在第二躺比较后就已经得到排好序的数组了。

但是,我们的程序在第二趟排序后仍会执行第三趟、第四趟排序。这是没有必要的,因此我们可以对其进行优化一下:

  • 如果在某躺排序中没有发生交换位置,那么我们可以认为该数组已经排好序了
    • 因为我们每趟排序的目的就是将当前趟最大的数置换到对应的位置上,没有发生置换说明就已经排好序了

代码如下:

//装载临时变量
int temp;

//记录是否发生了置换, 0 表示没有发生置换、 1 表示发生了置换
int isChange;

//外层循环是排序的趟数
for (int i = 0; i < arrays.length - 1; i++) {

    //每比较一趟就重新初始化为0
    isChange = 0;

    //内层循环是当前趟数需要比较的次数
    for (int j = 0; j < arrays.length - i - 1; j++) {

        //前一位与后一位与前一位比较,如果前一位比后一位要大,那么交换
        if (arrays[j] > arrays[j + 1]) {
            temp = arrays[j];
            arrays[j] = arrays[j + 1];
            arrays[j + 1] = temp;

            //如果进到这里面了,说明发生置换了
            isChange = 1;

        }
    }
    //如果比较完一趟没有发生置换,那么说明已经排好序了,不需要再执行下去了
    if (isChange == 0) {
        break;
    }
}

递归

递归介绍

递归在程序语言中简单的理解是:方法自己调用自己。

递归其实和循环是非常像的,循环都可以改写成递归,递归未必能改写成循环,这是一个充分不必要的条件。

  • 那么,有了循环,为什么还要用递归呢??在某些情况下(费波纳切数列,汉诺塔),使用递归会比循环简单很多很多

想要用递归必须知道两个条件:

  • 递归出口(终止递归的条件)
  • 递归表达式(规律)

技巧:在递归中常常是将问题切割成两个部分(1和整体的思想),这能够让我们快速找到递归表达式(规律)

求和

如果我们使用for循环来进行求和1+2+3+4+….+100,那是很简单的:

int sum = 0;
for (int i = 1; i <= 100; i++) {

    sum = sum + i;

}
System.out.println("公众号:Java3y:" + sum);

for循环都可以使用递归来进行改写,而使用递归必须要知道两个条件:

  • 递归出口
  • 递归表达式(规律)

我们来找出它的规律:1+2+3+…+n,这是一个求和的运算,那么我们可以假设X=1+2+3+…+n,可以将1+2+3+…+(n-1)看成是一个整体。而这个整体做的事又和我们的初始目的(求和)相同。以我们的高中数学知识我们又可以将上面的式子看成X=sum(n-1)+n

我们找到我们的递归表达式(规律),它就是sum(n-1)+n,那递归出口呢,这个题目的递归出口就有很多了,我列举一下:

如果n=1时,那么就返回1
如果n=2时,那么就返回(1+2)=3
如果n=3时,那么就返回(1+2+3)=6

递归表达式和递归出口我们都找到了,下面就代码演示:

递归出口为1:

public static void main(String[] args) {
    System.out.println(sum(100));
}

/**
 *
 * @param n 要加到的数字,比如题目的100
 * @return
 */
public static int sum(int n) {

    if (n == 1) {
        return 1;
    } else {
        return sum(n - 1) + n;
    }
}

数组内部的最大值

使用的是循环,那么我们通常这样实现:

int[] arrays = {2, 3, 4, 5, 1, 5, 2, 9, 5, 6, 8, 3, 2};

//将数组的第一个假设是最大值
int max = arrays[0];

for (int i = 1; i < arrays.length; i++) {

    if (arrays[i] > max) {
        max = arrays[i];
    }
}

System.out.println(max);

如果我们用递归的话,那怎么用弄呢?首先还是先要找到递归表达式(规律)和递归出口

  • 我们又可以运用1和整体的思想来找到规律
    • 将数组第一个数->2与数组后面的数->{3, 4, 5, 1, 5, 2, 9, 5, 6, 8, 3, 2}进行切割,将数组后面的数看成是一个整体X={3, 4, 5, 1, 5, 2, 9, 5, 6, 8, 3, 2},那么我们就可以看成是第一个数和一个整体进行比较if(2>X) return 2 else(2<X) return X
    • 而我们要做的就是找出这个整体的最大值与2进行比较。找出整体的最大值又是和我们的初始目的(找出最大值)是一样的
    • 也就可以写成if( 2>findMax() )return 2 else return findMax()
  • 递归出口,如果数组只有1个元素时,那么这个数组最大值就是它了。

使用到数组的时候,我们通常为数组设定左边界和右边界,这样比较好地进行切割。

  • L表示左边界,往往表示的是数组第一个元素,也就会赋值为0(角标为0是数组的第一个元素)
  • R表示右边界,往往表示的是数组的长度,也就会赋值为arrays.length-1(长度-1在角标中才是代表最后一个元素)

那么我们递归的写法就是:

public static void main(String[] args) {

      int[] arrays = {2, 3, 4, 5, 1, 5, 2, 9, 5, 6, 8, 3, 1};

      System.out.println(findMax(arrays, 0, arrays.length - 1));

  }


  /**
   * 递归,找出数组最大的值
   * @param arrays 数组
   * @param L      左边界,第一个数
   * @param R      右边界,数组的长度
   * @return
   */

  public static int findMax(int[] arrays, int L, int R) {

      //如果该数组只有一个数,那么最大的就是该数组第一个值了
      if (L == R) {
          return arrays[L];
      } else {

          int a = arrays[L];
          int b = findMax(arrays, L + 1, R);//找出整体的最大值

          if (a > b) {
              return a;
          } else {
              return b;
          }
      }

  }

斐波那契数列

菲波那切数列长这个样子:{1 1 2 3 5 8 13 21 34 55….. n }

数学好的同学可能很容易就找到规律了:前两项之和等于第三项

如果让我们求出第n项是多少,那么我们就可以很简单写出对应的递归表达式了:Z = (n-2) + (n-1)

递归出口在本题目是需要有两个的,因为它是前两项加起来才得出第三项的值

同样地,那么我们的递归出口可以写成这样:

if(n==1) retrun 1;
if(n==2) return 2;

看一下完整的代码:

public static void main(String[] args) {

    int[] arrays = {1, 1, 2, 3, 5, 8, 13, 21};
    //bubbleSort(arrays, 0, arrays.length - 1);

    int fibonacci = fibonacci(10);
    System.out.println(fibonacci);

}

public static int fibonacci(int n) {
    if (n == 1) {
        return 1;
    } else if (n == 2) {
        return 1;
    } else {
        return (fibonacci(n - 1) + fibonacci(n - 2));
    }

}

总结

要使用递归首先要知道两件事:

  • 递归出口(终止递归的条件)
  • 递归表达式(规律)