前言碎语
关于spring batch概念及基本使用,可移步《spring batch精选,一文吃透spring batch》,本文主要内容为spring batch的进阶内容,也就是spring batch的扩展(Multithreaded Step 多线程执行一个Step;Parallel Step 通过多线程并行执行多个Step;Remote Chunking 在远端节点上执行分布式Chunk作;Partitioning Step 对数据进行分区,并分开执行;)的Partitioning Step。本文构建的实例可为主服务,从服务,主从混用等模式,可以大大提高spring batch在单机处理时的时效。
本文项目源码:https://gitee.com/kailing/partitionjob
spring batch远程分区Step的原理
master节点将数据根据相关逻辑(ID,hash),拆分成一段一段要处理的数据集,然后将数据集放到消息中间件中(ActiveMQ,RabbitMQ ),从节点监听到消息,获取消息,读取消息中的数据集处理并发回结果。如下图:
下面按原理分步骤实施,完成spring batch的远程分区实例
第一步,首先引入相关依赖
见:https://gitee.com/kailing/partitionjob/blob/master/pom.xml
分区job主要依赖为:spring-batch-integration,提供了远程通讯的能力
第二步,Master节点数据分发
@Profile({"master", "mixed"}) @Bean public Job job(@Qualifier("masterStep") Step masterStep) { return jobBuilderFactory.get("endOfDayjob") .start(masterStep) .incrementer(new BatchIncrementer()) .listener(new JobListener()) .build(); } @Bean("masterStep") public Step masterStep(@Qualifier("slaveStep") Step slaveStep, PartitionHandler partitionHandler, DataSource dataSource) { return stepBuilderFactory.get("masterStep") .partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource)) .step(slaveStep) .partitionHandler(partitionHandler) .build(); }
master节点关键部分是,他的Step需要设置从节点Step的Name,和一个数据分区器,数据分区器需要实现Partitioner接口,它返回一个Map<String, ExecutionContext>的数据结构,这个结构完整的描述了每个从节点需要处理的分区片段。ExecutionContext保存了从节点要处理的数据边界,当然,ExecutionContext里的参数是根据你的业务来的,我这里,已数据ID为边界划分了每个区。具体的Partitioner实现如下:
/** * Created by kl on 2018/3/1. * Content :根据数据ID分片 */ public class ColumnRangePartitioner implements Partitioner { private JdbcOperations jdbcTemplate; ColumnRangePartitioner(DataSource dataSource){ this.jdbcTemplate = new JdbcTemplate(dataSource); } @Override public Map<String, ExecutionContext> partition(int gridSize) { int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from kl_article", Integer.class); int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from kl_article", Integer.class); int targetSize = (max - min) / gridSize + 1; Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); int number = 0; int start = min; int end = start + targetSize - 1; while (start <= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); start += targetSize; end += targetSize; number++; } return result; } }
第三步,Integration配置
spring batch Integration提供了远程分区通讯能力,Spring Integration拥有丰富的通道适配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中间件都可以实现远程分区处理。本文使用RabbitMQ来做为通讯的中间件。关于RabbitMQ的安装等不在本篇范围,下面代码描述了如何配置MQ连接,以及spring batch分区相关队列,消息适配器等。
/** * Created by kl on 2018/3/1. * Content :远程分区通讯 */ @Configuration @ConfigurationProperties(prefix = "spring.rabbit") public class IntegrationConfiguration { private String host; private Integer port=5672; private String username; private String password; private String virtualHost; private int connRecvThreads=5; private int channelCacheSize=10; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(connRecvThreads); executor.initialize(); connectionFactory.setExecutor(executor); connectionFactory.setPublisherConfirms(true); connectionFactory.setChannelCacheSize(channelCacheSize); return connectionFactory; } @Bean public MessagingTemplate messageTemplate() { MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests()); messagingTemplate.setReceiveTimeout(60000000l); return messagingTemplate; } @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "outboundRequests") public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) { AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template); endpoint.setExpectReply(true); endpoint.setOutputChannel(inboundRequests()); endpoint.setRoutingKey("partition.requests"); return endpoint; } @Bean public Queue requestQueue() { return new Queue("partition.requests", false); } @Bean @Profile({"slave","mixed"}) public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) { AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inboundRequests()); adapter.afterPropertiesSet(); return adapter; } @Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("partition.requests"); container.setAutoStartup(false); return container; } @Bean public PollableChannel outboundStaging() { return new NullChannel(); } @Bean public QueueChannel inboundRequests() { return new QueueChannel(); }第四步,从节点接收分区信息并处理
@Bean @Profile({"slave","mixed"}) @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging") public StepExecutionRequestHandler stepExecutionRequestHandler() { StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler(); BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator(); stepLocator.setBeanFactory(this.applicationContext); stepExecutionRequestHandler.setStepLocator(stepLocator); stepExecutionRequestHandler.setJobExplorer(this.jobExplorer); return stepExecutionRequestHandler; } @Bean("slaveStep") public Step slaveStep(MyProcessorItem processorItem, JpaPagingItemReader reader) { CompositeItemProcessor itemProcessor = new CompositeItemProcessor(); List<ItemProcessor> processorList = new ArrayList<>(); processorList.add(processorItem); itemProcessor.setDelegates(processorList); return stepBuilderFactory.get("slaveStep") .<Article, Article>chunk(1000)//事务提交批次 .reader(reader) .processor(itemProcessor) .writer(new PrintWriterItem()) .build(); }从节点最关键的地方在于StepExecutionRequestHandler,他会接收MQ消息中间件中的消息,并从分区信息中获取到需要处理的数据边界,如下ItemReader:
@Bean(destroyMethod = "") @StepScope public JpaPagingItemReader<Article> jpaPagingItemReader( @Value("#{stepExecutionContext['minValue']}") Long minValue, @Value("#{stepExecutionContext['maxValue']}") Long maxValue) { System.err.println("接收到分片参数["+minValue+"->"+maxValue+"]"); JpaPagingItemReader<Article> reader = new JpaPagingItemReader<>(); JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>(); String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue"; queryProvider.setSqlQuery(sql); queryProvider.setEntityClass(Article.class); reader.setQueryProvider(queryProvider); Map queryParames= new HashMap(); queryParames.put("minValue",minValue); queryParames.put("maxValue",maxValue); reader.setParameterValues(queryParames); reader.setEntityManagerFactory(entityManagerFactory); return reader; }中的minValuemin,maxValue,正是前文中Master节点分区中设置的值
文末总结
如上,已经完成了整个spring batch 远程分区处理的实例,需要注意的是,一个实例,即可主可从可主从,是有spring profile来控制的,细心的人可能会发现@Profile({"master", "mixed"})等注解,所以如果你在测试的时候,别忘了在spring boot中配置好spring.profiles.active=slave等