言成言成啊 | Kit Chen's Blog

Hazelcast分布式计算与IMap

发布于2022-02-09 22:42:19,更新于2022-02-28 22:30:02,标签:java hazelcast  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

参考

  1. Quickstart
  2. GitHub - hazelcast/hazelcast-code-samples: Hazelcast Code Samples

以下所有的操作,都是基于Hazelcast完成的,在maven项目里面,需要引入该依赖。

1
2
3
4
5
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-all</artifactId>
<version>4.2.2</version>
</dependency>

如果不对Hazelcast进行配置的话,会使用默认配置hazelcast-default.xml。可以在项目启动时查看日志输出。

一、分布式计算

主要记录三个常用的任务类型

  1. IExecutorService:分布式任务,提交的任务可以自动在集群中分配执行。
  2. DurableExecutorService:分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。
  3. IScheduledExecutorService:分布式定时任务,可以设置定时循环,也可以设置延时执行

根据最近使用Hazelcast的心得,带I的一般就是表示分布式,如IMap、IExceutorService

一个HazelcastInstance实例,就代表一个节点,通过配置可以修改节点的加入方式。

1
Hazelcast.newHazelcastInstance();

如果想要测试多节点任务分配,可以通过以下代码,另起一个Hazelcast节点。

1
2
3
4
5
6
public class SlaveMember {

public static void main(String[] args) {
Hazelcast.newHazelcastInstance();
}
}

1.1 分布式任务

分布式任务,提交的任务可以自动在集群中分配执行。

创建任务Runnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class EchoTask implements Runnable, Serializable {

private final String msg;

public EchoTask(String msg) {
this.msg = msg;
}

@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Echo: " + msg);
}
}

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MasterMember {

public static void main(String[] args) throws Exception {
Config config = new Config();
ExecutorConfig executorConfig = config.getExecutorConfig("executor");
//线程大小
executorConfig.setPoolSize(2);
//队列,这里是可以在配置文件里配置策略的,0表示Integer的最大值
executorConfig.setQueueCapacity(3);


//启动之后,就会一直存活,除非主动shutdown关闭
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
IExecutorService executor = hz.getExecutorService("executor");



for (int i = 1; i <= 4; i++) {
System.out.println("创建任务: " + i);
executor.execute(new EchoTask("" + i));
}
System.out.println("MasterMember finished!");

// executor.shutdown();

}
}

如果任务的数量>=线程数量+队列数量,就会报错,overloaded。除非是集群或者是加大任务数量或者队列数量。

如果此时创建任务的节点挂掉,那么任务就丢失了,想要任务不丢失,就需要使用持久式任务。

提交给所有节点

方法

  • executeOnAllMembers(Runnable command):在所有已知的集群成员上执行任务。
  • submitToAllMembers(Callabletask):将任务提交给所有群集成员,并返回每个节点对应任务的键值对。
  • submitToAllMembers(Callabletask, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数
  • submitToAllMembers(Runnable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数

submit带回调函数的没有返回值

模拟主节点,Slave节点代码不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MasterMember {

public static void main(String[] args) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Map<String, Integer> map = hz.getMap("map");
for (int i = 0; i < 5; i++) {
map.put(UUID.randomUUID().toString(), 1);
}
IExecutorService executor = hz.getExecutorService("executor");


Map<Member, Future<Integer>> result = executor.submitToAllMembers(new SumTask());
int sum = 0;
for (Future<Integer> future : result.values()) {
sum += future.get();
}

// Future<Integer> submit = executor.submit(new SumTask());
// Integer sum = submit.get();
//
System.out.println("Result: " + sum);

}
}

创建Callable

1
2
3
4
5
6
7
8
public class SumTask implements Callable<Integer>, Serializable {

@Override
public Integer call() throws Exception {
System.out.println("我要执行任务");
return 1;
}
}

执行结果如图,两个节点都执行了。

提交给指定键所属节点

方法:

  • executeOnKeyOwner(Runnable command, Object key):在指定key的所属节点上执行任务。
  • submitToKeyOwner(Callabletask, Object key):将任务提交给指定键的所有者,并返回该任务的Future
  • submitToKeyOwner(Callabletask, Object key, ExecutionCallbackcallback):在指定key的所属节点上执行任务。
  • submitToKeyOwner(Runnable task, Object key, ExecutionCallbackcallback):在指定key的所属节点上执行任务。

Slave节点保持不变,模拟主节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class VerifyTask implements Runnable, Serializable, HazelcastInstanceAware {

private final String key;
private transient HazelcastInstance hz;

public VerifyTask(String key) {
this.key = key;
}

@Override
public void setHazelcastInstance(HazelcastInstance hz) {
this.hz = hz;
}

@Override
public void run() {
IMap map = hz.getMap("map");
boolean localKey = map.localKeySet().contains(key);
System.out.println("Key " + key + " is local: " + localKey);
}
}


public class MasterMember {

public static void main(String[] args) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Map<String, String> map = hz.getMap("map");
for (int i = 0; i < 10; i++) {
map.put(UUID.randomUUID().toString(), "");
}

IExecutorService executor = hz.getExecutorService("executor");
for (String key : map.keySet()) {
//该任务在key所存储的节点上执行
executor.executeOnKeyOwner(new VerifyTask(key), key);
}
}
}

最后输出结果如图。可以看到IMap中的key是分布在不同节点上的,通过map.localKeySet,可以拿到本机存储的所有key,所有执行时传过来的key都在该节点上,由此可知,executeOnKeyOwner实现了在key所在节点上执行的功能。时

提交给指定节点

方法:

  • executeOnMember(Runnable command, Member member):指定单个成员执行任务。

  • executeOnMembers(Runnable command, Collectionmembers):指定多个成员执行任务。

  • executeOnMembers(Runnable command, MemberSelector memberSelector):使用选择器,指定成员执行任务。

  • submitToMember(Callabletask, Member member):向指定成员提交任务,并返回表示该任务的未来

  • submitToMember(Callabletask, Member member, ExecutionCallbackcallback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMember(Runnable task, Member member, ExecutionCallbackcallback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMembers(Callabletask, Collectionmembers):向给定成员提交任务,并返回Map<Member,Future>表示某个成员的Future,通过Future可以拿到最后执行结果。

  • submitToMembers(Callabletask, Collectionmembers, MultiExecutionCallback callback):指定多个成员执行任务,带有回调函数,成功或者失败后自动执行逻辑

  • submitToMembers(Callabletask, MemberSelector memberSelector):使用选择器,选定成员提交任务,并返回Map<Member,Future>,表示每个成员上任务的待定完成。

  • submitToMembers(Callabletask, MemberSelector memberSelector, MultiExecutionCallback callback):同上,带回调函数

  • submitToMembers(Runnable task, Collectionmembers, MultiExecutionCallback callback):指定多个成员执行任务,带回调函数

  • submitToMembers(Runnable task, MemberSelector memberSelector, MultiExecutionCallback callback):使用选择器,带回调函数

submit带回调函数的都没有返回值

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
Cluster cluster = instance.getCluster();
IExecutorService executorService = instance.getExecutorService("test");

//runnable就直接用null代替了
//指定单个成员执行
executorService.executeOnMember(null, cluster.getLocalMember());
//指定多个成员执行
executorService.executeOnMembers(null, cluster.getMembers());
//使用选择器
MemberSelector chooseMyAngel = new MemberSelector() {
@Override
public boolean select(Member member) {
String attribute = member.getAttribute("name");
if ("向晚".equals(attribute)) {
return true;
} else {
return false;
}
}
};
executorService.executeOnMembers(null, chooseMyAngel);
}

提交给阉割过的节点

阉割过的,其实就是精简后的节点,上面没有分区,不存储内容。主要用来计算一些比较繁琐的任务,然后将计算结果返回给集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//模拟一个计算繁琐的任务
public class ComputationHeavyTask implements Callable<Integer>, Serializable, HazelcastInstanceAware {

private transient HazelcastInstance hz;

public void setHazelcastInstance(HazelcastInstance hz) {
this.hz = hz;
}

public Integer call() throws Exception {
System.out.println("Running a computation heavy task on " + hz.getCluster().getLocalMember());
return 0;
}
}


//创建精简节点
public class StartLiteMember {

public static void main(String[] args) {
Config config = new Config();
config.setLiteMember(true);

Hazelcast.newHazelcastInstance(config);
}
}

//创建主节点
public class StartDataMember {

public static void main(String[] args) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IExecutorService executor = hz.getExecutorService("executor");

//使用选择器选择精简节点
Future<Integer> future = executor.submit(new ComputationHeavyTask(), MemberSelectors.LITE_MEMBER_SELECTOR);

System.out.println("Result: " + future.get());
}
}

运行结果如图

1.2 分布式持久任务

分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。

执行Runnable

创建Runnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EchoTask implements Runnable, Serializable {

private final String msg;

private final Integer delayMs;

EchoTask(String msg,Integer delayMs) {
this.msg = msg;
this.delayMs=delayMs;
}

@Override
public void run() {
System.out.println("runnable开始执行"+msg);
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("runnable执行结束:"+msg);
}
}

先启动一个SlaveMember。再启动下面代码,模拟把任务丢到任务池,然后关闭,查看任务是否会丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class BasicConfiguration {

public static void main(String[] args) throws Exception {
Config config = new Config();
config.getDurableExecutorConfig("exec")
.setCapacity(200)
//集群中已提交任务的备份数量。它的默认值为 1。
.setDurability(1)
.setPoolSize(8);

HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
DurableExecutorService executorService = instance.getDurableExecutorService("exec");

for (int i = 1; i <= 4; i++) {
System.out.println("创建任务: " + i);
DurableExecutorServiceFuture<?> submit = executorService.submit(new EchoTask("" + i,5000));
// submit.get();
}

// Hazelcast.shutdownAll();
// instance.shutdown();
//模拟异常关闭
System.exit(0);
}
}

如果两个节点都没关闭,则会分配执行,不存在重复执行的情况。如图。

如果某个节点关闭了,则所有的任务还能再另一个节点顺利执行。如图。

执行Callable

先说一下,下面这个例子的思路。

因为持久式任务,是会在节点中,随机一台机器执行的。为了方便测试,强制让主节点执行,并在执行时挂掉,在挂掉前,把taskId输出控制台。Slave节点监测到主节点没有成功执行任务,就会自动执行任务,输入taskId,获取最后的任务执行结果。

创建Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BasicTask implements Callable<String>, Serializable {

private final String msg;

BasicTask(String msg) {
this.msg = msg;
}

@Override
public String call() throws Exception {
int i=5;
while(i>0) {
System.out.println("等待1秒"+i);
Thread.sleep(1000);
--i;
}
return msg;
}
}

模拟Slave节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SlaveMember {

public static Long taskId = null;

public static void main(String[] args) {
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
//开启一个线程阻塞,直到获取到控制台输入内容
new Thread(() -> {
if (taskId == null) {
Scanner scanner = new Scanner(System.in);
System.out.print("快点输入要寻回的任务id:");
long l = scanner.nextLong();
taskId = l;
}
}).start();
//输出任务返回值
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (taskId != null) {
DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec");
Future<String> objectFuture = exec.retrieveResult(taskId);
try {
String o = objectFuture.get();
System.out.println("找回任务的返回值:" + o);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
break;
}
}

System.out.println("测试结束");
}
}

模拟主节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RetrieveLostTask {

public static void main(String[] args) throws Exception {
// Hazelcast.newHazelcastInstance();
//
// HazelcastInstance client = HazelcastClient.newHazelcastClient();
// DurableExecutorService executorService = client.getDurableExecutorService("exec");
// DurableExecutorServiceFuture<String> future = executorService.submit(new BasicTask("DurableExecutor"));
// long taskId = future.getTaskId();
// client.shutdown();
//
// System.out.println("关闭client");
//
// HazelcastInstance newClient = HazelcastClient.newHazelcastClient();
// DurableExecutorService newExecutorService = newClient.getDurableExecutorService("exec");
// Future<Object> retrieveResultFuture = newExecutorService.retrieveResult(taskId);
// Object result = retrieveResultFuture.get();
// System.out.println("Result: " + result);
//
// HazelcastClient.shutdownAll();
// Hazelcast.shutdownAll();

HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec");
//直接submit,会随机在集群上找一台机器执行
// DurableExecutorServiceFuture<String> submit = exec.submit(new BasicTask("test the node of executing task is disabled"));


/**
* 想法:
* 强制让任务在当前节点执行,然后让该节点在执行时挂掉。
* 另一个节点通过该节点挂掉之前的输出的taskId,重新执行任务获取任务执行结果
*/

//强制在当前节点执行submitToKeyOwner,需要获取当前节点的key
Cluster cluster = hazelcastInstance.getCluster();
Member localMember = cluster.getLocalMember();
Integer key = null;
int i = 0;
while (true) {
PartitionService partitionService = hazelcastInstance.getPartitionService();
Partition partition = partitionService.getPartition(i);
Member owner = partition.getOwner();
if (owner != null && owner == localMember) {
key = i;
break;
}
i++;
}
DurableExecutorServiceFuture<String> future = exec.submitToKeyOwner(new BasicTask("test the node of executing task is disabled"),
key);
System.out.println(future.getTaskId());
System.exit(0);
}
}

运行过如图

1.3 分布式定时任务

定时任务

开启一个定时任务,指定时间后开始,之后每隔5秒执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SlaveMember {

public static Long taskId = null;

private final static String START_TIME = "10:05:30";

/**
* 计算当前时间到指定时间的间隔秒数
*
* @param startTimeString HH:mm:ss
* @return
*/
private static Long calculateInitialDelay(String startTimeString) {
LocalTime startTime = LocalTime.parse(startTimeString);
LocalTime nowTime = LocalTime.now();
Long initialDelay = null;
if (nowTime.isAfter(startTime)) {
LocalDate dateTime = LocalDate.now();
LocalDateTime startDayTime = LocalDateTime.of(dateTime.plusDays(1L), startTime);
initialDelay = ChronoUnit.SECONDS.between(LocalDateTime.of(dateTime, nowTime), startDayTime);
} else {
initialDelay = ChronoUnit.SECONDS.between(nowTime, startTime);
}
return initialDelay;
}

public static void main(String[] args) {
Long initialDelay = calculateInitialDelay(START_TIME);
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst");
//通过配置名称,可以保证在集群中,只有一个节点执行任务。而不用像之前一样,定时任务还要加锁
tst.scheduleAtFixedRate(TaskUtils.named("task",
new Runnable() {
@Override
public void run() {
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println(format);
}
}), initialDelay, 5L, TimeUnit.SECONDS);
}
}

输出结果如图

延时任务

hazelcast的延时任务,跟定时任务相比,就是只执行一次,不会循环执行,但是即使执行完了,也不会释放资源的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class ScheduledDelayTask {

private final static String START_TIME = "10:33:30";


/**
* 创建延时任务
*
* @param tst
* @param initialDelay
* @return
*/
private static IScheduledFuture<Object> createDelayTask(IScheduledExecutorService tst, Long initialDelay) {

//hazelcast创建的延时任务
return tst.schedule(new Runnable() {
@Override
public void run() {
System.out.println(String.format("延时任务执行:%s", START_TIME));
}
},
initialDelay,
TimeUnit.SECONDS);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Long initialDelay = calculateInitialDelay(START_TIME);
Config config = new Config();
config.getScheduledExecutorConfig("tst")
.setPoolSize(1)
.setCapacity(1);

Long initialDelay = 5L;
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst");

IScheduledFuture<Object> delayTask = createDelayTask(tst, initialDelay);

//阻塞主线程,任务执行完之后,才会继续执行
Object o = delayTask.get();
System.out.println(o);


//释放资源
delayTask.dispose();
//将上一行代码注释放开,查看断点查看下面这行内容,可以知道即使任务执行完,也并没有释放
Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures = tst.getAllScheduledFutures();

//如果上面没执行释放,到这一步就直接报错了
createDelayTask(tst, 5L);

System.out.println("结束");


}
}

hazelcast提供的自动释放延时任务的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 管理延时任务
* <p>
* <p>
* 由于Hazelcast的延时任务,需要调用dispose方法来释放资源,否则就会一直占用
* </p>
* <p>
* <p>
* <p>
* 参考:
* 1.https://docs.hazelcast.com/imdg/latest/computing/scheduled-executor-service
* 2.https://github.com/hazelcast/hazelcast/issues/11221
* 3.https://stackoverflow.com/questions/42040046/hazelcast-scheduledexecutorservice
* <p>
* 因为还要考虑到集群问题,所以根据目前掌握的知识来看,
* 通过一个集群定时任务,拿到集群中所有的Future,然后isDone判断,来手动dispose
* <p>
* hazelcast在com.hazelcast.scheduledexecutor.TaskUtils#autoDisposable(java.lang.Runnable)提供了自动释放资源
* 本项目使用的是4.2.2版本,不过自动释放存在Bug,目前最新的5.0-BETA-1尚未解决该Bug
* <p>
* hazelcast官方人员对该Bug的回复:https://github.com/hazelcast/hazelcast/issues/19622
*/
public class ScheduledDelayAutoDisposeTask {
public static void main(String[] args) throws Exception {
Config config = new Config();
config.getScheduledExecutorConfig("tst")
.setPoolSize(1)
.setCapacity(1);
HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
IScheduledExecutorService tst = instance.getScheduledExecutorService("tst");
/**
* Runnable的自动释放,目前为止的Hazelcast有bug,并没有释放掉。
* Callable的自动释放,目前是可以的
*/
IScheduledFuture<Object> hhh = tst.schedule(TaskUtils.autoDisposable(new Runnable() {
@Override
public void run() {
System.out.println("第一次执行");
}
}), 5L, TimeUnit.SECONDS);
Object o = hhh.get();
tst.schedule(TaskUtils.autoDisposable(new Runnable() {
@Override
public void run() {
System.out.println("第二次执行");
}
}), 5L, TimeUnit.SECONDS);
}
}

不过目前的Hazelcast的自动释放资源,对于Runnable是不好使的。目前最好的办法,就是去拿所有的任务判断是否执行完成。如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IScheduledExecutorService tst = instance.getScheduledExecutorService("tst");
//轮询释放资源
tst.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Map<Member, List<IScheduledFuture<Object>>> map = tst.getAllScheduledFutures();
Set<Member> members = map.keySet();
for (Member member : members) {
List<IScheduledFuture<Object>> futures = map.get(member);
futures.forEach(x -> {
if (x.isDone()) {
x.dispose();
}
});
}
}
}, 5L, 5L, TimeUnit.SECONDS);
发布:2022-02-09 22:42:19
修改:2022-02-28 22:30:02
链接:https://meethigher.top/blog/2022/the-map-and-computing-of-hazelcast/
标签:java hazelcast 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏