Java Advanced Async Future

In my previous publication I showed example of C# parallel calculation where we assign task to the background thread and tell it which method to call when completed. There is one big advantage in C# over Java is that C# can pass pointers to the method as parameters, what allowed us to pass callback method which suppose to be invoked when result is ready. Unfortunately in Java we could not pass entire method as parameter to call it later, however there is workaround which I will show you today.

Task:

Execute parallel task which will invoke specified method on completion and return parameters from calculations to it.

A bit of additional information:

We will approach this task from the real live example. lets assume we are in the office where working several departments. We are Department Nr1 preparing tasks for other departments – “Management department” or “Caller”. Once we generate new task we passed it to the desired department – “Worker” and willing to be notified when “Worker” complete job so we could assign new job to it. It would mean that somebody from “Workers” department would have to come in in our office – “Call back method” and tell us that “Workers” ready to take new task and here is result from previous task. So lets look on implementation of such idea.

Implementation:

I will start from launcher:

package launcher;

import caller.Caller;

public class Launcher {

	public static void main(String[] args) {

        new Caller();
    }

}

There is nothing to comment.
Next We will create interface:

package interfaces;

public interface CallBackInterface {

	public void OnCallBackMethod(String returnedResult);

}

This interface will contain just one method – that is the one which will be our callBack method
Now lets look at the implementation of “Worker”

package workers;

import java.util.Random;

public class SlowWorker {
	
	private int id;
	private int timeToSleep;
	public SlowWorker(int id)
	{
		this.id = id;
		timeToSleep = new Random().nextInt(5000) + 1000;
		
	}
	
	public String doWork()
	{
		try{
			Log("Worker ["+id+"] Starting Heavy Calculations");
			
            Thread.sleep(timeToSleep);
            int digit1 = new Random().nextInt(100);
            int digit2 = new Random().nextInt(100);
            int sum = digit1+digit2;
            
            Log("Worker ["+id+"] Finished after: "+timeToSleep);
            
            return "Worker: ["+id+"] Sum of "+digit1+" and "+digit2+" = "+sum;
		}
		catch(Exception e)
		{
			Log("I ["+id+"] was forcibly interrupted");
			return "Worker: ["+id+"] Crash: "+e;
		}
	}
	
	public void Log(String msg)
	{
		System.out.println(msg);
	}

}

Worker accept own unique ID in constructor, just for understanding who is who, also it define sleep time as random from 1 sec to 6 sec, so each worker will be busy random amount of time

In doWork() method – we just sleep – pretending to do more heavy calculations and sum up two randomly generated digits. once it is done we return answer as String.

Now lets look at another worker – thats the one who will run from his “Workers” department to our “Management Department” and notify us that job was complete and give us text reports 🙂

package workers;

import interfaces.CallBackInterface;

import java.util.concurrent.Callable;

public class NotifierWorker implements Callable<Object>{
	
	
	private CallBackInterface callBack;
	private int id;
	
	public NotifierWorker(int id)
	{
		this.id = id;
	}
	
	@Override
	public Object call() throws Exception {
		callBack.OnCallBackMethod(new SlowWorker(id).doWork());
		return null;
	}
	
	public void setCallBack(CallBackInterface callBack)
	{
		this.callBack = callBack;
	}
	
	public CallBackInterface setCallBack()
	{
		return callBack;
	}

}

This worker – “Notifier” implements Callable what automaticly generate call() method for us. that is where happens all the async calculations. however before we execute async calculation we have to set response method which is our Management office to specifically tell to Notifier worker where to find us – “Management” when his department would complete a job.

Lets have a look on that:

package caller;

import interfaces.CallBackInterface;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import workers.NotifierWorker;

public class Caller implements CallBackInterface{
	
	private boolean workerComplete;
	private int nrOfWorkersToRun;
	private int jobsComplete;
	private static int id;
	
	public Caller()
	{
		workerComplete = false;
		nrOfWorkersToRun = 5;
		jobsComplete = 0;
		id = 1;
		startExecution();
	}
	
	@Override
	public void OnCallBackMethod(String returnedResult) {
		Log("Main Thread - OnCallBack: "+returnedResult);
		jobsComplete++;
		
		if(jobsComplete == 5)
			workerComplete = true;
	}
	
	public void startExecution()
	{
		
        Log("Main Thread: Start Work @: "  + new java.util.Date());
        Log("");
        ExecutorService es = Executors.newFixedThreadPool(3);
        for(int i = 0; i < nrOfWorkersToRun; i++) {
        	int nr = getNextId();
        	Log("Main Thread: giving job to worker nr: ["+nr+"]");
	        NotifierWorker slaveNotifier = new NotifierWorker(nr);
	        slaveNotifier.setCallBack(this);
	        es.submit(slaveNotifier);
        }

        try
        {
        	while(!workerComplete)
        	{
        		Log("Main Thread: doing my own stuff.........");
        		Thread.sleep(800);
        	}
        }
        catch(Exception e)
        {
        	Log("Main Thread CRASHED: "+e);
        }
        
        Log("");
        Log("Main Thread: End work @: " + new java.util.Date());
        System.exit(0);
	}
	
	
	public int getNextId()
	{
		return id++;
	}
	
	public void Log(String msg)
	{
		System.out.println(msg);
	}

}

As any management department it is big, important and hard to follow what a hell is it doing. But we will try:
in two words – we just schedule 5 tasks to workers and wait until all of them complete and displaying result of each completion once it is ready. Also keep in mind that we have 5 tasks however we limit our workers to only 3 concurrent task at a time to save up on their salaries 🙂

1) ExecutorService es = Executors.newFixedThreadPool(3); – We create Threadpool with size of three – that will allow us to limit all background tasks to three in total at the same time. (it is not a requirement, just more fun and extra chance to demonstrate thread pool queue)
2) NotifierWorker slaveNotifier = new NotifierWorker(nr); – We create notifiers and assign unique ID to each of them:
3) slaveNotifier.setCallBack(this); – Tell to each notifier where he can find us when job complete
4) es.submit(slaveNotifier); – send him to work on the given task
5) once hes task is complete he will tell us about it in OnCallBackMethod method where we keep track of how many was completed out of executed.
6) once amount of completed tasks equal to amount of scheduled tasks we assume that this is over and exit.

and here is console output for you:

Main Thread: Start Work @: Fri Mar 14 15:47:29 GMT 2014

Main Thread: giving job to worker nr: [1]
Main Thread: giving job to worker nr: [2]
Worker [1] Starting Heavy Calculations
Main Thread: giving job to worker nr: [3]
Worker [2] Starting Heavy Calculations
Main Thread: giving job to worker nr: [4]
Worker [3] Starting Heavy Calculations
Main Thread: giving job to worker nr: [5]
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Worker [2] Finished after: 2941
Main Thread - OnCallBack: Worker: [2] Sum of 19 and 58 = 77
Worker [4] Starting Heavy Calculations
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Worker [3] Finished after: 4552
Main Thread - OnCallBack: Worker: [3] Sum of 80 and 68 = 148
Worker [5] Starting Heavy Calculations
Worker [1] Finished after: 4794
Main Thread - OnCallBack: Worker: [1] Sum of 51 and 71 = 122
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Worker [4] Finished after: 5656
Main Thread - OnCallBack: Worker: [4] Sum of 81 and 46 = 127
Main Thread: doing my own stuff.........
Main Thread: doing my own stuff.........
Worker [5] Finished after: 5444
Main Thread - OnCallBack: Worker: [5] Sum of 1 and 93 = 94

Main Thread: End work @: Fri Mar 14 15:47:39 GMT 2014

And as usually you can download entire project here:

Java Async Future

That is going to be my first post and today I will explain purpose of Java Future

The main purpose of Java Future is expecting returning value from paralel task in the future. For example if you execute new Thread you would be able to do some task in background however it is going to be difficult to return value from that thread once calculations were finished, we will have to use some kind of synchronized middle object as a storage.

Java Future is nearly the same Thread but with possibility to return value in the future. To demonstrate it we will implement simple task:

Task Description:

Download 4k (4096×2160 – 9.1Mb size) wallpaper image from URL (my DropBox) as parallel task, were executed worker would return String of the total KB were downloaded when task will be fully complete.

Implementation:

Lets start from the main class:

package implementation;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AsyncMain {
	
	public static void main(String[] args)
	{
		new AsyncMain().startRoutine();
	}
	
	private void startRoutine()
	{
		//creating thread pool of single thread (but could be pool of many threads, I just don't need it for this demonstration)
		ExecutorService executor = Executors.newSingleThreadExecutor();
		//creating callable object which expected to return value of String type
		Callable<String> imageDownloader = new AsyncWorker();
		//Executing asynchronous thread which should return value in the future 
		Future<String> asyncThread = executor.submit(imageDownloader);
					
		//Waiting until async thread finish its job
		while (!asyncThread.isDone()) {
			
			try
			{
				//writing to the console once per sec
				System.out.println("Main: Worker still Downloading file...");
				Thread.sleep(1000);
				
			}
			catch(Exception e)
			{
				System.out.println("Main thread was interupted");
			}
		}
				

		String result = "";
		try {
			//getting return value from async thread
			result = asyncThread.get();
			//printing returned value from async thread to console
			System.out.println("Main: [Returned result from AsyncThread is: "+result+"]");
		} catch (Exception e) {
			e.printStackTrace();
		} 
			

		
		//Telling to pool executor to shutdown once it thread is finished
		executor.shutdown();
		
	}

	
}


As you can see, we create thread pool, define future callable and execute it.
Future has nice method isDone() which allow to check if parallel task was complete before trying to get value. However if you will try to get value without check isDone() it will blocked the thread until data will be available

Now lets look at the Worker or slave if you prefer:

package implementation;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.Callable;



public class AsyncWorker implements Callable<String>{

	@Override
	public String call() throws Exception {
		
		//variables used during the progress
        String result = null;
        int totalBytes = 0;
        int size;
        int sizeKb;
        float onePrcnt;
        byte data[] = new byte[1024];
        int count;
        float prevProcent = 0;
        
        //in / out streams
        BufferedInputStream in = null;
        FileOutputStream fout = null;
        
        //URLs to download: from / to
        String urlLink ="https://dl.dropboxusercontent.com/u/15715229/palousecanola.jpg";
        String outputPath = "C:/AsyncTest";
        String filename = "Palousecanola.jpg";

		try
        {
			//Create new url object
			URL url = new URL(urlLink);
			//opening connection
			URLConnection con = url.openConnection();
			//setting size of remote content
			size = con.getContentLength();
			sizeKb = size / 1024;
			onePrcnt = size / 100;
			//Closing connection ??? there should be more delicate way how to get size of downloadable content instead of opening and closing connection
			//but stream type of connection doesn't allow to get the size of entire stream, however download process is faster
			//Anyway we need size just to nicely represent amount of downloaded in percentage 
			con = null;
			
			//Preparing input stream
            in = new BufferedInputStream(url.openStream());
            
            //prepare output file location;
            new File(outputPath).mkdirs();
            //Preparing output file stream
            fout = new FileOutputStream(outputPath+"/"+filename);
            //downloading file
            while ((count = in.read(data, 0, 1024)) != -1)
            {
            	//Calculating total of bytes downloaded
            	totalBytes += count;
            	//writing to the file new bytes
                fout.write(data, 0, count);
                //display console message once per one full percent or more then one
                if(totalBytes >= onePrcnt)
                {
                	//calculation in percentage how much in total was downloaded
                	float totalPrcnt = totalBytes / onePrcnt;
                	//checking if total minus previous total would be more then 1%
                	if(totalPrcnt - prevProcent > 1)
                	{
                		prevProcent = totalPrcnt;
                		//Then display message
                		System.out.println("AsyncWorker: [Downloaded: "+(int)totalPrcnt+"% or "+totalBytes / 1024+"Kb from: "+sizeKb+"]");
                	}
                }
                	
            }
            
            //Setting return statement
            result = "Download of total "+totalBytes / 1024+"KB was complete";
        }
		catch(Exception e)
		{
			//setting return statement to contain exception message 
			result = "Application was Scrashed because of: "+e;
		}
        finally
        {
        	//closing [in] and [out] streams regardless was there exception or not
            if (in != null)
                in.close();
            if (fout != null)
                fout.close();
        }
		//internal message of complete state
		System.out.println("AsyncWorker: [Download Complete!]");
		//returning value
		return result;
	}

}

Our worker just connects to web (my drop box file) and try to download it.
I had to make connection to the web twice just to get file size for nice representation of download progress in percentage, however downloading file through the input stream does not allow you to query content weight but is much faster (you can test it your self), so I had to open connection get file size, close connection and open stream connection again.

Returned String is a result of downloaded file size or crash report, in both ways async task return value of String type to main thread.

This project source can be downloaded from my: