摘要

简单记录下,在实际工作中使用Hazelcast的心得。

正文

参考

  1. Quickstart
  2. GitHub - hazelcast/hazelcast-code-samples: Hazelcast Code Samples

以下所有的操作,都是基于Hazelcast完成的,在maven项目里面,需要引入该依赖。

xml
1
2
3
4
5
<dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast-all</artifactId>
    <version>4.2.2</version>
</dependency>

如果不对Hazelcast进行配置的话,会使用默认配置hazelcast-default.xml。可以在项目启动时查看日志输出。

一、分布式计算

主要记录三个常用的任务类型

  1. IExecutorService:分布式任务,提交的任务可以自动在集群中分配执行。
  2. DurableExecutorService:分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。
  3. IScheduledExecutorService:分布式定时任务,可以设置定时循环,也可以设置延时执行

根据最近使用Hazelcast的心得,带I的一般就是表示分布式,如IMap、IExceutorService

一个HazelcastInstance实例,就代表一个节点,通过配置可以修改节点的加入方式。

java
1
Hazelcast.newHazelcastInstance();

如果想要测试多节点任务分配,可以通过以下代码,另起一个Hazelcast节点。

java
1
2
3
4
5
6
public class SlaveMember {

    public static void main(String[] args) {
        Hazelcast.newHazelcastInstance();
    }
}

1.1 分布式任务

分布式任务,提交的任务可以自动在集群中分配执行。

创建任务Runnable

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class EchoTask implements Runnable, Serializable {

    private final String msg;

    public EchoTask(String msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Echo: " + msg);
    }
}

执行任务

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MasterMember {

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        ExecutorConfig executorConfig = config.getExecutorConfig("executor");
        //线程大小
        executorConfig.setPoolSize(2);
        //队列,这里是可以在配置文件里配置策略的,0表示Integer的最大值
        executorConfig.setQueueCapacity(3);


        //启动之后,就会一直存活,除非主动shutdown关闭
        HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
        IExecutorService executor = hz.getExecutorService("executor");



        for (int i = 1; i <= 4; i++) {
            System.out.println("创建任务: " + i);
            executor.execute(new EchoTask("" + i));
        }
        System.out.println("MasterMember finished!");
        
//        executor.shutdown();

    }
}

如果任务的数量>=线程数量+队列数量,就会报错,overloaded。除非是集群或者是加大任务数量或者队列数量。

如果此时创建任务的节点挂掉,那么任务就丢失了,想要任务不丢失,就需要使用持久式任务。

提交给所有节点

方法

  • executeOnAllMembers(Runnable command):在所有已知的集群成员上执行任务。
  • submitToAllMembers(Callable task):将任务提交给所有群集成员,并返回每个节点对应任务的键值对。
  • submitToAllMembers(Callable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数
  • submitToAllMembers(Runnable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数

submit带回调函数的没有返回值

模拟主节点,Slave节点代码不变

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MasterMember {

    public static void main(String[] args) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Map<String, Integer> map = hz.getMap("map");
        for (int i = 0; i < 5; i++) {
            map.put(UUID.randomUUID().toString(), 1);
        }
        IExecutorService executor = hz.getExecutorService("executor");


        Map<Member, Future<Integer>> result = executor.submitToAllMembers(new SumTask());
        int sum = 0;
        for (Future<Integer> future : result.values()) {
            sum += future.get();
        }

//        Future<Integer> submit = executor.submit(new SumTask());
//        Integer sum = submit.get();
//
        System.out.println("Result: " + sum);

    }
}

创建Callable

java
1
2
3
4
5
6
7
8
public class SumTask implements Callable<Integer>, Serializable {

    @Override
    public Integer call() throws Exception {
        System.out.println("我要执行任务");
        return 1;
    }
}

执行结果如图,两个节点都执行了。

4.jpg

提交给指定键所属节点

方法:

  • executeOnKeyOwner(Runnable command, Object key):在指定key的所属节点上执行任务。
  • submitToKeyOwner(Callable task, Object key):将任务提交给指定键的所有者,并返回该任务的Future
  • submitToKeyOwner(Callable task, Object key, ExecutionCallback callback):在指定key的所属节点上执行任务。
  • submitToKeyOwner(Runnable task, Object key, ExecutionCallback callback):在指定key的所属节点上执行任务。

Slave节点保持不变,模拟主节点

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class VerifyTask implements Runnable, Serializable, HazelcastInstanceAware {

    private final String key;
    private transient HazelcastInstance hz;

    public VerifyTask(String key) {
        this.key = key;
    }

    @Override
    public void setHazelcastInstance(HazelcastInstance hz) {
        this.hz = hz;
    }

    @Override
    public void run() {
        IMap map = hz.getMap("map");
        boolean localKey = map.localKeySet().contains(key);
        System.out.println("Key " + key + " is local: " + localKey);
    }
}


public class MasterMember {

    public static void main(String[] args) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Map<String, String> map = hz.getMap("map");
        for (int i = 0; i < 10; i++) {
            map.put(UUID.randomUUID().toString(), "");
        }

        IExecutorService executor = hz.getExecutorService("executor");
        for (String key : map.keySet()) {
            //该任务在key所存储的节点上执行
            executor.executeOnKeyOwner(new VerifyTask(key), key);
        }
    }
}

最后输出结果如图。可以看到IMap中的key是分布在不同节点上的,通过map.localKeySet,可以拿到本机存储的所有key,所有执行时传过来的key都在该节点上,由此可知,executeOnKeyOwner实现了在key所在节点上执行的功能。时

5.jpg

提交给指定节点

方法:

  • executeOnMember(Runnable command, Member member):指定单个成员执行任务。

  • executeOnMembers(Runnable command, Collection members):指定多个成员执行任务。

  • executeOnMembers(Runnable command, MemberSelector memberSelector):使用选择器,指定成员执行任务。

  • submitToMember(Callable task, Member member):向指定成员提交任务,并返回表示该任务的未来

  • submitToMember(Callable task, Member member, ExecutionCallback callback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMember(Runnable task, Member member, ExecutionCallback callback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMembers(Callable task, Collection members):向给定成员提交任务,并返回Map<Member,Future>表示某个成员的Future,通过Future可以拿到最后执行结果。

  • submitToMembers(Callable task, Collection members, MultiExecutionCallback callback):指定多个成员执行任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMembers(Callable task, MemberSelector memberSelector):使用选择器,选定成员提交任务,并返回Map<Member,Future>,表示每个成员上任务的待定完成。

  • submitToMembers(Callable task, MemberSelector memberSelector, MultiExecutionCallback callback):同上,带回调函数

  • submitToMembers(Runnable task, Collection members, MultiExecutionCallback callback):指定多个成员执行任务,带回调函数

  • submitToMembers(Runnable task, MemberSelector memberSelector, MultiExecutionCallback callback):使用选择器,带回调函数

submit带回调函数的都没有返回值

简单示例

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
    HazelcastInstance instance = Hazelcast.newHazelcastInstance();
    Cluster cluster = instance.getCluster();
    IExecutorService executorService = instance.getExecutorService("test");
    
    //runnable就直接用null代替了
    //指定单个成员执行
    executorService.executeOnMember(null, cluster.getLocalMember());
    //指定多个成员执行
    executorService.executeOnMembers(null, cluster.getMembers());
    //使用选择器
    MemberSelector chooseMyAngel = new MemberSelector() {
        @Override
        public boolean select(Member member) {
            String attribute = member.getAttribute("name");
            if ("向晚".equals(attribute)) {
                return true;
            } else {
                return false;
            }
        }
    };
    executorService.executeOnMembers(null, chooseMyAngel);
}

提交给阉割过的节点

阉割过的,其实就是精简后的节点,上面没有分区,不存储内容。主要用来计算一些比较繁琐的任务,然后将计算结果返回给集群。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//模拟一个计算繁琐的任务
public class ComputationHeavyTask implements Callable<Integer>, Serializable, HazelcastInstanceAware {

    private transient HazelcastInstance hz;

    public void setHazelcastInstance(HazelcastInstance hz) {
        this.hz = hz;
    }

    public Integer call() throws Exception {
        System.out.println("Running a computation heavy task on " + hz.getCluster().getLocalMember());
        return 0;
    }
}


//创建精简节点
public class StartLiteMember {

    public static void main(String[] args) {
        Config config = new Config();
        config.setLiteMember(true);

        Hazelcast.newHazelcastInstance(config);
    }
}

//创建主节点
public class StartDataMember {

    public static void main(String[] args) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IExecutorService executor = hz.getExecutorService("executor");

        //使用选择器选择精简节点
        Future<Integer> future = executor.submit(new ComputationHeavyTask(), MemberSelectors.LITE_MEMBER_SELECTOR);

        System.out.println("Result: " + future.get());
    }
}

运行结果如图

6.jpg

1.2 分布式持久任务

分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。

执行Runnable

创建Runnable

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EchoTask implements Runnable, Serializable {

    private final String msg;

    private final Integer delayMs;

    EchoTask(String msg,Integer delayMs) {
        this.msg = msg;
        this.delayMs=delayMs;
    }

    @Override
    public void run() {
        System.out.println("runnable开始执行"+msg);
        try {
            Thread.sleep(delayMs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("runnable执行结束:"+msg);
    }
}

先启动一个SlaveMember。再启动下面代码,模拟把任务丢到任务池,然后关闭,查看任务是否会丢失。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class BasicConfiguration {

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.getDurableExecutorConfig("exec")
                .setCapacity(200)
                //集群中已提交任务的备份数量。它的默认值为 1。
                .setDurability(1)
                .setPoolSize(8);

        HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
        DurableExecutorService executorService = instance.getDurableExecutorService("exec");

        for (int i = 1; i <= 4; i++) {
            System.out.println("创建任务: " + i);
            DurableExecutorServiceFuture<?> submit = executorService.submit(new EchoTask("" + i,5000));
//            submit.get();
        }

//        Hazelcast.shutdownAll();
//        instance.shutdown();
        //模拟异常关闭
        System.exit(0);
    }
}

如果两个节点都没关闭,则会分配执行,不存在重复执行的情况。如图。

1.jpg

如果某个节点关闭了,则所有的任务还能再另一个节点顺利执行。如图。

2.jpg

执行Callable

先说一下,下面这个例子的思路。

因为持久式任务,是会在节点中,随机一台机器执行的。为了方便测试,强制让主节点执行,并在执行时挂掉,在挂掉前,把taskId输出控制台。Slave节点监测到主节点没有成功执行任务,就会自动执行任务,输入taskId,获取最后的任务执行结果。

创建Callable

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class BasicTask implements Callable<String>, Serializable {

    private final String msg;

    BasicTask(String msg) {
        this.msg = msg;
    }

    @Override
    public String call() throws Exception {
        int i=5;
        while(i>0) {
            System.out.println("等待1秒"+i);
            Thread.sleep(1000);
            --i;
        }
        return msg;
    }
}

模拟Slave节点

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SlaveMember {

    public static Long taskId = null;

    public static void main(String[] args) {
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        //开启一个线程阻塞,直到获取到控制台输入内容
        new Thread(() -> {
            if (taskId == null) {
                Scanner scanner = new Scanner(System.in);
                System.out.print("快点输入要寻回的任务id:");
                long l = scanner.nextLong();
                taskId = l;
            }
        }).start();
        //输出任务返回值
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (taskId != null) {
                DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec");
                Future<String> objectFuture = exec.retrieveResult(taskId);
                try {
                    String o = objectFuture.get();
                    System.out.println("找回任务的返回值:" + o);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                break;
            }
        }

        System.out.println("测试结束");
    }
}

模拟主节点

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RetrieveLostTask {

    public static void main(String[] args) throws Exception {
//        Hazelcast.newHazelcastInstance();
//
//        HazelcastInstance client = HazelcastClient.newHazelcastClient();
//        DurableExecutorService executorService = client.getDurableExecutorService("exec");
//        DurableExecutorServiceFuture<String> future = executorService.submit(new BasicTask("DurableExecutor"));
//        long taskId = future.getTaskId();
//        client.shutdown();
//
//        System.out.println("关闭client");
//
//        HazelcastInstance newClient = HazelcastClient.newHazelcastClient();
//        DurableExecutorService newExecutorService = newClient.getDurableExecutorService("exec");
//        Future<Object> retrieveResultFuture = newExecutorService.retrieveResult(taskId);
//        Object result = retrieveResultFuture.get();
//        System.out.println("Result: " + result);
//
//        HazelcastClient.shutdownAll();
//        Hazelcast.shutdownAll();

        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec");
        //直接submit,会随机在集群上找一台机器执行
//        DurableExecutorServiceFuture<String> submit = exec.submit(new BasicTask("test the node of executing task is disabled"));


        /**
         * 想法:
         * 强制让任务在当前节点执行,然后让该节点在执行时挂掉。
         * 另一个节点通过该节点挂掉之前的输出的taskId,重新执行任务获取任务执行结果
         */

        //强制在当前节点执行submitToKeyOwner,需要获取当前节点的key
        Cluster cluster = hazelcastInstance.getCluster();
        Member localMember = cluster.getLocalMember();
        Integer key = null;
        int i = 0;
        while (true) {
            PartitionService partitionService = hazelcastInstance.getPartitionService();
            Partition partition = partitionService.getPartition(i);
            Member owner = partition.getOwner();
            if (owner != null && owner == localMember) {
                key = i;
                break;
            }
            i++;
        }
        DurableExecutorServiceFuture<String> future = exec.submitToKeyOwner(new BasicTask("test the node of executing task is disabled"),
                key);
        System.out.println(future.getTaskId());
        System.exit(0);
    }
}

运行过如图

3.jpg

1.3 分布式定时任务

定时任务

开启一个定时任务,指定时间后开始,之后每隔5秒执行一次。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SlaveMember {

    public static Long taskId = null;

    private final static String START_TIME = "10:05:30";

    /**
     * 计算当前时间到指定时间的间隔秒数
     *
     * @param startTimeString HH:mm:ss
     * @return
     */
    private static Long calculateInitialDelay(String startTimeString) {
        LocalTime startTime = LocalTime.parse(startTimeString);
        LocalTime nowTime = LocalTime.now();
        Long initialDelay = null;
        if (nowTime.isAfter(startTime)) {
            LocalDate dateTime = LocalDate.now();
            LocalDateTime startDayTime = LocalDateTime.of(dateTime.plusDays(1L), startTime);
            initialDelay = ChronoUnit.SECONDS.between(LocalDateTime.of(dateTime, nowTime), startDayTime);
        } else {
            initialDelay = ChronoUnit.SECONDS.between(nowTime, startTime);
        }
        return initialDelay;
    }

    public static void main(String[] args) {
        Long initialDelay = calculateInitialDelay(START_TIME);
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst");
        //通过配置名称,可以保证在集群中,只有一个节点执行任务。而不用像之前一样,定时任务还要加锁
        tst.scheduleAtFixedRate(TaskUtils.named("task",
                new Runnable() {
                    @Override
                    public void run() {
                        String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                        System.out.println(format);
                    }
                }), initialDelay, 5L, TimeUnit.SECONDS);
    }
}

输出结果如图

7.jpg

延时任务

hazelcast的延时任务,跟定时任务相比,就是只执行一次,不会循环执行,但是即使执行完了,也不会释放资源的。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class ScheduledDelayTask {

    private final static String START_TIME = "10:33:30";


    /**
     * 创建延时任务
     *
     * @param tst
     * @param initialDelay
     * @return
     */
    private static IScheduledFuture<Object> createDelayTask(IScheduledExecutorService tst, Long initialDelay) {

        //hazelcast创建的延时任务
        return tst.schedule(new Runnable() {
                                @Override
                                public void run() {
                                    System.out.println(String.format("延时任务执行:%s", START_TIME));
                                }
                            },
                initialDelay,
                TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        Long initialDelay = calculateInitialDelay(START_TIME);
        Config config = new Config();
        config.getScheduledExecutorConfig("tst")
                .setPoolSize(1)
                .setCapacity(1);

        Long initialDelay = 5L;
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
        IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst");

        IScheduledFuture<Object> delayTask = createDelayTask(tst, initialDelay);

        //阻塞主线程,任务执行完之后,才会继续执行
        Object o = delayTask.get();
        System.out.println(o);


        //释放资源
        delayTask.dispose();
        //将上一行代码注释放开,查看断点查看下面这行内容,可以知道即使任务执行完,也并没有释放
        Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures = tst.getAllScheduledFutures();

        //如果上面没执行释放,到这一步就直接报错了
        createDelayTask(tst, 5L);

        System.out.println("结束");


    }
}

hazelcast提供的自动释放延时任务的写法

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * 管理延时任务
 * <p>
 * <p>
 * 由于Hazelcast的延时任务,需要调用dispose方法来释放资源,否则就会一直占用
 * </p>
 * <p>
 * <p>
 * <p>
 * 参考:
 * 1.https://docs.hazelcast.com/imdg/latest/computing/scheduled-executor-service
 * 2.https://github.com/hazelcast/hazelcast/issues/11221
 * 3.https://stackoverflow.com/questions/42040046/hazelcast-scheduledexecutorservice
 * <p>
 * 因为还要考虑到集群问题,所以根据目前掌握的知识来看,
 * 通过一个集群定时任务,拿到集群中所有的Future,然后isDone判断,来手动dispose
 * <p>
 * hazelcast在com.hazelcast.scheduledexecutor.TaskUtils#autoDisposable(java.lang.Runnable)提供了自动释放资源
 * 本项目使用的是4.2.2版本,不过自动释放存在Bug,目前最新的5.0-BETA-1尚未解决该Bug
 * <p>
 * hazelcast官方人员对该Bug的回复:https://github.com/hazelcast/hazelcast/issues/19622
 */
public class ScheduledDelayAutoDisposeTask {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.getScheduledExecutorConfig("tst")
                .setPoolSize(1)
                .setCapacity(1);
        HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
        IScheduledExecutorService tst = instance.getScheduledExecutorService("tst");
        /**
         * Runnable的自动释放,目前为止的Hazelcast有bug,并没有释放掉。
         * Callable的自动释放,目前是可以的
         */
        IScheduledFuture<Object> hhh = tst.schedule(TaskUtils.autoDisposable(new Runnable() {
            @Override
            public void run() {
                System.out.println("第一次执行");
            }
        }), 5L, TimeUnit.SECONDS);
        Object o = hhh.get();
        tst.schedule(TaskUtils.autoDisposable(new Runnable() {
            @Override
            public void run() {
                System.out.println("第二次执行");
            }
        }), 5L, TimeUnit.SECONDS);
    }
}

不过目前的Hazelcast的自动释放资源,对于Runnable是不好使的。目前最好的办法,就是去拿所有的任务判断是否执行完成。如下

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IScheduledExecutorService tst = instance.getScheduledExecutorService("tst");
//轮询释放资源
tst.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        Map<Member, List<IScheduledFuture<Object>>> map = tst.getAllScheduledFutures();
        Set<Member> members = map.keySet();
        for (Member member : members) {
            List<IScheduledFuture<Object>> futures = map.get(member);
            futures.forEach(x -> {
                if (x.isDone()) {
                    x.dispose();
                }
            });
        }
    }
}, 5L, 5L, TimeUnit.SECONDS);