这篇博客是我周末花了两天时间摸索着写出来的,负载均衡以及Kafka的一些策略简直就是天坑。还好有周杰伦的“嚯嚯嚯嚯”陪着我!
之所以产出这一博客,是因为公司的项目上线了集群之后出现了问题。
大佬排查之后,发现我写的代码存在一点问题,所以就趁周末时间,进行了修改与测试,产出了meethigher/distributed-hazelcast-lock: 基于Hazelcast及Kafka实现的集群分布式锁和负载均衡
一、分布式锁
参考文章
先来看下流程图
准备三台节点,每台节点上面都有相同的定时任务,将三台节点部署成一个集群,定时任务同时启动,经过分布式锁的过滤,每个任务只有拿到锁的那台机器进行执行。
HazelcastConfig.java
1 2 3 4 5 6 7
| @Configuration public class HazelcastConfig { @Bean public HazelcastInstance hazelcastInstance() { return Hazelcast.newHazelcastInstance(); } }
|
TaskExecutorConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Configuration @EnableAsync public class TaskExecutorConfig { @Bean("taskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(8); executor.setKeepAliveSeconds(60); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("task-thread-"); return executor; } }
|
TaskEnum.java
1 2 3 4 5 6 7 8 9 10 11 12 13
| public enum TaskEnum { FIRST(0, "一级任务"), SECOND(1, "二级任务"), THIRD(2, "三级任务"), FORTH(3, "四级任务"); public final int code; public final String desc;
TaskEnum(int code, String desc) { this.code = code; this.desc = desc; } }
|
DailyTask.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Component public class DailyTask { Logger log = LoggerFactory.getLogger(DailyTask.class); @Autowired HazelcastInstance hazelcastInstance;
private final static String API_MONITOR_TASK_MAP_NAME = "api_monitor_task_map";
@Async("taskExecutor") @Scheduled(cron = "0 10 14 * * ? ") public void firstTask() { doTask(TaskEnum.FIRST); }
@Async("taskExecutor") @Scheduled(cron = "0 10 14 * * ?") public void secondTask() { doTask(TaskEnum.SECOND); }
@Async("taskExecutor") @Scheduled(cron = "0 10 14 * * ?") public void thirdTask() { doTask(TaskEnum.THIRD); }
@Async("taskExecutor") @Scheduled(cron = "0 10 14 * * ?") public void forthTask() { doTask(TaskEnum.FORTH); }
public void doTask(TaskEnum task) { if (!ObjectUtils.isEmpty(task)) { IMap<Integer, String> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME); map.put(task.code, task.desc); boolean canLocked = map.tryLock(task.code); if(canLocked){ System.out.println("本次抢到锁,执行任务..."); log.info(task.desc); map.unlock(task.code); }else{ System.out.println("本次不抢锁!"); } } } }
|
最终执行结果
缺点:同一时刻的任务,有可能全部被同一台机器抢到,其他两台机器会空闲,这种极限情况下的分配存在问题。不过我目前的项目同时只有一条任务,够用。
二、负载均衡
参考文章
先看下流程图
我是准备了两台机器提供Kafka集群。具体配置过程
- 两台机器均启动zookeeper,保持zookeeper默认配置即可
- 配置Kafka的配置文件。
- 每台kafka的brokerId保持唯一。
- 每台kafka的zookeeper.connect配置为zookeeper集群。
- 每台kafka的advertised.listeners配置PLAINTEXT://当前节点ip:9092,好像不用配也可以。
通过Kafka实现的负载均衡可以解决上面的那个问题,哪怕所有锁都被他自己抢到了,也无所谓,也就是抢到锁的节点只需要将工作内容抛给Kafka,经过Kafka,然后均衡地分配给下面的各个节点进行消费,从而达到负载均衡。
运行结果
三、存在的问题
demo里,上面两个例子,就已经很好很好了。达到了我满意并想要的效果。
实际上还是存在问题的。
在大型项目中,比如公司里的项目,就出现了问题。多节点启动之后,虽然定时任务都是同时的,但是由于机器性能的差异,导致会有延迟。
类似于下图。
还有一个问题,没有用try..finally释放锁,出现了问题。大概类似于上面的问题,没排查出原因来,最后处于妥善,还是用finally吧。
如上图所示,不加finally来释放锁时,他会等待别人释放锁,再去拿锁,这都给我整蒙了。
最后完善版的版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void doTask(TaskEnum task) { if (!ObjectUtils.isEmpty(task)) { IMap<Integer, Long> map = hazelcastInstance.getMap(API_MONITOR_TASK_MAP_NAME); Long temp = map.get(task.code); Long lastExecTime = ObjectUtils.isEmpty(temp) ? 0L : temp; Long currentTime = System.currentTimeMillis(); Long intervalTime = currentTime - lastExecTime; log.info("【" + task.desc + "】,lastExecTime=" + lastExecTime + ",currentTime=" + currentTime + ",intervalTime=" + intervalTime); boolean canLocked = map.tryLock(task.code); if (intervalTime > DISTRIBUTE_TASK_TIME_INTERVAL && canLocked) { log.info("抢到【" + task.desc + "】分发权限!"); try { map.put(task.code, currentTime); doMonitorTask(task); } catch (Exception e) { e.printStackTrace(); } finally { map.unlock(task.code); log.info("释放【" + task.desc + "】"); } } else { log.info("没有抢到【" + task.desc + "】权限"); } } }
|