摘要
简单记录下,在实际工作中使用Hazelcast的心得。
正文
参考
Quickstart GitHub - hazelcast/hazelcast-code-samples: Hazelcast Code Samples 以下所有的操作,都是基于Hazelcast完成的,在maven项目里面,需要引入该依赖。
1
2
3
4
5
<dependency>
<groupId> com.hazelcast</groupId>
<artifactId> hazelcast-all</artifactId>
<version> 4.2.2</version>
</dependency>
如果不对Hazelcast进行配置的话,会使用默认配置hazelcast-default.xml。可以在项目启动时查看日志输出。
一、分布式计算 主要记录三个常用的任务类型
IExecutorService:分布式任务,提交的任务可以自动在集群中分配执行。 DurableExecutorService:分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。 IScheduledExecutorService:分布式定时任务,可以设置定时循环,也可以设置延时执行 根据最近使用Hazelcast的心得,带I的一般就是表示分布式,如IMap、IExceutorService
一个HazelcastInstance实例,就代表一个节点,通过配置可以修改节点的加入方式。
1
Hazelcast . newHazelcastInstance ();
如果想要测试多节点任务分配,可以通过以下代码,另起一个Hazelcast节点。
1
2
3
4
5
6
public class SlaveMember {
public static void main ( String [] args ) {
Hazelcast . newHazelcastInstance ();
}
}
1.1 分布式任务 分布式任务,提交的任务可以自动在集群中分配执行。
创建任务Runnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class EchoTask implements Runnable , Serializable {
private final String msg ;
public EchoTask ( String msg ) {
this . msg = msg ;
}
@Override
public void run () {
try {
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "Echo: " + msg );
}
}
执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MasterMember {
public static void main ( String [] args ) throws Exception {
Config config = new Config ();
ExecutorConfig executorConfig = config . getExecutorConfig ( "executor" );
//线程大小
executorConfig . setPoolSize ( 2 );
//队列,这里是可以在配置文件里配置策略的,0表示Integer的最大值
executorConfig . setQueueCapacity ( 3 );
//启动之后,就会一直存活,除非主动shutdown关闭
HazelcastInstance hz = Hazelcast . newHazelcastInstance ( config );
IExecutorService executor = hz . getExecutorService ( "executor" );
for ( int i = 1 ; i <= 4 ; i ++ ) {
System . out . println ( "创建任务: " + i );
executor . execute ( new EchoTask ( "" + i ));
}
System . out . println ( "MasterMember finished!" );
// executor.shutdown();
}
}
如果任务的数量>=线程数量+队列数量,就会报错,overloaded。除非是集群或者是加大任务数量或者队列数量。
如果此时创建任务的节点挂掉,那么任务就丢失了,想要任务不丢失,就需要使用持久式任务。
提交给所有节点 方法
executeOnAllMembers(Runnable command):在所有已知的集群成员上执行任务。 submitToAllMembers(Callable task):将任务提交给所有群集成员,并返回每个节点对应任务的键值对。 submitToAllMembers(Callable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数 submitToAllMembers(Runnable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数 submit带回调函数的没有返回值
模拟主节点,Slave节点代码不变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MasterMember {
public static void main ( String [] args ) throws Exception {
HazelcastInstance hz = Hazelcast . newHazelcastInstance ();
Map < String , Integer > map = hz . getMap ( "map" );
for ( int i = 0 ; i < 5 ; i ++ ) {
map . put ( UUID . randomUUID (). toString (), 1 );
}
IExecutorService executor = hz . getExecutorService ( "executor" );
Map < Member , Future < Integer >> result = executor . submitToAllMembers ( new SumTask ());
int sum = 0 ;
for ( Future < Integer > future : result . values ()) {
sum += future . get ();
}
// Future<Integer> submit = executor.submit(new SumTask());
// Integer sum = submit.get();
//
System . out . println ( "Result: " + sum );
}
}
创建Callable
1
2
3
4
5
6
7
8
public class SumTask implements Callable < Integer > , Serializable {
@Override
public Integer call () throws Exception {
System . out . println ( "我要执行任务" );
return 1 ;
}
}
执行结果如图,两个节点都执行了。
展开
提交给指定键所属节点 方法:
executeOnKeyOwner(Runnable command, Object key):在指定key的所属节点上执行任务。 submitToKeyOwner(Callable task, Object key):将任务提交给指定键的所有者,并返回该任务的Future submitToKeyOwner(Callable task, Object key, ExecutionCallback callback):在指定key的所属节点上执行任务。 submitToKeyOwner(Runnable task, Object key, ExecutionCallback callback):在指定key的所属节点上执行任务。 Slave节点保持不变,模拟主节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class VerifyTask implements Runnable , Serializable , HazelcastInstanceAware {
private final String key ;
private transient HazelcastInstance hz ;
public VerifyTask ( String key ) {
this . key = key ;
}
@Override
public void setHazelcastInstance ( HazelcastInstance hz ) {
this . hz = hz ;
}
@Override
public void run () {
IMap map = hz . getMap ( "map" );
boolean localKey = map . localKeySet (). contains ( key );
System . out . println ( "Key " + key + " is local: " + localKey );
}
}
public class MasterMember {
public static void main ( String [] args ) throws Exception {
HazelcastInstance hz = Hazelcast . newHazelcastInstance ();
Map < String , String > map = hz . getMap ( "map" );
for ( int i = 0 ; i < 10 ; i ++ ) {
map . put ( UUID . randomUUID (). toString (), "" );
}
IExecutorService executor = hz . getExecutorService ( "executor" );
for ( String key : map . keySet ()) {
//该任务在key所存储的节点上执行
executor . executeOnKeyOwner ( new VerifyTask ( key ), key );
}
}
}
最后输出结果如图。可以看到IMap中的key是分布在不同节点上的,通过map.localKeySet,可以拿到本机存储的所有key,所有执行时传过来的key都在该节点上,由此可知,executeOnKeyOwner实现了在key所在节点上执行的功能。时
展开
提交给指定节点 方法:
executeOnMember(Runnable command, Member member):指定单个成员执行任务。
executeOnMembers(Runnable command, Collection members):指定多个成员执行任务。
executeOnMembers(Runnable command, MemberSelector memberSelector):使用选择器,指定成员执行任务。
submitToMember(Callable task, Member member):向指定成员提交任务,并返回表示该任务的未来
submitToMember(Callable task, Member member, ExecutionCallback callback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMember(Runnable task, Member member, ExecutionCallback callback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMembers(Callable task, Collection members):向给定成员提交任务,并返回Map<Member,Future>表示某个成员的Future,通过Future可以拿到最后执行结果。
submitToMembers(Callable task, Collection members, MultiExecutionCallback callback):指定多个成员执行任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMembers(Callable task, MemberSelector memberSelector):使用选择器,选定成员提交任务,并返回Map<Member,Future>,表示每个成员上任务的待定完成。
submitToMembers(Callable task, MemberSelector memberSelector, MultiExecutionCallback callback):同上,带回调函数
submitToMembers(Runnable task, Collection members, MultiExecutionCallback callback):指定多个成员执行任务,带回调函数
submitToMembers(Runnable task, MemberSelector memberSelector, MultiExecutionCallback callback):使用选择器,带回调函数
submit带回调函数的都没有返回值
简单示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main ( String [] args ) {
HazelcastInstance instance = Hazelcast . newHazelcastInstance ();
Cluster cluster = instance . getCluster ();
IExecutorService executorService = instance . getExecutorService ( "test" );
//runnable就直接用null代替了
//指定单个成员执行
executorService . executeOnMember ( null , cluster . getLocalMember ());
//指定多个成员执行
executorService . executeOnMembers ( null , cluster . getMembers ());
//使用选择器
MemberSelector chooseMyAngel = new MemberSelector () {
@Override
public boolean select ( Member member ) {
String attribute = member . getAttribute ( "name" );
if ( "向晚" . equals ( attribute )) {
return true ;
} else {
return false ;
}
}
};
executorService . executeOnMembers ( null , chooseMyAngel );
}
提交给阉割过的节点 阉割过的,其实就是精简后的节点,上面没有分区,不存储内容。主要用来计算一些比较繁琐的任务,然后将计算结果返回给集群。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//模拟一个计算繁琐的任务
public class ComputationHeavyTask implements Callable < Integer > , Serializable , HazelcastInstanceAware {
private transient HazelcastInstance hz ;
public void setHazelcastInstance ( HazelcastInstance hz ) {
this . hz = hz ;
}
public Integer call () throws Exception {
System . out . println ( "Running a computation heavy task on " + hz . getCluster (). getLocalMember ());
return 0 ;
}
}
//创建精简节点
public class StartLiteMember {
public static void main ( String [] args ) {
Config config = new Config ();
config . setLiteMember ( true );
Hazelcast . newHazelcastInstance ( config );
}
}
//创建主节点
public class StartDataMember {
public static void main ( String [] args ) throws Exception {
HazelcastInstance hz = Hazelcast . newHazelcastInstance ();
IExecutorService executor = hz . getExecutorService ( "executor" );
//使用选择器选择精简节点
Future < Integer > future = executor . submit ( new ComputationHeavyTask (), MemberSelectors . LITE_MEMBER_SELECTOR );
System . out . println ( "Result: " + future . get ());
}
}
运行结果如图
展开
1.2 分布式持久任务 分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。
执行Runnable 创建Runnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EchoTask implements Runnable , Serializable {
private final String msg ;
private final Integer delayMs ;
EchoTask ( String msg , Integer delayMs ) {
this . msg = msg ;
this . delayMs = delayMs ;
}
@Override
public void run () {
System . out . println ( "runnable开始执行" + msg );
try {
Thread . sleep ( delayMs );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
System . out . println ( "runnable执行结束:" + msg );
}
}
先启动一个SlaveMember。再启动下面代码,模拟把任务丢到任务池,然后关闭,查看任务是否会丢失。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class BasicConfiguration {
public static void main ( String [] args ) throws Exception {
Config config = new Config ();
config . getDurableExecutorConfig ( "exec" )
. setCapacity ( 200 )
//集群中已提交任务的备份数量。它的默认值为 1。
. setDurability ( 1 )
. setPoolSize ( 8 );
HazelcastInstance instance = Hazelcast . newHazelcastInstance ( config );
DurableExecutorService executorService = instance . getDurableExecutorService ( "exec" );
for ( int i = 1 ; i <= 4 ; i ++ ) {
System . out . println ( "创建任务: " + i );
DurableExecutorServiceFuture <?> submit = executorService . submit ( new EchoTask ( "" + i , 5000 ));
// submit.get();
}
// Hazelcast.shutdownAll();
// instance.shutdown();
//模拟异常关闭
System . exit ( 0 );
}
}
如果两个节点都没关闭,则会分配执行,不存在重复执行的情况。如图。
展开
如果某个节点关闭了,则所有的任务还能再另一个节点顺利执行。如图。
展开
执行Callable 先说一下,下面这个例子的思路。
因为持久式任务,是会在节点中,随机一台机器执行的。为了方便测试,强制让主节点执行,并在执行时挂掉,在挂掉前,把taskId输出控制台。Slave节点监测到主节点没有成功执行任务,就会自动执行任务,输入taskId,获取最后的任务执行结果。
创建Callable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BasicTask implements Callable < String > , Serializable {
private final String msg ;
BasicTask ( String msg ) {
this . msg = msg ;
}
@Override
public String call () throws Exception {
int i = 5 ;
while ( i > 0 ) {
System . out . println ( "等待1秒" + i );
Thread . sleep ( 1000 );
-- i ;
}
return msg ;
}
}
模拟Slave节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SlaveMember {
public static Long taskId = null ;
public static void main ( String [] args ) {
HazelcastInstance hazelcastInstance = Hazelcast . newHazelcastInstance ();
//开启一个线程阻塞,直到获取到控制台输入内容
new Thread (() -> {
if ( taskId == null ) {
Scanner scanner = new Scanner ( System . in );
System . out . print ( "快点输入要寻回的任务id:" );
long l = scanner . nextLong ();
taskId = l ;
}
}). start ();
//输出任务返回值
while ( true ) {
try {
Thread . sleep ( 1000 );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
if ( taskId != null ) {
DurableExecutorService exec = hazelcastInstance . getDurableExecutorService ( "exec" );
Future < String > objectFuture = exec . retrieveResult ( taskId );
try {
String o = objectFuture . get ();
System . out . println ( "找回任务的返回值:" + o );
} catch ( InterruptedException e ) {
e . printStackTrace ();
} catch ( ExecutionException e ) {
e . printStackTrace ();
}
break ;
}
}
System . out . println ( "测试结束" );
}
}
模拟主节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RetrieveLostTask {
public static void main ( String [] args ) throws Exception {
// Hazelcast.newHazelcastInstance();
//
// HazelcastInstance client = HazelcastClient.newHazelcastClient();
// DurableExecutorService executorService = client.getDurableExecutorService("exec");
// DurableExecutorServiceFuture<String> future = executorService.submit(new BasicTask("DurableExecutor"));
// long taskId = future.getTaskId();
// client.shutdown();
//
// System.out.println("关闭client");
//
// HazelcastInstance newClient = HazelcastClient.newHazelcastClient();
// DurableExecutorService newExecutorService = newClient.getDurableExecutorService("exec");
// Future<Object> retrieveResultFuture = newExecutorService.retrieveResult(taskId);
// Object result = retrieveResultFuture.get();
// System.out.println("Result: " + result);
//
// HazelcastClient.shutdownAll();
// Hazelcast.shutdownAll();
HazelcastInstance hazelcastInstance = Hazelcast . newHazelcastInstance ();
DurableExecutorService exec = hazelcastInstance . getDurableExecutorService ( "exec" );
//直接submit,会随机在集群上找一台机器执行
// DurableExecutorServiceFuture<String> submit = exec.submit(new BasicTask("test the node of executing task is disabled"));
/**
* 想法:
* 强制让任务在当前节点执行,然后让该节点在执行时挂掉。
* 另一个节点通过该节点挂掉之前的输出的taskId,重新执行任务获取任务执行结果
*/
//强制在当前节点执行submitToKeyOwner,需要获取当前节点的key
Cluster cluster = hazelcastInstance . getCluster ();
Member localMember = cluster . getLocalMember ();
Integer key = null ;
int i = 0 ;
while ( true ) {
PartitionService partitionService = hazelcastInstance . getPartitionService ();
Partition partition = partitionService . getPartition ( i );
Member owner = partition . getOwner ();
if ( owner != null && owner == localMember ) {
key = i ;
break ;
}
i ++ ;
}
DurableExecutorServiceFuture < String > future = exec . submitToKeyOwner ( new BasicTask ( "test the node of executing task is disabled" ),
key );
System . out . println ( future . getTaskId ());
System . exit ( 0 );
}
}
运行过如图
展开
1.3 分布式定时任务 定时任务 开启一个定时任务,指定时间后开始,之后每隔5秒执行一次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SlaveMember {
public static Long taskId = null ;
private final static String START_TIME = "10:05:30" ;
/**
* 计算当前时间到指定时间的间隔秒数
*
* @param startTimeString HH:mm:ss
* @return
*/
private static Long calculateInitialDelay ( String startTimeString ) {
LocalTime startTime = LocalTime . parse ( startTimeString );
LocalTime nowTime = LocalTime . now ();
Long initialDelay = null ;
if ( nowTime . isAfter ( startTime )) {
LocalDate dateTime = LocalDate . now ();
LocalDateTime startDayTime = LocalDateTime . of ( dateTime . plusDays ( 1L ), startTime );
initialDelay = ChronoUnit . SECONDS . between ( LocalDateTime . of ( dateTime , nowTime ), startDayTime );
} else {
initialDelay = ChronoUnit . SECONDS . between ( nowTime , startTime );
}
return initialDelay ;
}
public static void main ( String [] args ) {
Long initialDelay = calculateInitialDelay ( START_TIME );
HazelcastInstance hazelcastInstance = Hazelcast . newHazelcastInstance ();
IScheduledExecutorService tst = hazelcastInstance . getScheduledExecutorService ( "tst" );
//通过配置名称,可以保证在集群中,只有一个节点执行任务。而不用像之前一样,定时任务还要加锁
tst . scheduleAtFixedRate ( TaskUtils . named ( "task" ,
new Runnable () {
@Override
public void run () {
String format = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ). format ( new Date ());
System . out . println ( format );
}
}), initialDelay , 5L , TimeUnit . SECONDS );
}
}
输出结果如图
展开
延时任务 hazelcast的延时任务,跟定时任务相比,就是只执行一次,不会循环执行,但是即使执行完了,也不会释放资源的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class ScheduledDelayTask {
private final static String START_TIME = "10:33:30" ;
/**
* 创建延时任务
*
* @param tst
* @param initialDelay
* @return
*/
private static IScheduledFuture < Object > createDelayTask ( IScheduledExecutorService tst , Long initialDelay ) {
//hazelcast创建的延时任务
return tst . schedule ( new Runnable () {
@Override
public void run () {
System . out . println ( String . format ( "延时任务执行:%s" , START_TIME ));
}
},
initialDelay ,
TimeUnit . SECONDS );
}
public static void main ( String [] args ) throws ExecutionException , InterruptedException {
// Long initialDelay = calculateInitialDelay(START_TIME);
Config config = new Config ();
config . getScheduledExecutorConfig ( "tst" )
. setPoolSize ( 1 )
. setCapacity ( 1 );
Long initialDelay = 5L ;
HazelcastInstance hazelcastInstance = Hazelcast . newHazelcastInstance ( config );
IScheduledExecutorService tst = hazelcastInstance . getScheduledExecutorService ( "tst" );
IScheduledFuture < Object > delayTask = createDelayTask ( tst , initialDelay );
//阻塞主线程,任务执行完之后,才会继续执行
Object o = delayTask . get ();
System . out . println ( o );
//释放资源
delayTask . dispose ();
//将上一行代码注释放开,查看断点查看下面这行内容,可以知道即使任务执行完,也并没有释放
Map < Member , List < IScheduledFuture < Object >>> allScheduledFutures = tst . getAllScheduledFutures ();
//如果上面没执行释放,到这一步就直接报错了
createDelayTask ( tst , 5L );
System . out . println ( "结束" );
}
}
hazelcast提供的自动释放延时任务的写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 管理延时任务
* <p>
* <p>
* 由于Hazelcast的延时任务,需要调用dispose方法来释放资源,否则就会一直占用
* </p>
* <p>
* <p>
* <p>
* 参考:
* 1.https://docs.hazelcast.com/imdg/latest/computing/scheduled-executor-service
* 2.https://github.com/hazelcast/hazelcast/issues/11221
* 3.https://stackoverflow.com/questions/42040046/hazelcast-scheduledexecutorservice
* <p>
* 因为还要考虑到集群问题,所以根据目前掌握的知识来看,
* 通过一个集群定时任务,拿到集群中所有的Future,然后isDone判断,来手动dispose
* <p>
* hazelcast在com.hazelcast.scheduledexecutor.TaskUtils#autoDisposable(java.lang.Runnable)提供了自动释放资源
* 本项目使用的是4.2.2版本,不过自动释放存在Bug,目前最新的5.0-BETA-1尚未解决该Bug
* <p>
* hazelcast官方人员对该Bug的回复:https://github.com/hazelcast/hazelcast/issues/19622
*/
public class ScheduledDelayAutoDisposeTask {
public static void main ( String [] args ) throws Exception {
Config config = new Config ();
config . getScheduledExecutorConfig ( "tst" )
. setPoolSize ( 1 )
. setCapacity ( 1 );
HazelcastInstance instance = Hazelcast . newHazelcastInstance ( config );
IScheduledExecutorService tst = instance . getScheduledExecutorService ( "tst" );
/**
* Runnable的自动释放,目前为止的Hazelcast有bug,并没有释放掉。
* Callable的自动释放,目前是可以的
*/
IScheduledFuture < Object > hhh = tst . schedule ( TaskUtils . autoDisposable ( new Runnable () {
@Override
public void run () {
System . out . println ( "第一次执行" );
}
}), 5L , TimeUnit . SECONDS );
Object o = hhh . get ();
tst . schedule ( TaskUtils . autoDisposable ( new Runnable () {
@Override
public void run () {
System . out . println ( "第二次执行" );
}
}), 5L , TimeUnit . SECONDS );
}
}
不过目前的Hazelcast的自动释放资源,对于Runnable是不好使的。目前最好的办法,就是去拿所有的任务判断是否执行完成。如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
HazelcastInstance instance = Hazelcast . newHazelcastInstance ();
IScheduledExecutorService tst = instance . getScheduledExecutorService ( "tst" );
//轮询释放资源
tst . scheduleAtFixedRate ( new Runnable () {
@Override
public void run () {
Map < Member , List < IScheduledFuture < Object >>> map = tst . getAllScheduledFutures ();
Set < Member > members = map . keySet ();
for ( Member member : members ) {
List < IScheduledFuture < Object >> futures = map . get ( member );
futures . forEach ( x -> {
if ( x . isDone ()) {
x . dispose ();
}
});
}
}
}, 5L , 5L , TimeUnit . SECONDS );