D.ershane D Programlama Dili
Ali Çehreli

çokuzlu: [tuple], bir kaç parçanın diziye benzer şekilde bir araya gelmesinden oluşan yapı
değişmez: [immutable], programın çalışması süresince kesinlikle değişmeyen
eş zamanlı programlama: [concurrency], iş parçacıklarının birbirlerine bağımlı olarak işlemeleri
evrensel: [global], evrensel isim alanında tanımlanmış, erişimi kısıtlanmamış
görev: [task], programın geri kalanıyla koşut işletilebilen işlem birimi
iş parçacığı: [thread], işletim sisteminin program işletme birimi
koşut işlemler: [parallelization], bağımsız işlemlerin aynı anda işletilmeleri
mesajlaşma: [message passing], iş parçacıklarının birbirlerine mesaj göndermeleri
... bütün sözlük

Bölümler
İngilizce Kaynaklar
Diğer



Mesajlaşarak Eş Zamanlı Programlama

Eş zamanlı programlama bir önceki bölümde gördüğümüz koşut işlemlere çok benzer. İkisi de işlemlerin farklı iş parçacıkları üzerinde aynı anda işletilmeleri ile ilgilidir ve aslında koşut işlemler de perde arkasında eş zamanlı programlama ile gerçekleştirilir. Bu iki kavram bu yüzden çok karıştırılır.

Koşut işlemlerle eş zamanlı programlama arasındaki farklar şunlardır:

D hem mesajlaşmaya dayanan hem de veri paylaşımına dayanan eş zamanlı programlamayı destekler. Veri paylaşımı ile hatasız programlar üretmek çok zor olduğundan modern programcılıkta mesajlaşma yöntemi benimsenmiştir. Bu bölümde std.concurrency modülünün sağladığı mesajlaşma olanaklarını, bir sonraki bölümde ise veri paylaşımına dayalı eş zamanlı programlama olanaklarını göreceğiz.

Kavramlar

İş parçacığı (thread): İşletim sistemi bütün programları iş parçacığı adı verilen işlem birimleri ile işletir. Çalıştırılan her D programının main() ile başlayan işlemleri işletim sisteminin o programı çalıştırmak için seçmiş olduğu bir iş parçacığı üzerinde başlatılır. main()'in işlettiği bütün işlemler normalde hep aynı iş parçacığı üzerinde işletilirler. Program, gerektiğinde kendisi başka iş parçacıkları başlatabilir ve böylece aynı anda birden fazla iş yürütebilir. Örneğin bir önceki bölümde gördüğümüz her görev, std.parallelism'in olanakları tarafından başlatılmış olan bir iş parçacığını kullanır.

İşletim sistemi iş parçacıklarını önceden kestirilemeyecek anlarda duraksatır ve tekrar başlatır. Bunun sonucunda örneğin aşağıdaki kadar basit işlemler bile bir süre yarım kalmış olabilirler:

    ++i;

Yukarıdaki işlem aslında üç adımdan oluşur: Değişkenin değerinin okunması, değerin arttırılması ve tekrar değişkene atanması. İşletim sisteminin bu iş parçacığını duraksattığı bir anda bu adımlar sonradan devam edilmek üzere yarım kalmış olabilirler.

Mesaj (message): İş parçacıklarının işleyişleri sırasında birbirlerine gönderdikleri bilgilere mesaj denir. Mesaj her türden ve her sayıda değişkenden oluşabilir.

İş parçacığı kimliği (Tid): Her iş parçacığının bir kimliği vardır. Kimlik, gönderilen mesajın alıcısı olan iş parçacığını belirler.

Sahip (owner): İş parçacığı başlatan her iş parçacığı, başlatılan iş parçacığının sahibi olarak anılır.

İşçi (worker): Başlatılan iş parçacığına işçi denir.

İş parçacıklarını başlatmak

Yeni bir iş parçacığı başlatmak için spawn() kullanılır. spawn() parametre olarak bir işlev alır ve yeni iş parçacığını o işlevden başlatır. O işlevin belki de başka işlevlere de dallanarak devam eden işlemleri artık yeni iş parçacığı üzerinde işletilirler. Bu noktadan sonra sahip ve işçi birbirlerinden bağımsız iki alt program gibi işlemeye başlarlar:

import std.stdio;
import std.concurrency;
import core.thread;

void işçi()
{
    foreach (i; 0 .. 5) {
        Thread.sleep(dur!"msecs"(500));
        writeln(i, " (işçi)");
    }
}

void main()
{
    spawn(&işçi);

    foreach (i; 0 .. 5) {
        Thread.sleep(dur!"msecs"(300));
        writeln(i, " (main)");
    }

    writeln("main tamam");
}

İşlemlerin aynı anda işletildiklerini gösterebilmek için buradaki örneklerde de Thread.sleep'ten yararlanıyorum. Programın çıktısı main()'den ve işçi()'den başlamış olan iki iş parçacığının diğerinden bağımsız olarak işlediğini gösteriyor:

0 (main)
0 (işçi)
1 (main)
2 (main)
1 (işçi)
3 (main)
2 (işçi)
4 (main)
main tamam
3 (işçi)
4 (işçi)

Program bütün iş parçacıklarının tamamlanmasını otomatik olarak bekler. Bunu yukarıdaki çıktıda görüyoruz: main()'in sonundaki "main tamam" yazdırıldığı halde işçi() işlevinin de tamamlanması beklenmiştir.

İş parçacığını başlatan işlevin aldığı parametreler spawn()'a işlev isminden sonra verilirler. Aşağıdaki programdaki iki işçi, çıkışa dörder tane sayı yazdırıyor. Hangi değerden başlayacağını parametre olarak alıyor:

import std.stdio;
import std.concurrency;
import core.thread;

void işçi(int başlangıçDeğeri)
{
    foreach (i; 0 .. 4) {
        Thread.sleep(dur!"msecs"(500));
        writeln(başlangıçDeğeri + i);
    }
}

void main()
{
    foreach (i; 1 .. 3) {
        spawn(&işçi, i * 10);
    }
}

İş parçacıklarından birisinin yazdırdıklarını maviyle belirtiyorum. İşletim sisteminin iş parçacıklarını başlatmasına ve duraklatmasına bağlı olarak bu çıktıdaki satırlar farklı sırada da olabilirler:

10
20
11
21
12
22
13
23
İş parçacıklarının kimlikleri

Modülün isim alanında tanımlanmış olan thisTid, her iş parçacığının kendi kimliğidir. İsmi, "bu iş parçacığının kimliği" anlamına gelen "this thread's identifier"dan türemiştir.

import std.stdio;
import std.concurrency;

void kimlikBilgisi(string açıklama)
{
    writefln("%s: %s, adresi: %s",
             açıklama, thisTid, &thisTid);
}

void işçi()
{
    kimlikBilgisi("işçi ");
}

void main()
{
    spawn(&işçi);
    kimlikBilgisi("sahip");
}

Tid türündeki kimliğin değerinin program açısından bir önemi olmadığı için bu türün toString işlevi bile tanımlanmamıştır. Bu yüzden programın aşağıdaki çıktısında yalnızca türün ismini görüyoruz. Ek olarak, bu iki değerin adresleri bile aynı çıkar:

sahip: Tid(std.concurrency.MessageBox), address: 809C360
işçi : Tid(std.concurrency.MessageBox), address: 809C360

Buna rağmen o iki kimlik farklıdır. Bunun nedeni, D'de modüllerin evrensel alanlarının veri doğruluğunu sağlamak için her iş parçacığında farklı olmasıdır. Her iş parçacığı tarafından aynı adresteymiş gibi algılanmasına rağmen bu değişkenler her iş parçacığında farklıdır.

Sahibin kendi kimliğini yeni başlattığı iş parçacığına parametre olarak göndermesi ve böylece kendisini tanıtması yaygındır:

import std.concurrency;

void işçi(Tid sahibim)
{
    // ...
}

void main()
{
    spawn(&işçi, thisTid);
}

spawn()'ın bu noktaya kadar gözardı etmiş olduğum dönüş değeri de işçinin kimliğini sahibe bildirir:

    Tid işçim = spawn(&işçi, thisTid);

Sahip ve işçi birbirlerinin kimliklerini böylece elde etmiş olurlar.

Mesajlaşma

Mesaj göndermek için send(), belirli türden mesaj beklemek için de receiveOnly() kullanılır. (Çeşitli türlerden mesaj bekleyen receive()'i ve belirli süreye kadar bekleyen receiveTimeout()'u daha aşağıda göstereceğim.)

Aşağıdaki programdaki sahip iş parçacığı işçisine int türünde bir mesaj göndermekte ve ondan double türünde bir mesaj beklemektedir. Bu iş parçacıkları sahip sıfırdan küçük bir değer gönderene kadar mesajlaşmaya devam edecekler. Önce sahip iş parçacığını gösteriyorum:

void main()
{
    Tid işçi = spawn(&işçiİşlevi, thisTid);

    foreach (değer; 1 .. 5) {
        işçi.send(değer);
        double sonuç = receiveOnly!double();
        writefln("gönderilen: %s, alınan: %s", değer, sonuç);
    }

    /* Sonlanmasını sağlamak için işçiye sıfırdan küçük bir
     * değer gönderiyoruz  */
    işçi.send(-1);
}

main(), spawn()'ın döndürdüğü iş parçacığının kimliğini işçi ismiyle saklamakta ve bu kimliği send() ile mesaj gönderirken kullanmaktadır.

İşçi ise kullanacağı int'i bir mesaj olarak alıyor, onu bir hesapta kullanıyor ve ürettiği double'ı yine bir mesaj olarak sahibine gönderiyor:

void işçiİşlevi(Tid sahip)
{
    int değer = 0;

    while (değer >= 0) {
        değer = receiveOnly!int();
        double sonuç = cast(double)değer / 5;
        sahip.send(sonuç);
    }
}

Yukarıdaki iş parçacığı mesajdaki değerin beşte birini hesaplar. Programın çıktısı şöyle:

gönderilen: 1, alınan: 0.2
gönderilen: 2, alınan: 0.4
gönderilen: 3, alınan: 0.6
gönderilen: 4, alınan: 0.8

Birden fazla değer aynı mesajın parçası olarak gönderilebilir:

    sahip.send(thisTid, 42, 1.5);

Aynı mesajın parçası olarak gönderilen değerler alıcı tarafta bir çokuzlunun üyeleri olarak belirirler. receiveOnly()'nin şablon parametrelerinin mesajı oluşturan türlere uymaları şarttır:

    /* Tid, int, ve double türlerinden oluşan bir mesaj
     * bekliyoruz */
    auto mesaj = receiveOnly!(Tid, int, double)();

    /* Mesaj bir çokuzlu olarak alınır */
    auto gönderen = mesaj[0];    // Tid türünde
    auto tamsayı =  mesaj[1];    // int türünde
    auto kesirli =  mesaj[2];    // double türünde

Türler uymadığında "mesaj uyumsuzluğu" anlamına gelen MessageMismatch hatası atılır:

import std.concurrency;

void işçiİşlevi(Tid sahip)
{
    // string gönderiyor
    sahip.send("merhaba");
}

void main()
{
    spawn(&işçiİşlevi, thisTid);

    // double bekliyor
    auto mesaj = receiveOnly!double();
}

Çıktısı:

std.concurrency.MessageMismatch@std/concurrency.d(202):
Unexpected message type
Örnek

Şimdiye kadar gördüğümüz kavramları kullanan basit bir benzetim programı tasarlayalım.

Bu örnek iki boyutlu düzlemdeki robotların birbirlerinden bağımsız ve rasgele hareketlerini belirliyor. Her robotu farklı bir iş parçacığı yönetiyor. Her iş parçacığı başlatılırken dört bilgi alıyor:

Yukarıdaki son üç bilgiyi bir arada tutan bir İş yapısı şöyle tanımlanabilir:

struct İş
{
    int robotNumarası;
    Yer başlangıç;
    Duration dinlenmeSüresi;
}

Bu iş parçacığının yaptığı tek iş, robotun numarasını ve hareketini bir sonsuz döngü içinde sahibine göndermek:

void gezdirici(Tid sahip, İş iş)
{
    Yer nereden = iş.başlangıç;

    while (true) {
        Thread.sleep(iş.dinlenmeSüresi);

        Yer nereye = rasgeleKomşu(nereden);
        Hareket hareket = Hareket(nereden, nereye);
        nereden = nereye;

        sahip.send(HareketMesajı(iş.robotNumarası, hareket));
    }
}

Sahip de sonsuz bir döngü içinde bu mesajları bekliyor. Aldığı mesajların hangi robotla ilgili olduğunu her mesajın parçası olan robot numarasından anlıyor:

    while (true) {
        auto mesaj = receiveOnly!HareketMesajı();

        writefln("%s %s",
                 robotlar[mesaj.robotNumarası],
                 mesaj.hareket);
    }

Bu örnekteki bütün mesajlar işçilerden sahibe gönderiliyor. Daha karmaşık programlarda her iki yönde ve çok çeşitli türlerden mesajlar da gönderilebilir. Programın tamamı şöyle:

import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;

struct Yer
{
    int satır;
    int sütun;

    string toString()
    {
        return format("%s,%s", satır, sütun);
    }
}

struct Hareket
{
    Yer nereden;
    Yer nereye;

    string toString()
    {
        return ((nereden == nereye)
                ? format("%s (durgun)", nereden)
                : format("%s -> %s", nereden, nereye));
    }
}

class Robot
{
    string görünüm;
    Duration dinlenmeSüresi;

    this(string görünüm, Duration dinlenmeSüresi)
    {
        this.görünüm = görünüm;
        this.dinlenmeSüresi = dinlenmeSüresi;
    }

    override string toString()
    {
        return format("%s(%s)", görünüm, dinlenmeSüresi);
    }
}

/* 0,0 noktası etrafında rasgele bir yer döndürür */
Yer rasgeleYer()
{
    return Yer(uniform!"[]"(-10, 10), uniform!"[]"(-10, 10));
}

/* Verilen değerin en fazla bir adım ötesinde bir değer
 * döndürür */
int rasgeleAdım(int şimdiki)
{
    return şimdiki + uniform!"[]"(-1, 1);
}

/* Verilen Yer'in komşusu olan bir Yer döndürür; çapraz
 * komşusu olabileceği gibi tesadüfen aynı yer de olabilir. */
Yer rasgeleKomşu(Yer yer)
{
    return Yer(rasgeleAdım(yer.satır),
               rasgeleAdım(yer.sütun));
}

struct İş
{
    int robotNumarası;
    Yer başlangıç;
    Duration dinlenmeSüresi;
}

struct HareketMesajı
{
    int robotNumarası;
    Hareket hareket;
}

void gezdirici(Tid sahip, İş iş)
{
    Yer nereden = iş.başlangıç;

    while (true) {
        Thread.sleep(iş.dinlenmeSüresi);

        Yer nereye = rasgeleKomşu(nereden);
        Hareket hareket = Hareket(nereden, nereye);
        nereden = nereye;

        sahip.send(HareketMesajı(iş.robotNumarası, hareket));
    }
}

void main()
{
    /* Farklı hızlardaki robotlar */
    Robot[] robotlar = [ new Robot("A", dur!"msecs"( 600)),
                         new Robot("B", dur!"msecs"(2000)),
                         new Robot("C", dur!"msecs"(5000)) ];

    /* Her birisi için bir iş parçacığı başlatılıyor */
    foreach (int robotNumarası, robot; robotlar) {
        spawn(&gezdirici, thisTid, İş(robotNumarası,
                                      rasgeleYer(),
                                      robot.dinlenmeSüresi));
    }

    /* Artık hareket bilgilerini işçilerden toplamaya
     * başlayabiliriz */
    while (true) {
        auto mesaj = receiveOnly!HareketMesajı();

        /* Bu robotla ilgili yeni bilgiyi çıkışa
         * yazdırıyoruz */
        writefln("%s %s",
                 robotlar[mesaj.robotNumarası],
                 mesaj.hareket);
    }
}

Program sonlandırılana kadar robotların konumlarını çıkışa yazdırır:

A(600 ms) -3,3 -> -4,4
A(600 ms) -4,4 -> -4,3
A(600 ms) -4,3 -> -3,2
B(2 secs) -6,9 (durgun)
A(600 ms) -3,2 -> -2,2
A(600 ms) -2,2 -> -3,1
A(600 ms) -3,1 -> -2,0
B(2 secs) -6,9 -> -5,9
A(600 ms) -2,0 (durgun)
A(600 ms) -2,0 -> -3,-1
C(5 secs) -6,6 -> -6,7
A(600 ms) -3,-1 -> -4,-1
...

Mesajlaşmaya dayanan eş zamanlı programlamanın yararını bu örnekte görebiliyoruz. Her robotun hareketi aynı anda ve diğerlerinden bağımsız olarak hesaplanıyor. Bu basit örnekteki sahip yalnızca robotların hareketlerini çıkışa yazdırıyor; bütün robotları ilgilendiren başka işlemler de uygulanabilir.

Farklı çeşitlerden mesaj beklemek

receiveOnly() yalnızca belirtilen türden mesaj bekleyebilir. receive() ise farklı çeşitlerden mesajlar beklemek için kullanılır. Parametre olarak belirsiz sayıda mesajcı işlev alır. Gelen mesaj bu mesajcı işlevlere sırayla uydurulmaya çalışılır ve mesaj, mesajın türünün uyduğu ilk işleve gönderilir.

Örneğin aşağıdaki receive() çağrısı ilki int, ikincisi de string bekleyen iki mesajcı işlev kullanmaktadır:

void işçiİşlevi()
{
    bool tamam_mı = false;

    while (!tamam_mı) {
        void intİşleyen(int mesaj)
        {
            writeln("int mesaj: ", mesaj);

            if (mesaj == -1) {
                writeln("çıkıyorum");
                tamam_mı = true;
            }
        }

        void stringİşleyen(string mesaj)
        {
            writeln("string mesaj: ", mesaj);
        }

        receive(&intİşleyen, &stringİşleyen);
    }
}

Gönderilen int mesajlar intİşleyen()'e, string mesajlar da stringİşleyen()'e uyarlar. O iş parçacığını şöyle bir kodla deneyebiliriz:

import std.stdio;
import std.concurrency;

// ...

void main()
{
    auto işçi = spawn(&işçiİşlevi);

    işçi.send(10);
    işçi.send(42);
    işçi.send("merhaba");
    işçi.send(-1);        // ← işçinin sonlanması için
}

Mesajlar alıcı taraftaki uygun mesajcı işlevlere gönderilirler:

int mesaj: 10
int mesaj: 42
string mesaj: merhaba
int mesaj: -1
çıkıyorum

receive(), yukarıdaki normal işlevler yerine isimsiz işlevler veya opCall() üye işlevi tanımlanmış olan türlerin nesnelerini de kullanabilir. Bunun bir örneğini görmek için programı isimsiz işlevler kullanacak şekilde değiştirelim. Ek olarak, işçinin sonlanmasını da -1 gibi özel bir değer yerine ismi açıkça Sonlan olan özel bir türle bildirelim.

Aşağıda receive()'e parametre olarak üç isimsiz işlev gönderildiğine dikkat edin. Bu işlevlerin açma ve kapama parantezlerini sarı ile belirtiyorum:

import std.stdio;
import std.concurrency;

struct Sonlan
{}

void işçiİşlevi()
{
    bool devam_mı = true;

    while (devam_mı) {
        receive(
            (int mesaj) {
                writeln("int mesaj: ", mesaj);
            },

            (string mesaj) {
                writeln("string mesaj: ", mesaj);
            },

            (Sonlan mesaj) {
                writeln("çıkıyorum");
                devam_mı = false;
            });
    }
}

void main()
{
    auto işçi = spawn(&işçiİşlevi);

    işçi.send(10);
    işçi.send(42);
    işçi.send("merhaba");
    işçi.send(Sonlan());
}
Beklenmeyen mesaj almak

std.variant modülünde tanımlanmış olan Variant her türden veriyi sarmalayabilen bir türdür. receive()'e verilen diğer mesajcı işlevlere uymayan mesajlar Variant türünü bekleyen bir mesajcı tarafından yakalanabilirler:

import std.stdio;
import std.concurrency;

void işçiİşlev()
{
    receive(
        (int mesaj) { /* ... */ },

        (double mesaj) { /* ... */ },

        (Variant mesaj) {
            writeln("Beklemediğim bir mesaj aldım: ", mesaj);
        });
}

struct ÖzelMesaj
{
    /* ... */
}

void main()
{
    auto işçi = spawn(&işçiİşlev);
    işçi.send(ÖzelMesaj());
}

Çıktısı:

Beklemediğim bir mesaj aldım: ÖzelMesaj()

Bu bölümün konusu dışında kaldığı için Variant'ın ayrıntılarına girmeyeceğim.

Mesajları belirli süreye kadar beklemek

Mesajların belirli bir süreden daha fazla beklenmesi istenmeyebilir. Gönderen iş parçacığı geçici olarak meşgul olmuş olabilir veya bir hata ile sonlanmış olabilir. Mesaj bekleyen iş parçacığının belki de hiç gelmeyecek olan bir mesajı sonsuza kadar beklemesini önlemek için receiveTimeout() çağrılır.

receiveTimeout()'un ilk parametresi mesajın en fazla ne kadar bekleneceğini bildirir. Dönüş değeri de mesajın o süre içinde gelip gelmediğini belirtir: Mesaj alındığında true, alınmadığında ise false değeridir.

import std.stdio;
import std.concurrency;
import core.thread;

void işçi(Tid sahip)
{
    Thread.sleep(dur!"seconds"(3));
    sahip.send("merhaba");
}

void main()
{
    spawn(&işçi, thisTid);

    writeln("mesaj bekliyorum");
    bool alındı = false;
    while (!alındı) {
        alındı = receiveTimeout(dur!"msecs"(600),
                                (string mesaj) {
                                    writeln("geldi: ", mesaj);
                                });

        if (!alındı) {
            writeln("... şimdilik yok");

            /* ... burada başka işlere devam edilebilir ... */
        }
    }
}

Yukarıdaki sahip, gereken mesajı en fazla 600 milisaniye bekliyor. Mesaj o süre içinde gelmezse başka işlerine devam edebilir:

mesaj bekliyorum
... şimdilik yok
... şimdilik yok
... şimdilik yok
... şimdilik yok
geldi: merhaba

Mesajın belirli süreden uzun sürmesi sonucunda çeşitli durumlarda başka kararlar da verilebilir. Örneğin, mesaj geç geldiğinde artık bir anlamı yoktur.

İşçide atılan hatalar

std.parallelism modülünün çoğu olanağı görevler sırasında atılan hataları yakalar ve görevle ilgili bir sonraki işlem sırasında tekrar atmak üzere saklar. Böylece örneğin bir görevin işlemesi sırasında atılmış olan hata daha sonra yieldForce() çağrıldığında görevi başlatan tarafta yakalanabilir:

    try {
        görev.yieldForce();

    } catch (Exception hata) {
        writefln("görev sırasında bir hata olmuş: '%s'",
                 hata.msg);
    }

Bu kolaylık std.concurrency'de yoktur. Atılan olası bir hatanın sahip iş parçacığına iletilebilmesi için açıkça yakalanması ve bir mesaj halinde gönderilmesi gerekir.

Aşağıdaki hesapçı() işlevi string türünde mesajlar alıyor; onları double türüne çeviriyor, 0.5 değerini ekliyor ve sonucu bir mesaj olarak gönderiyor:

void hesapçı(Tid sahip)
{
    while (true) {
        auto mesaj = receiveOnly!string();
        sahip.send(to!double(mesaj) + 0.5);
    }
}

Yukarıdaki to!double(), "merhaba" gibi double'a dönüştürülemeyen bir dizgi ile çağrıldığında hata atar. Atılan o hata hesapçı()'dan hemen çıkılmasına neden olacağından aşağıdaki üç mesajdan yalnızca birincisinin yanıtı alınabilir:

import std.stdio;
import std.concurrency;
import std.conv;

// ...

void main()
{
    Tid hesapçı = spawn(&hesapçı, thisTid);

    hesapçı.send("1.2");
    hesapçı.send("merhaba");  // ← hatalı veri
    hesapçı.send("3.4");

    foreach (i; 0 .. 3) {
        auto mesaj = receiveOnly!double();
        writefln("sonuç %s: %s", i, mesaj);
    }
}

Bu yüzden, sahip "1.2"nin sonucunu 1.7 olarak alır ama işçi sonlanmış olduğundan bir sonraki mesajı alamaz:

sonuç 0: 1.7
                 ← hiç gelmeyecek olan mesajı bekleyerek takılır

Hesapçı iş parçacığının bu konuda yapabileceği bir şey, kendi işlemleri sırasında atılabilecek olan hatayı try-catch ile yakalamak ve özel bir mesaj olarak iletmektir. Programı hatanın nedenini bir HesapHatası nesnesi olarak gönderecek şekilde aşağıda değiştiriyoruz. Ek olarak, iş parçacığının sonlanması da özel Sonlan türü ile sağlanıyor:

import std.stdio;
import std.concurrency;
import std.conv;

struct HesapHatası
{
    string neden;
}

struct Sonlan
{}

void hesapçı(Tid sahip)
{
    bool devam_mı = true;

    while (devam_mı) {
        receive(
            (string mesaj) {
                try {
                    sahip.send(to!double(mesaj) + 0.5);

                } catch (Exception hata) {
                    sahip.send(HesapHatası(hata.msg));
                }
            },

            (Sonlan mesaj) {
                devam_mı = false;
            });
    }
}

void main()
{
    Tid hesapçı = spawn(&hesapçı, thisTid);

    hesapçı.send("1.2");
    hesapçı.send("merhaba");  // ← hatalı veri
    hesapçı.send("3.4");
    hesapçı.send(Sonlan());

    foreach (i; 0 .. 3) {
        writef("sonuç %s: ", i);

        receive(
            (double mesaj) {
                writeln(mesaj);
            },

            (HesapHatası hata) {
                writefln("HATA! '%s'", hata.neden);
            });
    }
}

Hatanın nedeninin "rakam bulunamadı" anlamına gelen "no digits seen" olduğunu bu sefer görebiliyoruz:

sonuç 0: 1.7
sonuç 1: HATA! 'no digits seen'
sonuç 2: 3.9

Bu konuda diğer bir yöntem, işçinin yakaladığı hatanın olduğu gibi sahibe gönderilmesidir. Aynı hata sahip tarafından kullanılabileceği gibi tekrar atılabilir de:

// ... işçi tarafta ...
                try {
                    // ...

                } catch (shared(Exception) hata) {
                    sahip.send(hata);
                }},

// ... sahip tarafta ...
        receive(
            // ...

            (shared(Exception) hata) {
                throw hata;
            });

Yukarıdaki shared belirteçlerine neden gerek olduğunu bir sonraki bölümde göreceğiz.

İş parçacıklarının sonlandıklarını algılamak

İş parçacıkları alıcı tarafın herhangi bir nedenle sonlanmış olduğunu algılayabilirler.

OwnerTerminated hatası

"Sahip sonlandı" anlamına gelen bu hata işçinin bu durumdan haberinin olmasını sağlar. Aşağıdaki programdaki aracı iş parçası iki mesaj gönderdikten sonra sonlanıyor. Bunun sonucunda işçi tarafta bir OwnerTerminated hatası atılıyor:

import std.stdio;
import std.concurrency;

void main()
{
    spawn(&aracıİşlev);
}

void aracıİşlev()
{
    auto işçi = spawn(&işçiİşlev);
    işçi.send(1);
    işçi.send(2);
}  // ← İki mesajdan sonra sonlanıyor.

void işçiİşlev()
{
    while (true) {
        auto mesaj = receiveOnly!int(); // ← Sahip sonlanmışsa
                                        //   hata atılır.
        writeln("Mesaj: ", mesaj);
    }
}

Çıktısı:

Mesaj: 1
Mesaj: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
Owner terminated

İstendiğinde o hata işçi tarafından yakalanabilir ve böylece işçinin de hatasız olarak sonlanması sağlanabilir:

void işçiİşlev()
{
    bool devam_mı = true;

    while (devam_mı) {
        try {
            auto mesaj = receiveOnly!int();
            writeln("Mesaj: ", mesaj);

        } catch (OwnerTerminated hata) {
            writeln("Sahibim sonlanmış.");
            devam_mı = false;
        }
    }
}

Çıktısı:

Mesaj: 1
Mesaj: 2
Sahibim sonlanmış.
LinkTerminated hatası

spawnLinked() ile başlatılmış olan bir iş parçacığı sonlandığında sahibin tarafında LinkTerminated hatası atılır. spawnLinked(), spawn() ile aynı biçimde kullanılır:

import std.stdio;
import std.concurrency;

void main()
{
    auto işçi = spawnLinked(&işçiİşlev, thisTid);

    while (true) {
        auto mesaj = receiveOnly!int(); // ← İşçi sonlanmışsa
                                        //   hata atılır.
        writeln("Mesaj: ", mesaj);
    }
}

void işçiİşlev(Tid sahip)
{
    sahip.send(10);
    sahip.send(20);
}  // ← İki mesajdan sonra sonlanıyor.

İşçi yalnızca iki mesaj gönderdikten sonra sonlanıyor. İşçisini spawnLinked() ile başlatmış olduğu için sahip bu durumu bir LinkTerminated hatası ile öğrenir:

Mesaj: 10
Mesaj: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
Link terminated

OwnerTerminated hatasında olduğu gibi bu hata da yakalanabilir ve sahip de bu durumda düzenli olarak sonlanabilir:

    bool devam_mı = true;

    while (devam_mı) {
        try {
            auto mesaj = receiveOnly!int();
            writeln("Mesaj: ", mesaj);

        } catch (LinkTerminated hata) {
            writeln("İşçi sonlanmış.");
            devam_mı = false;
        }
    }

Çıktısı:

Mesaj: 10
Mesaj: 20
İşçi sonlanmış.
Posta kutusu yönetimi

İş parçacıklarına gönderilen mesajlar her iş parçacığına özel bir posta kutusunda dururlar. Posta kutusundaki mesajların sayısı alıcının mesajları işleyiş hızına bağlı olarak zamanla artabilir ve azalabilir. Posta kutusunun aşırı büyümesi hem sistem belleğine fazla yük getirir hem de programın tasarımındaki bir hataya işaret eder. Posta kutusunun sürekli olarak büyümesi bazı mesajların hiçbir zaman alınamayacaklarını da gösteriyor olabilir.

Posta kutusunun uzunluğu setMaxMailboxSize() işlevi ile kısıtlanır. Bu işlevin ilk parametresi hangi iş parçacığına ait posta kutusunun kısıtlanmakta olduğunu, ikinci parametresi posta kutusunun en fazla kaç mesaj alabileceğini, üçüncü parametresi de posta kutusu dolu olduğunda ne olacağını belirler. Üçüncü parametre için dört seçenek vardır:

Bunun bir örneğini görmek için önce posta kutusunun sürekli olarak büyümesini sağlayalım. Aşağıdaki programdaki işçi hiç zaman geçirmeden art arda mesaj gönderdiği halde sahip iş parçacığı her mesaj için bir saniye zaman harcamaktadır:

/* UYARI: Bu program çalışırken sisteminiz aşırı derecede
 *        yavaşlayabilir. */
import std.concurrency;
import core.thread;

void işçiİşlev(Tid sahip)
{
    while (true) {
        sahip.send(42);    // ← Sürekli olarak mesaj üretiyor.
    }
}

void main()
{
    spawn(&işçiİşlev, thisTid);

    while (true) {
        receive(
            (int mesaj) {
                // Her mesajda zaman geçiriyor.
                Thread.sleep(dur!"seconds"(1));
            });
    }
}

Mesajları tüketen taraf üreten taraftan yavaş kaldığı için yukarıdaki programın kullandığı bellek sürekli olarak artacaktır. Bunun önüne geçmek için ana iş parçacığının posta kutusu daha işçi başlatılmadan önce belirli bir mesaj sayısı ile kısıtlanabilir:

void main()
{
    setMaxMailboxSize(thisTid, 1000, OnCrowding.block);

    spawn(&işçiİşlev, thisTid);
// ...
}

Yukarıdaki setMaxMailboxSize() çağrısı ana iş parçacığının posta kutusunun uzunluğunu 1000 ile kısıtlamaktadır. OnCrowding.block, gönderen tarafın mesaja yer açılana kadar beklemesine neden olur.

OnCrowding.throwException kullanılan aşağıdaki örnekte ise mesajı gönderen taraf posta kutusunun dolu olduğunu atılan MailboxFull hatasından anlamaktadır:

import std.concurrency;
import core.thread;

void işçiİşlev(Tid sahip)
{
    while (true) {
        try {
            sahip.send(42);

        } catch (MailboxFull hata) {
            /* Gönderemedim; biraz sonra tekrar denerim. */
            Thread.sleep(dur!"msecs"(1));
        }
    }
}

void main()
{
    setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException);

    spawn(&işçiİşlev, thisTid);

    while (true) {
        receive(
            (int mesaj) {
                Thread.sleep(dur!"seconds"(1));
            });
    }
}
Öncelikli mesajlar

prioritySend() ile gönderilen mesajlar önceliklidir. Bu mesajlar posta kutusunda beklemekte olan mesajlardan daha önce alınırlar:

    prioritySend(sahip, ÖnemliMesaj(100));

Alıcı tarafta prioritySend() ile gönderilmiş olan mesajın türünü bekleyen mesajcı işlev yoksa PriorityMessageException hatası atılır:

std.concurrency.PriorityMessageException@std/concurrency.d(280):
Priority message
İş parçacığı isimleri

Şimdiye kadar kullandığımız basit örneklerde sahip ve işçinin birbirlerinin kimliklerini kolayca edindiklerini gördük. Çok sayıda iş parçacığının görev aldığı programlarda ise iş parçacıklarının Tid değerlerini birbirlerini tanısınlar diye elden ele geçirmek karmaşık olabilir. Bunun önüne geçmek için iş parçacıklarına bütün program düzeyinde isimler atanabilir.

Aşağıdaki üç işlev bütün iş parçacıkları tarafından erişilebilen bir eşleme tablosu gibi düşünülebilirler:

Aşağıdaki program birbirlerini isimleriyle bulan iki eş iş parçacığı başlatıyor. Bu iş parçacıkları sonlanmalarını bildiren Sonlan mesajını alana kadar birbirlerine mesaj gönderiyorlar:

import std.stdio;
import std.concurrency;
import core.thread;

struct Sonlan
{}

void main()
{
    // Eşinin ismi "ikinci" olan bir iş parçacığı
    auto birinci = spawn(&oyuncu, "ikinci");
    register("birinci", birinci);
    scope(exit) unregister("birinci");

    // Eşinin ismi "birinci" olan bir iş parçacığı
    auto ikinci = spawn(&oyuncu, "birinci");
    register("ikinci", ikinci);
    scope(exit) unregister("ikinci");

    Thread.sleep(dur!"seconds"(2));

    prioritySend(birinci, Sonlan());
    prioritySend(ikinci, Sonlan());
}

void oyuncu(string eşİsmi)
{
    Tid eş;

    while (eş == Tid.init) {
        Thread.sleep(dur!"msecs"(1));
        eş = locate(eşİsmi);
    }

    bool devam_mı = true;

    while (devam_mı) {
        eş.send("merhaba " ~ eşİsmi);
        receive(
            (string mesaj) {
                writeln("Mesaj: ", mesaj);
                Thread.sleep(dur!"msecs"(500));
            },

            (Sonlan mesaj) {
                writefln("%s, ben çıkıyorum.", eşİsmi);
                devam_mı = false;
            });
    }
}

Çıktısı:

Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
Mesaj: merhaba ikinci
Mesaj: merhaba ikinci
Mesaj: merhaba birinci
birinci, ben çıkıyorum.
ikinci, ben çıkıyorum.
Özet