首頁?學習  »   正文

不用多進程的Python十倍速并行技巧(上)

不用多進程的Python十倍速并行技巧(上)

雖然python的多處理庫已經成功地廣泛的用于應用程序,但在本文中,我們發現它在缺少一些重要的應用程序類中依然存在不足,包括數值數據處理、狀態計算和具有昂貴初始化的計算。主要有兩個原因:

  • 數字數據處理效率低下。
  • 缺少狀態計算的抽象(即無法在單獨的“任務”之間共享變量)。

Ray是一個快速、簡單的框架,用于構建和運行解決這些問題的分布式應用程序。Ray利用ApacheArrow進行高效的數據處理,并為分布式計算提供任務和參與者抽象。

本文對三種不易用Python多處理表示的工作負載進行了基準測試,并比較了Ray、Python多處理和串行Python代碼。請注意,務必與優化的單線程代碼進行比較。

在這些基準測試中,Ray比串行Python快10-30倍,比多處理快5-25倍,比大型機器上這兩種方法快5-15倍。

不用多進程的Python十倍速并行技巧(上)

在48個物理內核的機器上,Ray比Python多處理快9倍,比單線程Python快28倍。錯誤條被描繪出來,但在某些情況下太小,看不見。下面提供了復制這些數字的代碼。工作負載被擴展到核心的數量,所以更多的核心需要做更多的工作(這就是為什么serial python在更多的核心上花費更長的時間)。

使用M5實例類型(M5.large用于1個物理內核,M5.24XLarge用于48個物理內核)在EC2上運行基準測試。這里提供了運行所有基準的代碼。這篇文章中包含了縮寫的代碼片段。主要的區別在于,完整的基準包括1)計時和打印代碼,2)預熱Ray對象存儲的代碼,以及3)使基準適應小型機器的代碼。

基準1:數字數據

許多機器學習、科學計算和數據分析工作負載大量使用大型數據陣列。例如,一個數組可以表示一個大的圖像或數據集,應用程序可能希望有多個任務分析該圖像。有效處理數字數據至關重要。

通過下面的for循環,每一個使用Ray需要0.84秒,使用python多處理需要7.5秒,使用串行python需要24秒(在48個物理核上)。這一性能差異解釋了為什么可以在Ray上構建類似Modin的庫,而不是在其他庫之上構建。

代碼如下:

import numpy as np
import psutil
import ray
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

@ray.remote
def f(image, random_filter):
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

# Time the code below.

for _ in range(10):
    image = np.zeros((3000, 3000))
    image_id = ray.put(image)
    ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])

使用ray的玩具圖像處理示例的代碼。

通過調用ray.put(image),大型數組存儲在共享內存中,所有工作進程都可以訪問它,而不需要創建副本。這不僅適用于數組,還適用于包含數組的對象(如數組列表)。

當工作人員執行f任務時,結果再次存儲在共享內存中。然后,當腳本調用ray.get([…])時,它創建由共享內存支持的numpy數組,而無需反序列化或復制值。

這些優化是由于Ray使用Apache Arrow作為底層數據布局和序列化格式以及等離子共享內存對象存儲而實現的。

使用Python多處理,代碼如下所示:

from multiprocessing import Pool
import numpy as np
import psutil
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

def f(args):
    image, random_filter = args
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

pool = Pool(num_cpus)

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

# Time the code below.

for _ in range(10):
    image = np.zeros((3000, 3000))
    pool.map(f, zip(num_cpus * [image], filters))

使用多處理的玩具圖像處理示例的代碼。

這里的不同之處在于,Python multiprocessing在進程之間傳遞大型對象時使用pickle來序列化它們。這種方法要求每個進程創建自己的數據副本,這增加了大量的內存使用,以及昂貴的反序列化開銷,Ray通過使用Apache Arrow數據布局實現零拷貝序列化和Plasma store來避免這種開銷。

基準2:有狀態計算

需要在許多小工作單元之間共享大量“狀態”的工作負載是對Python多處理構成挑戰的另一類工作負載。這種模式非常常見,我用一個玩具流處理應用程序來說明它。

不用多進程的Python十倍速并行技巧(上)

在擁有48個物理內核的機器上,Ray比Python多處理速度快6倍,比單線程Python快17倍。在少于24個內核上,Python多處理并不比單線程Python表現得更好。工作負載被擴展到核心的數量,所以更多的核心需要做更多的工作(這就是為什么serial python在更多的核心上花費更長的時間)。

狀態通常封裝在Python類中,Ray提供了一個參與者抽象,這樣類就可以在并行和分布式設置中使用。相反,Python multiprocessing并沒有提供一種自然的方法來并行化Python類,因此用戶經常需要在map調用之間傳遞相關的狀態。這種策略在實踐中很難實現(許多Python變量不容易序列化),而且當它實際工作時可能很慢。

下面是一個有趣的示例,它使用并行任務一次處理一個文檔,提取每個單詞的前綴,并在末尾返回最常見的前綴。前綴計數存儲在actor狀態中,并由不同的任務進行更改。

本例使用Ray使用3.2秒,使用Python多處理使用21秒,使用串行Python使用54秒(在48個物理核心上)。

Ray版本如下所示:

from collections import defaultdict
import numpy as np
import psutil
import ray

num_cpus = psutil.cpu_count(logical=False)

ray.init(num_cpus=num_cpus)

@ray.remote
class StreamingPrefixCount(object):
    def __init__(self):
        self.prefix_count = defaultdict(int)
        self.popular_prefixes = set()

    def add_document(self, document):
        for word in document:
            for i in range(1, len(word)):
                prefix = word[:i]
                self.prefix_count[prefix] += 1
                if self.prefix_count[prefix] > 3:
                    self.popular_prefixes.add(prefix)

    def get_popular(self):
        return self.popular_prefixes

streaming_actors = [StreamingPrefixCount.remote() for _ in range(num_cpus)]

# Time the code below.

for i in range(num_cpus * 10):
    document = [np.random.bytes(20) for _ in range(10000)]
    streaming_actors[i % num_cpus].add_document.remote(document)

# Aggregate all of the results.
results = ray.get([actor.get_popular.remote() for actor in streaming_actors])
popular_prefixes = set()
for prefixes in results:
    popular_prefixes |= prefixes

使用Ray的玩具流處理示例的代碼。

Ray在這里表現得很好,因為Ray的抽象符合當前的問題。這個應用程序需要一種在分布式設置中封裝和修改狀態的方法,而actor正好符合這個要求。

多處理版本如下。

from collections import defaultdict
from multiprocessing import Pool
import numpy as np
import psutil

num_cpus = psutil.cpu_count(logical=False)

def accumulate_prefixes(args):
    running_prefix_count, running_popular_prefixes, document = args
    for word in document:
        for i in range(1, len(word)):
            prefix = word[:i]
            running_prefix_count[prefix] += 1
            if running_prefix_count[prefix] > 3:
                running_popular_prefixes.add(prefix)
    return running_prefix_count, running_popular_prefixes

pool = Pool(num_cpus)

running_prefix_counts = [defaultdict(int) for _ in range(4)]
running_popular_prefixes = [set() for _ in range(4)]

for i in range(10):
    documents = [[np.random.bytes(20) for _ in range(10000)]
                 for _ in range(num_cpus)]
    results = pool.map(
        accumulate_prefixes,
        zip(running_prefix_counts, running_popular_prefixes, documents))
    running_prefix_counts = [result[0] for result in results]
    running_popular_prefixes = [result[1] for result in results]

popular_prefixes = set()
for prefixes in running_popular_prefixes:
    popular_prefixes |= prefixes

使用多處理的玩具流處理示例的代碼。

這里的挑戰是pool.map執行無狀態函數,這意味著要在另一個pool.map調用中使用的pool.map調用中生成的任何變量都需要從第一個調用返回并傳遞到第二個調用。對于小對象來說,這種方法是可以接受的,但是當需要共享大的中間結果時,傳遞它們的成本是很高的(注意,如果變量在線程之間共享,這是不可能的,但是因為它們是跨進程邊界共享的,必須使用類似pickle的庫將變量序列化為一個字節字符串)。

因為它必須傳遞如此多的狀態,所以多處理版本看起來非常笨拙,最終只在串行Python上實現了很小的加速。實際上,您不會編寫這樣的代碼,因為您只是不會使用Python多處理進行流處理。相反,您可能會使用專用的流處理框架。這個例子表明,Ray非常適合構建這樣的框架或應用程序。

這里有一個警告是,有很多方法可以使用Python多處理。在本例中,我們將pool.map進行比較,因為它提供了最接近的API比較。在本例中,應該可以通過啟動不同的進程并在它們之間設置多個多進程隊列來獲得更好的性能,但是這會導致復雜而脆弱的設計。

好了,今天的內容已經很多了,余下的內容我們明天繼續。

歡迎關注ATYUN官方公眾號,商務合作及內容投稿請聯系郵箱:[email protected]

發表評論