# 一、什么是?
一个阻塞的队列。继承自AbstractQueue,同时AbstractQueue又继承了AbstractColletcion并实现了Queue接口,所以它间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据,是一个基于数组的阻塞队列。
ArrayBlockingQueue是有边界值的,在创建ArrayBlockingQueue时就要确定好该队列的大小,一旦创建,该队列大小不可更改,容量大小是基于int类型的,即[1,Integer.MAX_VALUE]。内部的全局锁是使用的ReentrantLock。
# 二、在哪用?
- 先进先出(队首是最旧的元素;队尾是新添加的元素)
- 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
- 队列不支持空元素
# 三、怎么用?
# 1、构造方法
- capacity:队列初始化大小
- fair:表示该队列中的可重入锁是否公平,默认为false
- c:添加的集合
ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
1
2
3
2
3
# 2、add方法
在队列的尾部添加元素,返回true。当队列满的时候会抛出IllegalStateException异常
boolean add(E e)
1
# 3、offer方法
在队列的尾部添加元素,如果添加成功返回true,否则返回false
- e:添加的元素
- timeout:最多等待的时长
- unit:时间单位
boolean offer(E e)
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
1
2
2
# 4、put方法
在队列的尾部添加元素,如果队列已满,则会阻塞,等待队列有空闲位置,该方法可以被打断。
void put(E e) throws InterruptedException
1
# 5、take方法
获取队列中的元素,如果没有则会被阻塞
E take() throws InterruptedException
1
# 6、poll方法
获取队列中队首的元素,获取后钙元素就会移除该队列。
- timeout:等待时长
- unit:时间单位
//立即返回
E poll()
//如果不能立即取出,则等待一定时间
E poll(long timeout, TimeUnit unit) throws InterruptedException
1
2
3
4
2
3
4
# 7、peek方法
读取队列中队首的元素,不会删除该元素。
E peek()
1
# 8、remove方法
删除队列中指定元素。
boolean remove(Object o)
1
# 9、contains方法
查询队列中是否存在某个严肃
boolean contains(Object o)
1
10、remainingCapacity方法
返回队列中剩余空闲位置
int remainingCapacity()
1
# 11、drainTo
将队列中的元素排到指定集合中去
- c:目标集合
- maxElements:最多排几个元素
int drainTo(Collection<? super E> c)
int drainTo(Collection<? super E> c, int maxElements)
1
2
2
# 四、实际开发
/** 请求数据队列,用于存储请求 */
public static final ArrayBlockingQueue<RequestChatMessage> REQUEST_DATA_QUEUE = new ArrayBlockingQueue<>(2000);
// 利用while-true机制建立简单定时队列消费,同时利用Redis控制并发
@Async("commonThreadExecutor")
@Override
public void consumerModelDataQueue() {
while (true) {
try {
// 加锁10分钟
boolean lock = redisUtils.getLock(RedisKeyConstants.MODEL_QUEUE_CONSUMER_LIMIT, RedisKeyConstants.MODEL_QUEUE_CONSUMER_LIMIT, 600);
if (lock) {
RequestChatMessage message = QueueUtils.MODEL_CALL_QUEUE.poll();
if (Objects.nonNull(message)) {
// 缓存当前消费消息信息
BaseChatMessage chatMessage = new BaseChatMessage();
BeanUtils.copyProperties(message, chatMessage);
redisUtils.set(RedisKeyConstants.MODEL_QUEUE_CONSUMER_CURRENT_MSG_KEY, JSON.toJSONString(chatMessage));
// 获取策略处理类,存储消息
SparkUtil.sendRequest(message);
}
}
// log.info("consumerModelDataQueue handler running, 当前时间:{}", DateUtil.date());
} catch (Exception e) {
log.error("consumerModelDataQueue error, ", e);
} finally {
// 休眠200毫秒,勿删
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 五、应用注意
如果当前代码基于当前项目启动环境,遇到需要重启项目、项目更新发布等情况,需要设计好消息保存渠道及相应初始化加载,避免数据丢失。