英文原版: http://www.disinterest.org/resource/stackless/2.6.4-docs-html/stackless-python.html

Stackless Python

Stackless Python 是Python语言的一个增强版本。它让程序员可以获得基于线程的程序的优点,同时又避免传统线程带来的性能和复杂度问题。Stackless Python为Python语言添加的微线程(microthread)是一种方便、廉价、轻量级的工具,如果使用得当,它不仅可以提供一种构建应用程序或者框架的方法,而且能改进程序的结构和可读性。

如果你在你安装的Python附带的文档中读到这篇文章,这说明你安装的已经是Stackless Python而非标准的Python。

1. 概述

除了Stackless Python新增的功能部分,Stackless Python的其他部分的行为和标准的Python完全一样,用法也完全一样。Stackless的新增的功能,是通过stackless模块暴露出来的框架来使用的。

2. 你需要知道的

Stackless Python只提供了一个最基本的框架,它没有附带任何支撑功能,只是满足构建一个特定用途的框架时可能出现的一般需求。

2.1. 阻塞的操作

如果调用的操作会阻塞Python解释器,用户需要注意,这个操作也会阻塞所有运行中的小任务。Python解释器会一直阻塞,调度器也会阻塞在执行操作的小任务上,直到那个小任务结束。阻塞解释器的操作常常是和同步IO(文件读写、套接字操作、进程间通讯等)有关的,也要注意time.sleep()。建议用户使用异步版本的IO函数。

某些第三方模块可以用Stackless兼容的方式来代替一些标准库中模块。这种方法的好处是,原来使用标准模块的其他模块也可以在替代的模块上工作。Stackless socket模块是最常使用的替代模块。

2.2. 异常

小任务里面出现的某些异常,是期望能沿着调用栈向上直到调度器的。这意味着简单的使用except语句可能会导致难以查出的问题。

有关这个问题的描述可以参看TaskletExit异常。

2.3. 调试

Stackless的调度机制改变了Python调试挂钩的工作方式。调试挂钩需要以小任务为单位设置,而不是线程。但是很少有调试器(标准库中一个也没有)考虑了这一点。这导致如果不进行一些特殊处理将不能进行调试。

这个问题的详细描述,参看Stackless调试文档。

3. 外部资源

除了此文档外,还有一系列资源:

4. 历史

Continuations 是一种需要语言本身来实现的特性,并需要以一种有利于它的存在的方式实现。为了将它加入到Python语言中,Christian Tismer大规模修改了Python语言。修改的主要方式是让它没有栈(Stackless),因此它的Python分支被称为Stackless Python。

现在,将数据存在栈上,会导致操作系统的线程被固定在某个功能上执行,直到那个功能完成并且栈被一点点释放。为了在Python中加入continuations,数据需要被存放在堆上,将功能的执行与栈分开。为此需要对Python进行大量的修改,Christian发布了Stackless Python。

维护一个语言的分支需要大量的工作,当语言本身发生与分支的改变相冲突的修改时,这种工作就会特别大。随着时间推移,越来越显得对Python的改动太多而难以维护,因此Christian设想对Stackless进行重写。很明显,一个更简单的方法可以实现,Stackless不再是没有栈的,也不再有continuations。

Following the rewrite, a framework was designed and added inspired by coming from CSP and the Limbo programming language. From this point on, Stackless was in a state where it contained the minimum functionality to give the benefits it aimed to provide, with the minimum amount of work required to keep it maintained.

A few years later in 2004, while sprinting on Stackless in Berlin, Christian and Armin Rigo came up with a way to take the core functionality of Stackless and build an extension module that provided it. This was the creation of greenlets, which are very likely a more popular tool than Stackless itself today. The greenlet source code in practice can be used as the base for green threading functionality not just in Python, but in other programming languages and projects.

With Stackless Python a solid product, Christian’s focus moved onto other projects, PyPy among them. One of his interests in PyPy was a proper implementation of the Stackless functionality, where it could be integrated as a natural part of any Python built.

For a while, Stackless Python languished, with no new versions to match the releases of Python itself. Then in 2006, CCP sent Kristjan Valur Jonsson and Richard Tew to PyCon where they sprinted with the aid of Christian Tismer. The result was an up to date release of Stackless Python. From this point in time, maintaining and releasing Stackless Python has been undertaken by Richard and Kristjan.

stackless — 内置的扩展模块

使用stackless模块是程序员使用Stackless Python的增强功能的方法。

1. 函数

The main scheduling related functions:

1.1. stackless.run(timeout=0, threadblock=False, soft=False, ignore_nesting=False, totaltimeout=False)

When run without arguments, scheduling is cooperative. It us up to you to ensure your tasklets yield, perhaps by calling schedule(), giving other tasklets a turn to run. The scheduler will exit when there are no longer any runnable tasklets left within it. This might be because all the tasklets have exited, whether by completing or erroring, but it also might be because some are blocked on channels. You should not assume that when run() exits, your tasklets have all run to completion, unless you know for sure that is how you structured your application.

The optional argument timeout is primarily used to run the scheduler in a different manner, providing pre-emptive scheduling. A non-zero value indicates that as each tasklet is given a chance to run, it should only be allowed to run as long as the number of Python virtual instructions are below this value. If a tasklet hits this limit, then it is interrupted and the scheduler exits returning the now no longer scheduled tasklet to the caller.

Example - run until 1000 opcodes have been executed:

interrupted_tasklet = stackless.run(1000)
# interrupted_tasklet is no longer scheduled, reschedule it.
interrupted_tasklet.insert()
# Now run your custom logic.
...

The optional argument threadblock affects the way Stackless works when channels are used for communication between threads. Normally,

The optional argument soft affects how pre-emptive scheduling behaves. When a pre-emptive interruption would normally occur, instead of interrupting and returning the running tasklet, the scheduler exits at the next convenient scheduling moment.

The optional argument ignore_nesting affects the behaviour of the attribute tasklet.nesting_level on individual tasklets. If set, interrupts are allowed at any interpreter nesting level, causing the tasklet-level attribute to be ignored.

The optional argument totaltimeout affects how pre-emptive scheduling behaves. Normally the scheduler is interrupted when any given tasklet has been running for timeout instructions. If a value is given for totaltimeout, instead the scheduler is interrupted when it has run for totaltimeout instructions.

Note The most common use of this function is to call it either without arguments, or with a value for timeout.

1.2. stackless.schedule(retval=stackless.current)

Yield execution of the currently running tasklet. When called, the tasklet is blocked and moved to the end of the chain of runnable tasklets. The next tasklet in the chain is executed next.

If your application employs cooperative scheduling and you do not use custom yielding mechanisms built around channels, you will most likely call this in your tasklets.

Example - typical usage of schedule():

stackless.schedule()

As illustrated in the example, the typical use of this function ignores both the optional argument retval and the return value. Note that as the variable name retval hints, the return value is the value of the optional argument.

1.3. stackless.schedule_remove(retval=stackless.current)

Yield execution of the currently running tasklet. When called, the tasklet is blocked and removed from the chain of runnable tasklets. The tasklet following calling tasklet in the chain is executed next.

The most likely reason to use this, rather than schedule(), is to build your own yielding primitive without using channels. This is where the otherwise ignored optional argument retval and the return value are useful.

tasklet.tempval is used to store the value to be returned, and as expected, when this function is called it is set to retval. Custom utility functions can take advantage of this and set a new value for tasklet.tempval before reinserting the tasklet back into the scheduler.

Example - a utility function:

def wait_for_result():
    waiting_tasklets.append(stackless.current)
    return stackless.schedule_remove()

def event_callback(result):
    for tasklet in waiting_tasklets:
        tasklet.tempval = result
        tasklet.insert()

    waiting_tasklets = []

def tasklet_function():
    result = wait_for_result()
    print "received result", result

One drawback of this approach over channels, is that it bypasses the useful tasklet.block_trap attribute. The ability to guard against a tasklet being blocked on a channel, is in practice a useful ability to have.

Callback related functions:

1.4. stackless.set_channel_callback(callable)

Install a callback for channels. Every send or receive action will result in callable being called. Setting a value of None will result in the callback being disabled.

Example - installing a callback:

def channel_cb(channel, tasklet, sending, willblock):
    pass

stackless.set_channel_callback(channel_cb)

The channel callback argument is the channel on which the action is being performed.

The tasklet callback argument is the tasklet that is performing the action on channel.

The sending callback argument is an integer, a non-zero value of which indicates that the channel action is a send rather than a receive.

The willblock callback argument is an integer, a non-zero value of which indicates that the channel action will result in tasklet being blocked on channel.

1.5. stackless.set_schedule_callback(callable)

Install a callback for scheduling. Every scheduling event, whether explicit or implicit, will result in callable being called.

Example - installing a callback:

def schedule_cb(prev, next):
    pass

stackless.set_schedule_callback(callable)

The prev callback argument is the tasklet that was just running.

The next callback argument is the tasklet that is going to run now.

Scheduler state introspection related functions:

1.6. stackless.get_thread_info(thread_id)

Return a tuple containing the threads main tasklet, current tasklet and run-count.

Example:

main_tasklet, current_tasklet, runcount = get_thread_info(thread_id)

1.7. stackless.getcurrent()

Return the currently executing tasklet of this thread.

1.8. stackless.getmain()

Return the main tasklet of this thread.

1.9. stackless.getruncount()

Return the number of currently runnable tasklets.

Debugging related functions:

1.10. stackless.enable_softswitch(flag)

Control the switching behaviour. Tasklets can be either switched by moving stack slices around or by avoiding stack changes at all. The latter is only possible in the top interpreter level.

Example - safely disabling soft switching:

old_value = stackless.enable_softswitch(False)
# Logic executed without soft switching.
enable_softswitch(old_value)

Note Disabling soft switching in this manner is exposed for timing and debugging purposes.

2. 属性

2.1. stackless.current

The currently executing tasklet of this thread.

2.2. stackless.main

The main tasklet of this thread.

2.3. stackless.runcount

The number of currently runnable tasklets.

Example - usage:

>>> stackless.runcount
1

Note The minimum value of runcount will be 1, as the calling tasklet will be included.

2.4. stackless.threads

A list of all thread ids, starting with the id of the main thread.

Example - usage:

>>> stackless.threads
[5148]

3. 异常

3.1. exception stackless.TaskletExit

This exception is used to silently kill a tasklet. It should not be caught by your code, and along with other important exceptions like SystemExit, be propagated up to the scheduler.

The following use of the except clause should be avoided:

try:
    some_function()
except:
    pass

This will catch every exception raised within it, including TaskletExit. Unless you guarantee you actually raise the exceptions that should reach the scheduler, you are better to use except in the following manner:

try:
    some_function()
except Exception:
    pass

Here only the more common exceptions are caught, as the ones that should not be caught and discarded inherit from BaseException, rather than Exception.

This class is derived from BaseException.

小任务(Tasklet) — 轻量级的线程

小任务将函数包装起来,允许函数以微线程来加载并在调度器中执行

加载一个小任务:

stackless.tasklet(callable)(*args, **kwargs)

这是最常见的加载小任务的方法。这不仅创建了一个小任务,而且自动将它插入到调试器中。

例子 - 加载一个更具体的小任务:

>>> def func(*args, **kwargs):
...     print "scheduled with", args, "and", kwargs
...
>>> stackless.tasklet(func)(1, 2, 3, string="test")
<stackless.tasklet object at 0x01C58030>
>>> stackless.run()
scheduled with (1, 2, 3) and {'string': 'test'}

1. 小任务、main、current等

有两种需要特别注意的小任务,主小任务(main tasklet)和当前小任务(current tasklet)。

主小任务是固定的,是你的应用程序的开始执行的地方。而成为主小任务的方式是它运行了调度器。

当前小任务是当前正在运行的小任务。如果没有其他小任务在执行,它可能是主小任务。否则,它就是调度器里面可以被执行的小任务的链表的第一个,那就是正在执行的。

例子 - 主小任务是当前小任务吗:

stackless.main == stackless.current

例子 - 当前小任务是主小任务吗:

stackless.current.is_main == 1

例子 - 有多少小任务在被调度:

stackless.runcount

注意:

主小任务也被计算在stackless.runcount内。如果你在主循环中检查在调度器里面有多少个小任务,你需要记住在你创建的小任务之外(之上)还有另外一个小任务。

2. tasklet类

通道(Channel) — 小任务之间的通讯

通道对象是用来在小任务之间进行通讯用的。

在一个小任务里往一个通道发送东西,另一个在这个通道等待接收东西的小任务就会被恢复。如果没有小任务在接收,发送方会被挂起。

在一个小任务里接收一个通道的东西,另一个在这个通道上等待发送的小任务就会被恢复。如果没有发送者,接收方会被挂起。

1. 通道与线程

通道是线程安全的。这意味着在一个线程中执行的小任务可以用通道与另一个线程中执行的小任务进行通讯。这在线程与Stackless一节有详述。

2. channel类

调度器 — 小任务如何运行

Stackless调度小任务的两种主要方法是抢占式调度和协作式调度。如何利用这两种方式来达到应用程序的需求,却有很多方法。

1. 合作式调度(Cooperative scheduling)

最简单的调度器运行方式是合作式。程序员需要知道他们的代码什么时候会发生阻塞,并进行适当的处理。与抢占式调度不同,他们可以准确知道何时会发生阻塞,这就允许他们清晰知道他们的代码与小任务中执行的任何其他东西之间如何协作。

例子 - 在小任务里面运行一个简单函数:

>>> def function(n):
...     for i in range(n):
...         print i+1
...         stackless.schedule()
...
>>> stackless.tasklet(function)(3)
>>> stackless.run()
1
2
3

在上述例子中,定义了一个function函数,然后又定义了一个小任务来调用这个函数,然后运行了调度器。调度器对小任务进行了4次调度。在第四次时,函数退出了,因此小任务也退出了。

在小任务里面执行的代码:

def function(n):
    for i in range(n):
        print i+1
        stackless.schedule()

第一步是执行小任务里面的代码。这里,function函数只是循环了n次,在每一次循环里面打印出这是第几次循环,然后通过运行stackless.schedule()给其他小任务一个调度机会。

创建小任务:

stackless.tasklet(function)(3)

一个小任务指定了要运行的函数,并给出了函数所需的参数(这里是要赋给n的3)。当小任务被第一次调度时,function函数第一次被执行,参数被传递给这个函数。这里创建小任务的操作自动将这个小任务插到调度器里面,因此没有必要再保留小任务的一个引用。

运行调度器:

stackless.run()

接着调度器被运行了。如果没有任何剩余的小任务,它会返回。注意这个小任务会被调度4次。第一次是它的起始运行,会用指定的参数调用小任务里的function函数。第二第三次调度也就是第二和第三次打印,而最后一次调度时,函数退出了,小任务也就退出了。

如果开发者确认,只要程序在运行调度器里面就一定会有小任务(就像上面所演示的),那么应用程序可以用一个stackless.run()调用来驱动。但是情况不总是这么简单,有时候调度器里面可能是空的,那就需要在创建新的小任务之后或者插入老的被阻塞的小任务之后,重复调用stackless.run()。

1.1. 检测不合作的小任务

在实践中,很少会出现一个小任务一直运行而不让其他小任务运行。小任务常常被事件阻塞而自动让出执行权,而不需要显式的去让出来。但是偶尔有预料之外的情况发生,导致程序不能执行到让出执行权的代码或者代码进入了一个死循环。

基于这样的考虑,常常会利用抢占式调度的功能,来检测长时间运行的小任务。办法是设置一个足够大的超时时间,而碰到这个超时的只可能是那些没有让出执行权的代码。

习惯用法 - 检测不合作的小任务:

while 1:
    t = stackless.run(1000000)
    if t is not None:
        t.insert()

        print "*** Uncooperative tasklet", t, "detected ***"
        traceback.print_stack(t.frame)

协作式调度,但是抢占不合作的小任务:

t = stackless.run(1000000)

因为大部分小任务会在执行1000000条指令后让出执行权,而只有那些不让出执行权、不合作的小任务会被中断并返回。

恢复中断的小任务:

if t is not None:
    t.insert()

中断的小任务不会继续留在调度器中。我们不知道这个小任务是做什么的,根据应用程序的需求不让它执行完可能是无法接受的,调用 tasklet.insert()将它重新插回调度器中,放在所有可以运行的小任务的列表末尾,来保证在这个小任务下一次被执行前,其他小任务有机会运行。

如果可以假定这样被中断的小任务都是有问题的,那么可以杀死它。在杀死之前记录下足够的信息(比如调用栈)就更好了。

杀死一个中断的小任务:

if t is not None:
    print "*** Uncooperative tasklet", t, "detected ***"
    traceback.print_stack(t.frame)

    t.kill()

注意

小任务调用Python外部的东西并长时间运行,这种情况并不会被这种机制检查到。这些调用可能是同步IO、复数模块调用底层C库的操作以及其它一系列东西。

1.2. Pumping the scheduler

Stackless最显而易见的用法是把所有的逻辑都写在小任务里并运行调度器,这种情况期望调度器退出的同时应用程序也结束了。但是,你的程序如果采用了这一方法,意味着你的程序必须用Stackless的框架来构建,都用调度器来运行。

如果你想更多的控制程序的运行方式和架构,那你就不能这样做了。这似乎排除了使用协作式的可能性,促使你使用抢占式调度,这样你的应用程序或者框架才能驱动Stackless。

但是,还有一种方法来获得协作式调度的优点,同时将你的应用程序或者框架控制住。这称作pumping调度器。

习惯用法 - pumping调度器:

def ApplicationMainLoop():
    while 1:
        ProcessMessages()
        ApplicationLoopStuff()
        Etc()

        stackless.run()

        RescheduleBlockedTasklets()

Pumping调度器的工作原理是代码显式让出执行权到一个通道,然后再将执行权传回调度器,而不是直接调用stackless.schedule()。这样,当调度器里的所有小任务执行完后调度器是空的。这意味着调度器的一次运行实际上是每个调度的小任务执行一次,它可以被应用程序或者框架的主循环一次次pump。

从调度器中让出执行权:

def CustomYield():
    customYieldChannel.receive()

自定义一个函数,调用这个函数后,小任务可以从调度器中让出执行权。这可以简单的用等待一个通道来实现。永远不会有小任务在这个通道上发送东西,所以所有在这个通道上等待的小任务都会被阻塞。

重新调度阻塞的小任务:

def RescheduleBlockedTasklets():
    while customYieldChannel.balance < 0:
        customYieldChannel.send(None)

如果我们要让所有阻塞的小任务插回调度器,我们只要往通道发送点东西(只要有接受者在等待这个通道)。但是有一种情况我们需要避免。当我们发送东西时,我们不想让每个收到的小任务都执行。为了达到这个目的,我们需要确认通道会在适当的时候把收到东西的小任务插回调度器。

配置一个通道用来让出执行权:

customYieldChannel = stackless.channel()
customYieldChannel.preference = 1

当channel创建时,简单的对channel的channel.preference属性进行修改。

注意:如果你pump调度器,你的小任务不能调用stackless.schedule()。如果没有意识到而这样做了,那会导致一个小任务一直占用调度器。对stackless.run()的调用就不会返回,除非那个小任务以别的方式让出执行权(出错或者退出)。

2. 抢占式调度(Pre-emptive scheduling)

If you want a lot of the work of using operating system threads without a lot of the benefits, then pre-emptive scheduling is a good choice. Making the scheduler work in a pre-emptive manner, is a simple matter of giving it a timeout value.

Example - running a simple tasklet within the scheduler:

>>> def function():
...     i = 0
...     while True:
...         print i+1
...         i += 1
...
>>> stackless.tasklet(function)()
>>> stackless.run(100)
1
2
3
4
5
6
7
8
9
10
11
<stackless.tasklet object at 0x01BCD0B0>

In this case, the scheduler runs until a maximum of 100 instructions have been executed in the Python virtual machine. At which point, whatever tasklet is currently running is returned when stackless.run() exits. The standard way to employ this is to pump the scheduler, reinserting the interrupted tasklet.

Idiom - pre-emptive scheduling:

while True:
    ProcessMessages()
    ApplicationLoopStuff()
    Etc()

    t = stackless.run(100)
    if t is None:
        break

    t.insert()

Run the scheduler for 100 instructions:

t = stackless.run(100)

There are two things to note here, if t is None then there are no tasklets in the scheduler to run. If t is not None, then it is an interrupted tasklet that needs to be reinserted into the scheduler.

Detect an empty scheduler:

if t is None:
    break

It may be that an empty scheduler indicates that all the work is done, or it may not. How this work is actually handled depends on the implementation details of your solution.

Reinsert the interrupted tasklet:

t.insert()

Note You are not running the scheduler for 100 instructions, you are running it until any subsequently scheduled tasklet runs for at least that many instructions. If all your tasklets always explicitly yield before this many instructions have been executed, then the stackless.run() call will not exit until for some reason one does not.

2.1. 让整个调度器只运行n个指令

运行调度器,直到某个小任务连续执行了n个指令,这是抢占式的一种方法。 Running the scheduler until a scheduled tasklet runs for n consecutive instructions is one way pre-emptive scheduling might work. However, if you want to structure your application or framework in such a way that it drives Stackless rather than the other way round, then you need the scheduler to exit instead. The scheduler can be directed to work in this way, by giving it a totaltimeout flag value.

习惯用法 - 抢占式调度器pumping:

while True:
    ProcessMessages()
    ApplicationLoopStuff()
    Etc()

    t = stackless.run(100, totaltimeout=True)
    if t is None:
        break

    t.insert()

3. 异常

Exceptions that occur within tasklets and are uncaught are raised out of the stackless.run() call, to be handled by its caller.

Example - an exception raised out of the scheduler:

>>> def func_loop():
...     while 1:
...         stackless.schedule()
...
>>> def func_exception():
...     raise Exception("catch this")
...
>>> stackless.tasklet(func_loop)()
<stackless.tasklet object at 0x01C58EB0>
>>> stackless.tasklet(func_exception)()
<stackless.tasklet object at 0x01C58F70>
>>> stackless.run()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in func_exception
Exception: catch this

This may not be the desired behaviour, and a more acceptable one might be that the exception is caught and dealt with in the tasklet it occurred in before that tasklet exits.

3.1. 捕获小任务的异常

We want to change the new behaviour to be:

  1. The tasklet with the uncaught exception exits normally.
  2. The uncaught exception is examined and handled before the tasklet exits.
  3. The scheduler continues running.

There are two ways to accomplish these things. You can either monkey-patch the tasklet creation process, or you can use a custom function for all your tasklet creation.

Example - a custom tasklet creation function:

def new_tasklet(f, *args, **kwargs):
    def safe_tasklet():
        try:
            f(*args, **kwargs)
        except Exception:
            traceback.print_exc()

    return stackless.tasklet(safe_tasklet)()

new_tasklet(some_function, 1, 2, 3, key="value") Example - monkey-patching the tasklet creation process:

def __call__(self, *args, **kwargs):
     f = self.tempval

     def new_f(old_f, args, kwargs):
         try:
             old_f(*args, **kwargs)
         except Exception:
             traceback.print_exc()

     self.tempval = new_f
     stackless.tasklet.setup(self, f, args, kwargs)

stackless.tasklet.__call__ = __call__

stackless.tasklet(some_function)(1, 2, 3, key=value) Printing the call stack in the case of an exception is good enough for these examples, but in practice the call stack might instead be recorded in a database.

Note We catch Exception explicitly, rather than catching any exception which might occur. The reason for this is to avoid catching exceptions we should not be catching like SystemExit or TaskletExit, which derive from the lower level BaseException.

调试与跟踪 — Stackless有什么不同

Debugging tools, like those used for tracing, are implemented through calls to the sys.settrace() function. Now, in normal Python, when this has been called any code that runs within the operating system thread is covered by it. In Stackless however, this function only covers the current tasklet.

The debugging related modules, whether in the standard library or not, do not take this difference into account. They are not likely to work, and if they do, are not likely to work in the way you expect. In an ideal world, Stackless Python might include modified versions of these modules, and patches adding them would be most welcome.

If you want working debugging for Stackless Python, at this time your best option is to use the WingWare Python IDE. WingWare have gone out of their way to add and support Stackless Python development.

Note In the past, the possibility of ditching the per-tasklet behaviour for the standard per-thread behaviour has been broached on the mailing list. Given the lack of movement on usability for this part of Stackless, it is not unlikely that this suggested change will be revisited.

1. settrace与小任务

In order to get debugging support working on a per-tasklet basis, you need to ensure you call sys.settrace() for all tasklets. Vilhelm Saevarsson has an email giving code and a description of the steps required including potentially unforeseen circumstances, in the Stackless mailing list archives.

Vilhelm’s code:

import sys
import stackless

def contextDispatch( prev, next ):
    if not prev: #Creating next
        # I never see this print out
        print "Creating ", next
    elif not next: #Destroying prev
        # I never see this print out either
        print "Destroying ", prev
    else:
        # Prev is being suspended
        # Next is resuming
        # When worker tasklets are resuming and have
        # not been set to trace, we make sure that
        # they are tracing before they run again
        if not next.frame.f_trace:
            # We might already be tracing so ...
            sys.call_tracing(next.settrace, (traceDispatch, ))

stackless.set_schedule_callback(contextDispatch)

def __call__(self, *args, **kwargs):
     f = self.tempval
     def new_f(old_f, args, kwargs):
         sys.settrace(traceDispatch)
         old_f(*args, **kwargs)
         sys.settrace(None)
     self.tempval = new_f
     stackless.tasklet.setup(self, f, args, kwargs)

def settrace( self, tb ):
    self.frame.f_trace = tb
    sys.settrace(tb)

stackless.tasklet.__call__ = __call__
stackless.tasklet.settrace = settrace

The key actions taken by this code:

线程 — 线程与Stackless

Stackless is a lightweight threading solution. It works by scheduling its tasklets within the CPU time allocated to the real thread that Python, and therefore the scheduler running within it, is on.

It does not:

But it does allow its functionality to be used flexibly, when you want to make use of more than one thread.

1. 每线程一个调度器

The operating system thread that the Python runtime is started in and runs on, is called the main thread. The typical use of Stackless, is to run the scheduler in this thread. But there is nothing that prevents a different scheduler, and therefore a different set of tasklets, from running in every Python thread you care to start.

Note Remember that tasklets are in essence part of the thread they were created in, and there is no way to move tasklets between threads. Example - scheduler per thread:

import threading
import stackless

def secondary_thread_func():
    print "THREAD(2): Has", stackless.runcount, "tasklets in its scheduler"

def main_thread_func():
    print "THREAD(1): Waiting for death of THREAD(2)"
    while thread.is_alive():
        stackless.schedule()
    print "THREAD(1): Death of THREAD(2) detected"

mainThreadTasklet = stackless.tasklet(main_thread_func)()

thread = threading.Thread(target=secondary_thread_func)
thread.start()

stackless.run()

Output:

THREAD(2): HasTHREAD(1): Waiting for death of THREAD(2)
 1 tasklets in its scheduler
THREAD(1): Death of THREAD(2) detected

This example demonstrates that there actually are two independent schedulers present, one in each participating Python thread. We know that the main thread has one manually created tasklet running, in addition to its main tasklet which is running the scheduler. If the secondary thread is truly independent, then when it runs it should have a tasklet count of 1 representing its own main tasklet. And this is indeed what we see.

See also:

2. 通道是线程安全的

Whether or not you are running a scheduler on multiple threads, you can still communicate with a thread that is running a scheduler using a channel object.

Example - interthread channel usage:

import threading
import stackless

commandChannel = stackless.channel()

def master_func():
    commandChannel.send("ECHO 1")
    commandChannel.send("ECHO 2")
    commandChannel.send("ECHO 3")
    commandChannel.send("QUIT")

def slave_func():
    print "SLAVE STARTING"
    while 1:
        command = commandChannel.receive()
        print "SLAVE:", command
        if command == "QUIT":
            break
    print "SLAVE ENDING"

def scheduler_run(tasklet_func):
    t = stackless.tasklet(tasklet_func)()
    while t.alive:
        stackless.run()

thread = threading.Thread(target=scheduler_run, args=(master_func,))
thread.start()

scheduler_run(slave_func)

Output:

SLAVE STARTING
SLAVE: ECHO 1
SLAVE: ECHO 2
SLAVE: ECHO 3
SLAVE: QUIT
SLAVE ENDING

This example runs slave_func as a tasklet on the main thread, and master_func as a tasklet on a secondary thread that is manually created. The idea is that the master thread tells the slave thread what to do, with a QUIT message meaning that it should exit.

Note The reason the scheduler is repeatedly run in a loop, is because when a scheduler has no remaining tasklets scheduled within it, it will exit. As there is only one tasklet in each thread, as each channel operation in the thread blocks the calling tasklet, the scheduler will exit. Linking how long the scheduler is driven to the lifetime of all tasklets that it handles, ensures correct behaviour.

Pickling — 运行中的小任务的序列化

One of the most impressive features of Stackless, is the ability to pickle tasklets. This allows you to take a tasklet mid-execution, serialise it to a chunk of data and then unserialise that data at a later point, creating a new tasklet from it that resumes where the last left off.

What makes this particularly impressive is the fact that the Python pickle structure is platform independent. Code can for instance initially be run on a x86 Windows machine, then interrupted, pickled and sent over the network to be resumed on an ARM Linux machine.

Example - pickling a tasklet:

>>> def func():
...    busy_count = 0
...    while 1:
...        busy_count += 1
...        if busy_count % 10 == 0:
...            print busy_count
...
>>> stackless.tasklet(func)()
<stackless.tasklet object at 0x01BD16B0>
>>> t1 = stackless.run(100)
10
20
>>> s = pickle.dumps(t1)
>>> t1.kill()
>>> t2 = pickle.loads(s)
>>> t2.insert()
>>> stackless.run(100)
30
40
50

In the above example, a tasklet is created that increments the counter busy_count and outputs the value when it is a multiple of 10.

Run the tasklet for a while:

>>> t1 = stackless.run(100)
10
20

The tasklet has been interrupted at some point in its execution. If it were to be resumed, we would expect its output to be the values following those previously displayed.

Serialise the tasklet:

>>> s = pickle.dumps(t1)

As any other object is pickled, so are tasklets. In this case, the serialised representation of the tasklet is a string, stored in s.

Destroy the tasklet:

>>> t1.kill()

We want to show that the old code cannot be resumed, and in order to do so, we destroy the tasklet it was running within.

Unserialise the stored representation:

>>> t2 = pickle.loads(s)

As any other object is unpickled, so are tasklets. We take the string and by unpickling it, get a new tasklet object back.

Schedule the new tasklet:

>>> t2.insert()

Now the newly recreated tasklet is inserted into the scheduler, so that when the scheduler is next run, the tasklet is resumed.

Run the scheduler:

>>> stackless.run(100)
30
40
50
<stackless.tasklet object at 0x01BD1D30>

When the scheduler is run, the values displayed are indeed the ones that follow those displayed by the original tasklet. The value returned by stackless.run() is not stored in a variable this time, so the interpreter displays the recreated tasklet. You can see that it has a different address than t1, which was displayed earlier.

Note It should be possible to pickle any tasklets that you might want to. However, not all tasklets can be unpickled. One of the cases in which this is true, is where not all the functions called by the code within the tasklet are Python functions. The Stackless pickling mechanism has no ability to deal with C functions that may have been called.

StacklessPython (last edited 2010-01-24 11:31:07 by 125)

ch3n2k.com | Copyright (c) 2004-2020 czk.