言成言成啊 | Kit Chen's Blog

基于Hazelcast及Kafka实现的集群分布式锁和负载均衡

发布于2021-07-17 14:34:47,更新于2021-08-08 22:57:50,标签:java open hazelcast  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

这篇博客是我周末花了两天时间摸索着写出来的,负载均衡以及Kafka的一些策略简直就是天坑。还好有周杰伦的“嚯嚯嚯嚯”陪着我!

之所以产出这一博客,是因为公司的项目上线了集群之后出现了问题。

大佬排查之后,发现我写的代码存在一点问题,所以就趁周末时间,进行了修改与测试,产出了meethigher/distributed-hazelcast-lock: 基于Hazelcast及Kafka实现的集群分布式锁和负载均衡

一、分布式锁

参考文章

先来看下流程图

准备三台节点,每台节点上面都有相同的定时任务,将三台节点部署成一个集群,定时任务同时启动,经过分布式锁的过滤,每个任务只有拿到锁的那台机器进行执行。

HazelcastConfig.java

1
2
3
4
5
6
7
@Configuration
public class HazelcastConfig {
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
}

TaskExecutorConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@EnableAsync
public class TaskExecutorConfig {
@Bean("taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(10);
//最大线程数
executor.setMaxPoolSize(20);
//队列的长度
executor.setQueueCapacity(8);
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(60);
//线程是对拒绝任务的处理策略,也就是没有线程可用的时候
//CallerRunsPolicy在任务被拒绝添加后,会在调用execute方法的的线程来执行被拒绝的任务。除非executor被关闭,否则任务不会被丢弃。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//任务执行器的前缀,打印日志时输出
executor.setThreadNamePrefix("task-thread-");
return executor;
}
}

TaskEnum.java

1
2
3
4
5
6
7
8
9
10
11
12
13
public enum TaskEnum {
FIRST(0, "一级任务"),
SECOND(1, "二级任务"),
THIRD(2, "三级任务"),
FORTH(3, "四级任务");
public final int code;
public final String desc;

TaskEnum(int code, String desc) {
this.code = code;
this.desc = desc;
}
}

DailyTask.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Component
public class DailyTask {
Logger log = LoggerFactory.getLogger(DailyTask.class);
@Autowired
HazelcastInstance hazelcastInstance;


private final static String API_MONITOR_TASK_MAP_NAME = "api_monitor_task_map";

@Async("taskExecutor")
@Scheduled(cron = "0 10 14 * * ? ")
public void firstTask() {
doTask(TaskEnum.FIRST);
}


@Async("taskExecutor")
@Scheduled(cron = "0 10 14 * * ?")
public void secondTask() {
doTask(TaskEnum.SECOND);
}

@Async("taskExecutor")
@Scheduled(cron = "0 10 14 * * ?")
public void thirdTask() {
doTask(TaskEnum.THIRD);
}

@Async("taskExecutor")
@Scheduled(cron = "0 10 14 * * ?")
public void forthTask() {
doTask(TaskEnum.FORTH);
}


public void doTask(TaskEnum task) {
if (!ObjectUtils.isEmpty(task)) {
IMap<Integer, String> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME);
map.put(task.code, task.desc);
//判断任务是否能锁
boolean canLocked = map.tryLock(task.code);
if(canLocked){
System.out.println("本次抢到锁,执行任务...");
log.info(task.desc);
map.unlock(task.code);
}else{
System.out.println("本次不抢锁!");
}
}
}
}

最终执行结果

缺点:同一时刻的任务,有可能全部被同一台机器抢到,其他两台机器会空闲,这种极限情况下的分配存在问题。不过我目前的项目同时只有一条任务,够用。

二、负载均衡

参考文章

先看下流程图

我是准备了两台机器提供Kafka集群。具体配置过程

  1. 两台机器均启动zookeeper,保持zookeeper默认配置即可
  2. 配置Kafka的配置文件。
    • 每台kafka的brokerId保持唯一。
    • 每台kafka的zookeeper.connect配置为zookeeper集群。
    • 每台kafka的advertised.listeners配置PLAINTEXT://当前节点ip:9092,好像不用配也可以。

通过Kafka实现的负载均衡可以解决上面的那个问题,哪怕所有锁都被他自己抢到了,也无所谓,也就是抢到锁的节点只需要将工作内容抛给Kafka,经过Kafka,然后均衡地分配给下面的各个节点进行消费,从而达到负载均衡。

运行结果

三、存在的问题

demo里,上面两个例子,就已经很好很好了。达到了我满意并想要的效果。

实际上还是存在问题的。

在大型项目中,比如公司里的项目,就出现了问题。多节点启动之后,虽然定时任务都是同时的,但是由于机器性能的差异,导致会有延迟。

类似于下图。

还有一个问题,没有用try..finally释放锁,出现了问题。大概类似于上面的问题,没排查出原因来,最后处于妥善,还是用finally吧。

如上图所示,不加finally来释放锁时,他会等待别人释放锁,再去拿锁,这都给我整蒙了。

最后完善版的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void doTask(TaskEnum task) {
if (!ObjectUtils.isEmpty(task)) {
IMap<Integer, Long> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME);
Long temp = map.get(task.code);
Long lastExecTime = ObjectUtils.isEmpty(temp) ? 0L : temp;
Long currentTime = System.currentTimeMillis();
Long intervalTime = currentTime - lastExecTime;
log.info("【" + task.desc + "】,lastExecTime=" + lastExecTime + ",currentTime=" + currentTime + ",intervalTime=" + intervalTime);
//判断任务是否能锁
boolean canLocked = map.tryLock(task.code);
if (intervalTime > DISTRIBUTE_TASK_TIME_INTERVAL && canLocked) {
log.info("抢到【" + task.desc + "】分发权限!");
try {
map.put(task.code, currentTime);
doMonitorTask(task);
} catch (Exception e) {
e.printStackTrace();
} finally {
map.unlock(task.code);
log.info("释放【" + task.desc + "】");
}
} else {
log.info("没有抢到【" + task.desc + "】权限");
}
}
}
发布:2021-07-17 14:34:47
修改:2021-08-08 22:57:50
链接:https://meethigher.top/blog/2021/distributed-hazelcast-lock/
标签:java open hazelcast 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏