Skip to content

Latest commit

 

History

History
136 lines (109 loc) · 4.28 KB

README.md

File metadata and controls

136 lines (109 loc) · 4.28 KB

Describtion

This is a simple therad pool manager . But it's very useful , you can add a new task job in dynamic to this manager and it will dispatch and schedue your task (support interrupt ) . It will create several threads (based on your cpu numer ) and every thread has a 10 tasks queue by default .

Work Flow

How To Use

You need to extend the BasicTaskDispatchPooland overwrite runTask , getFinishedCallback and getItemTaskCallback.

  • runTask : detail job will do in this method
  • getFinishedCallback : callback of all task finished
  • getItemTaskCallback : callback of item task job finished

You can find detail in DemoAsyncPool

public class DemoAsyncPool extends BasicTaskDispatchPool<DemoAsyncPool.InputArgs, DemoAsyncPool.Response> {


    @Override
    protected Response runTask(InputArgs arg) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // interrupt test
        if (Double.compare(Math.random(), 0.5f) < 0) {
            Thread.currentThread().interrupt();
        }
        return new Response(Thread.currentThread().toString());

    }

    @Override
    protected ThreadDisPatchManager.ThreadTaskFinished<InputArgs, Response> getFinishedCallback() {
        return new ThreadDisPatchManager.ThreadTaskFinished<InputArgs, Response>() {
            @Override
            public void onFinished(List<Pair<InputArgs, Response>> finishedList,
                                   List<Pair<InputArgs, Response>> interruptedList) {
                System.out.println(finishedList.size() + "'s task has been finished and " + interruptedList.size() +
                        "'s " + "task has been interrupted");
            }

        };
    }

    @Override
    protected ThreadDisPatchManager.JobTaskFinished<InputArgs, Response> getItemTaskCallback() {
        return new ThreadDisPatchManager.JobTaskFinished<InputArgs, Response>() {
            @Override
            public void onInterrupted(InputArgs inputArgs) {
                System.out.println(inputArgs.arg + " has been interrupted");
            }

            @Override
            public void onFinished(Pair<InputArgs, Response> res) {
                StringBuilder itemTaskFinishMsg = new StringBuilder();
                if (res == null || res.first == null) {
                    System.out.println("error");
                }
                itemTaskFinishMsg
                        .append("input :" + res.first.getArg())
                        .append(" res: " + res.second);

                System.out.println("itemTaskFinishMsg==>" + itemTaskFinishMsg.toString());

            }
        };
    }


    public static class InputArgs {
        private String arg;

        public InputArgs(String arg) {
            this.arg = arg;
        }

        public String getArg() {
            return arg;
        }
    }

    public static class Response {
        private String res;

        public Response(String res) {
            this.res = res;
        }

        public String getRes() {
            return res;
        }

        @Override
        public String toString() {
            return res;
        }
    }
}

Test in Foo.java

public class Foo {
    public static void main(String[] args) {
        DemoAsyncPool demoAsyncPool = new DemoAsyncPool();
        for (int i = 0; i < 10; i++) {
            demoAsyncPool.addTask(new DemoAsyncPool.InputArgs("job:" + i));
        }
    }
}

Support back pressure

Test in BackPressureDemoAsyncPool

 BackPressureDemoAsyncPool demoAsyncPool = new BackPressureDemoAsyncPool();
        int i = 0;
        while (true) {
            if (i >= 30) {
                break;
            }
            if (demoAsyncPool.isBusying()) {
                Thread.sleep(10000);
            }
            demoAsyncPool.addTask(new BackPressureDemoAsyncPool.InputArgs("job:" + i));
            i++;
        }