自定义阻塞队列和自定义线程池
自定义阻塞队列
package jucdemo;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wardseptember
* @create 2021-02-27 17:12
*
* 这里只实现了几个接口,其他的太多了,不实现了,感兴趣的朋友可以自行去实现
*/
public class MyBlockingQueue<E> implements BlockingQueue<E> {
private LinkedList<E> queue = new LinkedList<>();
private AtomicInteger count = new AtomicInteger(0);
private final Object lock = new Object();
private final int maxSize;
public MyBlockingQueue(int maxSize) {
this.maxSize = maxSize;
}
private void enqueue(E e) {
queue.offer(e);
}
private E dequeue() {
return queue.poll();
}
@Override
public boolean add(E e) {
synchronized (lock) {
if (count.get() == maxSize) {
return false;
}
count.incrementAndGet();
enqueue(e);
return true;
}
}
@Override
public boolean offer(E e) {
return this.add(e);
}
@Override
public E remove() {
synchronized (lock) {
if (count.get() == 0) {
return null;
}
count.decrementAndGet();
return dequeue();
}
}
@Override
public E poll() {
return remove();
}
@Override
public E element() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public void put(E e) throws InterruptedException {
synchronized (lock) {
while (count.get() == maxSize) {
try {
lock.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
enqueue(e);
count.incrementAndGet();
lock.notify();
}
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E take() throws InterruptedException {
synchronized (lock) {
while (count.get() == 0) {
try {
lock.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
count.decrementAndGet();
lock.notify();
return dequeue();
}
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
return false;
}
@Override
public boolean addAll(Collection<? extends E> c) {
return false;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<E> iterator() {
return null;
}
@Override
public Object[] toArray() {
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
return null;
}
@Override
public int drainTo(Collection<? super E> c) {
return 0;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
}
自定义线程池
package jucdemo;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wardseptember
* @create 2021-02-27 16:49
*/
public class ThreadPoolExecutorDemo {
public static void main(String[] args) throws IOException {
// ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(2);
MyBlockingQueue<Runnable> myBlockingQueue = new MyBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, myBlockingQueue, new NameThreadFactory(), new MyIgnorePolicy());
for (int i = 1; i < 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
threadPoolExecutor.execute(task);
}
System.in.read();
threadPoolExecutor.shutdown();
}
static class NameThreadFactory implements ThreadFactory {
private final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + atomicInteger.getAndIncrement());
System.out.println(t.getName() + "被创建");
return t;
}
}
static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
doLog(r, executor);
}
private void doLog(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "被拒绝");
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + "正在运行");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
推荐阅读
- 机器学习资料汇总
- 吴恩达《机器学习》视频、作业、源码
- 106页《Python进阶》中文版正式发布
- 李航《统计学习方法》第二版完整课件
- 机器学习数学全书,1900页PDF下载
欢迎关注我的公众号呦,率先更新内容,并且后续还有一些源码级的免费教程推出。
还没有评论,来说两句吧...