下面设计实现的是:交换机Hlr指令处理任务模块。当然,在后续的业务发展过程中,还可能出现,其他类型指令的任务处理,所以根据“开闭”原则的定义,要抽象出一个接口类:BusinessEvent /** * filename:BusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * Description:业务事件任务接口定义 * author tangjie * version 1.0 * */ package newlandframework.batchtask.model; public interface BusinessEvent { // 执行具体批处理的任务 public int execute(Integer userId); }然后具体的Hlr指令发送任务模块HlrBusinessEvent要实现这个接口类的方法完成用户停复机Hlr指令的派发。代码如下/** * filename:HlrBusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * Description:Hlr指令派发任务接口定义 * author tangjie * version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.math.RandomUtils; public class HlrBusinessEvent implements BusinessEvent { // 交换机上的指令执行成功失败标识0表示成功 1表示失败 public final static int TASKSUCC 0; public final static int TASKFAIL 1; private final static int ELAPSETIME 1000; Override public int execute(Integer userId) { // 这里为了举例,随机产生1000以内的随机数 int millis RandomUtils.nextInt(ELAPSETIME); // 简单模拟往交换机发送停机/复机的指令 try { Thread.sleep(millis); String strContent String.format( 线程标识[%s]用户标识:[%d]执行交换机指令工单耗时:[%d]毫秒, Thread .currentThread().getName(), userId, millis); System.out.println(strContent); // 这里为了演示直接简单根据随机数是不是偶数简单模拟交换机指令执行的结果 return (millis % 2 0) ? TASKSUCC : TASKFAIL; } catch (InterruptedException e) { e.printStackTrace(); return TASKFAIL; } } }实际运行情况中我们可能要监控一下指令发送的时长于是再设计一个针对Hlr指令发送任务模块HlrBusinessEvent切面嵌入代理的Hlr指令时长计算代理类HlrBusinessEventAdvisor具体的代码如下/** * filename:HlrBusinessEventAdvisor.java * * Newland Co. Ltd. All rights reserved. * * Description:Hlr指令派发时长计算代理类 * author tangjie * version 1.0 * */ package newlandframework.batchtask.model; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.lang.time.StopWatch; public class HlrBusinessEventAdvisor implements MethodInterceptor { public HlrBusinessEventAdvisor() { } Override public Object invoke(MethodInvocation invocation) throws Throwable { // 计算一下指令派发时长 StopWatch sw new StopWatch(); sw.start(); Object obj invocation.proceed(); sw.stop(); System.out.println(执行交换机指令工单耗时: [ sw.getTime() ] 毫秒); return obj; } }剩下的我们由于是要异步并行计算得到执行结果于是我们设计一个批处理Hlr任务执行模块HlrBusinessEventTask它要实现java.util.concurrent.Callable接口的方法call它会返回一个异步任务的执行结果。/** * filename:HlrBusinessEventTask.java * * Newland Co. Ltd. All rights reserved. * * Description:Hlr指令派任务执行类 * author tangjie * version 1.0 * */ package newlandframework.batchtask.model; import java.util.concurrent.Callable; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; public class HlrBusinessEventTask implements CallableInteger { private NotifyUsers user null; private final static String MAPPERMETHODNAME execute; public HlrBusinessEventTask(NotifyUsers user) { this.user user; } Override public Integer call() throws Exception { synchronized (this) { ProxyFactory weaver new ProxyFactory(new HlrBusinessEvent()); NameMatchMethodPointcutAdvisor advisor new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MAPPERMETHODNAME); advisor.setAdvice(new HlrBusinessEventAdvisor()); weaver.addAdvisor(advisor); BusinessEvent proxyObject (BusinessEvent) weaver.getProxy(); Integer result new Integer(proxyObject.execute(user.getUserId())); // 返回执行结果 return result; } } }接下来我们要把并行异步加载的查询结果和并行异步处理任务执行的模块给它组合起来使用故重新封装一个通知用户批处理任务管理类模块NotifyUsersBatchTask。它的主要功能是批量并行异步加载查询待停复机的手机用户然后把它放入并行异步处理的线程池中进行异步处理。然后我们打印出本次批处理的任务一共有多少成功数和失败数分别是多少当然本文还给出了另外一种JMX方式的监控。NotifyTaskSuccCounter类主要是统计派发的任务中执行成功的任务的数量而与之相对应的类NotifyTaskFailCounter是用来统计执行失败的任务的数量。具体的代码如下/** * filename:NotifyUsersBatchTask.java * * Newland Co. Ltd. All rights reserved. * * Description:通知用户批处理任务管理类 * author tangjie * version 1.0 * */ package newlandframework.batchtask; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.IfClosure; import org.apache.commons.lang.StringUtils; import newlandframework.batchtask.jmx.BatchTaskMonitor; import newlandframework.batchtask.model.NotifyUsers; import newlandframework.batchtask.parallel.BatchQueryLoader; import newlandframework.batchtask.parallel.BatchTaskReactor; public class NotifyUsersBatchTask { public NotifyUsersBatchTask() { } private ArrayListDataSource dataSource; // 基于JMX的任务完成情况监控计数器 private BatchTaskMonitor monitor new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); // 支持同时加载多个数据源 public NotifyUsersBatchTask(ArrayListDataSource dataSource) { this.dataSource dataSource; } // 批处理任务执行成功计数器 class NotifyTaskSuccCounter implements Closure { public static final String NOTIFYTASKSUCCCOUNTER TASKSUCCCOUNTER; private int numberSucc 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER); numberSucc; } public int getSuccNumber() { return numberSucc; } } // 批处理任务执行失败计数器 class NotifyTaskFailCounter implements Closure { public static final String NOTIFYTASKFAILCOUNTER TASKFAILCOUNTER; private int numberFail 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER); numberFail; } public int getFailNumber() { return numberFail; } } // 并行加载查询多个水平分库的数据集合 public ListNotifyUsers query() throws SQLException { BatchQueryLoader loader new BatchQueryLoader(); String strSQL select home_city, msisdn, user_id from notify_users; for (int i 0; i dataSource.size(); i) { Connection con dataSource.get(i).getConnection(); Statement st con.createStatement(); loader.attachLoadEnv(strSQL, st, con); } ListResultSet list loader.executeQuery(); System.out.println(查询出记录总数为: list.size()); final ListNotifyUsers listNotifyUsers new ArrayListNotifyUsers(); for (int i 0; i list.size(); i) { ResultSet rs list.get(i); while (rs.next()) { NotifyUsers users new NotifyUsers(); users.setHomeCity(rs.getInt(1)); users.setMsisdn(rs.getInt(2)); users.setUserId(rs.getInt(3)); listNotifyUsers.add(users); } } // 释放连接资源 loader.close(); return listNotifyUsers; } // 批处理数据集合,任务分派 public void batchNotify(ListNotifyUsers list, final ExecutorService excutor) { System.out.println(处理记录总数为: list.size()); System.out.println(StringUtils.center(记录明细如下, 40, -)); NotifyTaskSuccCounter cntSucc new NotifyTaskSuccCounter(); NotifyTaskFailCounter cntFail new NotifyTaskFailCounter(); BatchTaskPredicate predicate new BatchTaskPredicate(excutor); Closure batchAction new IfClosure(predicate, cntSucc, cntFail); CollectionUtils.forAllDo(list, batchAction); System.out.println(批处理一共处理: list.size() 记录,处理成功: cntSucc.getSuccNumber() 条记录,处理失败: cntFail.getFailNumber() 条记录); } }