参考
Quickstart GitHub - hazelcast/hazelcast-code-samples: Hazelcast Code Samples 以下所有的操作,都是基于Hazelcast完成的,在maven项目里面,需要引入该依赖。
1 2 3 4 5 <dependency > <groupId > com.hazelcast</groupId > <artifactId > hazelcast-all</artifactId > <version > 4.2.2</version > </dependency >
如果不对Hazelcast进行配置的话,会使用默认配置hazelcast-default.xml。可以在项目启动时查看日志输出。
一、分布式计算 主要记录三个常用的任务类型
IExecutorService:分布式任务,提交的任务可以自动在集群中分配执行。 DurableExecutorService:分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。 IScheduledExecutorService:分布式定时任务,可以设置定时循环,也可以设置延时执行 根据最近使用Hazelcast的心得,带I的一般就是表示分布式,如IMap、IExceutorService
一个HazelcastInstance实例,就代表一个节点,通过配置可以修改节点的加入方式。
1 Hazelcast.newHazelcastInstance();
如果想要测试多节点任务分配,可以通过以下代码,另起一个Hazelcast节点。
1 2 3 4 5 6 public class SlaveMember { public static void main (String[] args) { Hazelcast.newHazelcastInstance(); } }
1.1 分布式任务 分布式任务,提交的任务可以自动在集群中分配执行。
创建任务Runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class EchoTask implements Runnable , Serializable { private final String msg; public EchoTask (String msg) { this .msg = msg; } @Override public void run () { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Echo: " + msg); } }
执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class MasterMember { public static void main (String[] args) throws Exception { Config config = new Config(); ExecutorConfig executorConfig = config.getExecutorConfig("executor" ); executorConfig.setPoolSize(2 ); executorConfig.setQueueCapacity(3 ); HazelcastInstance hz = Hazelcast.newHazelcastInstance(config); IExecutorService executor = hz.getExecutorService("executor" ); for (int i = 1 ; i <= 4 ; i++) { System.out.println("创建任务: " + i); executor.execute(new EchoTask("" + i)); } System.out.println("MasterMember finished!" ); } }
如果任务的数量>=线程数量+队列数量,就会报错,overloaded。除非是集群或者是加大任务数量或者队列数量。
如果此时创建任务的节点挂掉,那么任务就丢失了,想要任务不丢失,就需要使用持久式任务。
提交给所有节点 方法
executeOnAllMembers(Runnable command):在所有已知的集群成员上执行任务。 submitToAllMembers(Callabletask):将任务提交给所有群集成员,并返回每个节点对应任务的键值对。 submitToAllMembers(Callabletask, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数 submitToAllMembers(Runnable task, MultiExecutionCallback callback):将任务提交给所有群集成员,带回调函数 submit带回调函数的没有返回值
模拟主节点,Slave节点代码不变
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MasterMember { public static void main (String[] args) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); Map<String, Integer> map = hz.getMap("map" ); for (int i = 0 ; i < 5 ; i++) { map.put(UUID.randomUUID().toString(), 1 ); } IExecutorService executor = hz.getExecutorService("executor" ); Map<Member, Future<Integer>> result = executor.submitToAllMembers(new SumTask()); int sum = 0 ; for (Future<Integer> future : result.values()) { sum += future.get(); } System.out.println("Result: " + sum); } }
创建Callable
1 2 3 4 5 6 7 8 public class SumTask implements Callable <Integer >, Serializable { @Override public Integer call () throws Exception { System.out.println("我要执行任务" ); return 1 ; } }
执行结果如图,两个节点都执行了。
提交给指定键所属节点 方法:
executeOnKeyOwner(Runnable command, Object key):在指定key的所属节点上执行任务。 submitToKeyOwner(Callabletask, Object key):将任务提交给指定键的所有者,并返回该任务的Future submitToKeyOwner(Callabletask, Object key, ExecutionCallbackcallback):在指定key的所属节点上执行任务。 submitToKeyOwner(Runnable task, Object key, ExecutionCallbackcallback):在指定key的所属节点上执行任务。 Slave节点保持不变,模拟主节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class VerifyTask implements Runnable , Serializable , HazelcastInstanceAware { private final String key; private transient HazelcastInstance hz; public VerifyTask (String key) { this .key = key; } @Override public void setHazelcastInstance (HazelcastInstance hz) { this .hz = hz; } @Override public void run () { IMap map = hz.getMap("map" ); boolean localKey = map.localKeySet().contains(key); System.out.println("Key " + key + " is local: " + localKey); } } public class MasterMember { public static void main (String[] args) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); Map<String, String> map = hz.getMap("map" ); for (int i = 0 ; i < 10 ; i++) { map.put(UUID.randomUUID().toString(), "" ); } IExecutorService executor = hz.getExecutorService("executor" ); for (String key : map.keySet()) { executor.executeOnKeyOwner(new VerifyTask(key), key); } } }
最后输出结果如图。可以看到IMap中的key是分布在不同节点上的,通过map.localKeySet,可以拿到本机存储的所有key,所有执行时传过来的key都在该节点上,由此可知,executeOnKeyOwner实现了在key所在节点上执行的功能。时
提交给指定节点 方法:
executeOnMember(Runnable command, Member member):指定单个成员执行任务。
executeOnMembers(Runnable command, Collectionmembers):指定多个成员执行任务。
executeOnMembers(Runnable command, MemberSelector memberSelector):使用选择器,指定成员执行任务。
submitToMember(Callabletask, Member member):向指定成员提交任务,并返回表示该任务的未来
submitToMember(Callabletask, Member member, ExecutionCallbackcallback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMember(Runnable task, Member member, ExecutionCallbackcallback):向指定成员提交任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMembers(Callabletask, Collectionmembers):向给定成员提交任务,并返回Map<Member,Future>表示某个成员的Future,通过Future可以拿到最后执行结果。
submitToMembers(Callabletask, Collectionmembers, MultiExecutionCallback callback):指定多个成员执行任务,带有回调函数,成功或者失败后自动执行逻辑
submitToMembers(Callabletask, MemberSelector memberSelector):使用选择器,选定成员提交任务,并返回Map<Member,Future>,表示每个成员上任务的待定完成。
submitToMembers(Callabletask, MemberSelector memberSelector, MultiExecutionCallback callback):同上,带回调函数
submitToMembers(Runnable task, Collectionmembers, MultiExecutionCallback callback):指定多个成员执行任务,带回调函数
submitToMembers(Runnable task, MemberSelector memberSelector, MultiExecutionCallback callback):使用选择器,带回调函数
submit带回调函数的都没有返回值
简单示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) { HazelcastInstance instance = Hazelcast.newHazelcastInstance(); Cluster cluster = instance.getCluster(); IExecutorService executorService = instance.getExecutorService("test" ); executorService.executeOnMember(null , cluster.getLocalMember()); executorService.executeOnMembers(null , cluster.getMembers()); MemberSelector chooseMyAngel = new MemberSelector() { @Override public boolean select (Member member) { String attribute = member.getAttribute("name" ); if ("向晚" .equals(attribute)) { return true ; } else { return false ; } } }; executorService.executeOnMembers(null , chooseMyAngel); }
提交给阉割过的节点 阉割过的,其实就是精简后的节点,上面没有分区,不存储内容。主要用来计算一些比较繁琐的任务,然后将计算结果返回给集群。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class ComputationHeavyTask implements Callable <Integer >, Serializable , HazelcastInstanceAware { private transient HazelcastInstance hz; public void setHazelcastInstance (HazelcastInstance hz) { this .hz = hz; } public Integer call () throws Exception { System.out.println("Running a computation heavy task on " + hz.getCluster().getLocalMember()); return 0 ; } } public class StartLiteMember { public static void main (String[] args) { Config config = new Config(); config.setLiteMember(true ); Hazelcast.newHazelcastInstance(config); } } public class StartDataMember { public static void main (String[] args) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); IExecutorService executor = hz.getExecutorService("executor" ); Future<Integer> future = executor.submit(new ComputationHeavyTask(), MemberSelectors.LITE_MEMBER_SELECTOR); System.out.println("Result: " + future.get()); } }
运行结果如图
1.2 分布式持久任务 分布式持久任务,可以在集群成员中备份任务。如果其中一个成员宕机,运算结果也不会丢失。如果成员都没有挂机,则会在所有成员中分配执行。
执行Runnable 创建Runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class EchoTask implements Runnable , Serializable { private final String msg; private final Integer delayMs; EchoTask(String msg,Integer delayMs) { this .msg = msg; this .delayMs=delayMs; } @Override public void run () { System.out.println("runnable开始执行" +msg); try { Thread.sleep(delayMs); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("runnable执行结束:" +msg); } }
先启动一个SlaveMember。再启动下面代码,模拟把任务丢到任务池,然后关闭,查看任务是否会丢失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class BasicConfiguration { public static void main (String[] args) throws Exception { Config config = new Config(); config.getDurableExecutorConfig("exec" ) .setCapacity(200 ) .setDurability(1 ) .setPoolSize(8 ); HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); DurableExecutorService executorService = instance.getDurableExecutorService("exec" ); for (int i = 1 ; i <= 4 ; i++) { System.out.println("创建任务: " + i); DurableExecutorServiceFuture<?> submit = executorService.submit(new EchoTask("" + i,5000 )); } System.exit(0 ); } }
如果两个节点都没关闭,则会分配执行,不存在重复执行的情况。如图。
如果某个节点关闭了,则所有的任务还能再另一个节点顺利执行。如图。
执行Callable 先说一下,下面这个例子的思路。
因为持久式任务,是会在节点中,随机一台机器执行的。为了方便测试,强制让主节点执行,并在执行时挂掉,在挂掉前,把taskId输出控制台。Slave节点监测到主节点没有成功执行任务,就会自动执行任务,输入taskId,获取最后的任务执行结果。
创建Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class BasicTask implements Callable <String >, Serializable { private final String msg; BasicTask(String msg) { this .msg = msg; } @Override public String call () throws Exception { int i=5 ; while (i>0 ) { System.out.println("等待1秒" +i); Thread.sleep(1000 ); --i; } return msg; } }
模拟Slave节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class SlaveMember { public static Long taskId = null ; public static void main (String[] args) { HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(); new Thread(() -> { if (taskId == null ) { Scanner scanner = new Scanner(System.in); System.out.print("快点输入要寻回的任务id:" ); long l = scanner.nextLong(); taskId = l; } }).start(); while (true ) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } if (taskId != null ) { DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec" ); Future<String> objectFuture = exec.retrieveResult(taskId); try { String o = objectFuture.get(); System.out.println("找回任务的返回值:" + o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } break ; } } System.out.println("测试结束" ); } }
模拟主节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class RetrieveLostTask { public static void main (String[] args) throws Exception { HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(); DurableExecutorService exec = hazelcastInstance.getDurableExecutorService("exec" ); Cluster cluster = hazelcastInstance.getCluster(); Member localMember = cluster.getLocalMember(); Integer key = null ; int i = 0 ; while (true ) { PartitionService partitionService = hazelcastInstance.getPartitionService(); Partition partition = partitionService.getPartition(i); Member owner = partition.getOwner(); if (owner != null && owner == localMember) { key = i; break ; } i++; } DurableExecutorServiceFuture<String> future = exec.submitToKeyOwner(new BasicTask("test the node of executing task is disabled" ), key); System.out.println(future.getTaskId()); System.exit(0 ); } }
运行过如图
1.3 分布式定时任务 定时任务 开启一个定时任务,指定时间后开始,之后每隔5秒执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class SlaveMember { public static Long taskId = null ; private final static String START_TIME = "10:05:30" ; private static Long calculateInitialDelay (String startTimeString) { LocalTime startTime = LocalTime.parse(startTimeString); LocalTime nowTime = LocalTime.now(); Long initialDelay = null ; if (nowTime.isAfter(startTime)) { LocalDate dateTime = LocalDate.now(); LocalDateTime startDayTime = LocalDateTime.of(dateTime.plusDays(1L ), startTime); initialDelay = ChronoUnit.SECONDS.between(LocalDateTime.of(dateTime, nowTime), startDayTime); } else { initialDelay = ChronoUnit.SECONDS.between(nowTime, startTime); } return initialDelay; } public static void main (String[] args) { Long initialDelay = calculateInitialDelay(START_TIME); HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(); IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst" ); tst.scheduleAtFixedRate(TaskUtils.named("task" , new Runnable() { @Override public void run () { String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ).format(new Date()); System.out.println(format); } }), initialDelay, 5L , TimeUnit.SECONDS); } }
输出结果如图
延时任务 hazelcast的延时任务,跟定时任务相比,就是只执行一次,不会循环执行,但是即使执行完了,也不会释放资源的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class ScheduledDelayTask { private final static String START_TIME = "10:33:30" ; private static IScheduledFuture<Object> createDelayTask (IScheduledExecutorService tst, Long initialDelay) { return tst.schedule(new Runnable() { @Override public void run () { System.out.println(String.format("延时任务执行:%s" , START_TIME)); } }, initialDelay, TimeUnit.SECONDS); } public static void main (String[] args) throws ExecutionException, InterruptedException { Config config = new Config(); config.getScheduledExecutorConfig("tst" ) .setPoolSize(1 ) .setCapacity(1 ); Long initialDelay = 5L ; HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config); IScheduledExecutorService tst = hazelcastInstance.getScheduledExecutorService("tst" ); IScheduledFuture<Object> delayTask = createDelayTask(tst, initialDelay); Object o = delayTask.get(); System.out.println(o); delayTask.dispose(); Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures = tst.getAllScheduledFutures(); createDelayTask(tst, 5L ); System.out.println("结束" ); } }
hazelcast提供的自动释放延时任务的写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class ScheduledDelayAutoDisposeTask { public static void main (String[] args) throws Exception { Config config = new Config(); config.getScheduledExecutorConfig("tst" ) .setPoolSize(1 ) .setCapacity(1 ); HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); IScheduledExecutorService tst = instance.getScheduledExecutorService("tst" ); IScheduledFuture<Object> hhh = tst.schedule(TaskUtils.autoDisposable(new Runnable() { @Override public void run () { System.out.println("第一次执行" ); } }), 5L , TimeUnit.SECONDS); Object o = hhh.get(); tst.schedule(TaskUtils.autoDisposable(new Runnable() { @Override public void run () { System.out.println("第二次执行" ); } }), 5L , TimeUnit.SECONDS); } }
不过目前的Hazelcast的自动释放资源,对于Runnable是不好使的。目前最好的办法,就是去拿所有的任务判断是否执行完成。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 HazelcastInstance instance = Hazelcast.newHazelcastInstance(); IScheduledExecutorService tst = instance.getScheduledExecutorService("tst" ); tst.scheduleAtFixedRate(new Runnable() { @Override public void run () { Map<Member, List<IScheduledFuture<Object>>> map = tst.getAllScheduledFutures(); Set<Member> members = map.keySet(); for (Member member : members) { List<IScheduledFuture<Object>> futures = map.get(member); futures.forEach(x -> { if (x.isDone()) { x.dispose(); } }); } } }, 5L , 5L , TimeUnit.SECONDS);