-
Notifications
You must be signed in to change notification settings - Fork 24
Event service
In the event service mode, the Pilot launches and feeds a payload with event ranges (a set of events to be processed) downloaded from the server. The event service mode itself is set internally, after the Pilot receives a corresponding event service job by the PanDA server. For such a job, the job definition JSON contains the instruction 'eventService' (for normal event service jobs) or 'eventServiceMerge' (for event service merge jobs, in which the pilot merges many small event service files into a bigger file).
Event service mode is selected when the pilot downloads a job carrying one of the event service identifiers ('eventService' or 'eventServiceMerge') in the job description. Seeing one of these identifiers, the pilot internally sets the 'is_eventservice' or 'is_eventservicemerge' booleans that are data members of the JobData class (i.e. they end up in the job object as job.is_eventservice and job.is_eventservicemerge). The pilot later uses them in pilot/control/payload to select the proper payload executor (eventservice.Executor or eventservicemerge.Executor - the executor for running 'normal' jobs is called generic.Executor). The pilot starts the selected executor by calling its run() function. The main pilot keeps running its normal threads in the meantime for monitoring.
The eventServiceMerge workflow is very similar to the generic workflow, while the main eventService workflow is very different. In it's run_payload() function (called from the run() function), the executor type is selected. For a normal event service job to be run on a grid site, the 'generic' is used. For the Raythena workflow, the executor type is 'raythena'. As of April 2020, the executor type has to be set in the pilot config file although it might be preferable to use AGIS instead. When the payload (job object) has been attached to the selected executor, the executor itself is launched by calling its start() function. When the GenericExecutor starts, its run() function gets called since it is a thread. The run() function in turn starts the ESProcess, described in the next section.
pilot.control.payload.execute_payloads()
'-> pilot.control.payloads.eventservice.Executor.run() (generic.Executor.run())
'-> pilot.control.payloads.eventservice.Executor.run_payload()
'-> pilot.eventservice.workexecutor.workexecutor.start() (plugin gets selected using config file, thread starts, run() gets executed)
'-> pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor.start() (basexecutor.BaseExecutor.start())
'-> pilot.eventservice.communicationmanager.communicationmanager.start() (thread starting)
'-> pilot.eventservice.workexecutor.plugins.genericexecutor.run()
'-> pilot.eventservice.esprocess.esprocess.ESProcess.start()
ESProcess works as an independent thread with two hooks to communicate without outside. Callers needs to implement these two hooks and link them to the ESProcess when using it.
* get_event_ranges_hook: to get events
* handle_output_message_hook: to handle outputs
pilot.control.payloads.eventservice.py pilot.eventservice.workexecutor.plugins.genericexecutor.py Implement get_event_ranges_hook and handle_output_message_hook and link them to ESProcess. The handle_output_message_hook calls ESStageOut client to stageout ES outputs.