技术开发 频道

Android网络业务的封装与调度

  【IT168技术】手机客户端程序由于网络宽带的约束,尤其在GPRS网络环境下,大数据量的网络交互很大程度上降低应用的响应,影响用户体验。比如,如果做一个手机网盘客户端,在后台上传文件时(大数据量的交互),获取文件列表(命令类的交互)这个过程就显得太别慢。而我们的要求是希望这些命令类操作能尽快得到响应。

  通常,在手机客户端,我们设计一个网络操作的管理器,来统一管理这些需要联网的操作。具体做法是把网络操作封装成一个Command(或者说是Task),管理器实现特定的调度规则来调度运行这些Task。

  这样做的好处至少有三:

  一. 用Command封装了网络操作,使得这些操作与上传的业务分离,解除了强耦合。

  二. 可以根据网络情况来确定来采用不同的调度规则,提高用户体验。

  三. 重用,这些Task和TaskManager的代码在别的手机应用上基本上能照搬过去。

  四. 扩展,当应用需要扩展新的业务时,只有扩展一个新的Command(或者说是Task),接受调度即可,易于扩展。

  例子:

  还是以上文提到的微盘为例,可以概括我们对管理器的设计要求有:

  在Wifi网络环境下:

  一:各种网络操作可以并行运行。

  在GPRS网络环境下:

  二:支持优先级抢占调度,命令类操作的优先级比数据传输类的优先级高,当命令类的Task(获取文件列表)提交后,打断数据传输的Task(如上传,下载),等命令类的任务运行完毕,再接着运行数据类任务(断点上传,下载)。

  二:同一个优先级的任务可以并行运行,如多个命令一起在网络上传输。

  实现思路:

  TaskManager :

  1. TaskManager开辟一个后台线程进行调度工作。

  2. 由于要支持多个优先级的抢占调度,我们需要两个队列来维护运行中的Task和等待中的Task。

  3. 由于Task的调度是基于优先级的,我们可以使用优先级队列,运行队列采用PriorityQueue,等待队列使用PriorityBlockingQueue,当没有网络业务需要运行时,调度线程阻塞挂起,避免空转。

  4. TaskManager设计为单一实例(单一模式)。

  5. 每个Task被调度运行时,该Task被从等待队列移动运行队列,当Task执行完毕时,从运行队列删除,唤醒调度线程进行新的调度。

  下面是简单的设计代码:

  public final class TaskEngine implements Runnable{

  private PriorityQueue runningQueue;//运行的task队列

  private PriorityBlockingQueue readyQueue;//就绪的task队列,准备接受调度的task列表

  private final AtomicLong taskIdProducer = new AtomicLong(1);//Task Id生成器

  private Object sheduleLock = new Object();//同步锁

  private static TaskEngine instance;

  public long addTask(BusinessObject bo){

  Task task = new Task(bo);

  long newTaskId = taskIdProducer.incrementAndGet();

  task.setTaskId(newTaskId);

  if(this.isWifiNetWork()){ //WIFI网络

  synchronized(sheduleLock){

  runningQueue.add(task);

  }

  new Thread(task).start();

  }else{ //GPRS网络

  if(readyQueue.offer(task)){ //task入就绪队列

  final ReentrantLock lock = this.lock;

  lock.lock();

  try{

  needSchedule.signal(); //唤醒调度线程重新调度

  }finally{

  lock.unlock();}

  }

  }

  return newTaskId;

  }

  public final void run(){//task调度逻辑

  ....

  ....

  }

  //挂起调度线程 当不需要调度时

  private void waitUntilNeedSchedule() throws InterruptedException

  {

  .....

  }

  }

  Task:

  1. 对要执行的网络操作的封装。

  2. Task执行完毕时,发Task结束信号,唤醒调度线程进行新的调度

  3. Task要实现Comparable接口,才能让TaskManager的两个队列自动其包含的Task排序。

  下面是简单的设计代码:

  public class Task implements Runnable,Comparable{

  private long taskId;

  private BusinessObject bo;//封装网络操作的业务对象的抽象父类,

  //它封装了具体的Command,保证了业务扩展中Task的接口不变

  @Override

  public void run() {

  this.onTaskStart();

  this.bo.execute();

  this.onTaskEnd();

  }

  private voidonTaskStart()

  {...}

  public int getPriority()

  {...}

  public void setPriority(intpriority)

  {...}

  @Override

  public int compareTo(Task object1) {

  return this.getPriority()>object1.getPriority()?-1:1;

  }

  }

  小注意事项:

  Android对PriorityQueue的实现和Jdk中的实现有点不一样。

  (PriorityQueue.java在android中的代码路径是usr\dalvik\libcore\luni\src\main\java\java\util)

  PriorityQueue.remove(Object o) ;//从PriorityQueue中删除一个元素。

  对于完成这个删除操作android和jdk都是分两个过程实现,一,找出待删除元素的索引index,二,删除index所在元素。

  在JDK中,是通过调用元素的equals方法来找到待删除元素的索引,

  private int indexOf(Object o) {

  if (o != null) {

  for (int i = 0; i < size; i++)

  if (o.equals(queue[i]))

  return i;

  }

  return -1;

  }

  在android中,是间接调用元素的compareTo方法判断结果是否为0来找到待删除元素的索引,

  int targetIndex;

  for (targetIndex = 0; targetIndex < size; targetIndex++) {

  if (0 == this.compare((E) o, elements[targetIndex])) {

  break;

  }

  }

  private int compare(E o1, E o2) {

  if (null != comparator) {

  return comparator.compare(o1, o2);

  }

  return ((Comparable) o1).compareTo(o2);

  }

  所以为了Task能在执行完毕时从PriorityQueue找到这个Task并删除之,需要在compareTo方法里在优先级相等时

  返回0。

  @Override

  public int compareTo(Task object1) {

  if(this.getPriority()==object1.getPriority())

  return 0;

  return this.getPriority()>object1.getPriority()?-1:1;

  }

  当是这样运行PriorityQueue.remove(Object o) 逻辑上只能删除PriorityQueue里第一个优先级与被删除的元素

  优先级相等的元素(可能是待删除的元素也可能不是),有误删的可能,需要做如下修改:

  @Override

  public int compareTo(Task object1) {

  if(this.getPriority()==object1.getPriority() && this.equals(object1))

  return 0;

  return this.getPriority()>object1.getPriority()?-1:1;

  }

  这样才能正确执行remove(Object o),删除指定的对象o。

  个人觉得android这样设计使得remove(Object o)复杂化了,不然JDK中那么简洁。语义上也不是那么好懂了,

  因为按通常理解,equals是判断两个对象是否相等,compareTo可能是对象的某个属性的比较(类别数据库中的order by),

  而现在执行PriorityQueue.remove(Object o),这个对象o明明在容器PriorityQueue中,却删除不了,除非去翻看android中PriorityQueue的实现代码,然后重写compareTo这个方法。这样的API使用时比较容易出错,应该不符号良好的API设计规范吧。

  完整的代码实现如下:

* @类名:TaskEngine
  
* @创建:baiyingjun (devXiaobai@gmail.com)
  
* @创建日期:2011-7-7
  
* @说明:task调度引擎
  
***************************************************/
public final class TaskEngine implements Runnable{

    
private static final String TAG=Log.makeTag(TaskEngine.class);
    
    
private PriorityQueue<Task> runningQueue;//运行的task队列
    
    
private PriorityBlockingQueue<Task> readyQueue;//就绪的task队列,准备接受调度的task列表
    
    
private final AtomicLong taskIdProducer = new AtomicLong(1);
    
    
private Object sheduleLock = new Object();//调度锁
    
    
private static TaskEngine instance;
    
    
private final ReentrantLock lock = new ReentrantLock(true);
    
    
private final Condition needSchedule = lock.newCondition();
    
    
private Task currentTask;//准备接受调度的task
    
    
private Context mAppContext;
    
    
/**
      
* add BusinessObject to taskEngine
*/
    
public long addTask(BusinessObject bo) throws NetworkNotConnectException{
         Task task
= new Task(bo);
        
long newTaskId = taskIdProducer.incrementAndGet();
         task.setTaskId(newTaskId);
        
if(Log.DBG){
            
Log.d(TAG, "Add task with task id "+newTaskId+", priority "+task.getPriority());
         }
        
        
if(this.isWifiNetWork()){ //WIFI网络
             synchronized(sheduleLock){
                 runningQueue.add(task);
             }
            
new Thread(task).start();
         }
else{ //GPRS网络
            
if(readyQueue.offer(task)){ //task入就绪队列
                
if(Log.DBG)
                    
Log.d(TAG, "add task " +task.bo.methodName+" "+task.taskId+" to ready queue");
                 final ReentrantLock lock
= this.lock;
                 lock.lock();
                 try{
                     needSchedule.signal();
//唤醒调度线程重新调度
                 }finally{
                     lock.unlock();
                 }
             }
            
//schedule();
         }
         return newTaskId;
     }
    
    
private TaskEngine(Context context){
         mAppContext
= context;
         runningQueue
= new PriorityQueue<Task>();
         readyQueue
= new PriorityBlockingQueue<Task>();
        
new Thread(this).start();
        
Log.i(TAG, "shedule thread working");
     }
    
    
public synchronized static TaskEngine getInstance(Context context){
         Context appContext
= context.getApplicationContext();
        
if(instance==null || instance.mAppContext!=appContext){
             instance
=new TaskEngine(appContext);
         }
         return instance;
     }
    
     protected
boolean isWifiNetWork() throws NetworkNotConnectException{
         return NetworkManager.isWIFINetWork(mAppContext);
     }
    
    
/**
      
* task调度逻辑
*/
    
public final void run(){
         Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        
while(true){
             try {
                
if(this.isWifiNetWork()){
                     Task task
= this.readyQueue.take();
                    
if(task !=null){
                         synchronized(sheduleLock){
                             runningQueue.add(task);
                         }
                        
new Thread(task).start();
                     }
                 }
                
else{//非wifi网络
//空就绪队列,空运行队列,等待直到有任务到来
                    
if(this.readyQueue.size()==0 && runningQueue.size()==0){
                         currentTask
=readyQueue.take();
                         synchronized(sheduleLock){
                             runningQueue.add(currentTask);
                         }
                        
new Thread(currentTask).start();
                     }
                    
//抢占式调度(就绪队列非空,运行队列优先级比就绪队列优先级低)
                    
else if(readyQueue.size()>0 &&
                             this.readyQueue.element().getPriority()
>=this.getMaxPriority()){
                         currentTask
= readyQueue.take();
                        
if(currentTask.getPriority()>this.getMaxPriority()){//暂停低优先级的任务运行
                             synchronized(sheduleLock){
                                
for(int i=0;i<runningQueue.size();i++){
                                     Task toStopTask
=runningQueue.remove();
                                    
//因为任务调度,将低优先级的任务暂时给冻结起来
                                     toStopTask.setState(Task.STATE_FROST);
                                     readyQueue.add(toStopTask);
                                 }
                             }
                         }
                        
//运行被调度的任务
                         runningQueue.add(currentTask);
                        
new Thread(currentTask).start();
                     }
else {//等高优先级的任务运行完毕
                         waitUntilNeedSchedule();
                     }
                 }
            
             }catch (InterruptedException e) {
                
Log.e(TAG, "Schedule error "+e.getMessage());
             } catch (NetworkNotConnectException e) {
                
// TODO Auto-generated catch block
                 e.printStackTrace();
             }
         }
     }

    
/*
      
* 等待,直到就绪队列里的最高优先级比当前运行优先级高,或者就绪队列为空时等待运行队列运行完毕
*/
    
private void waitUntilNeedSchedule() throws InterruptedException{
          final ReentrantLock lock
= this.lock;
          lock.lockInterruptibly();
             try {
                 try{
                    
while ((readyQueue.size()>0
                            
&& readyQueue.element().getPriority()<getMaxPriority())//等高优先级的任务运行完毕
                             || (readyQueue.size()
==0 && runningQueue.size()>0) ){//或者等运行队列运行完毕
                        
if(Log.DBG)
                            
Log.d(TAG, "waiting sheduling........");
                         needSchedule.await();
                     }
                 }
                 catch (InterruptedException ie) {
                     needSchedule.signal();
// propagate to non-interrupted thread
                     throw ie;
                 }
             } finally {
                 lock.unlock();
             }
     }
    
    
/**
      
* Hand the specified task ,such as pause,delete and so on
*/
    
public boolean handTask(long taskId,int handType) {
        
Log.i(TAG, "set task`s state with taskId "+taskId);
         synchronized(this.sheduleLock){
            
//如果在运行队列里,取消该任务
             Iterator
<Task> runningItor= this.runningQueue.iterator();
            
while(runningItor.hasNext()){
                 Task task
= runningItor.next();
                
boolean b = task.equals(this);
                
if(task.getTaskId()==taskId){
                     runningQueue.remove(task);
                     task.setState(handType);
                    
Log.i(TAG, "set runningQueue taskId = "+taskId + " state " + handType);
                     return
true;
                 }
             }
            
//如果在就绪队列里,删除
             Iterator
<Task> readyItor= this.readyQueue.iterator();
            
while(readyItor.hasNext()){
                 Task task
= readyItor.next();
                
if(task.getTaskId()==taskId){
//                    readyQueue.remove(task);
                     task.setState(handType);
                    
Log.i(TAG, "set readyQueue taskId = "+taskId + " state " + handType);
                     return
true;
                 }
             }
             return
false;
         }
     }
    
/***
      
* 获取运行队列任务的最高优先级
*/
    
private int getMaxPriority(){
        
if(this.runningQueue==null || this.runningQueue.size()==0)
             return
-1;
        
else{
             return this.runningQueue.element().getPriority();
         }
     }
        
    
/***************************************************
      
* @类名:Task
      
* @创建:baiyingjun (devXiaobai@gmail.com)
      
* @创建日期:2011-7-7
      
* @说明:业务对象的包装成可运行实体
      
***************************************************/
    
public class Task implements Runnable,Comparable<Task>{

        
//运行
        
public static final int STATE_INIT = -1;
        
//运行
        
public static final int STATE_RUN = 0;
        
//停止
        
public static final int STATE_STOP = 1;
        
//暂停
        
public static final int STATE_PAUSE = 2;
        
//取消
        
public static final int STATE_CANCLE = 3;
        
//冻结,如果在GPRS下,因为线程调度的时候低优先级的被放readyqueue里的时候,要把这个任务暂时给“冻结”起来
        
public static final int STATE_FROST = 4;
        
        
private long taskId;
        
        
private BusinessObject bo;
        
        
public Task(){
            
         }
        
        
public Task(BusinessObject bo){
             this.bo
=bo;
         }
        
         @Override
        
public void run() {
             this.onTaskStart();
             this.bo.execute();
             this.onTaskEnd();
         }

        
private void onTaskStart(){
             this.bo.setmState(STATE_RUN);
         }
        
        
public long getTaskId() {
             return taskId;
         }

        
public void setTaskId(long taskId) {
             this.taskId
= taskId;
         }

        
public int getPriority() {
             return this.bo.getPriority();
         }

        
public void setPriority(int priority) {
             this.bo.setPriority(priority);
         }

        
/*
          
* compare task priority
*/
         @Override
        
public int compareTo(Task object1) {
            
if(this.getPriority()==object1.getPriority()&& this.equals(object1))
                 return
0;
             return this.getPriority()
>object1.getPriority()?-1:1;
         }
        
        
public void setState(int state){//设置当前运行的task的state
             this.bo.setmState(state);
            
Log.d(TAG, "Set task "+this.bo.methodName+" "+this.taskId + " state " + state);
         }
        
        
private void onTaskEnd(){//运行完毕后从taskengine运行队列里删除
            
if(Log.DBG){
                
Log.d(TAG, "task "+this.bo.methodName+" "+taskId+" End");
             }
            
if(this.bo.getmState() == STATE_FROST)//因为调度停止了该业务
                 return;
            
             final ReentrantLock lock
= TaskEngine.this.lock;
             lock.lock();
             try{
                
boolean removed = runningQueue.remove(this); //remove from running queue
                 assert removed;
                
if(Log.DBG)
                    
Log.d(TAG, this.bo.methodName+" "+this.taskId+" remove from runningQueue");
                 needSchedule.signal();
//唤醒调度线程重新调度
             }finally{
                 lock.unlock();
             }
         }

         @Override
        
public boolean equals(Object o) {
            
// TODO Auto-generated method stub
            
if(this==o){
                 return
true;
             }
            
if(o instanceof Task){
                 return taskId
==((Task)o).taskId;
             }
             return
false;
         }
     }
    
}

 

  拾漏补遗:

  1.补充最初的设计类图(可能与代码不太一致,但能说明问题)

  2. BusinessObject的实现代码:

/***************************************************
  
* @类名:BusinessObject
  
* @创建:baiyingjun (devXiaobai@gmail.com)
  
* @创建日期:2011-7-6
  
* @说明:抽象的业务,扩展的网络业务要继承这个类并实现抽象方法execute()
  
***************************************************/
public abstract class BusinessObject {
    
    
public static final int LOWEST_PRIORITY=1;//最低优先级
    
    
public static final int LOW_PRIORITY=2;//低优先级
    
    
public static final int NORMAL_PRIORITY=3;//正常优先级
    
    
public static final int HIGH_PRIORITY=4;//高优先级
    
    
public static final int HIGHEST_PRIORITY=5;//最高优先级
    
     protected BusinessListener listnener;
//运行业务的回调
    
     protected Context mContext;
    
    
private long taskId;
    
     protected Map
<String,Object> params;//运行业务需要的参数
    
    
private int priority;
    
    
public int getPriority() {
         return priority;
     }

    
public void setPriority(int priority) {
         this.priority
= priority;
     }

    
//设置回调
    
public void addBusinessListener(BusinessListener listnener){
         this.listnener
=listnener;
     }
    
    
public long doBusiness() throws NetworkNotConnectException{
         taskId
= TaskEngine.getInstance(mContext).addTask(this);
         return taskId;
     }
    
    
public abstract void execute();
}
0
相关文章