自定义阻塞队列和自定义线程池

女爷i 2022-11-03 10:55 247阅读 0赞

自定义阻塞队列

  1. package jucdemo;
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import java.util.LinkedList;
  5. import java.util.concurrent.BlockingQueue;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. /**
  9. * @author wardseptember
  10. * @create 2021-02-27 17:12
  11. *
  12. * 这里只实现了几个接口,其他的太多了,不实现了,感兴趣的朋友可以自行去实现
  13. */
  14. public class MyBlockingQueue<E> implements BlockingQueue<E> {
  15. private LinkedList<E> queue = new LinkedList<>();
  16. private AtomicInteger count = new AtomicInteger(0);
  17. private final Object lock = new Object();
  18. private final int maxSize;
  19. public MyBlockingQueue(int maxSize) {
  20. this.maxSize = maxSize;
  21. }
  22. private void enqueue(E e) {
  23. queue.offer(e);
  24. }
  25. private E dequeue() {
  26. return queue.poll();
  27. }
  28. @Override
  29. public boolean add(E e) {
  30. synchronized (lock) {
  31. if (count.get() == maxSize) {
  32. return false;
  33. }
  34. count.incrementAndGet();
  35. enqueue(e);
  36. return true;
  37. }
  38. }
  39. @Override
  40. public boolean offer(E e) {
  41. return this.add(e);
  42. }
  43. @Override
  44. public E remove() {
  45. synchronized (lock) {
  46. if (count.get() == 0) {
  47. return null;
  48. }
  49. count.decrementAndGet();
  50. return dequeue();
  51. }
  52. }
  53. @Override
  54. public E poll() {
  55. return remove();
  56. }
  57. @Override
  58. public E element() {
  59. return null;
  60. }
  61. @Override
  62. public E peek() {
  63. return null;
  64. }
  65. @Override
  66. public void put(E e) throws InterruptedException {
  67. synchronized (lock) {
  68. while (count.get() == maxSize) {
  69. try {
  70. lock.wait();
  71. } catch (InterruptedException ex) {
  72. ex.printStackTrace();
  73. }
  74. }
  75. enqueue(e);
  76. count.incrementAndGet();
  77. lock.notify();
  78. }
  79. }
  80. @Override
  81. public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  82. return false;
  83. }
  84. @Override
  85. public E take() throws InterruptedException {
  86. synchronized (lock) {
  87. while (count.get() == 0) {
  88. try {
  89. lock.wait();
  90. } catch (InterruptedException ex) {
  91. ex.printStackTrace();
  92. }
  93. }
  94. count.decrementAndGet();
  95. lock.notify();
  96. return dequeue();
  97. }
  98. }
  99. @Override
  100. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  101. return null;
  102. }
  103. @Override
  104. public int remainingCapacity() {
  105. return 0;
  106. }
  107. @Override
  108. public boolean remove(Object o) {
  109. return false;
  110. }
  111. @Override
  112. public boolean containsAll(Collection<?> c) {
  113. return false;
  114. }
  115. @Override
  116. public boolean addAll(Collection<? extends E> c) {
  117. return false;
  118. }
  119. @Override
  120. public boolean removeAll(Collection<?> c) {
  121. return false;
  122. }
  123. @Override
  124. public boolean retainAll(Collection<?> c) {
  125. return false;
  126. }
  127. @Override
  128. public void clear() {
  129. }
  130. @Override
  131. public int size() {
  132. return 0;
  133. }
  134. @Override
  135. public boolean isEmpty() {
  136. return false;
  137. }
  138. @Override
  139. public boolean contains(Object o) {
  140. return false;
  141. }
  142. @Override
  143. public Iterator<E> iterator() {
  144. return null;
  145. }
  146. @Override
  147. public Object[] toArray() {
  148. return new Object[0];
  149. }
  150. @Override
  151. public <T> T[] toArray(T[] a) {
  152. return null;
  153. }
  154. @Override
  155. public int drainTo(Collection<? super E> c) {
  156. return 0;
  157. }
  158. @Override
  159. public int drainTo(Collection<? super E> c, int maxElements) {
  160. return 0;
  161. }
  162. }

自定义线程池

  1. package jucdemo;
  2. import java.io.IOException;
  3. import java.util.concurrent.*;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. /**
  6. * @author wardseptember
  7. * @create 2021-02-27 16:49
  8. */
  9. public class ThreadPoolExecutorDemo {
  10. public static void main(String[] args) throws IOException {
  11. // ArrayBlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(2);
  12. MyBlockingQueue<Runnable> myBlockingQueue = new MyBlockingQueue<Runnable>(2);
  13. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, myBlockingQueue, new NameThreadFactory(), new MyIgnorePolicy());
  14. for (int i = 1; i < 10; i++) {
  15. MyTask task = new MyTask(String.valueOf(i));
  16. threadPoolExecutor.execute(task);
  17. }
  18. System.in.read();
  19. threadPoolExecutor.shutdown();
  20. }
  21. static class NameThreadFactory implements ThreadFactory {
  22. private final AtomicInteger atomicInteger = new AtomicInteger(1);
  23. @Override
  24. public Thread newThread(Runnable r) {
  25. Thread t = new Thread(r, "my-thread-" + atomicInteger.getAndIncrement());
  26. System.out.println(t.getName() + "被创建");
  27. return t;
  28. }
  29. }
  30. static class MyIgnorePolicy implements RejectedExecutionHandler {
  31. @Override
  32. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  33. doLog(r, executor);
  34. }
  35. private void doLog(Runnable r, ThreadPoolExecutor executor) {
  36. System.out.println(r.toString() + "被拒绝");
  37. }
  38. }
  39. static class MyTask implements Runnable {
  40. private String name;
  41. public MyTask(String name) {
  42. this.name = name;
  43. }
  44. @Override
  45. public void run() {
  46. try {
  47. System.out.println(this.toString() + "正在运行");
  48. Thread.sleep(3000);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. public String getName() {
  54. return name;
  55. }
  56. @Override
  57. public String toString() {
  58. return "MyTask [name=" + name + "]";
  59. }
  60. }
  61. }

推荐阅读

  • 机器学习资料汇总
  • 吴恩达《机器学习》视频、作业、源码
  • 106页《Python进阶》中文版正式发布
  • 李航《统计学习方法》第二版完整课件
  • 机器学习数学全书,1900页PDF下载

欢迎关注我的公众号呦,率先更新内容,并且后续还有一些源码级的免费教程推出。

gzh.jpg

发表评论

表情:
评论列表 (有 0 条评论,247人围观)

还没有评论,来说两句吧...

相关阅读