flink整合spring boot

2020/03/10 flink

spring boot整合flink,flink版本1.9,spring boot版本2.2.5。第一次用flink1.3的时候可能也是思路没对,没成功,这次看了一篇博客,Spring Boot整合Flink,里面的整体思路还是不错的但是还有有一些问题。

原博主意识到整合过程中的一个问题是flink流无法访问spring容器中的类,从而导致空指针异常,解决思路是在流中进行spring bean的初始化以获得ApplicationContext,进而使用其getBean方法获取类实例。但描述并不够详细。

首先我们想到的方法肯定是利用

@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
    private static final long serialVersionUID = -6454872090519042646L;
    private static ApplicationContext applicationContext = null;
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (ApplicationContextUtil.applicationContext == null) {
            ApplicationContextUtil.applicationContext = applicationContext;
        }
    }
 
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
 
    //通过name获取 Bean.
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }
 
    //通过class获取Bean.
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }
 
    //通过name,以及Clazz返回指定的Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}

来获得spring boot环境,在本地测试使用的确也没问题,因为本地测试时,flink的jobmanager和taskmanager都运行在同一个本地jvm里面,所以可以获得到spring boot环境,即,线下代码这么写是可以的

public void open(Configuration parameters) throws Exception {
        logger.info("test environment: async function for mysql java open ...");
        super.open(parameters);
        ApplicationContextUtil contextUtil = new ApplicationContextUtil();
        ruleDao = contextUtil.getBean(RuleDao.class);
        logger.info("ruleDao:" + ruleDao.toString());
    }

但是这种方法放到线上却不行,因为通过调试会发现,在往flink集群提交任务时,其实是在jobmanager上生成了spring boot环境,然后将job提交任务到taskmanager上去执行,由于是flink集群必然是多节点部署的,在taskmanager上并没有启动sring boot环境,因此报错。原博主提出解决办法,在flink任务中直接新建spring boot环境,代码:

public void open(Configuration parameters) throws Exception {
        logger.info("produce environment : async function for mysql java open ...");
        super.open(parameters);
        SpringApplication application = new SpringApplication(AsyncFunctionForRule.class);
        application.setBannerMode(Banner.Mode.OFF);
        ApplicationContext context = application.run(new String[]{});
        ruleDao = context.getBean(RuleDao.class);
        logger.info("ruleDao:" + ruleDao.toString());
    }

并且要注意在上面这种线上使用的时候,class的头上要加上@EnableAutoConfiguration,@EntityScan(“com.free4inno.RealTimeCommon.entity”), @EnableJpaRepositories(basePackages = “com.free4inno.RealTimeCommon.dao”)注解

但是这样造成的问题是,taskManager上第一个提交的任务新建springboot环境后,占用了8080端口,后面的其他任务再去这个taskManager上提交任务的时候,会重复创建spring boot环境,然后又悲惨的发现8080端口已经被占用,最终导致抱错

因此,正确的思路应该是,判断taskManager上是否有springboot环境,如果没有,新建spring boot环境,如果有,获得之前的spring boot context,代码:

@Override
    public void open(Configuration parameters) throws Exception {
        logger.info("Online and offline environment: async function for mysql java open ...");
        super.open(parameters);
        ApplicationContextUtil contextUtil = new ApplicationContextUtil();
        ApplicationContext applicationContext = contextUtil.getApplicationContext();
        String judge = applicationContext + "";
        logger.info("ApplicationContext judge : "+judge);
        if(judge.equals("null")){
            logger.info("*******Didn't detect environment. Initial now!**********");
            SpringApplication application = new SpringApplication(AsyncFunctionForRule.class);
            application.setBannerMode(Banner.Mode.OFF);
            ApplicationContext context = application.run(new String[]{});
            ApplicationContextUtil.setOwnApplicationContext(context);
            ruleDao = context.getBean(RuleDao.class);
        }else {
            logger.info("*******environment has been establish before!**********");
            ruleDao = contextUtil.getBean(RuleDao.class);
        }
        logger.info("ruleDao:" + ruleDao.toString());
    }

Search

    Table of Contents