
    ciVB                     f    d dl Z d dlZd dlZd dlmZmZmZ  G d d          Zde j        defdZ	dS )    N)AnyCallable	Coroutinec                   
   e Zd ZdZd Zd Zd ZdedefdZ	dedefd	Z
ded
edeeeef         f         fdZddd
edeeeef         f         dedz  defdZd
edeeeef         f         ddfdZddefdZd Zdej        ded
edeeeef         f         fdZdej        ded
edeeeef         f         fdZded
edeeeef         f         fdZdej        dedefdZdej        dedefdZdej        dedefdZdS )BackgroundSchedulerzf
    Schedules background tasks execution either in separate thread or in the running event loop.
    c                     d | _         g | _        t          j                    | _        d| _        d | _        d | _        t          j                    | _	        d S )NF)
_next_timer_event_loops	threadingLock_lock_stopped_health_check_loop_health_check_threadEvent_health_check_loop_readyselfs    k/var/www/html/web/mlink/mlink_AI_Server/mlink-backend/venv/lib/python3.11/site-packages/redis/background.py__init__zBackgroundScheduler.__init__   sN    ^%%
DH=A!(1(9(9%%%    c                 .    |                                   d S N)stopr   s    r   __del__zBackgroundScheduler.__del__   s    		r   c                 n   | j         5  | j        r	 ddd           dS d| _        | j        r | j                                         d| _        | j        D ]0}|                                r|                    |j                   1| j                                         ddd           dS # 1 swxY w Y   dS )zB
        Stop all scheduled tasks and clean up resources.
        NT)	r   r   r	   cancelr
   
is_runningcall_soon_threadsafer   clear)r   loops     r   r   zBackgroundScheduler.stop   s(    Z 	& 	&} 	& 	& 	& 	& 	& 	& 	& 	& !DM ( '')))#'  ) 9 9??$$ 9--di888##%%%	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	&s   	B*A?B**B.1B.delaycallbackc                    | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   t          j                    }| j         5  | j                            |           ddd           n# 1 swxY w Y   t          j        t          || j	        ||g|R d          }|
                                 dS )zI
        Runs callable task once after certain delay in seconds.
        NTtargetargsdaemon)r   r   asyncionew_event_loopr
   appendr   Thread_start_event_loop_in_thread_call_laterstart)r   r"   r#   r'   r!   threads         r   run_oncezBackgroundScheduler.run_once.   sh    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	
 %''Z 	+ 	+$$T***	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ !.(%ADAA
 
 

 	   	+//A77A;>A;intervalc                    | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   t          j                    }| j         5  | j                            |           ddd           n# 1 swxY w Y   t          j        t          || j	        ||g|R d          }|
                                 dS )zN
        Runs recurring callable task with given interval in seconds.
        NTr%   )r   r   r)   r*   r
   r+   r   r,   r-   _call_later_recurringr/   )r   r3   r#   r'   r!   r0   s         r   run_recurringz!BackgroundScheduler.run_recurringC   sh    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	
 %''Z 	+ 	+$$T***	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ !.2HhNNN
 
 

 	r2   coro.c                    | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   |                                  | j         5  | j        }ddd           n# 1 swxY w Y    |j        | j        |||g|R   dS )a  
        Runs recurring coroutine with given interval in seconds in a background thread.
        Uses a shared event loop to ensure connection pools remain valid across calls.

        This is useful for sync code that needs to run async health checks.
        N)r   r   _ensure_health_check_loopr   r   _call_later_recurring_coro)r   r3   r7   r'   r!   s        r   run_recurring_coroz&BackgroundScheduler.run_recurring_coroX   sP    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	
 	&&(((Z 	+ 	+*D	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	"!+T8T	
DH	
 	
 	
 	
 	
 	
   	+//A%%A),A)g      $@timeoutr>   Nreturnc                   | j         5  | j        rt          d          	 ddd           n# 1 swxY w Y   |                                  | j         5  | j        }ddd           n# 1 swxY w Y   t          j         || |          }	 |                    |          S # t          $ r |	                                  w xY w)ag  
        Runs a coroutine synchronously and returns its result.
        Uses the shared health check event loop to ensure connection pools
        created here remain valid for subsequent recurring health checks.

        This is useful for running the initial health check before starting
        recurring checks.

        Args:
            coro: Coroutine function to execute
            *args: Arguments to pass to the coroutine
            timeout: Maximum seconds to wait for the result. None means wait
                forever. Default is 10 seconds to avoid blocking indefinitely
                if the event loop is busy with long-running health checks.

        Returns:
            The result of the coroutine

        Raises:
            TimeoutError: If the coroutine doesn't complete within timeout
            Any exception raised by the coroutine
        zScheduler is stoppedNr=   )
r   r   RuntimeErrorr9   r   r)   run_coroutine_threadsaferesultTimeoutErrorr   )r   r7   r>   r'   r!   futures         r   run_coro_syncz!BackgroundScheduler.run_coro_syncp   sV   : Z 	; 	;} ;"#9:::;	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	;
 	&&(((Z 	+ 	+*D	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 1$$+tDD	===111 	 	 	MMOOO	s*   ,00A&&A*-A*
B    C c                 ^   | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   |                                  | j         5  | j        }ddd           n# 1 swxY w Y   dt          j        fd}t	          j         || |          }|                    |           dS )a  
        Schedule a coroutine for execution on the shared health check loop
        without waiting for the result. Exceptions are logged but not raised.

        This is useful for HALF_OPEN recovery health checks that need to run
        on the same event loop where connection pools were created.

        Args:
            coro: Coroutine function to execute
            *args: Arguments to pass to the coroutine
        NrE   c                 6   |                                  r.t          j        t                                        d           dS |                                 Bt          j        t                                        d|                                            dS dS )z&Log any exceptions from the coroutine.z#Fire-and-forget coroutine cancelledNz*Fire-and-forget coroutine raised exceptionexc_info)	cancelledlogging	getLogger__name__debug	exception)rE   s    r   on_completezABackgroundScheduler.run_coro_fire_and_forget.<locals>.on_complete   s    !! !(++112WXXXXX!!##/!(++11@#--// 2      0/r   )r   r   r9   r   r)   FuturerB   add_done_callback)r   r7   r'   r!   rQ   rE   s         r   run_coro_fire_and_forgetz,BackgroundScheduler.run_coro_fire_and_forget   se    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	
 	&&(((Z 	+ 	+*D	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+	 	 	 	 	 1$$+tDD  -----r<         @c                    | j                                         rM| j        5  | j        '| j                                        r	 ddd           dS ddd           n# 1 swxY w Y   | j        5  | j        '| j                                        r	 ddd           dS | j                                          t          j                    | _        | j        	                    | j                   t          j        | j        d          | _        | j                                         | j                             |          sj| j        }d| _        || j        v r| j                            |           	 |                                 n# t$          $ r Y nw xY wt'          d| d          	 ddd           dS # 1 swxY w Y   dS )z
        Ensure the shared health check loop and thread are running.

        Args:
            timeout: Maximum seconds to wait for the loop to start.

        Raises:
            RuntimeError: If the loop fails to start within the timeout.
        NT)r&   r(   r=   z/Health check event loop failed to start within z seconds)r   is_setr   r   r   r    r)   r*   r
   r+   r   r,   _run_health_check_loopr   r/   waitremoveclose	ExceptionrA   )r   r>   failed_loops      r   r9   z-BackgroundScheduler._ensure_health_check_loop   s    (//11 	  +7/::<< 8                        Z '	 '	 '3+6688 4 '	 '	 '	 '	 '	 '	 '	 '	 )//111 '.&<&>&>D#$$T%<=== )2(82) ) )D% %++--- 055g5FF  #5*.'$"333%,,[999%%''''    D"WgWWW  5'	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	 '	sN   "AA #A ."F9CF93FF9
FF9FF99F= F=c           	      N   t          j        | j                   | j                            | j        j                   	 | j                                         	 t          j        | j                  }|D ]}|                                 | j        	                    t          j
        |ddi           n# t          $ r Y nw xY w| j                                         dS # | j                                         w xY w# 	 t          j        | j                  }|D ]}|                                 | j        	                    t          j
        |ddi           n# t          $ r Y nw xY w| j                                         w # | j                                         w xY wxY w)z'Run the shared health check event loop.return_exceptionsTN)r)   set_event_loopr   	call_soonr   setrun_forever	all_tasksr   run_until_completegatherr\   r[   )r   pendingtasks      r   rX   z*BackgroundScheduler._run_health_check_loop  s   t6777 	))$*G*KLLL	0#//111
0!+D,CDD# " "DKKMMMM'::NGDtDD        '--/////'--////
0!+D,CDD# " "DKKMMMM'::NGDtDD        '--////'--////sm   C> AB5 4C  5
C?C  CC   C;>F$ AEF
E)&F(E))F,F$F!!F$r!   c                     | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y    | j        ||| j        |||g|R   dS )z0Schedule first execution of recurring coroutine.N)r   r   r.   _execute_recurring_coro)r   r!   r3   r7   r'   s        r   r:   z.BackgroundScheduler._call_later_recurring_coro  s     Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	(D8$$	
QU	
 	
 	
 	
 	
 	
   	+//c           	           j         5   j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   dt          j        f fd}	 t          j                    }|                    |           dS # t          $ rR  j         5   j        r	 ddd           Y dS 	 ddd           n# 1 swxY w Y     j         j        gR   Y dS w xY w)z
        Executes recurring coroutine with given interval in seconds.
        Schedules next execution only after current one completes to prevent overlap.
        Nrh   c                 t   |                                  rnT|                                 @t          j        t                                        d|                                            j        5  j        r	 ddd           dS 	 ddd           n# 1 swxY w Y    j        j	        gR   dS )<Callback when coroutine completes - schedule next execution.Nz%Background coroutine raised exceptionrI   )
rK   rP   rL   rM   rN   rO   r   r   r.   rj   )rh   r'   r7   r3   r!   r   s    r   rQ   z@BackgroundScheduler._execute_recurring_coro.<locals>.on_complete7  sB    ~~ !!-!(++11;!^^-- 2      =                       D,      s   2	BBB)	r   r   r)   Taskensure_futurerS   r\   r.   rj   )r   r!   r3   r7   r'   rQ   rh   s   `````  r   rj   z+BackgroundScheduler._execute_recurring_coro(  s    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 		gl 	 	 	 	 	 	 	 	 	 	6	(t55D"";///// 	 	 	  =                        D,       	sH   	044,B   C	B5C)C5B9	9C<B9	=CCc                     K    j         5   j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   t          j                     fd fd                               _        dS )a  
        Runs recurring coroutine with given interval in seconds in the current event loop.
        To be used only from an async context. No additional threads are created.

        Prevents overlapping executions by scheduling the next run only after
        the current one completes.

        Raises:
            RuntimeError: If called without a running event loop (programming error)
        Nc                      j         5  j        r	 ddd           dS 	 ddd           n# 1 swxY w Y                                  _        dS )z<Schedule the next execution after the current one completes.Nr   r   
call_laterr	   )execute_and_rescheduler3   r!   r   s   r   schedule_nextz>BackgroundScheduler.run_recurring_async.<locals>.schedule_next{  s      =                        $x9OPPDs   	,00c                     j         5  j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   dt          j        ffd} 	 t          j                    }|                    |            dS # t          $ r< t          j        t                    
                    dd                         Y dS w xY w)z=Execute the coroutine and schedule next run after completion.Nrh   c                     |                                  rnT|                                 @t          j        t                                        d|                                                          dS )rn   Nz*Recurring async coroutine raised exceptionrI   )rK   rP   rL   rM   rN   rO   )rh   rv   s    r   rQ   z\BackgroundScheduler.run_recurring_async.<locals>.execute_and_reschedule.<locals>.on_complete  ss     >>## ^^%%1%h//55D!%!1!1 6   
 r   z,Failed to schedule recurring async coroutineTrI   )r   r   r)   ro   rp   rS   r\   rL   rM   rN   rO   )rQ   rh   r'   r7   rv   r   s     r   ru   zGBackgroundScheduler.run_recurring_async.<locals>.execute_and_reschedule  sQ     =                       ',             ,TT4[99&&{33333      !(++11BT 2     s   	,00
,A8 8AB>=B>)r   r   r)   get_running_looprt   r	   )r   r3   r7   r'   ru   r!   rv   s   ````@@@r   run_recurring_asyncz'BackgroundScheduler.run_recurring_asynce  s-      Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 '))	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q	  	  	  	  	  	  	  	 <  ??85KLLs   	488c                     | j         5  | j        r	 d d d            d S 	 d d d            n# 1 swxY w Y    |j        ||g|R  | _        d S r   rs   )r   r!   r"   r#   r'   s        r   r.   zBackgroundScheduler._call_later  s     Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 +4?5(BTBBBrk   c                     | j         5  | j        r	 d d d            d S 	 d d d            n# 1 swxY w Y    | j        ||| j        |||g|R   d S r   )r   r   r.   _execute_recurringr   r!   r3   r#   r'   s        r   r5   z)BackgroundScheduler._call_later_recurring  s     Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	(D3T8X	
PT	
 	
 	
 	
 	
 	
rk   c                 8   | j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y   	  ||  n# t          $ r Y nw xY w| j         5  | j        r	 ddd           dS 	 ddd           n# 1 swxY w Y    | j        ||| j        |||g|R   dS )zR
        Executes recurring callable task with given interval in seconds.
        N)r   r   r\   r.   r}   r~   s        r   r}   z&BackgroundScheduler._execute_recurring  s    Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 		HdOOO 	 	 	D	 Z 	 	} 	 	 	 	 	 	 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	(D3T8X	
PT	
 	
 	
 	
 	
 	
s-   	+//= 
A
	A
	A77A;>A;)rU   )rN   
__module____qualname____doc__r   r   r   floatr   r1   r6   r   r   r;   rF   rT   r9   rX   r)   AbstractEventLoopr:   rj   rz   r.   r5   r}    r   r   r   r      s        	: 	: 	:  & & &(e x    *e x    *

%-c9S#s]3K.K%L
 
 
 
8 !%	. . .sIc3m445. 	.
 
. . . .`$.S)CcM"::;$.	$. $. $. $.L: : : : : :x0 0 0.
'
 
 sIc3m445	
 
 
 
;'; ; sIc3m445	; ; ; ;z;M;M%-c9S#s]3K.K%L;M ;M ;M ;MzC-C6;CGOC C C C
'
 
 	
 
 
 

'
 
 	
 
 
 
 
 
r   r   
event_loopcall_soon_cbc           	         t          j        |             | j        || g|R   	 |                                  	 t          j        |           }|D ]}|                                 |                     t          j        |ddi           n# t          $ r Y nw xY w| 	                                 dS # | 	                                 w xY w# 	 t          j        |           }|D ]}|                                 |                     t          j        |ddi           n# t          $ r Y nw xY w| 	                                 w # | 	                                 w xY wxY w)z
    Starts event loop in a thread and schedule callback as soon as event loop is ready.
    Used to be able to schedule tasks using loop.call_later.

    :param event_loop:
    :return:
    r_   TN)
r)   r`   ra   rc   rd   r   re   rf   r\   r[   )r   r   r'   rg   rh   s        r   r-   r-     s    :&&&Jz9D9999   	'
33G  ))@4@@     	 	 	D	 J	'
33G  ))@4@@     	 	 	D	 Jsl   C AB B2 
BB2 BB2 2CEADE
D,)E+D,,E/EEE)
r)   rL   r   typingr   r   r   r   r   r-   r   r   r   <module>r      s          + + + + + + + + + +K
 K
 K
 K
 K
 K
 K
 K
\)9A     r   