Skip to content

Latest commit

 

History

History
288 lines (225 loc) · 11.5 KB

README.md

File metadata and controls

288 lines (225 loc) · 11.5 KB

Threadify:

The purpose of this package is to make using Python threads easier and more user-friendly while providing more flexibility and capability. Threadify extends Python threads to add the ability to cooperatively pause, unpause, and kill threads.

One of the inconvenient aspects of using generic Python threads is that there is no easy way to pause, continue, or kill a thread at a natural unit-of-work boundary. Threadify adds methods to signal a thread to cooperatively perform these actions. The unit-of-work executed by Threadify is the task-body function (the task).

Structure:

Creating a Threadify object is very similar to creating a Python thread.

import time
from threadify import Threadify  

def taskbody(storage):
    print(".")  
    time.sleep(0.25)  
        
t = Threadify(taskbody)
t.start()
The Task:

The task is the work to be done by the Threadify object. The task function can be a simple run-to-completion model or a more complex, long-lasting function. The Threadify object acts as a superloop repeatedly calling the task. Between invocations of task, requests for pause, unpause, or kill are processed. A task can signal its own Threadify object to continue calling it by returning True, or can request to be killed by returning False.

Task Storage Parameter:

The task function takes a single parameter: a dictionary named 'storage'. This dictionary provides two primary features:

  • A vehicle to pass initial values to the task via the Threadify initializer.
  • Mutable storage that persists across invocations of task by the Threadify superloop.

Any values that need to be retained from one call to the next can be fetched from and saved to storage.

Task Suggestions:
  • Incorporate at least some small time.sleep interval before returning. This provides an opportunity for the scheduler to context switch to other tasks and helps prevent 'hogging' the CPU cycles. Example: time.sleep(0.010)
  • While blocking calls (such as pending on a queue) are perfectly acceptable, bear in mind that it reduces the Threadify object's responsiveness to requests to pause, unpause, or kill since these requests are executed between task invocations. For the example of a queue, a queue timeout can be used to enforce a bare-minimum level of responsiveness as needed by the application.

Documentation:

Threadify inherits from Python's threading.Thread, so it makes available all of the methods that threading.Thread provides in-addition-to its own.

class Threadify(threading.Thread):

    VERSION: str = "1.0.0"

    # Set True to enable debug output
    ENABLE_DEBUG_OUTPUT = False

    def __init__((self, task: Optional[Callable] = None, storage: Optional[dict] = None, *, name: str = None,
                 daemon: bool = True, deep_copy: bool = True, ignore_task_exceptions: bool = False,
                 start: bool = False):
        :param task: The callable to be repeatedly executed by the thread in the thread's context.
        :param storage: Dictionary containing data for 'task'. It is persistent and mutable across invocations of 'task'
                        by the thread for the life of the thread. The task can access, modify, and add variables to
                        the dictionary and have them persist across each task invocation (which happens repeatedly).
        :param name:    Name for the thread or None for an autogenerated default name.
        :param daemon:  True - Run as a daemon thread (ie: if main program exits, thread exits);
                        False - Thread continues to run even if the main program that created it exits.
        :param deep_copy: True - Make independent, deep copy of storage for use by thread; False - Shallow copy. A
                        deep copy may require less programmer care since independent copies are made. A shallow
                        copy is potentially faster, but requires the programmer to be careful not to create
                        data contention between various threads of execution. Items that can't be pickled
                        can't be deep-copied.
        :param ignore_task_exceptions: True - Ignore unhandled exceptions raised in task and continue;
                        False - Re-raise task exception thereby terminating the thread.
        :param start:   True - Automatically start thread after construction; False - Thread must be manually started
                        by calling its builtin 'start' method.

    def pause(self, wait_until_paused: bool = False, timeout_secs: Optional[int] = None):
        Use to cooperatively pause the thread. Note that unless 'wait_until_paused' is True, this
        method can return before the pause has taken effect since thread pausing is affected by the responsiveness
        of and the blocking in the user task.
        :param wait_until_paused: True - wait until thread has paused before returning; False - return immediately.
        :param timeout_secs: 'None' or maximum number of seconds to wait for thread to pause when
                            'wait_until_paused' is True; None means ignore timeout and wait as long as required
                            for thread to pause.
        :return: True - Thread paused before return; False - Thread not yet paused before return

    def unpause(self):
        Unpause a paused thread.
        :return: None

    def is_paused(self) -> bool:
        Indicate if the thread is currently paused. This represents the current actual state of the thread - not
        whether or not a pause was requested.
        :return: True - Thread is currently paused; False - Thread is not paused

    def kill(self, wait_until_dead: bool = False, timeout_secs: Optional[int] = None):
        Cooperatively end execution of the thread.
        :param wait_until_dead: True - Wait with timeout for thread to terminate; False - Return immediately
        :param timeout_secs: 'None' or maximum number of seconds to wait for termination when
                            'wait_until_dead' is True; None means ignore timeout and wait as long as required
                            for thread to terminate.
        :return: True - Thread terminated before return; False - Thread not yet terminated before returning


# User-Supplied Task Function
def task(storage: dict) -> bool:
    The periodic work to be done by the thread. Blocking affects the responsiveness to cooperative
    pause and kill signals; however, at least some small sleep delay (ex: time.sleep(0.010) ) or IO blocking
    should be included to allow opportunities for context-switches for other threads. Note that changes made to
    'storage' persist across each invocation of task for the life of the thread.
    :param storage: Dict to provide persistent, mutable task variable storage.
    :returns: True - continue to run; False - kill thread

Examples:

import time
from threadify import Threadify
import queue


def task_symbols(storage):
    """
    Task that prints first character of contents of storage["symbol"] forever.
    """
    sym = storage.get("symbol", ".")
    print(sym[0], sep=" ", end="", flush=True)
    time.sleep(.25)
    return True


def task_run_5s(storage):
    """
    Demonstrate a self-terminating task.
    Use storage to pass in a start time so that task can decide when to self-terminate.
    """
    # Get start time from storage
    start = storage.get("start_time")

    # Compute elapsed time and print
    delta = time.time() - start
    print("Elapsed time: {:4.2f}".format(delta))

    # Time to die?
    if delta >= 5:
        print("Stopping after {:4.2f} seconds".format(delta))

        # Signal thread to terminate
        return False

    # Sleep to control speed of the output
    time.sleep(1)

    # Signal thread to keep running
    return True


def task_checkqueue(storage):
    """
    Task that watches a queue for messages and acts on them when received.
    """
    # Get the queue object from the storage dictionary
    thequeue = storage.get("queue")
    try:
        # Use a timeout so it blocks for at-most 0.5 seconds while waiting for a message. Smaller values can be used to
        # increase the cycling of the task and responsiveness to Threadify control signals (like pause) if desired.
        msg = thequeue.get(block=True, timeout=.5)
    except queue.Empty:
        print("_", end="")
    else:
        if msg == "QUIT":
            return False

        # Print received message
        print("{:s}".format(msg), end="")

    return True


def task_dosomething(storage):
    """
    Task that gets launched to handle something in the background until it is completed and then terminates. Note that
    this task doesn't return until it is finished, so it won't be listening for Threadify pause or kill requests. 
    """    
    # An important task that we want to run in the background.    
    for i in range(10):
        print(i, end="")
        time.sleep(1)

    return False 


# To enable debug on ALL Threadify classes ...
Threadify.ENABLE_DEBUG_OUTPUT = True
# or it can be enabled on an instance-by-instance basis. 


# EXAMPLE 1) Simplest example - built-in task displays '.' to the screen each 0.25 seconds
print("EX 1) Print '.' approximately every 0.25 seconds.")

t1 = Threadify(start=True)
# Main program sleeps here while task continues to run
time.sleep(5)

t1.kill(wait_until_dead=True)
print("\nDone")


# EXAMPLE 2) Demonstrate two tasks running with one being paused/continued
print("EX 2) Starting 2 tasks - one will be paused and continued while the other runs continuously.")

# Pass initial value of "symbol" via the storage dictionary to each task
t1 = Threadify(task_symbols, {"symbol": "X"})
t2 = Threadify(task_symbols, {"symbol": "O"})

# Start tasks manually (could have been automatically started instead via the start parameter)
t1.start()
t2.start()

time.sleep(5.1)
print("\nPausing 'X' for 5 seconds.")
t1.pause(True)
time.sleep(5)

print("\nUnpausing 'X' for 5 seconds.")
t1.unpause()
time.sleep(5)

t1.kill()
t2.kill()
t1.join()
t2.join()
print("\nDone")

 
# EXAMPLE 3) Demonstrate a task that self-terminates after 5 seconds
print("EX 3) Demonstrate a task that self-terminates after 5 seconds.")

t1 = Threadify(task_run_5s, {"start_time": time.time()}, daemon=False, start=True)

# Join instructs main program to wait on t1 to complete before continuing
t1.join()
print("\nDone")


# EXAMPLE 4) Demonstrate communication with a task via a queue passed in through storage.
print("EX 4) Demonstrate communication with a task via a queue passed in through storage.")

# Create a thread-safe queue for message passing
q = queue.Queue()

# This instance REQUIRES deep_copy=FALSE since Queue is not pickleable.
t1 = Threadify(task_checkqueue, {"queue": q}, deep_copy=False, start=True)
# Wait 3 seconds - then send some messages
time.sleep(3)
q.put("HE")
q.put("LLO")
q.put(" WORLD")
time.sleep(2)
q.put("1")
time.sleep(1)
q.put("2")
time.sleep(2)
q.put("3")
time.sleep(3)

# Send the QUIT message to have task kill itself and then wait for it to die
q.put("QUIT")
t1.join()
print("\Done.")


# EXAMPLE 5) Fire and forget. Launch a function in a separate thread and have it run to completion. 
print("EX 5) Fire and forget. Launch a function in a separate thread and have it run to completion.")

t1 = Threadify(task_dosomething, start=True)

# Join instructs main program to wait on t1 to complete before continuing
t1.join()
print("\nDone")

Conclusion:

As these simple examples demonstrate, there are many use-cases where Threadify can be used to simplify the problems that a programmer is trying to solve. Concurrent programming can be hard; Threadify attempts to make it a little easier.