培训首页  >  软件开发新闻  >  西安尚学堂Java 8中的并行流

西安尚学堂Java 8中的并行流

[2017-03-28 16:18:01] 浏览量:67 来源:

尚学堂

  Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。今天,西安尚学堂的小编就给大家介绍Java8中的并行流。

  Java 8 的并行流可以让我们相对轻松地执行并行任务。

  myList.parallelStream.map(obj -> longRunningOperation())

  但是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是所有并行流共享的。默认情况,fork/join 池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。对 CPU 密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。例如:

  myList.parallelStream  .map(this::retrieveFromA)  .map(this::processUsingB)  .forEach(this::saveToC)  myList.parallelStream  .map(this::retrieveFromD)  .map(this::processUsingE)  .forEach(this::saveToD)

  这两个流很大程度上是受限于IO操作,所以会等待其他系统。但这两个流使用相同的(小)线程池,因此会相互等待而被阻塞。这个可以改进,我们以一个流为例:

  final List firstRange = buildIntRange();  firstRange.parallelStream().forEach((number) -> {  try {  // do something slow  Thread.sleep(5);  } catch (InterruptedException e) { }  });

  在执行期间,获取了一份线程dump的文件。这是相关的线程:

  ForkJoinPool.commonPool-worker-1  ForkJoinPool.commonPool-worker-2  ForkJoinPool.commonPool-worker-3  ForkJoinPool.commonPool-worker-4

  现在,要并行的执行这两个并行流

  Runnable firstTask = () -> {  firstRange.parallelStream().forEach((number) -> {  try {  // do something slow  Thread.sleep(5);  } catch (InterruptedException e) { }  });  };  Runnable secondTask = () -> {  secondRange.parallelStream().forEach((number) -> {  try {  // do something slow  Thread.sleep(5);  } catch (InterruptedException e) { }  });  };  // run threads

  这次我们再看一下线程dump文件:

  ForkJoinPool.commonPool-worker-1  ForkJoinPool.commonPool-worker-2  ForkJoinPool.commonPool-worker-3  ForkJoinPool.commonPool-worker-4

  正如你所见,结果是一样的。我们只使用了4个线程。

  一种变通方案

  正如所提到的,JVM 后台使用 fork/join 池,我们可以看到:

  如果合适,安排一个异步执行的任务到当前正在运行的池中。如果任务不在inForkJoinPool()中,也可以调用ForkJoinPool.commonPool()获取新的池来执行。

  ForkJoinPool forkJoinPool = new ForkJoinPool(3);  forkJoinPool.submit(() -> {  firstRange.parallelStream().forEach((number) -> {  try {  Thread.sleep(5);  } catch (InterruptedException e) { }  });  });  ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  forkJoinPool2.submit(() -> {  secondRange.parallelStream().forEach((number) -> {  try {  Thread.sleep(5);  } catch (InterruptedException e) {  }  });  });

  现在,我们再次查看线程池:

  ForkJoinPool-1-worker-1  ForkJoinPool-1-worker-2  ForkJoinPool-1-worker-3  ForkJoinPool-1-worker-4  ForkJoinPool-2-worker-1  ForkJoinPool-2-worker-2  ForkJoinPool-2-worker-3  ForkJoinPool-1-worker-4

  因为我们创建自己的线程池,所以可以避免共享线程池,如果有需要,甚至可以分配比处理机数量更多的线程。

  ForkJoinPool forkJoinPool = new ForkJoinPool();

  请联系网站,了解详细的课程信息~

  优质、便捷、省心


文中图片素材来源网络,如有侵权请联系删除
  • 软件开发
  • 软件测试
  • 数据库
  • Web前端
  • 大数据
  • 人工智能
  • 零基础
  • 有HTML基础
  • 有PHP基础
  • 有C语言基础
  • 有JAVA基础
  • 其他计算机语言基础
  • 周末班
  • 全日制白班
  • 随到随学

厚学推荐学校

网上报名

推荐学校

更多>>
热门信息

温馨提示