Ключевые слова:threads, linux, gcc, (найти похожие документы)
From: Владимир Калюжный, Владимир Тарасенко
Newsgroups: Издательский дом "КОМИЗДАТ" htpp://www.comizdat.com/
Date: Mon, 20 Sep 2004 18:21:07 +0000 (UTC)
Subject: Многопотоковое программирование под Linux
Оригинал: http://www.cpp.com.ua/?in=kpp_show_article&kpp_art_ID=15
Многопотоковые вычисления в системе Linux
Последнее время все компьютерные издания так или иначе уделяют
внимание Linux, свободно распространяемой операционной системе. Многие
ведущие производители ПО объявляют о переносе своих продуктов на эту
систему. В интернете постоянно появляются новые сайты посвященные этой
системе. Налицо бум вокруг Linux, который не только не умолкает а
становится все сильнее и мощнее.
Linux и потоки.
Linux, как клон Unix, на данный момент поддерживает многозадачность и
многопотоковость, т.е. в системе одновременно может работать несколько
задач (процессов), и каждая из задач может выполнятся в несколько
потоков. Для начала рассмотрим, что такое поток: поток выполнения -
это элемент кода программы, выполняемый последовательно. Большинство
приложений - однопотоковые программы. Многопотоковая программа в один
момент времени может выполняться в нескольких отдельных потоках. В
случае, если задача выполняется на многопроцессорной машине, то все ее
потоки могут выполняться одновременно, повышая таким образом
производительность выполнения задачи. Производительность
многопотокового приложения можно улучшить даже на однопроцессорной
системе. Например, если один из потоков приложения блокируется
каким-то системным вызовом или ожидает поступления данных, в это время
выполняется другой поток.
Между процессами и потоками существуют различия. Под процессами
понимается программа находящаяся в стадии выполнения. Скажем, shell в
UNIX - это процесс который создается при входе пользователя в систему.
Каждая команда создает новый процесс. Согласно терминологии UNIX - это
порожденный процесс, который выполняет команду от лица пользователя.
Потоки - это часть процесса, и они используют сегменты данных и кода
совместно.
Для многопотокового программирования существует два основных
стандарта: многопотоковые API Solaris (Sun Microsystems) и API
POSIX.1c
В Linux используется API POSIX.1c. Но если быть абсолютно точным, то в
Linux присутствует системный вызов clone(), на основе которого и
построено API для работы с потоками, соответствующие стандарту
POSIX.1c с незначительными исключениями.
Постановка задачи
Довольно часто в инженерных рассчетах необходимо произвети вычисления
матрицы, элементами которой являются функции, а вернее значение
функции с определенными параметрами.
Рассмотрим следующую матрицу A размерностью 4x4:
f(X11) f(X12) f(X13) f(X14)
f(X21) f(X22) f(X23) f(X24)
f(X31) f(X32) f(X33) f(X34)
f(X41) f(X42) f(X43) f(X44)
где
f(x) - вычисляемая функция
Xij - аргумент
Стандартный подход к вычислениям: достоинства и недостатки
Для вычисления элементов данной матрицы, обычно используют следующий
фрагмент кода:
Приняв, что выходные данные (Xij) хранятся в массиве X, а исходные в
S:
int SIZE_I = 4;
int SIZE_J = 4;
double X[SIZE_I][SIZE_J];
double S[SIZE_I][SIZE_J];
.....
double f(double x)
{
..... //какие-то вычисления
}
main_evalution()
{
for (int i=0;i<SIZE_I; ++i)
{
for (int z=0; z<SIZE_J; P ++z)
{
//вычисляем елемент матрицы
X[i][z] = f(S[i][z]);
}
}
}
После выполнения этого кода, матрица X будет заполнена вычисленными
данными. Достоинство данного подхода - простота реализации. Недостаток
- при работе на мощной машине (особенно с несколькими процессорами)
неполное использование вычислительных ресурсов.
Многопотоковые вычисления: достоинства и недостатки
Для преобразования предыдущего примера в многопотоковую задачу,
необходимо произвести маленькие изменения - функция, вычисляющая
значение, должна работать отдельным потоком.
int SIZE_I = 4;
int SIZE_J = 4;
double X[SIZE_I][SIZE_J];
double S[SIZE_I][SIZE_J];
struct DATA_
{
double x;
int i;
int z;
}
typedef struct DATA_ DATA;
double f(double x)
{
//какие-то вычисления
}
void *thread_f(void *arg) //функция для вычисления элемента матрицы
{
DATA* a = (DATA*)arg; //преобразуем данные
X[a->i][a->z] = f([a->x]); //вычисляем
}
main_evalution()
{
pthread_t thread; //идентификатор потока
DATA *arg; //данные для передачи в поток
for (int i=0;i<SIZE_I; ++i)
{
for (int z=0; z<SIZE_J; P ++z)
{
// создаем
arg = new DATA;
//инициализируем данные
arg->i = i; arg->z = z; arg->x = S[i][z];
//создаем поток
pthread_create(&thread, NULL, thread_f, (void *)arg);
//переводим в отсоединенное состояние
pthread_detach(thread);
}
}
}
В результате сделанных изменений, вычисление каждого элемента будет
происходить в отдельном потоке. Недостатком такого метода является
сложность - необходимо всегда учитывать то, что две нити могут
обратиться к одним и тем же даным - одна для чтения, другая для
записи, и в таком случае нельзя гарантировать достоверность данных.
Т.е. необходимо устанавливать/проверять блокировки, обеспечивать
синхронизацию выполнения и т.п. Достоинство - повышение
производительности. Так в нашем примере, процесс не ждет выполнения
всех нитей, т.е. он не ждет когда все елементы матрицы заполняться, а
продолжает свою работу. В случае когда дальнейшая работа программы
зависит от полученных вычислений, можно приостановить основной процесс
до завершения всех нитей.
Функции для работы с потоками
Для работы с потоками существуют следующие основные функции:
* pthread_create(pthread_t *tid, const pthread_attr_t *attr,
void*(*function)(void*), void* arg) - создает поток для выполнения
функции function. В качестве параметра для потоковой функции
передается указатель arg. Индентификатор нового потока
возвращается через tid. Поток создается с параметрами attr.
* pthread_mutex_init(pthread_mutex_t* lock, pthread_mutexattr_t
*attr) - инициализирует взаимоисключающую блокировку. attr -
содержит аттрибуты для взаимоисключающей блокировки. В случае,
если attr == NULL, используются установки по умолчаниию.
* pthread_mutex_destroy(pthread_mutex_t* lock) - удаляет
взаимоисключающую блокировку.
* pthread_mutex_lock(pthread_mutex_t* lock) - устанавливает
блокировку. В случае, если блокировка была установлена другим
процессом, текущий процесс останавливается до снятия блокировки
другим процессом.
* pthread_mutex_unlock(pthread_mutex_t* lock) - снимает блокировку.
* pthread_join(pthread_t tid, void **statusp) - ожидает завершение
неотсоединенного процесса, результат возвращаемый функцией
сохраняется в statusp.
* pthread_detach(pthread_t tid) - отсоединяет процесс. Это же можно
задать при создании процесса, установив аттрибут detachstate
вызовом pthread_attr_setdetachstate.
* pthread_exit(void *status) - завершает процесс, статус передается
вызову pthread_join, подобен exit(). Но вызов exit() в процессе
приведет к завершению всей программы.
Процесс завершается двумя путями - вызовом pthread_exit() или
завершением потоковой функции. В случае, если процесс неотсоединен, то
при его завершении ресурсы, выделенные процессу, не освобождаются до
вызова pthread_join(). Если процесс отсоединенный - ресурсы
освобождаются по ее завершению.
Пример программы
Данная программа запрашивает у пользователя параметры матрицы
аргументов, и используя потоки, заполняет матрицу результатами
вычислений.
Данную программу необходимо компилировать с библиотекой pthread
(именно в ней находятся все функции для работы с потоками) и задав
_REENTRANT:
g++ -D_REENTRANT -o threads threads.c -lpthread
Данный код проверялся на RedHat Linux 6.0
/* threads.c
* simple pthread API demo
* autor: Tarasenko Volodymyr
* e-mail: [18]trsnk@mail.ru
* Компилировать:
* g++ -D_REENTRANT -o threads threads.c -lpthread
*/
#include <PTHREAD.H>
#include <STDIO.H>
#include <UNISTD.H>
#include <MATH.H>
#define SIZE_I 2
#define SIZE_J 2
float X[SIZE_I][SIZE_J];
float S[SIZE_I][SIZE_J];
int all = 0;
struct DATA_
{
double x;
int i;
int z;
};
typedef struct DATA_ DATA;
pthread_mutex_t lock; //Исключающая блокировка
// Функция для вычислений
double f(float x)
{
if (x>0) return log(x);
else return x;
}
// Потоковая функция для вычислений
void *thread_f(void *arg)
{
DATA* a = (DATA*) arg;
X[a->i][a->z] = f(a->x);
// устанавливаем блокировку
pthread_mutex_lock(&lock);
// изменяем глобальную переменную
++all;
// снимаем блокировку
pthread_mutex_unlock(&lock);
delete a; // удаляем свои данные
return NULL;
}
// Потоковая функция для ввода
void *input_thr(void *arg)
{
DATA* a = (DATA*) arg;
//pthread_mutex_lock(&lock);
printf("S[%d][%d]:", a->i, a->z);
scanf("%f", &S[a->i][a->z]);
//pthread_mutex_unlock(&lock);
delete a;
return NULL;
}
int main()
{
//массив идентификаторов потоков
pthread_t thr[ SIZE_I + SIZE_J ];
//инициализация исключающей блокировки
pthread_mutex_init(&lock, NULL);
DATA *arg;
// Ввод
for (int i=0;i<SIZE_I; ++i)
{
for (int z=0; z<SIZE_J; P ++z)
{
arg = new DATA;
arg->i = i; arg->z = z;
//создаем поток для ввода
pthread_create(&thr[i+z], NULL, input_thr, (void *)arg);
}
}
//Ожидаем завершения всех потоков
//идентификаторы потоков хранятся в массиве
for(int i = 0; i<SIZE_I; ++i){
{
pthread_join(thr[i], NULL);
}
//Вычисления
printf("Start calculation\n");
for (int i=0;i<SIZE_I; ++i)
{
for (int z=0; z<SIZE_J; P ++z)
{
arg = new DATA;
arg->i = i; arg->z = z; arg->x = S[i][z];
pthread_t thread;
//создаем поток для вычислений
pthread_create(&thread, NULL, thread_f, (void *)arg);
// переводим в отсоединенный режим
pthread_detach(thread);
}
}
do
{
// Основной процесс "засыпает" на 1с
sleep(1);
// Все-ли завершились?
printf("finished %d threads.\n", all);
}while(all < SIZE_I+SIZE_J);
//Печать результатов
for (int i=0;i<SIZE_I; ++i)
{
for (int z=0; z<SIZE_J; P ++z)
{
printf("X[%d][%d] = %f\t", i, z, X[i][z]);
}
printf("\n");
}
//Удаляем исключающую блокировку
pthread_mutex_destroy(&lock);
return 0;
}
После запуска программа инициализирует исключающую блокировку и
начинает ввод данных. В данном случае, в качестве примера ввод сделан
из потоков, без всяких блокировок ввода/вывода, чтобы показать, что
потоки работают одновременно и когда один останавливается, остальные
продолжают работать.
Основной процесс ожидает завершения всех потоков вызовом
pthread_join().
Только после завершения всех потоков происходит переход ко второй
части программы - вычислениям.
Для вычислений используются отсоединенные потоки, отсоединение
происходит вызовом pthread_detach().
После завершения вычислений в потоке, происходит увеличение переменной
all на единицу, и поток завершает работу. Для гарантирования
правильности изменений применяется исключающая блокировка.
После задержки основного процесса на 1 сек., проверяем количество
завершенных потоков, и если все потоки завершили вычисления, выводим
результат работы.
Показанный пример будет полезен при решении многих задач. Особенно при
расчетах в области обработки металлов давлением, при решении которых,
часто используются методы конечных элементов или методы граничных
элементов. Эти методы характеризируются большими вычислениями,
связанными с матрицами и их заполнением. В большинстве случаев
элементом матрицы является результат сложных вычислений, таких как
решение интегральных уравнений. Применение многопотокового подхода
позволит увеличить скорость и производительность вычислений. Но, как
показано, это приводит к усложнению реализации вычислений.
Литература и ссылки
Литература:
1. Теренс Чан "Системное программирование на С++ для Unix"
2. Андрей Робачевский "Операционная система UNIX"
Ссылки:
1. http://www.linux.org.ru
2. http://www.byte.com
3. http://www.linux.org
У автора есть грубая ошибка!
в main создан указатель
>>DATA *arg;
позднее он в цикле инициализируется объектом (тоже в main)
>> arg = new DATA;
но удаляется объект почемуто из функций потоков
>> delete a; // удаляем свои данные
причем удаление происходит После разблокирования мутекса!
Так делать очень опасно!
можете легко прибить чужой объект или вычитать чужие данные (чтение тоже выполняется вне лока мутекса почемуто)
Безопасные варианты:
1) создать массив указателей
DATA *arg[SIZE_I][SIZE_J]
и отдавать каждому потоку указатель на индивидуальный кусок памяти.
2)выполнять все операции с общей памятью только при заблокированном мутексе (тоесть в потоках чтение, модификация и освобождение общей памяти должно быть внутри общего pthread_mutex_lock(&lock)).
3) удалять объекты только в том потоке который их создал.
Код статьи в качестве примера использовать не рекомендуется ибо в нем автор наступил на те грабли от которых по идее должен уберечь читателей.
>>//создаем поток для ввода
>>pthread_create(&thr[i+z], NULL, input_thr, (void *)arg);
как я понял должны создаваться потоки для ввода каждого элемента матрицы, но
i=1,z=2 -- &thr[i+z] укажет на &thr[3]
i=2,z=1 -- &thr[i+z] укажет на &thr[3]
>>//Ожидаем завершения всех потоков
>>//идентификаторы потоков хранятся в массиве
>>pthread_join(thr[i], NULL);
при
i=3,z=3 -- &thr[i+z] укажет на &thr[6] и этот поток мы не ждем, т.к. size_i=4, правильно? соответственно и расчет данных для таких элементов будет "левым"?
Действительно имеет место несколько косяков.
1) При определении массива thr надо указывать количество элементов как SIZE_I*SIZE_J.
2) В циклах обращаться к элементам как i*SIZE_J+z
3) При переборе элементов массива соответственно должно быть написано for(int i = 0; i<SIZE_I*SIZE_J; ++i)
4) Как уже написал MordorSV действительно лучше создать массив указателей на данные и удалять его в том потоке в котором создавался