国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

Python 使用生成器代替線程的方法

瀏覽:88日期:2022-07-15 09:58:30

問題

你想使用生成器(協(xié)程)替代系統(tǒng)線程來實(shí)現(xiàn)并發(fā)。這個有時又被稱為用戶級線程或綠色線程。

解決方案

要使用生成器實(shí)現(xiàn)自己的并發(fā),你首先要對生成器函數(shù)和 yield 語句有深刻理解。 yield 語句會讓一個生成器掛起它的執(zhí)行,這樣就可以編寫一個調(diào)度器, 將生成器當(dāng)做某種“任務(wù)”并使用任務(wù)協(xié)作切換來替換它們的執(zhí)行。 要演示這種思想,考慮下面兩個使用簡單的 yield 語句的生成器函數(shù):

# Two simple generator functionsdef countdown(n): while n > 0: print(’T-minus’, n) yield n -= 1 print(’Blastoff!’)def countup(n): x = 0 while x < n: print(’Counting up’, x) yield x += 1

這些函數(shù)在內(nèi)部使用yield語句,下面是一個實(shí)現(xiàn)了簡單任務(wù)調(diào)度器的代碼:

from collections import dequeclass TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): ’’’ Admit a newly started task to the scheduler ’’’ self._task_queue.append(task) def run(self): ’’’ Run until there are no more tasks ’’’ while self._task_queue: task = self._task_queue.popleft() try:# Run until the next yield statementnext(task)self._task_queue.append(task) except StopIteration:# Generator is no longer executingpass# Example usesched = TaskScheduler()sched.new_task(countdown(10))sched.new_task(countdown(5))sched.new_task(countup(15))sched.run()

TaskScheduler 類在一個循環(huán)中運(yùn)行生成器集合——每個都運(yùn)行到碰到y(tǒng)ield語句為止。 運(yùn)行這個例子,輸出如下:

T-minus 10T-minus 5Counting up 0T-minus 9T-minus 4Counting up 1T-minus 8T-minus 3Counting up 2T-minus 7T-minus 2...

到此為止,我們實(shí)際上已經(jīng)實(shí)現(xiàn)了一個“操作系統(tǒng)”的最小核心部分。 生成器函數(shù)就是任務(wù),而yield語句是任務(wù)掛起的信號。 調(diào)度器循環(huán)檢查任務(wù)列表直到?jīng)]有任務(wù)要執(zhí)行為止。

實(shí)際上,你可能想要使用生成器來實(shí)現(xiàn)簡單的并發(fā)。 那么,在實(shí)現(xiàn)actor或網(wǎng)絡(luò)服務(wù)器的時候你可以使用生成器來替代線程的使用。

下面的代碼演示了使用生成器來實(shí)現(xiàn)一個不依賴線程的actor:

from collections import dequeclass ActorScheduler: def __init__(self): self._actors = {} # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): ’’’ Admit a newly started actor to the scheduler and give it a name ’’’ self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ’’’ Send a message to a named actor ’’’ actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ’’’ Run as long as there are pending messages. ’’’ while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass# Example useif __name__ == ’__main__’: def printer(): while True: msg = yield print(’Got:’, msg) def counter(sched): while True: # Receive the current count n = yield if n == 0:break # Send to the printer task sched.send(’printer’, n) # Send the next count to the counter task (recursive) sched.send(’counter’, n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor(’printer’, printer()) sched.new_actor(’counter’, counter(sched)) # Send an initial message to the counter to initiate sched.send(’counter’, 10000) sched.run()

完全弄懂這段代碼需要更深入的學(xué)習(xí),但是關(guān)鍵點(diǎn)在于收集消息的隊(duì)列。 本質(zhì)上,調(diào)度器在有需要發(fā)送的消息時會一直運(yùn)行著。 計數(shù)生成器會給自己發(fā)送消息并在一個遞歸循環(huán)中結(jié)束。

下面是一個更加高級的例子,演示了使用生成器來實(shí)現(xiàn)一個并發(fā)網(wǎng)絡(luò)應(yīng)用程序:

from collections import dequefrom select import select# This class represents a generic yield event in the schedulerclass YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass# Task Schedulerclass Scheduler: def __init__(self): self._numtasks = 0 # Total num of tasks self._ready = deque() # Tasks ready to run self._read_waiting = {} # Tasks waiting to read self._write_waiting = {} # Tasks waiting to write # Poll for I/O events and restart waiting tasks def _iopoll(self): rset,wset,eset = select(self._read_waiting,self._write_waiting,[]) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self,task): ’’’ Add a newly started task to the scheduler ’’’ self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): ’’’ Append an already started task to the ready queue. msg is what to send into the task when it resumes. ’’’ self._ready.append((task, msg)) # Add a task to the reading set def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) # Add a task to the write set def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): ’’’ Run the task scheduler until there are no tasks ’’’ while self._numtasks: if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: # Run the coroutine to the next yield r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError(’unrecognized yield event’) except StopIteration: self._numtasks -= 1# Example implementation of coroutine-based socket I/Oclass ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data)class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent)class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r)# Wrapper around a socket object for use with yieldclass Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name)if __name__ == ’__main__’: from socket import socket, AF_INET, SOCK_STREAM import time # Example of a function involving generators. This should # be called using line = yield from readline(sock) def readline(sock): chars = [] while True: c = yield sock.recv(1) if not c:break chars.append(c) if c == b’n’:break return b’’.join(chars) # Echo server using generators class EchoServer: def __init__(self,addr,sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self,addr): s = Socket(socket(AF_INET,SOCK_STREAM)) s.bind(addr) s.listen(5) while True:c,a = yield s.accept()print(’Got connection from ’, a)self.sched.new(self.client_handler(Socket(c))) def client_handler(self,client): while True:line = yield from readline(client)if not line: breakline = b’GOT:’ + linewhile line: nsent = yield client.send(line) line = line[nsent:] client.close() print(’Client closed’) sched = Scheduler() EchoServer((’’,16000),sched) sched.run()

這段代碼有點(diǎn)復(fù)雜。不過,它實(shí)現(xiàn)了一個小型的操作系統(tǒng)。 有一個就緒的任務(wù)隊(duì)列,并且還有因I/O休眠的任務(wù)等待區(qū)域。 還有很多調(diào)度器負(fù)責(zé)在就緒隊(duì)列和I/O等待區(qū)域之間移動任務(wù)。

討論

在構(gòu)建基于生成器的并發(fā)框架時,通常會使用更常見的yield形式:

def some_generator(): ... result = yield data ...

使用這種形式的yield語句的函數(shù)通常被稱為“協(xié)程”。 通過調(diào)度器,yield語句在一個循環(huán)中被處理,如下:

f = some_generator()# Initial result. Is None to start since nothing has been computedresult = Nonewhile True: try: data = f.send(result) result = ... do some calculation ... except StopIteration: break

這里的邏輯稍微有點(diǎn)復(fù)雜。不過,被傳給 send() 的值定義了在yield語句醒來時的返回值。 因此,如果一個yield準(zhǔn)備在對之前yield數(shù)據(jù)的回應(yīng)中返回結(jié)果時,會在下一次 send() 操作返回。 如果一個生成器函數(shù)剛開始運(yùn)行,發(fā)送一個None值會讓它排在第一個yield語句前面。

除了發(fā)送值外,還可以在一個生成器上面執(zhí)行一個 close() 方法。 它會導(dǎo)致在執(zhí)行yield語句時拋出一個 GeneratorExit 異常,從而終止執(zhí)行。 如果進(jìn)一步設(shè)計,一個生成器可以捕獲這個異常并執(zhí)行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語句執(zhí)行時生成一個任意的執(zhí)行指令。 一個任務(wù)調(diào)度器可利用它來在運(yùn)行的生成器中處理錯誤。

最后一個例子中使用的 yield from 語句被用來實(shí)現(xiàn)協(xié)程,可以被其它生成器作為子程序或過程來調(diào)用。 本質(zhì)上就是將控制權(quán)透明的傳輸給新的函數(shù)。 不像普通的生成器,一個使用 yield from 被調(diào)用的函數(shù)可以返回一個作為 yield from 語句結(jié)果的值。 關(guān)于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器編程,要提醒你的是它還是有很多缺點(diǎn)的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執(zhí)行CPU依賴或I/O阻塞程序, 它會將整個任務(wù)掛起直到操作完成。為了解決這個問題, 你只能選擇將操作委派給另外一個可以獨(dú)立運(yùn)行的線程或進(jìn)程。 另外一個限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個方案,你會發(fā)現(xiàn)你需要自己改寫很多標(biāo)準(zhǔn)庫函數(shù)。 作為本節(jié)提到的協(xié)程和相關(guān)技術(shù)的一個基礎(chǔ)背景,可以查看 PEP 342 和 “協(xié)程和并發(fā)的一門有趣課程”

PEP 3156 同樣有一個關(guān)于使用協(xié)程的異步I/O模型。 特別的,你不可能自己去實(shí)現(xiàn)一個底層的協(xié)程調(diào)度器。 不過,關(guān)于協(xié)程的思想是很多流行庫的基礎(chǔ), 包括 gevent, greenlet, Stackless Python 以及其他類似工程。

以上就是Python 使用生成器代替線程的方法的詳細(xì)內(nèi)容,更多關(guān)于Python 生成器代替線程的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 国产日韩欧美精品在线 | 欧美一区二区三区精品国产 | 99色视频在线 | 99国产精品视频免费观看 | 久久久久久一级毛片免费野外 | 国产成人亚洲精品无广告 | 午夜啪啪福利视频 | 亚洲国产夜色在线观看 | 美女视频黄在线观看 | 亚洲一区欧美二区 | 在线观看精品视频一区二区三区 | 国产成人资源 | 欧美高清一区二区 | 国内精品线在线观看 | 国产欧美综合一区二区 | 被老外玩爽的中国美女视频 | 久久国产一区二区 | 三级欧美 | 高颜值美女啪啪 | 一级片在线免费看 | 亚洲国产精品自在现线让你爽 | 久久精品在线观看 | 手机在线毛片免费播放 | 九草网| 曰批美女免费视频播放 | 欧美xxx高清 | 韩国一级性生活片 | 永久免费毛片手机版在线看 | 亚洲国产精品久久久久久网站 | 夜色1网站| 九九久久精品国产 | 国产精品黄页网站在线播放免费 | 亚洲综合爱久久影院 | 国产精选91热在线观看 | 成人黄色免费观看 | 色偷偷亚洲偷自拍 | 欧美va在线播放免费观看 | 国产精品免费大片 | 欧美日韩精品在线视频 | 91久久精品国产91性色tv | 国产精品久久久久久久久福利 |