본문 바로가기
spring

[Spring] dynamic Scheduler 다이나믹 스케줄링

by moonsiri 2021. 11. 9.
728x90
반응형

@ImportResource({ "classpath:scheduler/context-scheduler.xml" }) 나 @Scheduled 어노테이션을 사용하지 않고 스케줄을 동적으로 등록하는 두가지 방법을 알아보겠습니다.

 

 

1. ThreadPoolTaskScheduler

꼭 아래 예시와 같이 사용할 필욘 없지만, Map에 scheduler를 저장해놓고 destroy 할 수 있습니다.

@Service
public class SchedulerServiceImpl implements SchedulerService {

	private final Map<String, ThreadPoolTaskScheduler> schedulerMap = new ConcurrentHashMap<>();

	public SchedulerServiceImpl() {
		Set<String> typeList = ...;
		for (String type: typeList) {
			this.startScheduler(type);
		}
	}

	/**
	 * 스케줄 시작
	 *
	 * @param type
	 */
	@Override
	public void startScheduler(String type) {
		ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
		scheduler.initialize();
		scheduler.schedule(() -> this.execute(type), new PeriodicTrigger(1, TimeUnit.SECONDS));
		schedulerMap.put(type, scheduler);
	}

	/**
	 * 스케줄 종료
	 *
	 * @param type
	 */
	@Override
	public void destroyScheduler(String type) {
		ThreadPoolTaskScheduler scheduler = schedulerMap.get(type);
		scheduler.shutdown();
		schedulerMap.remove(type);
	}

	/**
	 * 스케줄
	 *
	 */
	private void execute(String type) {
    	// ...
	}

 

 

 

2. SchedulingConfigurer

2.1. VM Option 사용

 

WAS가 여러 개일 때 WAS 마다 다른 스케줄을 실행시키려고 합니다.

기존에는 context-scheduler.xml를 이용하여 각 WAS마다 다른 xml파일로 스케줄을 실행했었는데,

  • @ImportResource({ "classpath:scheduler/context-scheduler.xml" })

이 방법은 WAS 개수만큼 build 해야 하기 때문에 원빌드 후 배포 시 VM Option으로 서버 번호를 넘겨 그 번호에 해당하는 스케줄 Job만 실행되게 동적 스케줄을 구현해보려고 합니다.

  • -jar -Dserver.number=0 {프로젝트명}.jar

 

 

처음에는 원하는 context-scheduler.xml 파일을 불러오려고 아래와 같이 구현했는데, bean이 컨테이너에 제대로 올라가지 않아서 실패하였습니다.

	@Autowired
	private ResourceLoader resourceLoader;
    
	@PostConstruct
	public void init() {
		ApplicationContext context = new ClassPathXmlApplicationContext("scheduler/context-scheduler.xml");
		for (String beanDefinitionNames : context.getBeanDefinitionNames()) {
			System.out.println(beanDefinitionNames);
		}

		final XmlBeanDefinitionReader xmlBeanDefinitionReader = new XmlBeanDefinitionReader(new GenericApplicationContext());

		Resource resource = resourceLoader.getResource("classpath:scheduler/context-scheduler.xml");
		if (resource.exists()) {
			xmlBeanDefinitionReader.loadBeanDefinitions(resource);
		}
	}

 

 

@Scheduled 어노테이션을 사용하면 모든 WAS에서 스케줄이 실행됩니다.

그리고 @Scheduled 어노테이션은 Spring context startup시 한 번만 resolved와 초기화됩니다.

따라서 Spring에서 @Scheduled 어노테이션을 사용하면 runtime에 fixedDelay 또는 fixedRate, cron 값을 변경할 수 없습니다.

 

그래서 스케줄러를 동적으로 등록하는 다른 방법을 찾아보았습니다.

Spring의 SchedulingConfigurer를 사용하면 fixedDelay 또는 fixedRate, cron 값을 동적으로 설정할 수 있습니다.

 

SchedulingConfigurer 인터페이스를 구현하기에 앞서, 우선 실행할 Job을 일괄 등록할 수 있게 Job Service에서 상속받아 공통으로 사용할 인터페이스(DoExecuteJob.java)를 생성합니다.

public interface DoExecuteJob {
 
    void execute();
}

 

실행할 Job은 DoExecuteJob.java를 상속받아 단일 메서드(execute())에 실행할 로직을 추가합니다.

단, 클래스명은 JobImpl로 끝나야 합니다.

@Service
public class AuditLogJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
}
@Service
public class DestructionJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
}
@Service
public class UserJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
}

 

 

Job의 cron 표현식을 정의할 scheduler.properties를 생성합니다.

key 패턴은 {*jobImpl의 bean name}.{serverNumber}.cron.expression로 이루어지게 합니다.

# 스케줄 실행 cron 정의
# *JobImpl.{serverNumber}.cron.expression={cron}


# 0번 서버에서 실행되는 Job
auditLogJobImpl.0.cron.expression=0 30 1 * * ?

# 1번 서버에서 실행되는 Job
destructionJobImpl.1.cron.expression=0 0 1 * * ?
userJobImpl.1.cron.expression=0 0 7 * * ?
@PropertySource(value = {
    "classpath:properties/scheduler.properties"
}, encoding = "UTF-8")
@EnableTransactionManagement
@SpringBootApplication
public class Application {
 
    public static void main(String[] args) throws Exception {
        // ...
    }
 
}

 

 

DynamicSchedulingConfig.java를 생성하고 SchedulingConfigurer 인터페이스를 구현합니다.

@Slf4j
@Configuration
@EnableScheduling
public class DynamicSchedulingConfig implements SchedulingConfigurer {

	private final GenericWebApplicationContext context;
	private final Environment env;
	private final ThreadPoolTaskScheduler taskScheduler;

	@Value("${server.number:0}")
	private Integer serverNumber;

	public SchedulerConfiguration(GenericWebApplicationContext context, Environment env) {
		this.context = context;
		this.env = env;

		this.taskScheduler = new ThreadPoolTaskScheduler();
		this.taskScheduler.setPoolSize(50);
		this.taskScheduler.setThreadNamePrefix("thread-scheduler-task-");
		this.taskScheduler.initialize();
	}

	@Override
	public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
		log.info("########################################################");
		log.info("##### {}번 서버", serverNumber);
		taskRegistrar.setTaskScheduler(this.taskScheduler);

		String[] beanDefinitionNames = context.getBeanDefinitionNames();
		for (String beanName : beanDefinitionNames) {
			if (beanName.endsWith("JobImpl")) {
				try {
					String cronExpr = env.getRequiredProperty(beanName + SEPERATOR_DOT + serverNumber + ".cron.expression");
					DoExecuteJob scheduleJob = context.getBean(beanName, DoExecuteJob.class);
					taskRegistrar.addTriggerTask(scheduleJob::execute, new CronTrigger(cronExpr));
					log.info("## execute schedule job : {} - {}", beanName, cronExpr);
				} catch (Exception e) {
					log.info("## unexecuted schedule job : {}", beanName);
				}
			}
		}

		log.info("########################################################");
	}

	@PreDestroy
	public void shutdown() {
		if (this.taskScheduler != null) {
			this.taskScheduler.shutdown();
		}
	}
}

(1) JopImpl로 끝나는 bean 이름을 필터링합니다.

(2) properties파일에 정의해둔 cron 표현식을 조회하는데, 존재하지 않으면 실행하지 않는 Job으로 판단합니다.

(3) Job Service에서 상속받아 공통으로 사용 중인 인터페이스(DoExecuteJob.java)로 스케줄을 등록합니다.

 

 

-jar -Dserver.number=1 {프로젝트명}.jar 실행 결과 로그

########################################################
##### 1번 서버
## unexecuted schedule job : auditLogJobImpl
## execute schedule job : destructionJobImpl - 0 0 1 * * ?
## execute schedule job : userJobImpl -0 0 7 * * ?
########################################################

 

 

2.2. Redis 활용

2.1.의 VM Option 사용은 젠킨스를 수정해야하는 번거로움이 있고, 서버가 죽으면 해당하는 서버의 스케줄이 동작하지 않는 문제가 있습니다.

이와같은 문제점을 개선하기 위해 Redis를 활용하여 중복 스케줄을 체크하는 로직을 구현하였습니다.

SchedulingConfigurer 인터페이스를 구현하기에 앞서, 우선 실행할 Job을 일괄 등록할 수 있게 Job Service에서 상속받아 공통으로 사용할 인터페이스(DoExecuteJob.java)를 생성합니다.

public interface DoExecuteJob {
 
    void execute();
 
    default boolean logging() {
        return true;    // default: ScheduleRunnableAspect 에서 로그 출력
    }
}

DoExecuteJob 인터페이스의 execute 메서드가 동작할때 실행할 AOP를 생성합니다.

@Slf4j
@Aspect
@Component
public class ScheduleRunnableAspect {
 
    @Resource
    private RedisService redisService;
 
    public static final String REDIS_KEY_PATTERN = "pts:schedule:execute:%s";
 
    /**
     * 스케줄이 중복실행되지 않게 체크하고 로그를 출력
     *
     * @param joinPoint
     * @return
     * @throws Throwable
     */
    @Around("execution(* com.biz.common.job.DoExecuteJob.execute(..))")
    public Object scheduled(ProceedingJoinPoint joinPoint) throws Throwable {
 
        final String className = this.getClassName(joinPoint);
        final String redisKey = String.format(REDIS_KEY_PATTERN, className);
 
        if (redisService.incrby(redisKey) > 1) {
            return null;
        }
 
        boolean logging = this.logging(joinPoint);
 
        Object proceed;
        try {
            if (logging) {
                log.info("## {} execute start ##", className);
            }
            proceed = joinPoint.proceed();
        } finally {
            redisService.delete(redisKey);
            if (logging) {
                log.info("## {} execute end ##", className);
            }
        }
 
        return proceed;
    }
 
    private String getClassName(ProceedingJoinPoint joinPoint) {
        return joinPoint.getTarget().getClass().getSimpleName();
    }
 
    private boolean logging(ProceedingJoinPoint joinPoint) {
        try {
            DoExecuteJob target = (DoExecuteJob) joinPoint.getTarget();
            return target.logging();
        } catch (Exception e) {
            return true;
        }
    }
}

 

실행할 Job은 DoExecuteJob.java를 상속받아 단일 메서드(execute())에 실행할 로직을 추가합니다.

단, 클래스명은 JobImpl로 끝나야 합니다.

@Service
public class AuditLogJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
}
@Service
public class DestructionJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
}
@Service
public class UserJobImpl implements DoExecuteJob {
    // ...
 
    @Override
    public void execute() {
        // 실행 내용
    }
 
    @Override
    public boolean logging() {
        return false; // 로그 미출력
    }
}

 

Job의 cron 표현식을 정의할 scheduler.properties를 생성합니다.

key 패턴은 {Class명(implements DoExecuteJob)}.{serverNumber}.cron.expression로 이루어지게 합니다.

# 스케줄 실행 cron 정의
# {Class명(implements DoExecuteJob)}.cron.expression={cron}


auditLogJobImpl.cron.expression=0 30 1 * * ?
destructionJobImpl.cron.expression=0 0 1 * * ?
userJobImpl.cron.expression=0 0 7 * * ?
@PropertySource(value = {
    "classpath:properties/scheduler.properties"
}, encoding = "UTF-8")
@EnableTransactionManagement
@SpringBootApplication
public class Application {
 
    public static void main(String[] args) throws Exception {
        // ...
    }
 
}

 

 

DynamicSchedulingConfig.java를 생성하고 SchedulingConfigurer 인터페이스를 구현합니다.

@Slf4j
@Configuration
@EnableScheduling
public class SchedulerConfiguration implements SchedulingConfigurer {
 
    private final GenericWebApplicationContext context;
    private final Environment env;
 
    private final ThreadPoolTaskScheduler taskScheduler;
 
    public SchedulerConfiguration(GenericWebApplicationContext context, Environment env) {
        this.context = context;
        this.env = env;
 
        this.taskScheduler = new ThreadPoolTaskScheduler();
        this.taskScheduler.setPoolSize(50);
        this.taskScheduler.setThreadNamePrefix("pts-scheduler-");
        this.taskScheduler.initialize();
    }
 
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        log.info("########################################################");
        taskRegistrar.setTaskScheduler(taskScheduler);
 
        Map<String, DoExecuteJob> beans = context.getBeansOfType(DoExecuteJob.class);
 
        for (Map.Entry<String, DoExecuteJob> jobEntry : beans.entrySet()) {
 
            final String beanName = jobEntry.getKey();
            try {
                String cronExpr = env.getRequiredProperty(beanName + ".cron.expression");
                DoExecuteJob scheduleJob = jobEntry.getValue();
                taskRegistrar.addTriggerTask(scheduleJob::execute, new CronTrigger(cronExpr));
                log.info("## execute schedule job : {} - {}", beanName, cronExpr);
            } catch (Exception e) {
                log.info("## unexecuted schedule job : {}", beanName);
            }
        }
 
        log.info("########################################################");
    }
 
    @PreDestroy
    public void shutdown() {
        if (this.taskScheduler != null) {
            this.taskScheduler.shutdown();
        }
    }
}

(1) DoExecuteJob를 사용하는 클래스들을 조회해옵니다.

(2) properties파일에 정의해둔 cron 표현식을 조회하는데, 존재하지 않으면 실행하지 않는 Job으로 판단합니다.

(3) Job Service에서 상속받아 공통으로 사용 중인 인터페이스(DoExecuteJob.java)로 스케줄을 등록합니다.

 

 

-jar {프로젝트명}.jar 실행 결과 로그

########################################################
## unexecuted schedule job : auditLogJobImpl
## execute schedule job : destructionJobImpl - 0 0 1 * * ?
## execute schedule job : userJobImpl -0 0 7 * * ?
########################################################

 

 

 

 

 

[Reference]

https://www.baeldung.com/spring-scheduled-tasks

https://www.baeldung.com/spring-task-scheduler

728x90
반응형

댓글