[ABAP] Parallelverarbeitung / Multitasking

Auch SAP kann Multitasking. Dazu muss mann einen RFC-fähigen Funktionsbaustein in einem neuen Task innerhalb einer sogenannten Taskgroup aufrufen. Die Abarbeitung erfolgt dann asynchron, dass Ergebnis wird in einer Handler-Funktion zurückgeliefert.

Im folgenden Funktionsbeispiel erzeugt das Rahmenprogramm “zparallel” Arbeitspakete (it_input) für die Tasks und ein Objekt der Klasse “lcl_task_manager”. Letzteres managed die Verteilung der Eingangsdaten (it_input) auf mehrere Tasks. Die Taskobjekte werden durch die Klasse “lcl_task” abgebildet. Die einzelnen Tasks rufen asynchron mit ihren Arbeitspaketen den RFC-fähigen Funktionsbaustein “BAPI_BILLINGDOC_GETLIST”.

Nach erfolgreichem Beenden aller Tasks werden die Ergebnisse der Arbeitspakete zusammengesetzt (it_result) und an das Rahmenprogramm zurückgegeben.

Weiterführende Informationen: Link, Link, Link und Link

Rahmenprogramm: REPORT zparallel (Aufruf der Parallelverarbeitung)

REPORT zparallel.

* Klassen für Taskmanagement
INCLUDE ztask_manager.

* iTabs für Eingabe- und Ausgabedaten
DATA: it_input TYPE lcl_task=>ty_it_input_data.
DATA: it_result TYPE lcl_task_manager=>ty_it_final_result.
DATA: it_statistic TYPE lcl_task_manager=>ty_it_statistic.

PARAMETERS: p_rows TYPE i DEFAULT 1000. " Test-Elemente
PARAMETERS: p_wait TYPE i DEFAULT 5.    " Wartezeit in [s]
PARAMETERS: p_max_rt TYPE i DEFAULT 3.  " max. Anzahl Versuche
PARAMETERS: p_size TYPE i DEFAULT 100.  " Intervallgröße

START-OF-SELECTION.

  DATA: it_vbelv TYPE STANDARD TABLE OF vbelv WITH DEFAULT KEY.

* Intervalle erzeugen
  SELECT DISTINCT vbelv FROM vbfa INTO TABLE @it_vbelv UP TO @p_rows ROWS.

  IF sy-subrc = 0.
    SORT: it_vbelv BY table_line.

    DATA(lv_lines) = lines( it_vbelv ).
    DATA(lv_parts) = lv_lines DIV p_size.
    DATA(lv_mod)   = lv_lines MOD p_size.
    IF lv_mod <> 0.
      lv_parts = lv_parts + 1.
    ENDIF.

    DATA(lv_idx_l) = 0.
    DATA(lv_idx_h) = 0.

    DO lv_parts TIMES.
      lv_idx_l = lv_idx_h + 1.
      lv_idx_h = lv_idx_l - 1 + p_size.

      IF lv_idx_h > lv_lines.
        lv_idx_h = lv_lines.
      ENDIF.

      APPEND VALUE #( sign         = 'I'
                      option       = 'BT'
                      ref_doc_low  = it_vbelv[ lv_idx_l ]
                      ref_doc_high = it_vbelv[ lv_idx_h ] ) TO it_input.
    ENDDO.

* Referenz auf Tastmanager für Parallelverarbeitung
    DATA(o_task_manager) = lcl_task_manager=>get_instance( i_task_group = lcl_task_manager=>co_default_task_group ).

    DATA: lv_started TYPE i.
    DATA: lv_ended TYPE i.
    DATA: lv_missed TYPE i.

* Timer starten
    DATA(o_timer) = cl_abap_runtime=>create_hr_timer( ).
    DATA(usec_start) = o_timer->get_runtime( ).

* Parallele Verarbeitung der Eingabedaten starten
    IF abap_true = o_task_manager->start_parallel_working( EXPORTING
                                                             i_it_input_data = it_input
                                                             i_wait_time_sec = p_wait
                                                             i_max_retries   = p_max_rt
                                                           IMPORTING
                                                             e_it_final_result = it_result
                                                             e_it_statistic    = it_statistic
                                                             e_tasks_started   = lv_started
                                                             e_tasks_ended     = lv_ended
                                                             e_tasks_missed    = lv_missed ).

* Timer stoppen
      DATA(usec_end) = o_timer->get_runtime( ).
      DATA(usec) = CONV decfloat16( usec_end - usec_start ).
      DATA(sec) = usec / 1000000.

      WRITE: / '    Zeit [µs]:', usec.
      WRITE: / '     Zeit [s]:', sec.
      WRITE: / '   max. Tasks:', o_task_manager->get_max_tasks( ).
      WRITE: / '   free Tasks:', o_task_manager->get_free_tasks( ).
      WRITE: / 'Tasks started:', lv_started.
      WRITE: / '  Tasks ended:', lv_ended.
      WRITE: / ' Tasks missed:', lv_missed.

      WRITE: / '  Input lines:', lines( it_input ).
      WRITE: / ' Result lines:', lines( it_result ).
      ULINE.
      WRITE: / | Task #     \| Retries     \| Time [µs]  \| S \| IN: LOW    \| IN: HIGH   \| SUBRC  \| SIZE|.
      ULINE.
      LOOP AT it_statistic ASSIGNING FIELD-SYMBOL(<fs_stat>).
        WRITE: / <fs_stat>-task_number, '|', <fs_stat>-retries, '|', <fs_stat>-runtime, '|', <fs_stat>-successful, '|', <fs_stat>-input_data-ref_doc_low, '|', <fs_stat>-input_data-ref_doc_high, '|', <fs_stat>-subrc, '|', <fs_stat>-output_part_size.
      ENDLOOP.

      ULINE.
      WRITE: / | ITEM #     \| IN: LOW    \| IN: HIGH   \| OUT: REF_DOC|.
      ULINE.
* Ergebnisausgabe
      LOOP AT it_result ASSIGNING FIELD-SYMBOL(<fs_res>).
        WRITE: / sy-tabix, '|', <fs_res>-input_data-ref_doc_low, '|', <fs_res>-input_data-ref_doc_high, '|', <fs_res>-output_data-ref_doc.
      ENDLOOP.
    ENDIF.
  ENDIF.

Task-Klassen: INCLUDE ZTASK_MANAGER

*--------------------------------------------------------------------*
* Task-Klasse (Worker)
*--------------------------------------------------------------------*
* Ruft asynchon einen RFC-fähigen Funktionsbaustein
* und verarbeitet asynchron das Ergebnis (Event)
*--------------------------------------------------------------------*
CLASS lcl_task DEFINITION FINAL.
  PUBLIC SECTION.

* Eingabedaten des Task
    TYPES: ty_input_data TYPE bapi_ref_doc_range.
* Ausgabedaten des Task
    TYPES: ty_output_data TYPE bapivbrksuccess.

    TYPES: ty_it_input_data TYPE STANDARD TABLE OF ty_input_data WITH DEFAULT KEY.
    TYPES: ty_it_output_data TYPE STANDARD TABLE OF ty_output_data WITH DEFAULT KEY.

* Name des RFC-fähigen Funktionsbausteines für "start_async_call" und "task_receive"-Handler
    CONSTANTS: co_fb_name TYPE string VALUE 'BAPI_BILLINGDOC_GETLIST'.

* Event, wird getriggert, wenn die asynchrone Methode task_receive aufgerufen wird
    EVENTS:
      data_received
        EXPORTING
          VALUE(e_subrc) TYPE sy-subrc
          VALUE(e_runtime) TYPE int4
          VALUE(e_retries) TYPE i
          VALUE(e_input_data) TYPE ty_input_data
          VALUE(e_output_data) TYPE ty_it_output_data.

* Konstruktor
    METHODS:
      constructor
        IMPORTING
          i_task_group  TYPE rzllitab-classname
          i_task_number TYPE i.

* Startet asynchronen Aufruf des RFC-Bausteins
    METHODS:
      start_async_call
        IMPORTING
                  i_input_data              TYPE ty_input_data
                  i_started_after_n_retries TYPE i
        RETURNING VALUE(rv_subrc)           TYPE sy-subrc.

* Handlermethode, wird asynchron gerufen
    METHODS:
      task_receive
        IMPORTING
          p_task TYPE clike.

* Hilfsfunktionen
    METHODS:
      is_running
        RETURNING VALUE(rv_running) TYPE boolean.

    METHODS:
      get_group
        RETURNING VALUE(rv_group) TYPE rzllitab-classname.

    METHODS:
      get_task_number
        RETURNING VALUE(rv_number) TYPE i.

  PRIVATE SECTION.

    DATA: lv_group TYPE rzllitab-classname.
    DATA: lv_number TYPE i.
    DATA: lv_running TYPE boolean.
    DATA: lv_started_after_n_retries TYPE i.

    DATA: lv_start_time TYPE int4.
    DATA: lv_end_time TYPE int4.

* Merker für Übergabe-Parameter in start_async_call
    DATA: lv_input_data TYPE ty_input_data.
ENDCLASS.

CLASS lcl_task IMPLEMENTATION.
*--------------------------------------------------------------------*
* Konstruktor
*--------------------------------------------------------------------*
* -> i_task_group  - Gruppenname
* -> i_task_number - Tasknummer
*--------------------------------------------------------------------*
  METHOD constructor.
    lv_group                   = i_task_group.
    lv_number                  = i_task_number.
    lv_running                 = abap_false.
    lv_start_time              = 0.
    lv_end_time                = 0.
    lv_started_after_n_retries = 0.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Startet asynchronen Aufruf des RFC-Bausteins
*--------------------------------------------------------------------*
* -> i_input_data              - Eingabedaten für Verarbeitung
* -> i_started_after_n_retries - Statistikwert, nach wieviel Versuchen
*                                der Task gestartet werden konnte
* <- rv_subrc                  - Status des asynchronen Aufrufs
*--------------------------------------------------------------------*
  METHOD start_async_call.

    lv_running                 = abap_false.
    lv_started_after_n_retries = i_started_after_n_retries.

    GET RUN TIME FIELD lv_start_time.

* neuen asynchronen Aufruf des Bausteines starten
* Gruppe, Tasknummer und Handlerfunktion übergeben
* Baustein muss RFC-fähig sein
* Verfügbare RFC-Gruppen für Parallelverarbeitung: Transaktion RZ12
* Namen und Typen der EXPORTING-Parameter beachten:
* -> fehlerhafte Angaben werden nicht vom Compiler bemerkt oder auch nicht als sy-subrc zurückgegeben
    CALL FUNCTION co_fb_name
      STARTING NEW TASK lv_number
      DESTINATION IN GROUP lv_group
      CALLING task_receive ON END OF TASK
      EXPORTING
        refdocrange           = i_input_data
      EXCEPTIONS
        system_failure        = 1
        communication_failure = 2
        resource_failure      = 3
        OTHERS                = 4.

* Fehlerstatus zurückgeben
    rv_subrc = sy-subrc.

    IF rv_subrc = 0.
      lv_input_data      = i_input_data.
      lv_running         = abap_true.
    ENDIF.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Receive-Funktion, welche bei Fertigsstellung vom Task asynchron
* getriggert wird
* Es werden keine WRITE-Statements ausgeführt!
*--------------------------------------------------------------------*
* -> p_task            - Tasknummer
*--------------------------------------------------------------------*
  METHOD task_receive.

    DATA: it_task_data TYPE ty_it_output_data.

* Ergebnisse des asynchronen RFC-Aufrufes vom Funktionsbaustein holen
    RECEIVE RESULTS FROM FUNCTION co_fb_name
      TABLES
        success               = it_task_data
      EXCEPTIONS
        system_failure        = 1
        communication_failure = 2
        resource_failure      = 3
        OTHERS                = 4.

    GET RUN TIME FIELD lv_end_time.

    DATA(lv_runtime) = lv_end_time - lv_start_time.

* Event für Datenrückgabe triggern
    RAISE EVENT data_received
      EXPORTING
        e_subrc       = sy-subrc
        e_runtime     = lv_runtime
        e_retries     = lv_started_after_n_retries
        e_input_data  = lv_input_data
        e_output_data = it_task_data.

    lv_running = abap_false.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Gibt Status des Tasks
*--------------------------------------------------------------------*
* <- rv_running - Status des Tasks
*--------------------------------------------------------------------*
  METHOD is_running.
    rv_running = lv_running.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Gibt Gruppenname, in welcher der Task läuft
*--------------------------------------------------------------------*
* <- rv_group - Gruppenname
*--------------------------------------------------------------------*
  METHOD get_group.
    rv_group = lv_group.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Gibt Tasknummer
*--------------------------------------------------------------------*
* <- rv_number - Tasknummer
*--------------------------------------------------------------------*
  METHOD get_task_number.
    rv_number = lv_number.
  ENDMETHOD.
ENDCLASS.
*--------------------------------------------------------------------*
* Verwaltungs-Klasse für Tasks
*--------------------------------------------------------------------*
* 1. Verarbeitet Eingabedaten (Workitems)
* 2. Holt für jedes Workitem einen freien Task
* 3. Startet pro Workitem einen Task (asynchron)
* 4. Wertet das asynchron gelieferte Ergebis in einer Handlermethode aus
*--------------------------------------------------------------------*
* Sinnvoll ist die Implementierung nur, wenn die Workprozesse
* ausreichend lange dauern, so dass sich eine parallele Verarbeitung
* überhaupt lohnt
*--------------------------------------------------------------------*
CLASS lcl_task_manager DEFINITION FINAL CREATE PRIVATE.
  PUBLIC SECTION.

* Datentyp für finale Ausgabe, wenn alle Workprozesse fertig sind
    TYPES: BEGIN OF ty_final_result,
             input_data  TYPE lcl_task=>ty_input_data,
             output_data TYPE lcl_task=>ty_output_data,
           END OF ty_final_result.

    TYPES: ty_it_final_result TYPE STANDARD TABLE OF ty_final_result WITH DEFAULT KEY.

    TYPES: BEGIN OF ty_statistic,
             task_number      TYPE i,
             runtime          TYPE int4,
             retries          type i,
             input_data       TYPE lcl_task=>ty_input_data,
             output_part_size TYPE i,
             subrc            TYPE sy-subrc,
             successful       TYPE boolean,
           END OF ty_statistic.

    TYPES: ty_it_statistic TYPE STANDARD TABLE OF ty_statistic WITH DEFAULT KEY.

* RFC-Gruppenname
* die Gruppe mit classname = 'parallel_generators' wird standardmäßig von Admins eingerichtet
* Gruppen sind in Tabelle "RZLLITAB" abgelegt, grouptype = 'S' (Server)
* Transaktion RZ12
    CONSTANTS: co_default_task_group TYPE rzllitab-classname VALUE 'parallel_generators'.

* Factory-Funktion
    CLASS-METHODS:
      get_instance
        IMPORTING
                  i_task_group               TYPE rzllitab-classname DEFAULT co_default_task_group
        RETURNING VALUE(rv_ref_task_manager) TYPE REF TO lcl_task_manager.

* Startet die parallele Abarbeitung der Eingabedaten
    METHODS:
      start_parallel_working
        IMPORTING
                  i_it_input_data         TYPE lcl_task=>ty_it_input_data
                  i_wait_time_sec         TYPE i DEFAULT 10
                  i_max_retries           TYPE i DEFAULT 3
        EXPORTING
                  e_it_final_result       TYPE ty_it_final_result
                  e_it_statistic          TYPE ty_it_statistic
                  e_tasks_started         TYPE i
                  e_tasks_ended           TYPE i
                  e_tasks_missed          TYPE i
        RETURNING VALUE(rv_work_complete) TYPE boolean.

    METHODS:
      get_max_tasks
        RETURNING VALUE(rv_max_tasks) TYPE i.

    METHODS:
      get_free_tasks
        RETURNING VALUE(rv_free_tasks) TYPE i.

  PRIVATE SECTION.

    TYPES: BEGIN OF ty_task,
             task_number TYPE i,
             o_task      TYPE REF TO lcl_task,
           END OF ty_task.

* Null-Referenz
    CONSTANTS: null TYPE REF TO lcl_task_manager VALUE IS INITIAL.

* Referenz auf ein Task-Objekt
    CLASS-DATA: o_task_manager TYPE REF TO lcl_task_manager.

    CLASS-DATA: lv_max_tasks TYPE i.
    CLASS-DATA: lv_free_tasks TYPE i.

* Zähler
    CLASS-DATA: lv_tasks_started TYPE i.
    CLASS-DATA: lv_tasks_ended TYPE i.
    CLASS-DATA: lv_active_tasks TYPE i.

* globale Taskliste
    CLASS-DATA: it_tasks TYPE SORTED TABLE OF ty_task WITH UNIQUE KEY task_number.

* Ergebnisliste
    CLASS-DATA: it_final_result TYPE ty_it_final_result.

* Ergebnisliste
    CLASS-DATA: it_statistic TYPE ty_it_statistic.

* Handler für data_received-Ereignis der Tasks
    METHODS:
      on_task_completed FOR EVENT data_received OF lcl_task
        IMPORTING
            e_subrc
            e_runtime
            e_retries
            e_input_data
            e_output_data
            sender.

* Hilfsfunktionen
    METHODS:
      get_free_task
        RETURNING VALUE(rv_task_number) TYPE i.

    METHODS:
      get_task
        IMPORTING
                  i_task_number      TYPE i
        RETURNING VALUE(rv_ref_task) TYPE REF TO lcl_task.

    METHODS:
      start_task
        IMPORTING
                  i_input_data    TYPE lcl_task=>ty_input_data
                  i_wait_time_sec TYPE i DEFAULT 10
                  i_max_retries   TYPE i DEFAULT 3
        RETURNING VALUE(rv_ok)    TYPE boolean.
ENDCLASS.
*--------------------------------------------------------------------*
CLASS lcl_task_manager IMPLEMENTATION.
*--------------------------------------------------------------------*
* Factory Methode für Singleton (genau eine Referenz) auf den
* Taskmanager zurück
*--------------------------------------------------------------------*
* -> i_task_group        - Name der RFC-Task-Group, in der die Tasks ablaufen
*                          sollen
*                          Vorgabewert ist 'parallel_generators', welche
*                          standardmäßig von Admins eingerichtet wird
*                          Gruppen sind in Tabelle "RZLLITAB" abgelegt,
*                          grouptype = 'S' (Server)
*                          Transaktion RZ12
* <- rv_ref_task_manager - Referenz auf Task-Manager-Objekt
*--------------------------------------------------------------------*
  METHOD get_instance.

    rv_ref_task_manager = null.

    IF o_task_manager IS INITIAL.

      DATA(lv_task_group) = i_task_group.

      lv_max_tasks = 0.
      lv_free_tasks = 0.

      IF lv_task_group IS INITIAL.
* falls keine task group übergeben wurde, eine heraussuchen
* Verfügbare RFC-Gruppen für Parallelverarbeitung: Transaktion RZ12
        SELECT SINGLE classname FROM rzllitab INTO @lv_task_group WHERE grouptype = 'S'.
      ENDIF.

      IF NOT lv_task_group IS INITIAL.

* Start neue LUW
        COMMIT WORK.

* Parallel Background Tasks - Initialisierung der PBT-Umgebung
* Sollte nur 1x aufgerufen werden, sonst Exception "pbt_env_already_initialized"
        CALL FUNCTION 'SPBT_INITIALIZE'
          EXPORTING
            group_name                     = lv_task_group
          IMPORTING
            max_pbt_wps                    = lv_max_tasks
            free_pbt_wps                   = lv_free_tasks
          EXCEPTIONS
            invalid_group_name             = 1
            internal_error                 = 2
            pbt_env_already_initialized    = 3
            currently_no_resources_avail   = 4
            no_pbt_resources_found         = 5
            cant_init_different_pbt_groups = 6
            OTHERS                         = 7.

* wenn Initialisierung ok
        IF sy-subrc = 0.
* wenn genug freie Tasks vorhanden
          IF lv_free_tasks >= 1.

            o_task_manager = NEW #( ).

            lv_tasks_started = 0.
            lv_tasks_ended   = 0.
            lv_active_tasks  = 0.

            CLEAR: it_tasks.

* Anzahl freie Tasks zur Liste hinzufügen
            DO lv_free_tasks TIMES.
              DATA(o_task) = NEW lcl_task( i_task_number = sy-index
                                           i_task_group  = lv_task_group ).

* Eventhandler für beendete Tasks
              SET HANDLER o_task_manager->on_task_completed FOR o_task.

              APPEND VALUE #( task_number = sy-index
                              o_task      = o_task ) TO it_tasks.
            ENDDO.

            rv_ref_task_manager = o_task_manager.
          ENDIF.

        ENDIF.

      ENDIF.

    ELSE.
      rv_ref_task_manager = o_task_manager.
    ENDIF.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Parallele / asynchrone Abarbeitung anschieben
*--------------------------------------------------------------------*
* -> i_it_input_data   - Eingabedaten
* -> i_wait_time_sec   - Wartezeit für Abarbeitung aller Tasks in sec
* -> i_max_retries     - max. Anz. Versuche
* <- e_it_final_result - Erbebnismenge der parallelen Verarbeitung
* <- e_tasks_started   - Anz. gestartete Tasks
* <- e_tasks_ended     - Anz. beendete Tasks (sollte im Idealfall
*                        e_tasks_started entsprechen
* <- e_tasks_missed    - Anz. Tasks, die nicht gestartet werden
*                        konnten
*--------------------------------------------------------------------*
  METHOD start_parallel_working.

    rv_work_complete = abap_false.

    lv_tasks_started = 0.
    lv_tasks_ended   = 0.
    lv_active_tasks  = 0.

    CLEAR: it_final_result.
    CLEAR: it_statistic.

    DATA(lv_tasks_missed) = 0.

* Übergebene Workitems abarbeiten
    LOOP AT i_it_input_data ASSIGNING FIELD-SYMBOL(<fs_input_data>).
* pro Workitem einen Task (asynchron) starten
      IF me->start_task( i_input_data  = <fs_input_data>
                         i_max_retries = i_max_retries ).
      ELSE.
* Fehler: kein Task für Workitem verfügbar
        lv_tasks_missed = lv_tasks_missed + 1.

        APPEND VALUE #( task_number      = -1
                        runtime          = 0
                        retries          = i_max_retries
                        input_data       = <fs_input_data>
                        output_part_size = 0
                        subrc            = -1
                        successful       = abap_false ) TO it_statistic.
      ENDIF.
    ENDLOOP.

* warten bis alle Tasks fertig sind
    WAIT FOR ASYNCHRONOUS TASKS UNTIL ( lv_active_tasks = 0 ) UP TO i_wait_time_sec SECONDS.
    IF sy-subrc = 0.
      rv_work_complete = abap_true.
    ENDIF.

* Ergebnisrückgabe
    e_it_final_result = it_final_result.
    e_it_statistic    = it_statistic.
    e_tasks_started   = lv_tasks_started.
    e_tasks_ended     = lv_tasks_ended.
    e_tasks_missed    = lv_tasks_missed.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Gibt Anzahl max. Tasks
*--------------------------------------------------------------------*
* <- rv_max_tasks - Anzahl max. Tasks
*--------------------------------------------------------------------*
  METHOD get_max_tasks.
    rv_max_tasks = lv_max_tasks.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Gibt Anzahl freie Tasks
*--------------------------------------------------------------------*
* <- rv_free_tasks - Anzahl freie Tasks
*--------------------------------------------------------------------*
  METHOD get_free_tasks.
    rv_free_tasks = lv_free_tasks.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Einen Task starten
*--------------------------------------------------------------------*
* -> i_input_data    - Eingabedaten für die Verarbeitung
* -> i_wait_time_sec - Wartezeit für Abarbeitung aller Tasks in sec
* -> i_max_retries   - max. Anz. Versuche wenn System "busy" bis
*                    - Abbruch erfolgt
* <- rv_ok           - true, wenn Task gestartet werden konnte
*--------------------------------------------------------------------*
  METHOD start_task.

    rv_ok = abap_false.

    DATA(lv_retries) = 0.

* n Neuversuche einen freien Task zu bekommen
    WHILE lv_retries < i_max_retries.
      DATA(lv_tasknumber) = me->get_free_task( ).

      IF lv_tasknumber <> -1.
        DATA(o_task) = me->get_task( lv_tasknumber ).

        IF o_task IS BOUND.

          DATA(lv_subrc) = o_task->start_async_call( i_input_data              = i_input_data
                                                     i_started_after_n_retries = lv_retries ).

          CASE lv_subrc.
            WHEN 0.
* Ok
              lv_tasks_started = lv_tasks_started + 1.
              lv_active_tasks  = lv_active_tasks + 1.
              lv_retries       = i_max_retries.

              rv_ok = abap_true.
            WHEN 1 OR 2.
* Failure
              lv_retries = i_max_retries.
            WHEN 3.
* Busy
              IF lv_tasks_started > 0.
                WAIT FOR ASYNCHRONOUS TASKS UNTIL ( lv_tasks_ended >= lv_tasks_started ) UP TO i_wait_time_sec SECONDS.
              ENDIF.

              IF sy-subrc = 0.
                lv_retries = lv_retries + 1.
              ELSE.
                lv_retries = i_max_retries.
              ENDIF.
            WHEN OTHERS.
* Sollte nicht auftreten
              lv_retries = i_max_retries.
          ENDCASE.
        ELSE.
* Task not found (sollte nicht vorkommen)
          lv_retries = lv_retries + 1.
        ENDIF.
      ELSE.
* kein freier Task verfügbar -> warten
        WAIT FOR ASYNCHRONOUS TASKS UNTIL ( lv_tasks_ended >= lv_tasks_started ) UP TO i_wait_time_sec SECONDS.
        IF sy-subrc = 0.
          lv_retries = lv_retries + 1.
        ELSE.
          lv_retries = i_max_retries.
        ENDIF.
      ENDIF.
    ENDWHILE.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Freien Task aus der Liste der verfügbaren suchen
*--------------------------------------------------------------------*
* <- rv_task_number - Tasknummer des gefundenen Tasks
*--------------------------------------------------------------------*
  METHOD get_free_task.

    rv_task_number = -1.

    LOOP AT it_tasks ASSIGNING FIELD-SYMBOL(<fs_task>).
      IF <fs_task>-o_task->is_running( ) = abap_false.
        rv_task_number = <fs_task>-task_number.
        EXIT.
      ENDIF.
    ENDLOOP.

  ENDMETHOD.
*--------------------------------------------------------------------*
* Referenz auf Task anhand der Tasknummer holen
*--------------------------------------------------------------------*
* -> i_task_number - Tasknummer
* <- rv_ref_task   - Referenz auf Task
*--------------------------------------------------------------------*
  METHOD get_task.
    IF line_exists( it_tasks[ task_number = i_task_number ] ).

      ASSIGN it_tasks[ task_number = i_task_number ] TO FIELD-SYMBOL(<fs_task>).

      IF <fs_task> IS ASSIGNED.
        rv_ref_task = <fs_task>-o_task.
      ENDIF.

    ENDIF.
  ENDMETHOD.
*--------------------------------------------------------------------*
* Eventhandler, wenn Task ferig
*--------------------------------------------------------------------*
* -> e_input_data  - Eingabedaten des Tasks
* -> e_output_data - Ausgabedaten des Tasks
* -> sender        - Referenz auf Task
*--------------------------------------------------------------------*
  METHOD on_task_completed.

* Zähler setzen
    lv_tasks_ended = lv_tasks_ended + 1.
    lv_active_tasks = lv_active_tasks - 1.

    APPEND VALUE #( task_number      = sender->get_task_number( )
                    runtime          = e_runtime
                    retries          = e_retries
                    input_data       = e_input_data
                    output_part_size = lines( e_output_data )
                    subrc            = e_subrc
                    successful       = abap_true ) TO it_statistic.

* Hier Daten an Ergebnistabelle einfach anfügen
    LOOP AT e_output_data ASSIGNING FIELD-SYMBOL(<fs_output_data>).
      APPEND VALUE #( input_data  = e_input_data
                      output_data = <fs_output_data> ) TO it_final_result.
    ENDLOOP.

  ENDMETHOD.
ENDCLASS.

[ABAP] OO: Events auslösen, Eventhandling

* Demo-Objekt, welches ein Event definiert und auslöst
CLASS lcl_demo DEFINITION.
  PUBLIC SECTION.
* der Event, der getriggert werden soll
    EVENTS:
      my_event
        EXPORTING
          VALUE(txt) TYPE string
          VALUE(val) TYPE i.

* Instanzkonstruktor mit Übergabeparameter
    METHODS:
      constructor
        IMPORTING
          iv_name TYPE string.

* die Methode, die den Event auslöst
    METHODS: write.

* gibt Namen zurück
    METHODS: get_name
      RETURNING VALUE(rv_name) TYPE string.

* Variable zum Speichern des Namens
  PRIVATE SECTION.
    DATA: gv_name TYPE string.
ENDCLASS.

CLASS lcl_demo IMPLEMENTATION.
* Instanzkonstruktor mit Übergabeparameter
  METHOD constructor.
    gv_name = iv_name.
  ENDMETHOD.
* die Methode, die den Event auslöst
  METHOD write.
    WRITE: / '1. lcl_demo triggert Event "my_event"'.
* Event auslösen
    RAISE EVENT my_event
      EXPORTING
        txt = 'Übergabetext'
        val = 1.
  ENDMETHOD.
* gibt Namen zurück
  METHOD get_name.
    rv_name = gv_name.
  ENDMETHOD.
ENDCLASS.

* Eventhandlerklasse, die Methoden zur Ereignisbehalndlung abbildet
CLASS lcl_eventhandler DEFINITION.
  PUBLIC SECTION.
* die Ereignisbehandlungsroutine, die mit dem Event über SET HANDLER verknüpft wird
    CLASS-METHODS: on_my_event FOR EVENT my_event OF lcl_demo
      IMPORTING
          txt
          val
          sender.
ENDCLASS.

CLASS lcl_eventhandler IMPLEMENTATION.
* die Ereignisbehandlungsroutine, die mit dem Event verknüpft wird
  METHOD on_my_event.
    WRITE: / '2. lcl_eventhandler=>on_my_event:'.
    WRITE: / '   Name:', sender->get_name( ).
    WRITE: / '    txt:', txt.
    WRITE: / '    val:', val.
  ENDMETHOD.
ENDCLASS.

START-OF-SELECTION.
* Objekt erzeugen
  DATA(o_evt) = NEW lcl_demo( 'Aufrufer-Objekt' ).

* Handler registrieren: Zuordnung von Eventhandler "on_my_event" zu Objekt der Demo-Klasse mit Eventtrigger für my_event
  SET HANDLER lcl_eventhandler=>on_my_event FOR o_evt.

* Funktionsaufruf, welcher den Event triggert
  o_evt->write( ).

Weiterführende Infos: Link und Link