Skip to content

Latest commit

 

History

History
1950 lines (1721 loc) · 138 KB

c24.md

File metadata and controls

1950 lines (1721 loc) · 138 KB

ГЛАВА 24. Многопоточное программирование. Часть вторая: библиотека TPL

Вероятно, самым главным среди новых средств, вне­ дренных в версию 4.0 среды .NET Framework, является библиотека распараллеливания задач (TPL). Эта биб­ лиотека усовершенствует многопоточное программирова­ ние двумя основными способами. Во-первых, она упрощает создание и применение многих потоков. И во-вторых, она позволяет автоматически использовать несколько процес­ соров. Иными словами, TPL открывает возможности для автоматического масштабирования приложений с целью эффективного использования ряда доступных процессо­ ров. Благодаря этим двух особенностям библиотеки TPL она рекомендуется в большинстве случаев к применению для организации многопоточной обработки.

Еще одним средством параллельного программиро­ вания, внедренным в версию 4.0 среды .NET Framework, является параллельный язык интегрированных запросов (PLINQ). Язык PLINQ дает возможность составлять запро­ сы, для обработки которых автоматически используется несколько процессоров, а также принцип параллелизма, когда это уместно. Как станет ясно из дальнейшего, запро­ сить параллельную обработку запроса очень просто. Сле­ довательно, с помощью PLINQ можно без особого труда внедрить параллелизм в запрос.

Главной причиной появления таких важных новшеств, как TPL и PLINQ, служит возросшее значение параллелиз­ ма в современном программировании. В настоящее время многоядерные процессоры уже стали обычным явлением. Кроме того, постоянно растет потребность в повышении производительности программ. Все это, в свою очередь, вызвало растущую потребность в механизме, который позволял бы с выгодой использовать несколько процессов для повышения произво­ дительности программного обеспечения. Но дело в том, что в прошлом это было не так-то просто сделать ясным и допускающим масштабирование способом. Изменить это положение, собственно, и призваны TPL и PLINQ. Ведь они дают возможность лег­ че (и безопаснее) использовать системные ресурсы.

Библиотека TPL определена в пространстве имен System.Threading.Tasks. Но для работы с ней обычно требуется также включать в программу класс System. Threading, поскольку он поддерживает синхронизацию и другие средства многопо­ точной обработки, в том числе и те, что входят в класс Interlocked.

В этой главе рассматривается и TPL, и PLINQ. Следует, однако, иметь в виду, что и та и другая тема довольно обширны. Поэтому в этой главе даются самые основы и рассматриваются некоторые простейшие способы применения TPL и PLINQ. Таким образом, материал этой главы послужит вам в качестве удобной отправной точки для дальнейшего изучения TPL и PLINQ. Если параллельное программирование входит в сферу ваших интересов, то именно эти средства .NET Framework вам придется изучить более основательно.

ПРИМЕЧАНИЕ Несмотря на то что применение TPL и PLINQ рекомендуется теперь для разработки боль­ шинства многопоточных приложений, организация многопоточной обработки на основе класса Thread, представленного в главе 23, по-прежнему находит широкое распростране­ ние. Кроме того, многое из того, что пояснялось в главе 23, применимо и к TPL. Поэтому усвоение материала главы 23 все еще необходимо для полного овладения особенностями организации многопоточной обработки на С#.

Два подхода к параллельному программированию

Применяя TPL, параллелизм в программу можно ввести двумя основными способа­ ми. Первый из них называется параллелизмом данных. При таком подходе одна опера­ ция над совокупностью данных разбивается на два параллельно выполняемых потока или больше, в каждом из которых обрабатывается часть данных. Так, если изменяется каждый элемент массива, то, применяя параллелизм данных, можно организовать па­ раллельную обработку разных областей массива в двух или больше потоках. Нетрудно догадаться, что такие параллельно выполняющиеся действия могут привести к значи­ тельному ускорению обработки данных по сравнению с последовательным подходом. Несмотря на то что параллелизм данных был всегда возможен и с помощью класса Thread, построение масштабируемых решений средствами этого класса требовало не­ мало усилий и времени. Это положение изменилось с появлением библиотеки TPL, с помощью которой масштабируемый параллелизм данных без особого труда вводится в программу.

Второй способ ввода параллелизм называется параллелизмом задач. При таком под­ ходе две операции или больше выполняются параллельно. Следовательно, паралле­ лизм задач представляет собой разновидность параллелизма, который достигался в прошлом средствами класса Thread. А к преимуществам, которые сулит применение TPL, относится простота применения и возможность автоматически масштабировать исполнение кода на несколько процессоров.

Класс Task

В основу TPL положен класс Task. Элементарная единица исполнения инкапсу­ лируется в TPL средствами класса Task, а не Thread. Класс Task отличается от класса Thread тем, что он является абстракцией, представляющей асинхронную операцию. А в классе Thread инкапсулируется поток исполнения. Разумеется, на системном уров­ не поток по-прежнему остается элементарной единицей исполнения, которую можно планировать средствами операционной системы. Но соответствие экземпляра объекта класса Task и потока исполнения не обязательно оказывается взаимно-однозначным. Кроме того, исполнением задач управляет планировщик задач, который работает с пу­ дом потоков. Это, например, означает, что несколько задач могут разделять один и тот же поток. Класс Task (и вся остальная библиотека TPL) определены в пространстве имен System.Threading.Tasks.

Создание задачи

Создать новую задачу в виде объекта класса Task и начать ее исполнение можно самыми разными способами. Для начала создадим объект типа Task с помощью кон­ структора и запустим его, вызвав метод Start(). Для этой цели в классе Task опреде­ лено несколько конструкторов. Ниже приведен тот конструктор, которым мы собира­ емся воспользоваться:

public Task(Action действие)

где действие обозначает точку входа в код, представляющий задачу, тогда как Action — делегат, определенный в пространстве имен System. Форма делегата Action, которой мы собираемся воспользоваться, выглядит следующим образом.

public delegate void Action()

Таким образом, точкой входа должен служить метод, не принимающий никаких параметров и не возвращающий никаких значений. (Как будет показано далее, делега­ ту Action можно также передать аргумент.)

Как только задача будет создана, ее можно запустить на исполнение, вызвав метод Start(). Ниже приведена одна из его форм.

public void Start()

После вызова метода Start() планировщик задач запланирует исполнение задачи. В приведенной ниже программе все изложенное выше демонстрируется на прак­ тике. В этой программе отдельная задача создается на основе метода MyTask(). После того как начнет выполняться метод Main(), задача фактически создается и запускается на исполнение. Оба метода MyTask() и Main() выполняются параллельно.

// Создать и запустить задачу на исполнение.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {
    // Метод выполняемый в качестве задачи.
    static void MyTask() {
        Console.WriteLine("MyTask() запущен");
        for(int count = 0; count < 10; count++) {
            Thread.Sleep(500);
            Console.WriteLine ("В методе MyTask(), подсчет равен " + count);
        }
        Console.WriteLine("MyTask завершен");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объект задачи.
        Task tsk = new Task(MyTask);

        // Запустить задачу на исполнение.
        tsk.Start();

        // метод Main() активным до завершения метода MyTask().
        for(int i = 0; i < 60; i++) {
            Console.Write(".");
            Thread.Sleep(100);
        }
        Console.WriteLine("Основной поток завершен.");
    }
}

Ниже приведен результат выполнения этой программы. (У вас он может несколько отличаться в зависимости от загрузки задач, операционной системы и прочих факторов.)

Основной поток запущен.
.MyTask() запущен
.....В методе MyTask(), подсчет равен 0
.....В методе MyTask(), подсчет равен 1
.....В методе MyTask(), подсчет равен 2
.....В методе MyTask(), подсчет равен 3
.....В методе MyTask(), подсчет равен 4
.....В методе MyTask(), подсчет равен 5
.....В методе MyTask(), подсчет равен 6
.....В методе MyTask(), подсчет равен 7
.....В методе MyTask(), подсчет равен 8
.....В методе MyTask(), подсчет равен 9
MyTask завершен
.........Основной поток завершен.

Следует иметь в виду, что по умолчанию задача исполняется в фоновом потоке. Следовательно, при завершении создающего потока завершается и сама задача. Имен­ но поэтому в рассматриваемой здесь программе метод Thread.Sleep() использован для сохранения активным основного потока до тех пор, пока не завершится выполне­ ние метода MyTask(). Как и следовало ожидать, организовать ожидание завершения задачи можно и более совершенными способами, что и будет показано далее.

В приведенном выше примере программы задача, предназначавшаяся для парал­ лельного исполнения, обозначалась в виде статического метода. Но такое требование к задаче не является обязательным. Например, в приведенной ниже программе, которая является переработанным вариантом предыдущей, метод MyTask(), выполняющий роль задачи, инкапсулирован внутри класса.

// Использовать метод экземпляра в качестве задачи.
using System;
using System.Threading;
using System.Threading.Tasks;

class MyClass {
    // Метод выполняемый в качестве задачи.
    public void MyTask() {
        Console.WriteLine("MyTask() запущен");
        for (int count = 0; count < 10; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyTask(), подсчет равен " + count);
        }
        Console.WriteLine("MyTask завершен ");
    }
}

class DemoTask {
    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объект типа MyClass.
        MyClass mc = new MyClass();

        // Сконструировать объект задачи для метода me.MyTask().
        Task tsk = new Task(mc.MyTask);

        // Запустить задачу на исполнение.
        tsk.Start();

        // Сохранить метод Main() активным до завершения метода MyTask().
        for(int i = 0; i < 60; i++) {
            Console.Write(".");
            Thread.Sleep(100);
        }

        Console.WriteLine("Основной поток завершен.");
    }
}

Результат выполнения этой программы получается таким же, как и прежде. Един­ ственное отличие состоит в том, что метод MyTask() вызывается теперь для экземпляра объекта класса MyClass.

В отношении задач необходимо также иметь в виду следующее: после того, как за­ дача завершена, она не может быть перезапущена. Следовательно, иного способа по­ вторного запуска задачи на исполнение, кроме создания ее снова, не существует.

Применение идентификатора задачи

В отличие от класса Thread; в классе Task отсутствует свойство Name для хранения имени задачи. Но вместо этого в нем имеется свойство Id для хранения идентификато­ ра задачи, по которому можно распознавать задачи. Свойство Id доступно только для чтения и относится к типу int. Оно объявляется следующим образом.

public int Id { get; }

Каждая задача получает идентификатор, когда она создается. Значения идентифи­ каторов уникальны, но не упорядочены. Поэтому один идентификатор задачи может появиться перед другим, хотя он может и не иметь меньшее значение.

Идентификатор исполняемой в настоящий момент задачи можно выявить с помо­ щью свойства CurrentId. Это свойство доступно только для чтения, относится к типу static и объявляется следующим образом.

public static Nullable<int> CurrentID { get; }

Оно возвращает исполняемую в настоящий момент задачу или же пустое значение, если вызывающий код не является задачей.

В приведенном ниже примере программы создаются две задачи и показывается, какая из них исполняется.

// Продемонстрировать применение свойств Id и CurrentId.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {
    // Метод, исполняемый как задача.
    static void MyTask() {
        Console.WriteLine("MyTask() №" + Task.CurrentId + " запущен");

        for(int count = 0; count < 10; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyTask() #" + Task.CurrentId +
                            ", подсчет равен " + count );
        }
        Console.WriteLine("MyTask №" + Task.CurrentId + " завершен");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объекты двух задач.
        Task tsk = new Task(MyTask);
        Task tsk2 = new Task(MyTask);

        // Запустить задачи на исполнение,
        tsk.Start();
        tsk2.Start();

        Console.WriteLine("Идентификатор задачи tsk: " + tsk.Id);
        Console.WriteLine("Идентификатор задачи tsk2: " + tsk2.Id);

        // Сохранить метод Main() активным до завершения остальных задач.
        for(int i = 0; i < 60; i++) {
            Console.Write (".");
            Thread.Sleep(100);
        }
        Console.WriteLine("Основной поток завершен.");
    }
}

Выполнение этой программы приводит к следующему результату.

Основной поток запущен
Идентификатор задачи tsk: 1
Идентификатор задачи tsk2: 2
.MyTask() №1 запущен
MyTask() №2 запущен
.....В методе MyTask() №1, подсчет равен 0
В методе MyTask() №2, подсчет равен 0
.....В методе MyTask() №2, подсчет равен 1
В методе MyTask() №1, подсчет равен 1
.....В методе MyTask() №1, подсчет равен 2
В методе MyTask() №2, подсчет равен 2
.....В методе MyTask() №2, подсчет равен 3
В методе MyTask() №1, подсчет равен 3
.....В методе MyTask() №1, подсчет равен 4
В методе MyTask() №2, подсчет равен 4
.....В методе MyTask() №1, подсчет равен 5
В методе MyTask() №2, подсчет равен 5
.....В методе MyTask() №2, подсчет равен 6
В методе MyTask() №1, подсчет равен 6
.....В методе MyTask() №2, подсчет равен 7
В методе MyTask() №1, подсчет равен 7
.....В методе MyTask() №1, подсчет равен 8
В методе MyTask() №2, подсчет равен 8
.....В методе MyTask() №1, подсчет равен 9
MyTask №1 завершен
В методе MyTask() №2, подсчет равен 9
MyTask №2 завершен
.........Основной поток завершен.

Применение методов ожидания

В приведенных выше примерах основной поток исполнения, а по существу, ме­ тод Main(), завершался потому, что такой результат гарантировали вызовы мето­ да Thread.Sleep(). Но подобный подход нельзя считать удовлетворительным. Организовать ожидание завершения задач можно и более совершенным способом, применяя методы ожидания, специально предоставляемые в классе Task. Самым про­ стым из них считается метод Wait(), приостанавливающий исполнение вызывающего потока до тех пор, пока не завершится вызываемая задача. Ниже приведена простей­ шая форма объявления этого метода.

public void Wait()

При выполнении этого метода могут быть сгенерированы два исключения. Первым из них является исключение ObjectDisposedException. Оно генерируется в том случае, если задача освобождена посредством вызова метода Dispose(). А второе ис­ ключение, AggregateException, генерируется в том случае, если задача сама генери­ рует исключение или же отменяется. Как правило, отслеживается и обрабатывается именно это исключение. В связи с тем что задача может сгенерировать не одно ис­ ключение, если, например, у нее имеются порожденные задачи, все подобные исклю­ чения собираются в единое исключение типа AggregateException. Для того чтобы выяснить, что же произошло на самом деле, достаточно проанализировать внутренние исключения, связанные с этим совокупным исключением. А до тех пор в приведенных далее примерах любые исключения, генерируемые задачами, будут обрабатываться во время выполнения.

Ниже приведен вариант предыдущей программы, измененный с целью продемон­ стрировать применение метода Wait() на практике. Этот метод используется внутри метода Main(), чтобы приостановить его выполнение до тех пор, пока не завершатся обе задачи tsk и tsk2.

// Применить метод Wait().
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {
    // Метод, исполняемый как задача.
    static void MyTask() {
        Console.WriteLine("MyTask() №" + Task.CurrentId + " запущен");
        for(int count = 0; count < 10; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyTask() #" + Task.CurrentId +
                            ", подсчет равен " + count );
        }
        Console.WriteLine("MyTask №" + Task.CurrentId + " завершен");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объекты двух задач.
        Task tsk = new Task(MyTask);
        Task tsk2 = new Task(MyTask);

        // Запустить задачи на исполнение.
        tsk.Start();
        tsk2.Start();

        Console.WriteLine("Идентификатор задачи tsk: " + tsk.Id);
        Console.WriteLine("Идентификатор задачи tsk2: " + tsk2.Id);

        // Приостановить выполнение метода Main() до тех пор,
        // пока не завершатся обе задачи tsk и tsk2
        tsk.Wait();
        tsk2.Wait();
        Console.WriteLine("Основной поток завершен.");
    }
}

При выполнении этой программы получается следующий результат.

Основной поток запущен
Идентификатор задачи tsk: 1
Идентификатор задачи tsk2: 2
MyTask() №1 запущен
MyTask() №2 запущен
В методе MyTask() №1, подсчет равен 0
В методе MyTask() №2, подсчет равен 0
В методе MyTask() №1, подсчет равен 1
В методе MyTask() №2, подсчет равен 1
В методе MyTask() №1, подсчет равен 2
В методе MyTask() №2, подсчет равен 2
В методе MyTask() №1, подсчет равен 3
В методе MyTask() №2, подсчет равен 3
В методе MyTask() №1, подсчет равен 4
В методе MyTask() №2, подсчет равен 4
В методе MyTask() №1, подсчет равен 5
В методе MyTask() №2, подсчет равен 5
В методе MyTask() №1, подсчет равен 6
В методе MyTask() №2, подсчет равен 6
В методе MyTask() №1, подсчет равен 7
В методе MyTask() №2, подсчет равен 7
В методе MyTask() №1, подсчет равен 8
В методе MyTask() №2, подсчет равен 8
В методе MyTask() №1, подсчет равен 9
MyTask №1 завершен
В методе MyTask() №2, подсчет равен 9
MyTask №2 завершен
Основной поток завершен.

Как следует из приведенного выше результата, выполнение метода Main() приоста­ навливается до тех пор, пока не завершатся обе задачи tsk и tsk2. Следует, однако, иметь в виду, что в рассматриваемой здесь программе последовательность завершения задач tsk и tsk2 не имеет особого значения для вызовов метода Wait(). Так, если первой за­ вершается задача tsk2, то в вызове метода tsk.Wait() будет по-прежнему ожидаться завершение задачи tsk. В таком случае вызов метода tsk2.Wait() приведет к выполне­ нию и немедленному возврату из него, поскольку задача tsk2 уже завершена.

В данном случае оказывается достаточно двух вызовов метода Wait(), но того же результата можно добиться и более простым способом, воспользовавшись методом WaitAll(). Этот метод организует ожидание завершения группы задач. Возврата из него не произойдет до тех пор, пока не завершатся все задачи. Ниже приведена про­ стейшая форма объявления этого метода.

public static void WaitAll(params Task[] tasks)

Задачи, завершения которых требуется ожидать, передаются с помощью пара­ метра в виде массива tasks. А поскольку этот параметр относится к типу params, то данному методу можно отдельно передать массив объектов типа Task или спи­ сок задач. При этом могут быть сгенерированы различные исключения, включая и AggregateException.

Для того чтобы посмотреть, как метод WaitAll() действует на практике, замените в приведенной выше программе следующую последовательность вызовов.

tsk.Wait();
tsk2.Wait();

на

Task.WaitAll(tsk, tsk2);

Программа будет работать точно так же, но логика ее выполнения станет более понятной.

Организуя ожидание завершения нескольких задач, следует быть особенно внима­ тельным, чтобы избежать взаимоблокировок. Так, если две задачи ожидают заверше­ ния друг друга, то вызов метода WaitAll() вообще не приведет к возврату из него. Разумеется, условия для взаимоблокировок возникают в результате ошибок програм­ мирования, которых следует избегать. Следовательно, если вызов метода WaitAll() не приводит к возврату из него, то следует внимательно проанализировать, могут ли две задачи или больше взаимно блокироваться. (Вызов метода Wait(), который не приводит к возврату из него, также может стать причиной взаимоблокировок.) Иногда требуется организовать ожидание до тех пор, пока не завершится любая из группы задач. Для этой цели служит метод WaitAny(). Ниже приведена простейшая форма его объявления.

public static int WaitAny(params Task[] tasks)

Задачи, завершения которых требуется ожидать, передаются с помощью параме­ тра в виде массива tasks объектов типа Task или отдельного списка аргументов типа Task. Этот метод возвращает индекс задачи, которая завершается первой. При этом могут быть сгенерированы различные исключения.

Попробуйте применить метод WaitAny() на практике, подставив в предыдущей программе следующий вызов.

Task.WaitAny(tsk, tsk2);

Теперь, выполнение метода Main() возобновится, а программа завершится, как только завершится одна из двух задач.

Помимо рассматривавшихся здесь форм методов Wait(), WaitAll() и WaitAny(), имеются и другие их варианты, в которых можно указывать период про­ стоя или отслеживать признак отмены. (Подробнее об отмене задач речь пойдет да­ лее в этой главе.)

Вызов метода Dispose()

В классе Task реализуется интерфейс IDisposable, в котором определяется метод Dispose(). Ниже приведена форма его объявления.

public void Dispose()

Метод Dispose() реализуется в классе Task, освобождая ресурсы, используемые этим классом. Как правило, ресурсы, связанные с классом Task, освобождаются авто­ матически во время "сборки мусора" (или по завершении программы). Но если эти ресурсы требуется освободить еще раньше, то для этой цели служит метод Dispose(). Это особенно важно в тех программах, где создается большое число задач, оставляемых на произвол судьбы.

Следует, однако, иметь в виду, что метод Dispose() можно вызывать для отдель­ ной задачи только после ее завершения. Следовательно, для выяснения факта завер­ шения отдельной задачи, прежде чем вызывать метод Dispose(), потребуется неко­ торый механизм, например, вызов метода Wait(). Именно поэтому так важно было рассмотреть метод Wait(), перед тем как обсуждать метод Dispose(). Ели же по­ пытаться вызвать Dispose() для все еще активной задачи, то будет сгенерировано ис­ ключение InvalidOperationException.

Во всех примерах, приведенных в этой главе, создаются довольно короткие задачи, которые фазу же завершаются, и поэтому применение метода Dispose() в этих приме­ рах не дает никаких преимуществ. (Именно по этой причине вызывать метод Dispose() в приведенных выше программах не было никакой необходимости. Ведь все они заверша­ лись, как только завершалась задача, что в конечном итоге приводило к освобождению от остальных задач.) Но в целях демонстрации возможностей данного метода и во избежа­ ние каких-либо недоразумений метод Dispose() будет вызываться явным образом при непосредственном обращении с экземплярами объектов типа Task во всех последующих примерах программ. Если вы обнаружите отсутствие вызовов метода Dispose() в ис­ ходном коде, полученном из других источников, то не удивляйтесь этому. Опять же, если программа завершается, как только завершится задача, то вызывать метод Dispose() нет никакого смысла — разве что в целях демонстрации его применения.

Применение класса TaskFactory для запуска задачи

Приведенные выше примеры программы были составлены не так эффективно, как следовало бы, поскольку задачу можно создать и сразу же начать ее исполнение, вы­ звав метод StartNew(), определенный в классе TaskFactory. В классе TaskFactory предоставляются различные методы, упрощающие создание задач и управление ими. По умолчанию объект класса TaskFactory может быть получен из свойства Factory, доступного только для чтения в классе Task. Используя это свойство, можно вызвать любые методы класса TaskFactory. Метод StartNew() существует во множестве форм. Ниже приведена самая простая форма его объявления:

public Task StartNew(Action action)

где action — точка входа в исполняемую задачу. Сначала в методе StartNew() авто­ матически создается экземпляр объекта типа Task для действия, определяемого па­ раметром action, а затем планируется запуск задачи на исполнение. Следовательно, необходимость в вызове метода Start() теперь отпадает.

Например, следующий вызов метода StartNew() в рассматривавшихся ранее про­ граммах приведет к созданию и запуску задачи tsk одним действием.

Task tsk = Task.Factory.StartNew(MyTask);

После этого оператора сразу же начнет выполняться метод MyTask().

Метод StartNew() оказывается более эффективным в тех случаях, когда задача соз­ дается и сразу же запускается на исполнение. Поэтому именно такой подход и при­ меняется в последующих примерах программ.

Применение лямбда-выражения в качестве задачи

Кроме использования обычного метода в качестве задачи, существует и другой, бо­ лее рациональный подход: указать лямбда-выражение как отдельно решаемую задачу. Напомним, что лямбда-выражения являются особой формой анонимных функций. По­ этому они могут исполняться как отдельные задачи. Лямбда-выражения оказываются особенно полезными в тех случаях, когда единственным назначением метода является решение одноразовой задачи. Лямбда-выражения могут составлять отдельную задачу иди же вызывать другие методы. Так или иначе, применение лямбда-выражения в каче­ стве задачи может стать привлекательной альтернативой именованному методу.

В приведенном ниже примере программы демонстрируется применение лямбда- выражения в качестве задачи. В этой программе код метода MyTask() из предыдущих примеров программ преобразуется в лямбда-выражение.

// Применить лямбда-выражение в качестве задачи.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoLambdaTask {
    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Далее лямбда-выражение используется для определения задачи.
        Task tsk = Task.Factory.StartNew( () => {
            Console.WriteLine("Задача запущена");

            for (int count = 0; count < 10; count++) {
                Thread.Sleep(500);
                Console.WriteLine("Подсчет в задаче равен " + count );
            }

            Console.WriteLine("Задача завершена");
        } );

        // Ожидать завершения задачи tsk.
        tsk.Wait();

        // Освободить задачу tsk.
        tsk.Dispose();
        Console.WriteLine("Основной поток завершен.");
    }
}

Ниже приведен результат выполнения этой программы.

Основной поток запущен.
Задача запущена
Подсчет в задаче равен 0
Подсчет в задаче равен 1
Подсчет в задаче равен 2
Подсчет в задаче равен 3
Подсчет в задаче равен 4
Подсчет в задаче равен 5
Подсчет в задаче равен 6
Подсчет в задаче равен 7
Подсчет в задаче равен 8
Подсчет в задаче равен 9
Задача завершена
Основной поток завершен.

Помимо применения лямбда-выражения для описания задачи, обратите также внимание в данной программе на то, что вызов метода tsk.Dispose() не делается до тех пор, пока не произойдет возврат из метода tsk.Wait(). Как пояснялось в пред­ ыдущем разделе, метод Dispose() можно вызывать только по завершении задачи. Для того чтобы убедиться в этом, попробуйте поставить вызов метода tsk.Dispose() в рассматриваемой здесь программе перед вызовом метода tsk.Wait(). Вы сразу же заметите, что это приведет к исключительной ситуации.

Создание продолжения задачи

Одной из новаторских и очень удобных особенностей библиотеки TPL является воз­ можность создавать продолжение задачи. Продолжение — это одна задача, которая ав­ томатически начинается после завершения другой задачи. Создать продолжение мож­ но, в частности, с помощью метода ContinueWith(), определенного в классе Task. Ниже приведена простейшая форма его объявления:

public Task ContinueWith(Action<Task> действие_продолжения)

где действие_продолжения обозначает задачу, которая будет запущена на исполне­ ние по завершении вызывающей задачи. У делегата Action имеется единственный па­ раметр типа Task. Следовательно, вариант делегата Action, применяемого в данном методе, выглядит следующим образом.

public delegate void Action<in T>(T obj)

В данном случае обобщенный параметр T обозначает класс Task. Продолжение задачи демонстрируется на примере следующей программы.

// Продемонстрировать продолжение задачи.
using System;
using System.Threading;
using System.Threading.Tasks;

class ContinuationDemo {
    // Метод, исполняемый как задача.
    static void MyTask() {
        Console.WriteLine("MyTask() запущен");

        for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyTask() подсчет равен " + count );
        }

        Console.WriteLine("MyTask завершен");
    }

    // Метод, исполняемый как продолжение задачи.
    static void ContTask(Task t) {
        Console.WriteLine("Продолжение запущено");

        for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В продолжении подсчет равен " + count );
        }

        Console.WriteLine("Продолжение завершено");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объект первой задачи.
        Task tsk = new Task(MyTask);

        // А теперь создать продолжение задачи.
        Task taskCont = tsk.ContinueWith(ContTask);

        // Начать последовательность задач.
        tsk.Start();

        // Ожидать завершения продолжения.
        taskCont.Wait();

        tsk.Dispose();
        taskCont.Dispose();

        Console.WriteLine("Основной поток завершен.");
    }
}

Ниже приведен результата выполнения данной программы.

Основной поток запущен.
MyTask() запущен
В методе MyTask() подсчет равен 0
В методе MyTask() подсчет равен 1
В методе MyTask() подсчет равен 2
В методе MyTask() подсчет равен 3
В методе MyTask() подсчет равен 4
MyTask() завершен
Продолжение запущено
В продолжении подсчет равен 0
В продолжении подсчет равен 1
В продолжении подсчет равен 2
В продолжении подсчет равен 3
В продолжении подсчет равен 4
Продолжение завершено
Основной поток завершен.

Как следует из приведенного выше результата, вторая задача не начинается до тех пор, пока не завершится первая. Обратите также внимание на то, что в методе Main() пришлось ожидать окончания только продолжения задачи. Дело в том, что метод MyTask() как задача завершается еще до начала метода ContTask как продолжения задачи. Следовательно, ожидать завершения метода MyTask() нет никакой надобно­ сти, хотя если и организовать такое ожидание, то в этом будет ничего плохого.

Любопытно, что в качестве продолжения задачи нередко применяется лямбда- выражение. Для примера ниже приведен еще один способ организации продолжения задачи из предыдущего примера программы.

// В данном случае в качестве продолжения задачи применяется лямбда-выражение.
Task taskCont = tsk.ContinueWith((first) =>
    {
        Console.WriteLine("Продолжение запущено");
        for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В продолжении подсчет равен " + count );
        }
        Console.WriteLine("Продолжение завершено");
    }
};

В этом фрагменте кода параметр first принимает предыдущую задачу (в данном случае — tsk).

Помимо метода ContinueWith(), в классе Task предоставляются и другие методы, поддерживающие продолжение задачи, обеспечиваемое классом TaskFactory. К их чис­ лу относятся различные формы методов ContinueWhenAny() и ContinueWhenAll(), которые продолжают задачу, если завершится любая или все указанные задачи соот­ ветственно.

Возврат значения из задачи

Задача может возвращать значение. Это очень удобно по двум причинам. Во-первых, это означает, что с помощью задачи можно вычислить некоторый резуль­ тат. Подобным образом поддерживаются параллельные вычисления. И во-вторых, вы­ зывающий процесс окажется блокированным до тех пор, пока не будет получен ре­ зультат. Это означает, что для организации ожидания результата не требуется никакой особой синхронизации.

Для того чтобы возвратить результат из задачи, достаточно создать эту задачу, ис­ пользуя обобщенную форму Task класса Task. Ниже приведены два кон­ структора этой формы класса Task:

public Task(Func<TResult> функция)
public Task(Func<Object, TResult> функция, Object состояние)

где функция обозначает выполняемый делегат. Обратите внимание на то, что он дол­ жен быть типа Func, а не Action. Тип Func используется именно в тех случаях, когда задача возвращает результат. В первом конструкторе создается задача без аргументов, а во втором конструкторе — задача, принимающая аргумент типа Object, передавае­ мый как состояние. Имеются также другие конструкторы данного класса.

Как и следовало ожидать, имеются также другие варианты метода StartNew(), доступные в обобщенной форме класса TaskFactory и поддерживающие возврат результата из задачи. Ниже приведены те варианты данного метода, которые применяются параллельно с только что рассмотренными конструкторами класса Task.

public Task<TResult> StartNew(Func<TResult> функция)
public Task<TResult> StartNew(Func<Object,TResult> функция, Object состояние)

В любом случае значение, возвращаемое задачей, подучается из свойства Result в классе Task, которое определяется следующим образом.

public TResult Result { get; internal set; }

Аксессор set является внутренним для данного свойства, и поэтому оно оказывает­ ся доступным во внешнем коде, по существу, только для чтения. Следовательно, задача получения результата блокирует вызывающий код до тех пор, пока результат не будет вычислен.

В приведенном ниже примере программы демонстрируется возврат задачей значе­ ний. В этой программе создаются два метода. Первый из них, MyTask(), не принимает параметров, а просто возвращает логическое значение true типа bool. Второй метод, SumIt(), принимает единственный параметр, который приводится к типу int, и воз­ вращает сумму из значения, передаваемого в качестве этого параметра.

// Возвратить значение из задачи.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {
    // Простейший метод, возвращающий результат и не принимающий аргументов.
    static bool MyTask() {
        return true;
    }

    // Этот метод возвращает сумму из положительного целого значения,
    // которое ему передается в качестве единственного параметра
    static int Sumlt(object v) {
        int x = (int) v;
        int sum = 0;

        for(; x > 0; x--)
            sum += x;

        return sum;
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Сконструировать объект первой задачи.
        Task<bool> tsk = Task<bool>.Factory.StartNew(MyTask);

        Console.WriteLine("Результат после выполнения задачи MyTask: " +
                        tsk.Result);

        // Сконструировать объект второй задачи.
        Task<int> tsk2 = Task<int>.Factory.StartNew(Sumlt, 3);

        Console.WriteLine("Результат после выполнения задачи Sumlt: " +
                        tsk2.Result);

        tsk.Dispose();
        tsk2.Dispose();

        Console.WriteLine("Основной поток завершен.");
    }
}

Выполнение этой программы приводит к следующему результату.

Основной поток запущен.
Результат после выполнения задачи MyTask: True
Результат после выполнения Sumlt: 6
Основной поток завершен.

Помимо упомянутых выше форм класса Task<TResult> и метода StartNew<TResult>, имеются также другие формы. Они позволяют указывать другие дополнительные параметры.

Отмена задачи и обработка исключения AggregateException

В версии 4.0 среды .NET Framework внедрена новая подсистема, обеспечивающая структурированный, хотя и очень удобный способ отмены задачи. Эта новая подсисте­ ма основывается на понятии признака отмены. Признаки отмены поддерживаются в классе Task, среди прочего, с помощью фабричного метода StartNew().

ПРИМЕЧАНИЕ Новую подсистему отмены можно применять и для отмены потоков, рассматривавшихся в предыдущей главе, но она полностью интегрирована в TPL и PLINQ. Именно поэтому эта подсистема рассматривается в этой главе.

Отмена задачи, как правило, выполняется следующим образом. Сначала полу­ чается признак отмены из источника признаков отмены. Затем этот признак пере­ дается задаче, после чего она должна контролировать его на предмет получения за­ проса на отмену. (Этот запрос может поступить только из источника признаков отмены.) Если получен запрос на отмену, задача должна завершиться. В одних слу­ чаях этого оказывается достаточно для простого прекращения задачи без каких- либо дополнительных действий, а в других — из задачи должен быть вызван метод ThrowIfCancellationRequested() для признака отмены. Благодаря этому в отме­ няющем коде становится известно, что задача отменена. А теперь рассмотрим процесс отмены задачи более подробно.

Признак отмены является экземпляром объекта типа CancellationToken, т.е. структуры, определенной в пространстве имен System.Threading. В струк­ туре CancellationToken определено несколько свойств и методов, но мы вос­ пользуемся двумя из них. Во-первых, это доступное только для чтения свойство IsCancellationRequested, которое объявляется следующим образом.

public bool IsCancellationRequested { get; }

Оно возвращает логическое значение true, если отмена задачи была запрошена для вызывающего признака, а иначе — логическое значение false. И во-вторых, это метод ThrowIfCancellationRequested(), который объявляется следующим образом.

public void ThrowIfCancellationRequested()

Если признак отмены, для которого вызывается этот метод, получил запрос на от­ мену, то в данном методе генерируется исключение OperationCanceledException. В противном случае никаких действий не выполняется. В отменяющем коде можно организовать отслеживание упомянутого исключения с целью убедиться в том, что отмена задачи действительно произошла. Как правило, с этой целью сначала пере­ хватывается исключение AggregateException, а затем его внутреннее исключение анализируется с помощью свойства InnerException или InnerExceptions. (Свой­ ство InnerExceptions представляет собой коллекцию исключений. Подробнее о кол­ лекциях речь пойдет в главе 25.)

Признак отмены получается из источника признаков отмены, который пред­ ставляет собой объект класса CancellationTokenSource, определенного в про­ странстве имен System. Threading. Для того чтобы получить данный признак, нуж­ но создать сначала экземпляр объекта типа CancellationTokenSource. (С этой целью можно воспользоваться вызываемым по умолчанию конструктором класса CancellationTokenSource.) Признак отмены, связанный с данным источником, ока­ зывается доступным через используемое только для чтения свойство Token, которое объявляется следующим образом.

public CancellationToken Token { get; }

Это и есть тот признак, который должен быть передан отменяемой задаче.

Для отмены в задаче должна быть получена копия признака отмены и организо­ ван контроль этого признака с целью отслеживать саму отмену. Такое отслеживание можно организовать тремя способами: опросом, методом обратного вызова и с по­ мощью дескриптора ожидания. Проще всего организовать опрос, и поэтому здесь бу­ дет рассмотрен именно этот способ. С целью опроса в задаче проверяется упомянутое выше свойство IsCancellationRequested признака отмены. Если это свойство со­ держит логическое значение true, значит, отмена была запрошена, и задача долж­ на быть завершена. Опрос может оказаться весьма эффективным, если организовать его правильно. Так, если задача содержит вложенные циклы, то проверка свойства IsCancellationRequested во внешнем цикле зачастую дает лучший результат, чем его проверка на каждом шаге внутреннего цикла.

Для создания задачи, из которой вызывается метод ThrowIfCancellationRequested(), когда она отменяется, обычно требуется передать признак отмены как самой задаче, так и конструктору класса Task, будь то непосредственно или же косвенно через метод StartNew(). Передача признака отмены самой задаче позволяет изменить состояние от­ меняемой задачи в запросе на отмену из внешнего кода. Далее будет использована сле­ дующая форма метода StartNew().

public Task StartNew(Action<Object> action, Object состояние,
                    CancellationToken признак_отмены)

В этой форме признак отмены передается через параметры, обозначаемые как состояние и признак_отмены. Это означает, что признак отмены будет передан как делегату, реализующему задачу, так и самому экземпляру объекта типа Task. Ниже приведена форма, поддерживающая делегат Action.

public delegate void Action<in T>(T obj)

В данном случае обобщенный параметр Т обозначает тип Object. В силу этого объект obj должен быть приведен внутри задачи к типу CancellationToken.

И еще одно замечание: по завершении работы с источником признаков отмены следует освободить его ресурсы, вызвав метод Dispose().

Факт отмены задачи может быть проверен самыми разными способами. Здесь при­ меняется следующий подход: проверка значения свойства IsCanceled для экземпля­ ра объекта типа Task. Если это логическое значение true, то задача была отменена.

В приведенной ниже программе демонстрируется отмена задачи. В ней при­ меняется опрос для контроля состояния признака отмены. Обратите внимание на то, что метод ThrowIfCancellationRequested() вызывается после входа в метод MyTask(). Это дает возможность завершить задачу, если она была отмена еще до ее запуска. Внутри цикла проверяется свойство IsCancellationRequested. Если это свойство содержит логическое значение true, а оно устанавливается после вызова ме­ тода Cancel() для экземпляра источника признаков отмены, то на экран выводится сообщение об отмене и далее вызывается метод ThrowIfCancellationRequested() для отмены задачи.

// Простой пример отмены задачи с использованием опроса.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoCancelTask {
    // Метод, исполняемый как задача.
    static void MyTask(Object ct) {
        CancellationToken cancelTok = (CancellationToken) ct;

        // Проверить, отменена ли задача, прежде чем запускать ее.
        cancelTok.ThrowIfCancellationRequested();
        Console.WriteLine("MyTask() запущен");

        for(int count = 0; count < 10; count++) {
            // В данном примере для отслеживания отмены задачи применяется опрос.
            if(cancelTok.IsCancellationRequested) {
                Console.WriteLine("Получен запрос на отмену задачи.");
                cancelTok.ThrowIfCancellationRequested();
            }
            Thread.Sleep(500);
            Console.WriteLine("В методе MyTask() подсчет равен " + count );
        }
        Console.WriteLine("MyTask завершен");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Создать объект источника признаков отмены.
        CancellationTokenSource cancelTokSrc = new CancellationTokenSource();

        // Запустить задачу, передав признак отмены ей самой и делегату.
        Task tsk = Task.Factory.StartNew(MyTask, cancelTokSrc.Token,
                                        cancelTokSrc.Token);

        // Дать задаче возможность исполняться вплоть до ее отмены.
        Thread.Sleep(2000);
        try {
            // Отменить задачу.
            cancelTokSrc.Cancel();

            // Приостановить выполнение метода Main() до тех пор,
            // пока не завершится задача tsk.
            tsk.Wait();
        } catch (AggregateException exc) {
            if(tsk.IsCanceled)
                Console.WriteLine("\nЗадача tsk отменена\n");

            // Для просмотра исключения снять комментарии со следующей строки кода:
            // Console.WriteLine(ехе);
        } finally {
            tsk.Dispose();
            cancelTokSrc.Dispose();
        }
        Console.WriteLine("Основной поток завершен.");
    }
}

Ниже приведен результат выполнения этой программы. Обратите внимание на то что задача отменяется через 2 секунды.

Основной поток запущен.
MyTask() запущен
В методе MyTask() подсчет равен 0
В методе MyTask() подсчет равен 1
В методе MyTask() подсчет равен 2
В методе MyTask() подсчет равен 3
Получен запрос, на отмену задачи.
Задача tsk отменена
Основной поток завершен.

Как следует из приведенного выше результата, выполнение метода MyTask() отменяется в методе Main() лишь две секунды спустя. Следовательно, в методе MyTask() выполняются четыре шага цикла. Когда же перехватывается исключение AggregateException, проверяется состояние задачи. Если задача tsk отменена, что и должно произойти в данном примере, то об этом выводится соответствующее со­ общение. Следует, однако, иметь в виду, что когда сообщение AggregateException генерируется в ответ на отмену задачи, то это еще не свидетельствует об ошибке, а про­ сто означает, что задача была отменена.

Выше были изложены лишь самые основные принципы, положенные в основу от­ мены задачи и генерирования исключения AggregateException. Тем не менее эта тема намного обширнее и требует от вас самостоятельного и углубленного изучения, если вы действительно хотите создавать высокопроизводительные, масштабируемые приложения.

Другие средства организации задач

В предыдущих разделах был описан ряд понятий и основных способов организа­ ции и исполнения задач. Но имеются и другие полезные средства. В частности, задачи можно делать вложенными, когда одни задачи способны создавать другие, или же по­ рожденными, когда вложенные задачи оказываются тесно связанными с создающей их задачей.

В предыдущем разделе было дано краткое описание исключения AggregateException, но у него имеются также другие особенности, которые могут оказаться весьма полезными. К их числу относится метод Flatten(), применяемый для преоб­ разования любых внутренних исключений типа AggregateException в единственное исключение AggregateException. Другой метод, Handle(), служит для обработки исключения, составляющего совокупное исключение AggregateException.

При создании задачи имеется возможность указать различные дополнительные параметры, оказывающие влияние на особенности ее исполнения. Для этой цели ука­ зывается экземпляр объекта типа TaskCreationOptions в конструкторе класса Task или же в фабричном методе StartNew(). Кроме того, в классе TaskFactory доступно целое семейство методов FromAsync(), поддерживающих модель асинхронного про­ граммирования (АРМ — Asynchronous Programming Model).

Как упоминалось ранее в этой главе, задачи планируются на исполнение экзем­ пляром объекта класса TaskScheduler. Как правило, для этой цели предоставляется планировщик, используемый по умолчанию в среде .NET Framework. Но этот плани­ ровщик может быть настроен под конкретные потребности разработчика. Кроме того, допускается применение специализированных планировщиков задач.

Класс Parallel

В примерах, приведенных до сих пор в этой главе, демонстрировались ситуации, в которых библиотека TPL использовалась таким же образом, как и класс Thread. Но это было лишь самое элементарное ее применение, поскольку в TPL имеются и другие средства. К их числу относится класс Parallel, который упрощает параллельное ис­ полнение кода и предоставляет методы, рационализирующие оба вида параллелизма: данных и задач.

Класс Parallel является статическим, и в нем определены методы For(), ForEach() и Invoke(). У каждого из этих методов имеются различные формы. В част­ ности, метод For() выполняет распараллеливаемый цикл for, а метод ForEach() — распараллеливаемый цикл foreach, и оба метода поддерживают параллелизм дан­ ных. А метод Invoke() поддерживает параллельное выполнение двух методов иди больше. Как станет ясно дальше, эти методы дают преимущество реализации на практике распространенных методик параллельного программирования, не прибегая к управлению задачами иди потоками явным образом. В последующих разделах каж­ дый из этих методов будет рассмотрен более подробно.

Распараллеливание задач методом Invoke()

Метод Invoke(), определенный в классе Parallel, позволяет выполнять один иди несколько методов, указываемых в виде его аргументов. Он также масштабирует исполнение кода, используя доступные процессоры, если имеется такая возможность. Ниже приведена простейшая форма его объявления.

public static void Invoke(params Action[] actions)

Выполняемые методы должны быть совместимы с описанным ранее делегатом Action. Напомним, что делегат Action объявляется следующим образом.

public delegate void Action()

Следовательно, каждый метод, передаваемый методу Invoke() в качестве аргумен­ та, не должен ни принимать параметров, ни возвращать значение. Благодаря тому что параметр actions данного метода относится к типу params, выполняемые методы могут быть указаны в виде переменного списка аргументов. Для этой цели можно так­ же воспользоваться массивом объектов типа Action, но зачастую оказывается проще указать список аргументов.

Метод Invoke() сначала инициирует выполнение, а затем ожидает завершения всех передаваемых ему методов. Это, в частности, избавляет от необходимости (да и не позволяет) вызывать метод Wait(). Все функции параллельного выполнения метод Wait() берет на себя. И хотя это не гарантирует, что методы будут действительно вы­ полняться параллельно, тем не менее, именно такое их выполнение предполагается, если система поддерживает несколько процессоров. Кроме того, отсутствует возмож­ ность указать порядок выполнения методов от первого и до последнего, и этот поря­ док не может быть таким же, как и в списке аргументов.

В приведенном ниже примере программы демонстрируется применение метода Invoke() на практике. В этой программе два метода MyMeth() и MyMeth2() выпол­ няются параллельно посредством вызова метода Invoke(). Обратите внимание на простоту организации данного процесса.

// Применить метод Parallel.Invoke() для параллельного выполнения двух методов.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoParallel {
    // Метод, исполняемый как задача.
    static void MyMeth() {
        Console.WriteLine("MyMeth запущен");

        for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyMeth подсчет равен " + count );
        }

        Console.WriteLine("MyMeth завершен");
    }

    // Метод, исполняемый как задача.
    static void MyMeth2() {
        Console.WriteLine("MyMeth2 запущен");

        for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("В методе MyMeth2, подсчет равен " + count );
        }

        Console.WriteLine("MyMeth2 завершен");
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Выполнить параллельно два именованных метода.
        Parallel.Invoke(MyMeth, MyMeth2);

        Console.WriteLine("Основной поток завершен.");
    }
}

Выполнение этой программы может привести к следующему результату.

Основной поток запущен.
MyMeth() запущен
MyMeth2() запущен
В методе MyMeth() подсчет равен 0
В методе MyMeth2() подсчет равен 0
В методе MyMeth() подсчет равен 1
В методе MyMeth2() подсчет равен 1
В методе MyMeth() подсчет равен 2
В методе MyMeth2() подсчет равен 2
В методе MyMeth() подсчет равен 3
В методе MyMeth2() подсчет равен 3
В методе MyMeth() подсчет равен 4
MyMeth() завершен
В методе MyMeth2() подсчет равен 4
MyMeth2() завершен
Основной поток завершен.

В данном примере особое внимание обращает на себя следующее обстоятель­ ство: выполнение метода Main() приостанавливается до тех пор, пока не произой­ дет возврат из метода Invoke(). Следовательно, метод Main(), в отличие от мето­ дов MyMeth() и MyMeth2(), не выполняется параллельно. Поэтому применять метод Invoke() показанным здесь способом нельзя в том случае, если требуется, чтобы ис­ полнение вызывающего потока продолжалось.

В приведенном выше примере использовались именованные методы, но для вы­ зова метода Invoke() это условие не является обязательным. Ниже приведен переде­ ланный вариант той же самой программы, где в качестве аргументов в вызове метода Invoke() применяются лямбда-выражения.

// Применить метод Parallel.Invoke() для параллельного выполнения двух методов.
// В этой версии программы применяются лямбда-выражения.
using System;
using System.Threading;
using System.Threading.Tasks;

class DemoParallel {
    static void Main() {
        Console.WriteLine("Основной поток запущен.");
        // Выполнить два анонимных метода, указываемых в лямбда-выражениях.
        Parallel.Invoke( () => {
                Console.WriteLine("Выражение #1 запущено");

                for (int count = 0; count < 5; count++) {
                    Thread.Sleep(500);
                    Console.WriteLine("В выражении #1 подсчет равен " + count );
                }

                Console.WriteLine("Выражение #1 завершено");
            },
            () => {
                Console.WriteLine("Выражение #2 запущено");

                for(int count = 0; count < 5; count++) {
                    Thread.Sleep(500);
                    Console.WriteLine("В выражении #2 подсчет равен " + count );
                }

                Console.WriteLine("Выражение #2 завершено");
            }
        );
        Console.WriteLine("Основной поток завершен.");
    }
}

Эта программа дает результат, похожий на результат выполнения предыдущей программы.

Применение метода For()

В TPL параллелизм данных поддерживается, в частности, с помощью метода For(), определенного в классе Parallel. Этот метод существует в нескольких формах. Его рассмотрение мы начнем с самой простой формы, приведенной ниже:

public static ParallelLoopResult
            For (int fromInclusive, int toExclusive, Action<int> body)

где fromInclusive обозначает начальное значение того, что соответствует перемен­ ной управления циклом; оно называется также итерационным, или индексным, зна­ чением; a toExclusive — значение, на единицу больше конечного. На каждом шаге цикла переменная управления циклом увеличивается на единицу. Следовательно, цикл постепенно продвигается от начального значения fromInclusive к конечному значению toExclusive минус единица. Циклически выполняемый код указывается методом, передаваемым через параметр body. Этот метод должен быть совместим с делегатом Action, объявляемым следующим образом.

public delegate void Action<in T>(T obj)

Для метода For() обобщенный параметр T должен быть, конечно, типа int. Значение, передаваемое через параметр obj, будет следующим значением перемен­ ной управления циклом. А метод, передаваемый через параметр body, может быть именованным или анонимным. Метод For() возвращает экземпляр объекта типа ParallelLoopResult, описывающий состояние завершения цикла. Для простых ци­ клов этим значением можно пренебречь. (Более подробно это значение будет рассмо­ трено несколько ниже.)

Главная особенность метода For() состоит в том, что он позволяет, когда такая воз­ можность имеется, распараллелить исполнение кода в цикле. А это, в свою очередь, мо­ жет привести к повышению производительности. Например, процесс преобразования массива в цикле может быть разделен на части таким образом, чтобы разные части мас­ сива преобразовывались одновременно. Следует, однако, иметь в виду, что повышение производительности не гарантируется из-за отличий в количестве доступных процес­ соров в разных средах выполнения, а также из-за того, что распараллеливание мелких циклов может составить издержки, которые превышают сэкономленное время.

В приведенном ниже примере программы демонстрируется применение метода For() на практике. В начале этой программы создается массив data, состоящий из 1000000000 целых значений. Затем вызывается метод For(), которому в качестве "тела" цикла передается метод MyTransform(). Этот метод состоит из ряда операторов, вы­ полняющих произвольные преобразования в массиве data. Его назначение — сыми­ тировать конкретную операцию. Как будет подробнее пояснено несколько ниже, вы­ полняемая операция должна быть нетривиальной, чтобы параллелизм данных принес какой-то положительный эффект. В противном случае последовательное выполнение цикла может завершиться быстрее.

// Применить метод Parallel.For() для организации параллельно
// выполняемого цикла обработки данных.
using System;
using System.Threading.Tasks;

class DemoParallelFor {
    static int[] data;

    // Метод, служащий в качестве тела параллельно выполняемого цикла.
    // Операторы этого цикла просто расходуют время ЦП для целей демонстрации.
    static void MyTrknsform(int i) {
        data[i] = data[i] / 10;

        if(data[i] < 10000) data[i] = 0;
        if(data[i] > 10000 & data[i] < 20000) data[i] = 100;
        if(data[i] > 20000 & data[i] < 30000) data[i] = 200;
        if(data[i] > 30000) data[i] = 300;
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        data = new int[100000000];

        // Инициализировать данные в обычном цикле for.
        for(int i=0; i < data.Length; i++) data[i] = i;

        // Распараллелить цикл методом For().
        Parallel.For(0, data.Length, MyTransform);

        Console.WriteLine("Основной поток завершен.");
    }
}

Эта программа состоит из двух циклов. В первом, стандартном, цикле for ини­ циализируется массив data. А во втором цикле, выполняемом параллельно методом For(), над каждым элементом массива data производится преобразование. Как упо­ миналось выше, это преобразование носит произвольный характер и выбрано лишь для целей демонстрации. Метод For() автоматически разбивает вызовы метода MyTransform() на части для параллельной обработки отдельных порций данных, хранящихся в массиве. Следовательно, если запустить данную программу на компью­ тере с двумя доступными процессорами или больше, то цикл преобразования данных в массиве может быть выполнен методом For() параллельно.

Следует, однако, иметь в виду, что далеко не все циклы могут выполняться эффек­ тивно, когда они распараллеливаются. Как правило, мелкие циклы, а также циклы, состоящие из очень простых операций, выполняются быстрее последовательным спо­ собом, чем параллельным. Именно поэтому цикл for инициализации массива данных не распараллеливается методом For() в рассматриваемой здесь программе. Распарал­ леливание мелких и очень простых циклов может оказаться неэффективным потому, что время, требующееся для организации параллельных задач, а также время, расхо­ дуемое на переключение контекста, превышает время, экономящееся благодаря па­ раллелизму. В подтверждение этого факта в приведенном ниже примере программы создаются последовательный и параллельный варианты цикла for, а для сравнения на экран выводится время выполнения каждого из них.

// Продемонстрировать отличия во времени последовательного
// и параллельного выполнения цикла for.
using System;
using System.Threading.Tasks;
using System.Diagnostics;

class DemoParallelFor {
    static int[] data;

    // Метод, служащий в качестве тела параллельно выполняемого цикла.
    // Операторы этого цикла просто расходуют время ЦП для целей демонстрации.
    static void MyTransform(int i) {
        data[i] = data[i] / 10;

        if(data[i] < 1000) data[i] = 0;
        if(data[i] > 1000 & data[i] < 2000) data[i] = 100;
        if(data[i] > 2000 & data[i] < 3000) data[i] = 200;
        if(data[i] > 3000) data[i] = 300;
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        // Create экземпляр объекта типа Stopwatch
        // для хранения времени выполнения цикла.
        Stopwatch sw = new Stopwatch();
        data = new int[100000000];

        // Инициализировать данные.
        sw.Start();

        // Параллельный вариант инициализации массива в цикле.
        Parallel.For(0, data.Length, (i) => data[i] = i );

        sw.Stop();
        Console.WriteLine("Параллельно выполняемый цикл инициализации: " +
                        "{0} секунд", sw.Elapsed.TotalSeconds);

        sw.Reset();

        sw.Start();

        // Последовательный вариант инициализации массива в цикле.
        for(int i=0; i < data.Length; i++) data[i] = i;

        sw.Stop();

        Console.WriteLine("Последовательно выполняемый цикл инициализации: " +
                        "{0} секунд", sw.Elapsed.TotalSeconds);
        Console.WriteLine();

        // Выполнить преобразования.
        sw.Start();

        // Параллельный вариант преобразования данных в цикле.
        Parallel.For(0, data.Length, MyTransform);

        sw.Stop();

        Console.WriteLine("Параллельно выполняемый цикл преобразования: " +
                        "{0} секунд", sw.Elapsed.TotalSeconds);
        sw.Reset();

        sw.Start();

        // Последовательный вариант преобразования данных в цикле.
        for(int i=0; i < data.Length; i++) MyTransform(i);

        sw.Stop();

        Console.WriteLine("Последовательно выполняемый цикл преобразования: " +
                        "(0) секунд", sw.Elapsed.TotalSeconds);

        Console.WriteLine("Основной поток завершен.");
    }
}

При выполнении этой программы на двухъядерном компьютере получается сле­ дующий результат.

Основной поток запущен.
Параллельно выполняемый цикл инициализации: 1.0537757 секунд
Последовательно выполняемый цикл инициализации: 0.3457628 секунд

Параллельно выполняемый цикл преобразования: 4.2246675 секунд
Последовательно выполняемый цикл преобразования: 5.3849959 секунд
Основной поток завершен.

Прежде всего, обратите внимание на то, что параллельный вариант цикла ини­ циализации массива данных выполняется приблизительно в три раза медленнее, чем последовательный. Дело в том, что в данном случае на операцию присваивания рас­ ходуется так мало времени, что издержки на дополнительно организуемое распарал­ леливание превышают экономию, которую оно дает. Обратите далее внимание на то, что параллельный вариант цикла преобразования данных выполняется быстрее, чем последовательный. В данном случае экономия от распараллеливания с лихвой возме­ щает издержки на его дополнительную организацию.

ПРИМЕЧАНИЕ Как правило, в отношении преимуществ, которые дает распараллеливание различных видов циклов, следует руководствоваться текущими рекомендациями корпорации Microsoft. Кроме того, необходимо убедиться в том, что распараллеливание цикла действительно при­ водит к повышению производительности, прежде чем использовать такой цикл в окончатель­ но выпускаемом прикладном коде.

Что касается приведенной выше программы, то необходимо упомянуть о двух других ее особенностях. Во-первых, обратите внимание на то, что в параллельно вы­ полняемом цикле для инициализации данных применяется лямбда-выражение, как показано ниже.

Parallel.For(0, data.Length, (i) => data[i] = i );

Здесь "тело" цикла указывается в лямбда-выражении. (Напомним, что в лямбда- выражении создается анонимный метод.) Следовательно, для параллельного выполне­ ния методом For() совсем не обязательно указывать именованный метод.

И во-вторых, обратите внимание на применение класса Stopwatch для вычисле­ ния времени выполнения цикла. Этот класс находится в пространстве имен System. Diagnostics. Для того чтобы воспользоваться им, достаточно создать экземпляр его объекта, а затем вызвать метод Start(), начинающий отчет времени, и далее — метод Stop(), завершающий отсчет времени. А с помощью метода Reset() отсчет времени сбрасывается в исходное состояние. Продолжительность выполнения можно получить различными способами. В рассматриваемой здесь программе для этой цели исполь­ зовано свойство Elapsed, возвращающее объект типа TimeSpan. С помощью этого объекта и свойства TotalSeconds время отображается в секундах, включая и доли се­ кунды. Как показывает пример рассматриваемой здесь программы, класс Stopwatch оказывается весьма полезным при разработке параллельно исполняемого кода.

Как упоминалось выше, метод For() возвращает экземпляр объекта типа ParallelLoopResult. Эго структура, в которой определяются два следующих свойства.

public bool IsCompleted { get; }
public Nullable<long> LowestBreakIteration { get; }

Свойство IsCompleted будет иметь логическое значение true, если выполнены все шаги цикла. Иными словами, при нормальном завершении цикла это свойство будет содержать логическое значение true. Если же выполнение цикла прервется раньше времени, то данное свойство будет содержать логическое значение false. Свойство LowestBreakIteration будет содержать наименьшее значение пере­ менной управления циклом, если цикл прервется раньше времени вызовом метода ParallelLoopState.Break().

Для доступа к объекту типа ParallelLoopState следует использовать форму ме­ тода For(), делегат которого принимает в качестве второго параметра текущее состоя­ ние цикла. Ниже эта форма метода For() приведена в простейшем виде.

public static ParallelLoopResult For(int fromInclusive, int toExclusive,
                                ActionCint, ParallelLoopState> body)

В данной форме делегат Action, описывающий тело цикла, определяется следую­ щим образом.

public delegate void Action<in T1, in T2>(T argl, T2 arg2)

Для метода For() обобщенный параметр T1 должен быть типа int, а обобщенный параметр Т2 — типа ParallelLoopState. Всякий раз, когда делегат Action вызыва­ ется, текущее состояние цикла передается в качестве аргумента аrg2.

Для преждевременного завершения цикла следует воспользоваться методом Break(), вызываемым для экземпляра объекта типа ParallelLoopState внутри тела цикла, определяемого параметром body. Метод Break() объявляется следующим образом.

public void Break()

Вызов метода Break() формирует запрос на как можно более раннее прекращение параллельно выполняемого цикла, что может произойти через несколько шагов цикла после вызова метода Break(). Но все шаги цикла до вызова метода Break() все же вы­ полняются. Следует, также иметь в виду, что отдельные части цикла могут и не выпол­ няться параллельно. Так, если выполнено 10 шагов цикла, то это еще не означает, что все эти 10 шагов представляют 10 первых значений переменной управления циклом.

Прерывание цикла, параллельно выполняемого методом For(), нередко оказыва­ ется полезным при поиске данных. Так, если искомое значение найдено, то продол­ жать выполнение цикла нет никакой надобности. Прерывание цикла может оказаться полезным и в том случае, если во время очередной операции встретились недостовер­ ные данные.

В приведенном ниже примере программы демонстрируется применение мето­ да Break() для прерывания цикла, параллельно выполняемого методом For(). Это вариант предыдущего примера, переработанный таким образом, чтобы метод MyTransform() принимал теперь объект типа ParallelLoopState в качестве своего параметра, а метод Break() вызывался при обнаружении отрицательного значения в массиве данных. Отрицательное значение, по которому прерывается выполнение цикла, вводится в массив data внутри метода Main(). Далее проверяется состояние завершения цикла преобразования данных. Свойство IsCompleted будет содержать логическое значение false, поскольку в массиве data обнаруживается отрицательное значение. При этом на экран выводится номер шага, на котором цикл был прерван. (В этой программе исключены все избыточные циклы, применявшиеся в ее предыду­ щей версии, а оставлены только самые эффективные из них: последовательно выпол­ няемый цикл инициализации и параллельно выполняемый цикл преобразования.)

// Использовать объекты типа ParallelLoopResult и ParallelLoopState, а также
// метод Break() вместе с методом For() для параллельного выполнения цикла.
using System;
using System.Threading.Tasks;

class DemoParallelForWithLoopResult {
    static int[] data;

    // Метод, служащий в качестве тела параллельно выполняемого цикла.
    // Операторы этого цикла просто расходуют время ЦП для целей демонстрации.
    static void MyTransform(int i, ParallelLoopState pls) {
        // Прервать цикл при обнаружении отрицательного значения.
        if(data[i] < 0) pls.Break();

        data[i] = data[i] / 10;

        if(data[i] < 1000) data[i] = 0;
        if(data[i] > 1000 & data[i] < 2000) data[i] = 100;
        if(data[i] > 2000 s data[i] < 3000) data[i] = 200;
        if(data[i] > 3000) data[i] = 300;
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");
        data = new int[100000000];

        // Инициализировать данные.
        for(int i=0; i < data.Length; i++) data[i] = i;

        // Поместить отрицательное значение в массив data.
        data[1000] = -10;

        // Параллельный вариант инициализации массива в цикле.
        ParallelLoopResult loopResult = Parallel.For(0, data.Length, MyTransform);

        // Проверить, завершился ли цикл.
        if(!loopResult.IsCompleted)
        Console.WriteLine("\nЦикл завершился преждевременно из-за того, " +
                        "что обнаружено отрицательное значение\n" +
                        "на шаге цикла номер " +
                        loopResult.LowestBreakIteration + "\n");
        Console.WriteLine("Основной поток завершен.");
    }
}

Выполнение этой программы может привести, например, к следующему резуль­ тату.

Основной поток запущен.

Цикл завершился преждевременно из-за того, что обнаружено отрицательное значение
на шаге цикла номер 1000

Основной поток завершен.

Как следует из приведенного выше результата, цикл преобразования данных пре­ ждевременно завершается после 1000 шагов. Дело в том, что метод Break() вызывает­ ся внутри метода MyTransform() при обнаружении в массиве данных отрицательного значения.

Помимо двух описанных выше форм метода For() существует и ряд других его форм. В одних из этих форм допускается указывать различные дополнительные пара­ метры, а в других — использовать параметры типа long вместо int для пошагового выполнения цикла. Имеются также формы метода For(), предоставляющие такие до­ полнительные преимущества, как, например, возможность указывать метод, вызывае­ мый по завершении потока каждого цикла.

И еще одно, последнее замечание: если требуется остановить цикл, параллельно выполняемый методом For(), не обращая особого внимания на любые шаги цикла, которые еще могут быть в нем выполнены, то для этой цели лучше воспользоваться методом Stop(), чем методом Break().

Применение метода ForEach()

Используя метод ForEach(), можно создать распараллеливаемый вариант цикла foreach. Существует несколько форм метода ForEach(). Ниже приведена простей­ шая форма его объявления:

public static ParallelLoopResult
            ForEach<TSource>(IEnumerable<TSource> source,
                            Action<TSource> body)

где source обозначает коллекцию данных, обрабатываемых в цикле, a body — метод, который будет выполняться на каждом шаге цикла. Как пояснялось ранее в этой кни­ ге, во всех массивах, коллекциях (описываемых в главе 25) и других источниках данных поддерживается интерфейс IEnumerable. Метод, передаваемый через параметр body, принимает в качестве своего аргумента значение или ссылку на каждый обраба­ тываемый в цикле элемент массива, но не его индекс. А в итоге возвращаются сведения о состоянии цикла.

Аналогично методу For(), параллельное выполнение цикла методом ForEach() можно остановить, вызвав метод Break() для экземпляра объекта типа ParallelLoopState, передаваемого через параметр body, при условии, что исполь­ зуется приведенная ниже форма метода ForEach().

public static ParallelLoopResult
            ForEach<TSource>(IEnumerable<TSource> source,
                            Actior<TSource, ParallelLoopState> body)

В приведенном ниже примере программы демонстрируется применение метода ForEach() на практике. Как и прежде, в данном примере создается крупный массив целых значений. А отличается данный пример от предыдущих тем, что метод, выпол­ няющийся на каждом шаге цикла, просто выводит на консоль значения из массива. Как правило, метод WriteLine() в распараллеливаемом цикле не применяется, пото­ му что ввод-вывод на консоль осуществляется настолько медленно, что цикл оказывает­ ся полностью привязанным к вводу-выводу. Но в данном примере метод WriteLine() применяется исключительно в целях демонстрации возможностей метода ForEach(). При обнаружении отрицательного значения выполнение цикла прерывается вызовом метода Break(). Несмотря на то что метод Break() вызывается в одной задаче, другая задача может по-прежнему выполняться в течение нескольких шагов цикла, прежде чем он будет прерван, хотя это зависит от конкретных условий работы среды выпол­ нения.

// Использовать объекты типа ParallelLoopResult и ParallelLoopState, а также
// метод Break() вместе с методом ForEach() для параллельного выполнения цикла.
using System;
using System.Threading.Tasks;

class DemoParallelForWithLoopResult {
    static int[] data;

    // Метод, служащий в качестве тела параллельно выполняемого цикла.
    // В данном примере переменной v передается значение элемента массива
    // данных, а не индекс этого элемента.
    static void DisplayData(int v, ParallelLoopState pls) {
        // Прервать цикл при обнаружении отрицательного значения.
        if(v < 0) pls.Break();

        Console.WriteLine("Значение: " + v);
    }

    static void Main() {
        Console.WriteLine("Основной поток запущен.");

        data = new int[100000000];

        // Инициализировать данные.
        for(int i=0; i < data.Length; i++) data[i] = i;

        // Поместить отрицательное значение в массив data,
        data[100000] = -10;

        // Использовать цикл, параллельно выполняемый методом ForEach(),
        // для отображения данных на экране.
        ParallelLoopResult loopResult = Parallel.ForEach(data, DisplayData);

        // Проверить, завершился ли цикл.
        if(!loopResult.IsCompleted)
            Console.WriteLine("\nЦикл завершился преждевременно из-за того, " +
                            "что обнаружено отрицательное значение\n" +
                            "на шаге цикла номер " +
                            loopResult.LowestBreakIteration + ".\n");

        Console.WriteLine("Основной поток завершен.");
    }
}

В приведенной выше программе именованный метод применяется в качестве деле­ гата, представляющего "тело" цикла. Но иногда удобнее применять анонимный метод. В качестве примера ниже приведено реализуемое в виде лямбда-выражения "тело" цикла, параллельно выполняемого методом ForEach().

// Использовать цикл, параллельно выполняемый методом ForEach(),
// для отображения данных на экране.
ParallelLoopResult loopResult =
    Parallel.ForEach(data, (v, pis) => {
        Console.WriteLine("Значение: " + v);
        if (v < 0) pls.Breakf);
    });

Исследование возможностей PLINQ

PLINQ представляет собой параллельный вариант языка интегрированных запросов LINQ и тесно связан с библиотекой TPL. PLINQ применяется, главным образом, для достижения параллелизма данных внутри запроса. Как станет ясно из дальнейшего, сделать это совсем не трудно. Как и TPL, тема PLINQ довольно обширна и многогранна, поэтому в этой главе представлены лишь самые основные понятия данного языка.

Класс ParallelEnumerable

Основу PLINQ составляет класс ParallelEnumerable, определенный в про­ странстве имен System.Linq. Это статический класс, в котором определены многие методы расширения, поддерживающие параллельное выполнение операций. По су­ ществу, он представляет собой параллельный вариант стандартного для LINQ класса Enumerable. Многие его методы являются расширением класса ParallelQuery, а не­ которые из них возвращают объект типа ParallelQuery. В классе ParallelQuery инкапсулируется последовательность операций, поддерживающая параллельное вы­ полнение. Имеются как обобщенный, так и необобщенный варианты данного класса. Мы не будем обращаться к классу ParallelQuery непосредственно, а воспользуемся несколькими методами класса ParallelEnumerable. Самый главный из них, метод AsParallel(), описывается в следующем разделе.

Распараллеливание запроса методом AsParallel()

Едва ли не самым удобным средством PLINQ является возможность просто созда­ вать параллельный запрос. Нужно лишь вызвать метод AsParallel() для источника данных. Метод AsParallel() определен в классе ParallelEnumerable и возвращает источник данных, инкапсулированный в экземпляре объекта типа ParallelQuery. Это дает возможность поддерживать методы расширения параллельных запросов. По­ сле вызова данного метода запрос разделяет источник данных на части и оперирует с каждой из них таким образом, чтобы извлечь максимальную выгоду из распаралле­ ливания. (Если распараллеливание оказывается невозможным или неприемлемым, то запрос, как обычно, выполняется последовательно.) Таким образом, добавления в ис­ ходный код единственного вызова метода AsParallel() оказывается достаточно для того, чтобы превратить последовательный запрос LINQ в параллельный запрос LINQ. Для простых запросов это единственное необходимое условие.

Существуют как обобщенные, так и необобщенные формы метода AsParallel(). Ниже приведена простейшая обобщенная его форма:

public static ParallelQuery AsParallel(this IEnumerable source)
public static ParallelQuery<TSource>
            AsParallel<TSource> (this IEnumerable<TSource> source)

где TSource обозначает тип элементов в последовательном источнике данных source.

Ниже приведен пример, демонстрирующий простой запрос PLINQ.

// Простой запрос PLINQ.
using System;
using System.Linq;

class PLINQDemo {
    static void Main() {
        int[] data = new int[10000000];

        // Инициализировать массив данных положительными значениями.
        for(int i=0; i < data.Length; i++) data[i] = i;

        // А теперь ввести в массив данных ряд отрицательных значений.
        data[1000] = -1;
        data[14000] = -2;
        data[15000] = -3;
        data[676000] = -4;
        data[8024540] = -5;
        data[9908000] = -6;

        // Использовать запрос PLINQ для поиска отрицательных значений.
        var negatives = from val in data.AsParallel()
                        where val < 0
                        select val;

        foreach(var v in negatives)
            Console.Write(v + " ");

        Console.WriteLine();
    }
}

Эта программа начинается с создания крупного массива data, инициализируемо­ го целыми положительными значениями. Затем в него вводится ряд отрицательных значений. А далее формируется запрос на возврат последовательности отрицательных значений. Ниже приведен этот запрос.

var negatives = from val in data.AsParallel()
                where val < 0
                select val;

В этом запросе метод AsParallel() вызывается для источника данных, в качестве которого служит массив data. Благодаря этому разрешается параллельное выполне­ ние операций над массивом data, а именно: поиск отрицательных значений парал­ лельно в нескольких потоках. По мере обнаружения отрицательных значений они до­ бавляются в последовательность вывода. Эго означает, что порядок формирования по­ следовательности вывода может и не отражать порядок расположения отрицательных значений в массиве data. В качестве примера ниже приведен результат выполнения приведенного выше кода в двухъядерной системе.

-5 -6 -1 -2 -3 -4

Как видите, в том потоке, где поиск выполнялся в верхней части массива, отрица­ тельные значения -5 и -6 были обнаружены раньше, чем значение -1 в том потоке, где поиск происходил в нижней части массива. Следует, однако, иметь в виду, что из-за от­ личий в степени загрузки задачами, количества доступных процессоров и прочих фак­ торов системного характера могут быть получены разные результаты. А самое главное, что результирующая последовательность совсем не обязательно будет отражать по­ рядок формирования исходной последовательности.

Применение метода AsOrdered()

Как отмечалось в предыдущем разделе, по умолчанию порядок формирования результирующей последовательности в параллельном запросе совсем не обязательно должен отражать порядок формирования исходной последовательности. Более того, результирующую последовательность следует рассматривать как практически неупо­ рядоченную. Если же результат должен отражать порядок организации источника данных, то его нужно запросить специально с помощью метода AsOrdered(), опреде­ ленного в классе ParallelEnumerable. Ниже приведены обобщенная и необобщен­ ная формы этого метода:

public static ParallelQuery AsOrdered(this ParallelQuery source)
public static ParallelQuery<TSource>
                AsOrdered<TSource>(this ParallelQuery<TSource> source)

где TSource обозначает тип элементов в источнике данных source. Метод AsOrdered() можно вызывать только для объекта типа ParallelQuery, поскольку он является методом расширения класса ParallelQuery.

Для того чтобы посмотреть, к какому результату может привести применение ме­ тода AsOrdered(), подставьте его вызов в приведенный ниже запрос из предыдущего примера программы.

// Использовать метод AsOrdered() для сохранения порядка
// в результирующей последовательности.
var negatives = from val in data.AsParallel().AsOrdered()
                where val < 0
                select val;

После выполнения программы порядок следования элементов в результирующей последовательности будет отражать порядок их расположения в исходной последова­ тельности.

Отмена параллельного запроса

Параллельный запрос отменяется таким же образом, как и задача. И в том и в дру­ гом случае отмена опирается на структуру CancellationToken, получаемую из класса CancellationTokenSource. Получаемый в итоге признак отмены передается запросу с помощью метода WithCancellation(). Отмена параллельного запроса производится методом Cancel(), который вызывается для источника признаков отмены. Главное отли­ чие отмены параллельного запроса от отмены задачи состоит в следующем: когда парал­ лельный запрос отменяется, он генерирует исключение OperationCanceledException, а не AggregateException. Но в тех случаях, когда запрос способен сгенерировать не­ сколько исключений, исключение OperationCanceledException может быть объеди­ нено в совокупное исключение AggregateException. Поэтому отслеживать лучше оба вида исключений.

Ниже приведена форма объявления метода WithCancellation():

public static ParallelQuery<TSource>
            WithCancellation<TSource> (
                this ParallelQuery<TSource> source,
            CancellationToken CancellationToken)

где source обозначает вызывающий запрос, a CancellationToken — признак отме­ ны. Этот метод возвращает запрос, поддерживающий указанный признак отмены.

В приведенном ниже примере программы демонстрируется порядок отмены параллельного запроса, сформированного в программе из предыдущего примера. В данной программе организуется отдельная задача, которая ожидает в течение 100 миллисекунд, а затем отменяет запрос. Отдельная задача требуется потому, что цикл foreach, в котором выполняется запрос, блокирует выполнение метода Main() до завершения цикла.

// Отменить параллельный запрос.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class PLINQCancelDemo {
    static void Main() {
        CancellationTokenSource cancelTokSrc = new CancellationTokenSource();
        int[] data = new int[10000000];

        // Инициализировать массив данных положительными значениями.
        for(int i=0; i < data.Length; i++) data[i] = i;

        // А теперь ввести в массив данных ряд отрицательных значений.
        data[1000] = -1;
        data[14000] = -2;
        data[15000] = -3;
        data[676000] = -4;
        data[8024540] = -5;
        data[9908000] = -6;

        // Использовать запрос PLINQ для поиска отрицательных значений.
        var negatives = from val in data.AsParallel().
                            WithCancellation(cancelTokSrc.Token)
                        where val < 0
                        select val;

        // Создать задачу для отмены запроса по истечении 100 миллисекунд.
        Task cancelTsk = Task.Factory.StartNew( () => {
                Thread.Sleep(100);
                cancelTokSrc.Cancel();
            });

        try {
            foreach(var v in negatives)
                Console.Write(v + " ");
        } catch(OperationCanceledException exc) {
            Console.WriteLine(exc.Message);
        } catch(AggregateException exc) {
            Console.WriteLine(exc);
        } finally {
            cancelTsk.Wait();
            cancelTokSrc.Dispose();
            cancelTsk.Dispose();
        }

        Console.WriteLine();
    }
}

Ниже приведен результат выполнения этой программы. Если запрос отменяется до его завершения, то на экран выводится только сообщение об исключительной си­ туации.

Запрос отменен с помощью маркера, переданного в метод WithCancellation.

Другие средства PLINQ

Как упоминалось ранее, PLINQ представляет собой довольно крупную подсистему. Это объясняется отчасти той гибкостью, которой обладает PLINQ. В PLINQ доступ­ ны и многие другие средства, помогающие подстраивать параллельные запросы под конкретную ситуацию. Так, при вызове метода WithDegreeOfParallelism() можно указать максимальное количество процессоров, выделяемых для обработки запроса, а при вызове метода AsSequential() — запросить последовательное выполнение части параллельного запроса. Если вызывающий поток, ожидающий результатов от цикла foreach, не требуется блокировать, то для этой цели можно воспользовать­ ся методом ForAll(). Все эти методы определены в классе ParallelEnumerable. А в тех случаях, когда PLINQ должен по умолчанию поддерживать последовательное выполнение, можно воспользоваться методом WithExecutionMode(), передав ему в качестве параметра признак ParallelExecutionMode.ForceParallelism.

Вопросы эффективности PLINQ

Далеко не все запросы выполняются быстрее только потому, что они распарал­ лелены. Как пояснялось ранее в отношении TPL, издержки, связанные с созданием параллельных потоков и управлением их исполнением, могут "перекрыть" все пре­ имущества, которые дает распараллеливание. Вообще говоря, если источник данных оказывается довольно мелким, а требующаяся обработка данных — очень короткой, то внедрение параллелизма может и не привести к ускорению обработки запроса. Поэтому за рекомендациями по данному вопросу следует обращаться к информации корпорации Microsoft.