Главная

Популярная публикация

Научная публикация

Случайная публикация

Обратная связь

ТОР 5 статей:

Методические подходы к анализу финансового состояния предприятия

Проблема периодизации русской литературы ХХ века. Краткая характеристика второй половины ХХ века

Ценовые и неценовые факторы

Характеристика шлифовальных кругов и ее маркировка

Служебные части речи. Предлог. Союз. Частицы

КАТЕГОРИИ:






Потребители производители




 

Особенностью паттерна является то, что количество потоков-потребителей и потоков-производителей может быть различной. Длина очереди может расти динамически и фактически является буфером между потоками, создающими какие-то данные и потоками их потребляющими.

Попробуем создать подобную очередь. Она должна быть потокозащищенной, т.е. устойчивой к тому, что сразу большое количество потоков хотят в нее писать и забирать данные. Кроме того, если очередь пуста, то потоки потребители должны ждать до тех пор, пока не появятся данные. Для этого можно использовать условное ожидание (conditional wait). Таким образом, если очередь пуста, все потоки-потребители засыпают, как только появляются данные, кто-то из них просыпается и забирает их.

Еще один важный момент – это длина очереди (она не может быть бесконечно большой иначе будет переполнен heap). Это возможно в том случае, когда производители работаю быстрее потребителей. Следовательно, лучше задать заранее максимально возможные размеры очереди. Тогда если производитель хочет добавить данные в уже полностью заполненную очередь он должен блокироваться. Очередь сама будит производителей, когда появляются свободные ячейки и будит потребителей, когда появляются данные.

Теперь попробуем реализовать на Java подобный буфер, но только для простоты он будет состоять из одной ячейки.

Кого интересует полноценный буфер, смотрите коды в интернете, например, http://www.java2s.com/Code/Java/Threads/ProducerconsumerinJava2.htm.

Сначала реализуем буфер, состоящий из одной ячейки, в которую может быть помещено целое число. В буфере реализовано два блокирующих метода: put() и get().

public class SingleElementBuffer {

private Integer elem = null;

 

public synchronized void put(Integer newElem)

throws InterruptedException {

while (elem!= null) {

this.wait();

}

this.elem = newElem;

this.notifyAll();

}

 

public synchronized Integer get()

throws InterruptedException {

while (elem == null) {

this.wait();

}

int result = this.elem;

this.elem = null;

this.notifyAll();

return result;

}

}

 

Давайте разберем работу методов put() и get() подробнее.

Если производителю нужно положить элемент в буфер, он вызывает синхронизируемый метод put(). Это удобно, т.к. буфер состоит из одной ячейки, следовательно, вполне логично, что в данном методе может находиться не более одного потока. Если ячейка непустая (elem!= null), то поток-производитель переводится в состояние ожидания (попадает в множество wait-set). Чуть выше, рассматривая примеры кодов, мы уже убеждались в том, что единственный способ освободить блокировку синхронизируемой секции – вызвать для потока метод wait(). Значит, как только блокируется текущий поток, зайти в метод put() сможет следующий поток-производитель. Таким образом, внутри синхронизируемой секции могут находиться и спать неограниченное количество потоков.

Теперь смоделируем ситуацию, что появился поток-потребитель, который вызывает метод get(). Для потребителя не выполняется условие (elem == null), поэтому он забирает элемент, устанавливает значение, равное null, делает notifyAll() и возвращает результат. Когда вызывается метод notifyAll(), все потоки, которые были в состоянии ожидания по wait() просыпаются и из множества wait-set переводятся в множество blocked-set. Таким образом, проснутся все спящие потоки-производители. Но активных потоков внутри синхронизируемой секции не может быть больше одного. Поэтому они проснутся все, но продолжать работу сможет только один из них. Этот поток-производитель запишет в ячейку свои данные и вызовет метод notifyAll() для ожидающих данные потоков-потребителей. Потом, когда следующий производитель продолжит работу, ячейка, возможно, окажется опять занятой (elem!= null) и он заснет по this.wait().

Итак, подытожим: при вызове метода notify() один из потоков, выбранный случайным образом, переводится в blocked-set. Если вызван метод notifyAll(), то все потоки, находящиеся в состоянии wait переходят в blocked-состояние. Далее опять же случайным образом (алгоритм нечестный – unfair) JVM выбирается из blocked-set поток, который и заходит в синхронизированный блок, блокируя его.

Вообще же в blocked-set поток может попасть двумя способами: если он ожидает в очереди доступа к блокированному другим потоком синхронизируемому методу и если поток находился в wait-set и был вызван метод notify(), который и перевел его в blocked-set. В wait-set можно оказаться одним единственным способом – поток вызвал метод wait().

Создадим производителя. Класс-производитель (producer), производит последовательно числа начиная со startValue (startValue, startValue+1, startValue+2, startValue+3,...) и помещает их в буфер (buffer.put(...)), спит period миллисекунд, повторяет (while(true) {...}).

 

public class Producer implements Runnable {

private int startValue;

private final int period;

private final SingleElementBuffer buffer;

 

public Producer(int startValue, int period, SingleElementBuffer buffer) {

this.buffer = buffer;

this.period = period;

this.startValue = startValue;

}

 

@Override

public void run() {

while (true) {

try {

System.out.println(startValue + " produced");

buffer.put(startValue++);

Thread.sleep(period);

} catch (InterruptedException e) {

System.out.println(Thread.currentThread().getName() + " stopped.");

return;

}

}

}

}

 

Создадим потребителя. Класс-потребитель (consumer), с максимальной скоростью изымает числа из буфера (buffer.get()), выводит в консоль, повторяет (while(true) {...}).

 

public class Consumer implements Runnable {

private final SingleElementBuffer buffer;

 

public Consumer(SingleElementBuffer buffer) {

this.buffer = buffer;

}

 

@Override

public void run() {

while (true) {

try {

int elem = buffer.get();

System.out.println(elem + " consumed");

} catch (InterruptedException e) {

System.out.println(Thread.currentThread().getName() + " stopped.");

return;

}

}

}

}

 

Система с одним потребителем сразу же блокируется (потребитель висит на очереди ожидая данных):

 

public class ProducerConsumerExample{

public static void main(String[] args) {

SingleElementBuffer buffer = new SingleElementBuffer();

new Thread(new Consumer(buffer)).start();

}

}

 

>>... повисла

 

Система с одним производителем блокируется позже (успевает произвести 2 числа и повисает пытаясь поместить второе число в очередь уже занятую первым числом):

 

public class ProducerConsumerExample{

public static void main(String[] args) {

SingleElementBuffer buffer = new SingleElementBuffer();

new Thread(new Producer(1, 1000, buffer)).start();

}

}

 

>> 1 produced
... задержка 1 секунда
>> 2 produced

... повисла

 

Система из одного потребителя и одного производителя работает стабильно:

 

public class ProducerConsumerExample {

public static void main(String[] args) {

SingleElementBuffer buffer = new SingleElementBuffer();

new Thread(new Producer(1, 1000, buffer)).start();

new Thread(new Consumer(buffer)).start();

}

}

 

>> 1 produced
>> 1 consumed
... задержка 1 секунда
>> 2 produced
>> 2 consumed
... задержка 1 секунда
>> 3 produced
>> 3 consumed
... задержка 1 секунда
>> 4 produced
>> 4 consumed
... и так далее

 

Система с 3-мя производителями (с разной скоростью помещения элементов в буфер - 300, 500 и 700 миллисекунд) и 2-мя потребителями работает с "легкими рывками":

 

public class ProducerConsumerExample{

public static void main(String[] args) {

SingleElementBuffer buffer = new SingleElementBuffer();

Thread[] producers = new Thread[]{

new Thread(new Producer(1, 950, buffer)),

new Thread(new Producer(100, 1550, buffer)),

new Thread(new Producer(1000, 2010, buffer)),

};

for (Thread producer: producers) {

producer.start();

}

Thread[] consumers = new Thread[]{

new Thread(new Consumer(buffer)),

new Thread(new Consumer(buffer)),

};

for (Thread consumer: consumers) {

consumer.start();

}

}

}

 

>>100 produced
>>1000 produced
>>1 produced
>>100 consumed
>>1 consumed
>>1000 consumed
>>2 produced
>>2 consumed
>>101 produced
>>101 consumed
>>3 produced
>>3 consumed
>>1001 produced
>>1001 consumed

... и так далее






Не нашли, что искали? Воспользуйтесь поиском:

vikidalka.ru - 2015-2024 год. Все права принадлежат их авторам! Нарушение авторских прав | Нарушение персональных данных