// $Header$ #include "system.h" #include <stdlib.h> #include <string.h> #include <stream.h> #include <osfcn.h> #include <sys/param.h> #include <sys/types.h> #ifdef HAVE_SIGPROCMASK #include <signal.h> #endif #ifdef HAVE_SYS_SELECT_H #include <sys/select.h> #endif #ifndef MAXHOSTNAMELEN #define MAXHOSTNAMELEN 64 #endif extern "C" { char* getenv( const char* ); int isatty( int fd ); int system( const char* string ); } #include "Reporter.h" #include "Sequencer.h" #include "Frame.h" #include "BuiltIn.h" #include "Task.h" #include "input.h" #include "Channel.h" #include "Select.h" #include "Socket.h" #include "ports.h" #include "version.h" #define GLISH_RC_FILE ".glishrc" // Time to wait until probing a remote daemon, in seconds. #define PROBE_DELAY 5 // Interval between subsequent probes, in seconds. #define PROBE_INTERVAL 5 // A Selectee corresponding to input for a Glish client. class ClientSelectee : public Selectee { public: ClientSelectee( Sequencer* s, Task* t ); int NotifyOfSelection(); protected: Sequencer* sequencer; Task* task; }; // A Selectee used only to intercept the initial "established" event // generated by a local Glish client. Because we communicate with these // clients using pipes instead of sockets, we can't use the AcceptSelectee // (see below) to recognize when they're connecting. class LocalClientSelectee : public Selectee { public: LocalClientSelectee( Sequencer* s, Channel* c ); int NotifyOfSelection(); protected: Sequencer* sequencer; Channel* chan; }; // A Selectee corresponding to a new request of a client to connect // to the sequencer. class AcceptSelectee : public Selectee { public: AcceptSelectee( Sequencer* s, Socket* conn_socket ); int NotifyOfSelection(); protected: Sequencer* sequencer; Socket* connection_socket; }; // A Selectee corresponding to a Glish script's own client. class ScriptSelectee : public Selectee { public: ScriptSelectee( Client* client, Agent* agent, int conn_socket ); int NotifyOfSelection(); protected: Client* script_client; Agent* script_agent; int connection_socket; }; // A Selectee for detecting user input. class UserInputSelectee : public Selectee { public: UserInputSelectee( int user_fd ) : Selectee( user_fd ) { } // Indicate user input available by signalling to end the // select. int NotifyOfSelection() { return 1; } }; // A Selectee for detecting Glish daemon activity. class DaemonSelectee : public Selectee { public: DaemonSelectee( RemoteDaemon* daemon, Selector* sel, Sequencer* s ); int NotifyOfSelection(); protected: RemoteDaemon* daemon; Selector* selector; Sequencer* sequencer; }; // A SelectTimer for handling glishd probes. class ProbeTimer : public SelectTimer { public: ProbeTimer( PDict(RemoteDaemon)* daemons, Sequencer* s ); protected: int DoExpiration(); PDict(RemoteDaemon)* daemons; Sequencer* sequencer; }; // A special type of Client used for script clients. It overrides // FD_Change() to create or delete ScriptSelectee's as needed. class ScriptClient : public Client { public: ScriptClient( int& argc, char** argv ); // Inform the ScriptClient as to which selector and agent // it should use for getting and propagating events. void SetInterface( Selector* selector, Agent* agent ); protected: void FD_Change( int fd, int add_flag ); Selector* selector; Agent* agent; }; // A special type of Agent used for script clients; when it receives // an event, it propagates it via the ScriptClient object. class ScriptAgent : public Agent { public: ScriptAgent( Sequencer* s, Client* c ) : Agent(s) { client = c; } Value* SendEvent( const char* event_name, parameter_list* args, int /* is_request */, int /* log */ ) { Value* event_val = BuildEventValue( args, 1 ); client->PostEvent( event_name, event_val ); Unref( event_val ); return 0; } protected: Client* client; }; // A RemoteDaemon keeps track of a glishd running on a remote host. // This includes the Channel used to communicate with the daemon and // modifiable state indicating whether we're currently waiting for // a probe response from the daemon. // Possible states a daemon can be in. typedef enum { DAEMON_OK, // all is okay DAEMON_REPLY_PENDING, // we're waiting for reply to last probe DAEMON_LOST // we've lost connectivity } daemon_states; class RemoteDaemon { public: RemoteDaemon( const char* daemon_host, Channel* channel ) { host = daemon_host; chan = channel; SetState( DAEMON_OK ); } const char* Host() { return host; } Channel* DaemonChannel() { return chan; } daemon_states State() { return state; } void SetState( daemon_states s ) { state = s; } protected: const char* host; Channel* chan; daemon_states state; }; Notification::Notification( Agent* arg_notifier, const char* arg_field, Value* arg_value, Notifiee* arg_notifiee ) { notifier = arg_notifier; field = strdup( arg_field ); value = arg_value; notifiee = arg_notifiee; Ref( value ); } Notification::~Notification() { delete field; Unref( value ); } void Notification::Describe( ostream& s ) const { s << "notification of "; notifier->DescribeSelf( s ); s << "." << field << " ("; value->DescribeSelf( s ); s << ") for "; notifiee->stmt->DescribeSelf( s ); } Sequencer::Sequencer( int& argc, char**& argv ) { init_reporters(); init_values(); // Create the global scope. scopes.append( new expr_dict ); create_built_ins( this ); null_stmt = new NullStmt; stmts = null_stmt; last_task_id = my_id = 1; await_stmt = except_stmt = 0; await_only_flag = 0; pending_task = 0; maximize_num_fds(); // avoid SIGPIPE's - they can occur if a client's termination // is not detected prior to sending an event to the client #ifdef HAVE_SIGPROCMASK sigset_t sig_mask; sigemptyset( &sig_mask ); sigaddset( &sig_mask, SIGPIPE ); sigprocmask( SIG_BLOCK, &sig_mask, 0 ); #else sigblock( sigmask( SIGPIPE ) ); #endif connection_socket = new AcceptSocket( 0, INTERPRETER_DEFAULT_PORT ); mark_close_on_exec( connection_socket->FD() ); selector = new Selector; selector->AddSelectee( new AcceptSelectee( this, connection_socket ) ); selector->AddTimer( new ProbeTimer( &daemons, this ) ); connection_host = local_host_name(); connection_port = new char[32]; sprintf( connection_port, "%d", connection_socket->Port() ); static const char tag_fmt[] = "*tag-%s.%d*"; int n = strlen( tag_fmt ) + strlen( connection_host ) + /* slop */ 32; interpreter_tag = new char[n]; sprintf( interpreter_tag, tag_fmt, connection_host, int( getpid() ) ); monitor_task = 0; last_notification = 0; last_whenever_executed = 0; num_active_processes = 0; verbose = 0; // Create the "system" global variable. system_agent = new UserAgent( this ); Value* system_val = system_agent->AgentRecord(); Expr* system_expr = InstallID( strdup( "system" ), GLOBAL_SCOPE ); system_expr->Assign( system_val ); system_val->SetField( "version", new Value( GLISH_VERSION ) ); // Create "script" global. script_client = new ScriptClient( argc, argv ); Expr* script_expr = InstallID( strdup( "script" ), GLOBAL_SCOPE ); if ( script_client->HasSequencerConnection() ) { // Set up script agent to deal with incoming and outgoing // events. ScriptAgent* script_agent = new ScriptAgent( this, script_client ); script_client->SetInterface( selector, script_agent ); script_expr->Assign( script_agent->AgentRecord() ); system_val->SetField( "is_script_client", new Value( glish_true ) ); // Include ourselves as an active process; otherwise // we'll exit once our child processes are gone. ++num_active_processes; } else { script_expr->Assign( new Value( glish_false ) ); system_val->SetField( "is_script_client", new Value( glish_false ) ); } name = argv[0]; for ( ++argv, --argc; argc > 0; ++argv, --argc ) { if ( ! strcmp( argv[0], "-v" ) ) ++verbose; else if ( strchr( argv[0], '=' ) ) putenv( argv[0] ); else break; } MakeEnvGlobal(); BuildSuspendList(); char* monitor_client_name = getenv( "glish_monitor" ); if ( monitor_client_name ) ActivateMonitor( monitor_client_name ); Parse( glish_init ); const char* glish_rc; if ( (glish_rc = getenv( "GLISHRC" )) ) Parse( glish_rc ); else { FILE* glish_rc_file = fopen( GLISH_RC_FILE, "r" ); const char* home; if ( glish_rc_file ) { fclose( glish_rc_file ); Parse( GLISH_RC_FILE ); } else if ( (home = getenv( "HOME" )) ) { char glish_rc_filename[256]; sprintf( glish_rc_filename, "%s/%s", home, GLISH_RC_FILE ); if ( (glish_rc_file = fopen( glish_rc_filename, "r")) ) { fclose( glish_rc_file ); Parse( glish_rc_filename ); } } } int do_interactive = 1; if ( argc > 0 && strcmp( argv[0], "--" ) ) { // We have a file to parse. Parse( argv[0] ); do_interactive = 0; ++argv, --argc; } MakeArgvGlobal( argv, argc ); if ( do_interactive ) Parse( stdin ); } Sequencer::~Sequencer() { delete script_client; delete selector; delete connection_socket; delete connection_port; delete interpreter_tag; } void Sequencer::AddBuiltIn( BuiltIn* built_in ) { Expr* id = InstallID( strdup( built_in->Name() ), GLOBAL_SCOPE ); id->Assign( new Value( built_in ) ); } void Sequencer::QueueNotification( Notification* n ) { if ( verbose > 1 ) message->Report( "queueing", n ); notification_queue.EnQueue( n ); } void Sequencer::PushScope() { scopes.append( new expr_dict ); } int Sequencer::PopScope() { int top_scope_pos = scopes.length() - 1; if ( top_scope_pos < 0 ) fatal->Report( "scope underflow in Sequencer::PopScope" ); expr_dict* top_scope = scopes[top_scope_pos]; int frame_size = top_scope->Length(); scopes.remove( top_scope ); delete top_scope; return frame_size; } Expr* Sequencer::InstallID( char* id, scope_type scope ) { int scope_index; if ( scope == LOCAL_SCOPE ) scope_index = scopes.length() - 1; else scope_index = 0; int frame_offset = scopes[scope_index]->Length(); Expr* result = new VarExpr( id, scope, frame_offset, this ); scopes[scope_index]->Insert( id, result ); if ( scope == GLOBAL_SCOPE ) global_frame.append( 0 ); return result; } Expr* Sequencer::LookupID( char* id, scope_type scope, int do_install ) { int scope_index; if ( scope == LOCAL_SCOPE ) scope_index = scopes.length() - 1; else scope_index = 0; Expr* result = (*scopes[scope_index])[id]; if ( ! result && do_install ) { if ( scope == LOCAL_SCOPE ) return LookupID( id, GLOBAL_SCOPE ); else return InstallID( id, GLOBAL_SCOPE ); } delete id; return result; } void Sequencer::PushFrame( Frame* new_frame ) { local_frames.append( new_frame ); } Frame* Sequencer::PopFrame() { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) fatal->Report( "local frame stack underflow in Sequencer::PopFrame" ); return local_frames.remove_nth( top_frame ); } Frame* Sequencer::CurrentFrame() { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) return 0; return local_frames[top_frame]; } Value* Sequencer::FrameElement( scope_type scope, int frame_offset ) { if ( scope == LOCAL_SCOPE ) { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) fatal->Report( "local frame requested but none exist in Sequencer::FrameElement" ); return local_frames[top_frame]->FrameElement( frame_offset ); } else { if ( frame_offset < 0 || frame_offset >= global_frame.length() ) fatal->Report( "bad global frame offset in Sequencer::FrameElement" ); return global_frame[frame_offset]; } } void Sequencer::SetFrameElement( scope_type scope, int frame_offset, Value* value ) { Value* prev_value; if ( scope == LOCAL_SCOPE ) { int top_frame = local_frames.length() - 1; if ( top_frame < 0 ) fatal->Report( "local frame requested but none exist in Sequencer::SetFrameElement" ); Value*& frame_value = local_frames[top_frame]->FrameElement( frame_offset ); prev_value = frame_value; frame_value = value; } else { if ( frame_offset < 0 || frame_offset >= global_frame.length() ) fatal->Report( "bad global frame offset in Sequencer::SetFrameElement" ); prev_value = global_frame.replace( frame_offset, value ); } Unref( prev_value ); } char* Sequencer::RegisterTask( Task* new_task ) { char buf[128]; sprintf( buf, "task%d", ++last_task_id ); char* new_ID = strdup( buf ); ids_to_tasks.Insert( new_ID, new_task ); return new_ID; } void Sequencer::DeleteTask( Task* task ) { (void) ids_to_tasks.Remove( task->TaskID() ); } void Sequencer::AddStmt( Stmt* addl_stmt ) { stmts = merge_stmts( stmts, addl_stmt ); } int Sequencer::RegisterStmt( Stmt* stmt ) { registered_stmts.append( stmt ); return registered_stmts.length(); } Stmt* Sequencer::LookupStmt( int index ) { if ( index <= 0 || index > registered_stmts.length() ) return 0; return registered_stmts[index - 1]; } Channel* Sequencer::GetHostDaemon( const char* host ) { if ( ! host || ! strcmp( host, ConnectionHost() ) || ! strcmp( host, "localhost" ) ) // request is for local host return 0; RemoteDaemon* d = daemons[host]; if ( ! d ) d = CreateDaemon( host ); return d->DaemonChannel(); } void Sequencer::Exec() { if ( interactive ) return; if ( error->Count() > 0 ) { message->Report( "execution aborted" ); return; } stmt_flow_type flow; Unref( stmts->Exec( 0, flow ) ); EventLoop(); } void Sequencer::Await( Stmt* arg_await_stmt, int only_flag, Stmt* arg_except_stmt ) { Stmt* hold_await_stmt = await_stmt; int hold_only_flag = await_only_flag; Stmt* hold_except_stmt = except_stmt; await_stmt = arg_await_stmt; await_only_flag = only_flag; except_stmt = arg_except_stmt; EventLoop(); await_stmt = hold_await_stmt; await_only_flag = only_flag; except_stmt = hold_except_stmt; } Value* Sequencer::AwaitReply( Task* task, const char* event_name, const char* reply_name ) { GlishEvent* reply = recv_event( task->GetChannel()->ReadFD() ); Value* result = 0; if ( ! reply ) { warn->Report( task, " terminated without replying to ", event_name, " request" ); result = error_value(); } else if ( ! strcmp( reply->name, reply_name ) ) { result = reply->value; Ref( result ); } else { warn->Report( "expected reply from ", task, " to ", event_name, " request, instead got \"", reply->name, "\"" ); Ref( reply ); // So NewEvent doesn't throw it away. NewEvent( task, reply ); result = reply->value; Ref( result ); // So following Unref doesn't discard value. } Unref( reply ); return result; } Channel* Sequencer::AddLocalClient( int read_fd, int write_fd ) { Channel* c = new Channel( read_fd, write_fd ); Selectee* s = new LocalClientSelectee( this, c ); selector->AddSelectee( s ); return c; } Channel* Sequencer::WaitForTaskConnection( Task* task ) { Task* t; Channel* chan; // Need to loop because perhaps we'll receive connections // from tasks other than the one we're waiting for. do { int new_conn = accept_connection( connection_socket->FD() ); mark_close_on_exec( new_conn ); chan = new Channel( new_conn, new_conn ); t = NewConnection( chan ); } while ( t && t != task ); if ( t ) return chan; else return 0; } Task* Sequencer::NewConnection( Channel* connection_channel ) { GlishEvent* establish_event = recv_event( connection_channel->ReadFD() ); // It's possible there's already a Selectee for this channel, // due to using a LocalClientSelectee. If so, remove it, so // it doesn't trigger additional activity. RemoveSelectee( connection_channel ); if ( ! establish_event ) { error->Report( "new connection immediately broken" ); return 0; } Value* v = establish_event->value; char* task_id; int protocol; if ( v->Type() == TYPE_STRING ) { task_id = v->StringVal(); protocol = 1; } else if ( ! v->FieldVal( "name", task_id ) || ! v->FieldVal( "protocol", protocol ) ) { error->Report( "bad connection establishment" ); return 0; } // ### Should check for protocol compatibility here. Task* task = ids_to_tasks[task_id]; if ( ! task ) { error->Report( "connection received from non-existent task ", task_id ); Unref( establish_event ); return 0; } else { task->SetProtocol( protocol ); AssociateTaskWithChannel( task, connection_channel ); NewEvent( task, establish_event ); } delete task_id; return task; } void Sequencer::AssociateTaskWithChannel( Task* task, Channel* chan ) { task->SetChannel( chan, selector ); task->SetActive(); selector->AddSelectee( new ClientSelectee( this, task ) ); // empty out buffer so subsequent select()'s will work if ( chan->DataInBuffer() ) EmptyTaskChannel( task ); } void Sequencer::RemoveSelectee( Channel* chan ) { if ( selector->FindSelectee( chan->ReadFD() ) ) selector->DeleteSelectee( chan->ReadFD() ); } int Sequencer::NewEvent( Task* task, GlishEvent* event ) { if ( ! event ) { // task termination task->CloseChannel(); if ( ! task->Active() ) return 0; // Abnormal termination - no "done" message first. event = new GlishEvent( (const char*) "fail", new Value( task->AgentID() ) ); } const char* event_name = event->name; Value* value = event->value; if ( verbose > 0 ) message->Report( name, ": received event ", task->Name(), ".", event_name, " ", value ); if ( monitor_task && task != monitor_task ) LogEvent( task->TaskID(), task->Name(), event_name, value, 1 ); // If true, generate message if no interest in event. int complain_if_no_interest = 0; if ( ! strcmp( event_name, "established" ) ) { // We already did the SetActive() when the channel // was established. } else if ( ! strcmp( event_name, "done" ) ) task->SetDone(); else if ( ! strcmp( event_name, "fail" ) ) { task->SetDone(); complain_if_no_interest = 1; } else if ( ! strcmp( event_name, "*rendezvous*" ) ) Rendezvous( event_name, value ); else if ( ! strcmp( event_name, "*forward*" ) ) ForwardEvent( event_name, value ); else complain_if_no_interest = 1; int ignore_event = 0; int await_finished = 0; if ( await_stmt ) { await_finished = task->HasRegisteredInterest( await_stmt, event_name ); if ( ! await_finished && await_only_flag && ! task->HasRegisteredInterest( except_stmt, event_name ) ) ignore_event = 1; } if ( ignore_event ) warn->Report( "event ", task->Name(), ".", event_name, " ignored due to \"await\"" ); else { // We're going to want to keep the event value as a field // in the task's AgentRecord. Ref( value ); int was_interest = task->CreateEvent( event_name, value ); if ( ! was_interest && complain_if_no_interest ) warn->Report( "event ", task->Name(), ".", event_name, " (", value, ") dropped" ); RunQueue(); // process effects of CreateEvent() } Unref( event ); if ( await_finished ) { pending_task = task; // Make sure the pending task isn't delete'd before // we can exhaust its pending input. Ref( pending_task ); return 1; } else return 0; } void Sequencer::NewClientStarted() { ++num_active_processes; } int Sequencer::ShouldSuspend( const char* task_var_ID ) { if ( task_var_ID ) return suspend_list[task_var_ID]; else // This is an anonymous client - don't suspend. return 0; } int Sequencer::EmptyTaskChannel( Task* task, int force_read ) { int status = 0; if ( task->Active() ) { Channel* c = task->GetChannel(); ChanState old_state = c->ChannelState(); c->ChannelState() = CHAN_IN_USE; if ( force_read ) status = NewEvent( task, recv_event( c->ReadFD() ) ); while ( status == 0 && c->ChannelState() == CHAN_IN_USE && c->DataInBuffer() ) { status = NewEvent( task, recv_event( c->ReadFD() ) ); } if ( c->ChannelState() == CHAN_INVALID ) { // This happens iff the given task has exited selector->DeleteSelectee( c->ReadFD() ); delete c; while ( reap_terminated_process() ) ; --num_active_processes; } else c->ChannelState() = old_state; } return status; } void Sequencer::MakeEnvGlobal() { Value* env_value = create_record(); extern char** environ; for ( char** env_ptr = environ; *env_ptr; ++env_ptr ) { char* delim = strchr( *env_ptr, '=' ); if ( delim ) { *delim = '\0'; env_value->AssignRecordElement( *env_ptr, new Value( delim + 1 ) ); *delim = '='; } else env_value->AssignRecordElement( *env_ptr, new Value( glish_false ) ); } Expr* env_expr = LookupID( strdup( "environ" ), GLOBAL_SCOPE ); env_expr->Assign( env_value ); } void Sequencer::MakeArgvGlobal( char** argv, int argc ) { // If there's an initial "--" argument, remove it, it's a vestige // from when "--" was needed to separate script files from their // arguments. if ( argc > 0 && ! strcmp( argv[0], "--" ) ) ++argv, --argc; Value* argv_value = new Value( (charptr*) argv, argc, COPY_ARRAY ); Expr* argv_expr = LookupID( strdup( "argv" ), GLOBAL_SCOPE ); argv_expr->Assign( argv_value ); } void Sequencer::BuildSuspendList() { char* suspend_env_list = getenv( "suspend" ); if ( ! suspend_env_list ) return; char* suspendee = strtok( suspend_env_list, " " ); while ( suspendee ) { suspend_list.Insert( suspendee, 1 ); suspendee = strtok( 0, " " ); } } void Sequencer::Parse( FILE* file, const char* filename ) { restart_yylex( file ); yyin = file; current_sequencer = this; line_num = 1; input_file_name = filename ? strdup( filename ) : 0; if ( yyin && isatty( fileno( yyin ) ) ) { message->Report( "Glish version ", GLISH_VERSION, "." ); // We're about to enter the "interactive" loop, so // first execute any statements we've seen so far due // to .glishrc files. Exec(); // And add a special Selectee for detecting user input. selector->AddSelectee( new UserInputSelectee( fileno( yyin ) ) ); interactive = 1; } else interactive = 0; if ( yyparse() ) error->Report( "syntax errors parsing input" ); // Don't need to delete input_file_name, yylex() already did // that on <<EOF>>. input_file_name = 0; line_num = 0; } void Sequencer::Parse( const char file[] ) { FILE* f = fopen( file, "r" ); if ( ! f ) error->Report( "can't open file \"", file, "\"" ); else Parse( f, file ); } void Sequencer::Parse( const char* strings[] ) { scan_strings( strings ); Parse( 0, "glish internal initialization" ); } RemoteDaemon* Sequencer::CreateDaemon( const char* host ) { RemoteDaemon* rd = OpenDaemonConnection( host ); if ( rd ) // We're all done, the daemon was already running. return rd; // Have to start up the daemon. message->Report( "activating Glish daemon on ", host ); char daemon_cmd[1024]; sprintf( daemon_cmd, "%s %s -n glishd &", RSH, host ); system( daemon_cmd ); rd = OpenDaemonConnection( host ); while ( ! rd ) { message->Report( "waiting for daemon ..." ); sleep( 1 ); rd = OpenDaemonConnection( host ); } return rd; } RemoteDaemon* Sequencer::OpenDaemonConnection( const char* host ) { int daemon_socket = get_tcp_socket(); if ( remote_connection( daemon_socket, host, DAEMON_PORT ) ) { // Connected. mark_close_on_exec( daemon_socket ); Channel* daemon_channel = new Channel( daemon_socket, daemon_socket ); RemoteDaemon* r = new RemoteDaemon( host, daemon_channel ); daemons.Insert( strdup( host ), r ); // Read and discard daemon's "establish" event. GlishEvent* e = recv_event( daemon_channel->ReadFD() ); Unref( e ); // Tell the daemon which directory we want to work out of. char work_dir[MAXPATHLEN]; if ( ! getcwd( work_dir, sizeof( work_dir ) ) ) fatal->Report( "problems getting cwd:", work_dir ); Value work_dir_value( work_dir ); send_event( daemon_channel->WriteFD(), "setwd", &work_dir_value ); selector->AddSelectee( new DaemonSelectee( r, selector, this ) ); return r; } else { close( daemon_socket ); return 0; } } void Sequencer::ActivateMonitor( char* monitor_client_name ) { TaskAttr* monitor_attrs = new TaskAttr( "*monitor*", "localhost", 0, 0, 0, 0 ); const_args_list monitor_args; monitor_args.append( new Value( monitor_client_name ) ); monitor_task = new ClientTask( &monitor_args, monitor_attrs, this ); if ( monitor_task->TaskError() ) { Unref( monitor_task ); monitor_task = 0; } } void Sequencer::LogEvent( const char* gid, const char* id, const char* event_name, const Value* event_value, int is_inbound ) { if ( ! monitor_task ) return; Value gid_value( gid ); Value id_value( id ); Value name_value( event_name ); parameter_list args; ConstExpr gid_expr( &gid_value ); ConstExpr id_expr( &id_value ); ConstExpr name_expr( &name_value ); ConstExpr value_expr( event_value ); Parameter gid_param( "glish_id", VAL_VAL, &gid_expr, 0 ); Parameter id_param( "id", VAL_VAL, &id_expr, 0 ); Parameter name_param( "name", VAL_VAL, &name_expr, 0 ); Parameter value_param( "value", VAL_VAL, &value_expr, 0 ); args.insert( &name_param ); args.insert( &id_param ); args.insert( &gid_param ); args.insert( &value_param ); const char* monitor_event_name = is_inbound ? "event_in" : "event_out"; monitor_task->SendEvent( monitor_event_name, &args, 0, 0 ); } void Sequencer::LogEvent( const char* gid, const char* id, const GlishEvent* e, int is_inbound ) { LogEvent( gid, id, e->name, e->value, is_inbound ); } void Sequencer::SystemEvent( const char* name, const Value* val ) { system_agent->SendSingleValueEvent( name, val, 1 ); } void Sequencer::Rendezvous( const char* event_name, Value* value ) { char* source_id; char* sink_id; if ( ! value->FieldVal( "source_id", source_id ) || ! value->FieldVal( "sink_id", sink_id ) ) fatal->Report( "bad internal", event_name, "event" ); Task* src = ids_to_tasks[source_id]; Task* snk = ids_to_tasks[sink_id]; if ( ! src || ! snk ) fatal->Report( "no such source or sink ID in internal", event_name, "event:", source_id, sink_id ); // By sending out these two events immediately, before any other // *rendezvous* events can arise, we impose a serial ordering on // all rendezvous. This avoids deadlock. // // Actually, now that we always use sockets (even locally), the // following isn't necessary, since connecting to a socket won't // block (unless the "listen" queue for the remote socket is // full). We could just pass along the *rendezvous-resp* event // to the sink and be done with it. But we retain the protocol // because it was a lot of work getting it right and we don't want // to have to figure it out again if for some reason we don't // always use sockets. src->SendSingleValueEvent( "*rendezvous-orig*", value, 1 ); snk->SendSingleValueEvent( "*rendezvous-resp*", value, 1 ); delete source_id; delete sink_id; } void Sequencer::ForwardEvent( const char* event_name, Value* value ) { char* receipient_id; char* new_event_name; if ( ! value->FieldVal( "receipient", receipient_id ) || ! value->FieldVal( "event", new_event_name ) ) fatal->Report( "bad internal event \"", event_name, "\"" ); Task* task = ids_to_tasks[receipient_id]; if ( ! task ) fatal->Report( "no such receipient ID in ", event_name, "internal event:", receipient_id ); task->SendSingleValueEvent( new_event_name, value, 1 ); delete receipient_id; delete new_event_name; } void Sequencer::EventLoop() { RunQueue(); if ( pending_task ) { EmptyTaskChannel( pending_task ); // We Ref()'d the pending_task when assigning it, to make // sure it didn't go away due to the effects of RunQueue(). Unref( pending_task ); pending_task = 0; } while ( ActiveClients() && ! selector->DoSelection() ) RunQueue(); } void Sequencer::RunQueue() { Notification* n; while ( (n = notification_queue.DeQueue()) ) { if ( verbose > 1 ) message->Report( "doing", n ); if ( n->notifiee->frame ) PushFrame( n->notifiee->frame ); Value* notifier_val = n->notifier->AgentRecord(); if ( notifier_val->Type() == TYPE_RECORD && notifier_val->HasRecordElement( n->field ) != n->value ) // Need to assign the event value. notifier_val->AssignRecordElement( n->field, n->value ); // There are a bunch of Ref's and Unref's here because the // Notify() call below can lead to a recursive call to this // routine (due to an "await" statement), so 'n' might // otherwise be deleted underneath our feet. Unref( last_notification ); last_notification = n; Ref( n ); n->notifiee->stmt->Notify( n->notifier ); if ( n->notifiee->frame ) (void) PopFrame(); Unref( n ); } } ClientSelectee::ClientSelectee( Sequencer* s, Task* t ) : Selectee( t->GetChannel()->ReadFD() ) { sequencer = s; task = t; } int ClientSelectee::NotifyOfSelection() { return sequencer->EmptyTaskChannel( task, 1 ); } LocalClientSelectee::LocalClientSelectee( Sequencer* s, Channel* c ) : Selectee( c->ReadFD() ) { sequencer = s; chan = c; } int LocalClientSelectee::NotifyOfSelection() { (void) sequencer->NewConnection( chan ); return 0; } AcceptSelectee::AcceptSelectee( Sequencer* s, Socket* conn_socket ) : Selectee( conn_socket->FD() ) { sequencer = s; connection_socket = conn_socket; } int AcceptSelectee::NotifyOfSelection() { int new_conn; if ( connection_socket->IsLocal() ) new_conn = accept_local_connection( connection_socket->FD() ); else new_conn = accept_connection( connection_socket->FD() ); mark_close_on_exec( new_conn ); (void) sequencer->NewConnection( new Channel( new_conn, new_conn ) ); return 0; } ScriptSelectee::ScriptSelectee( Client* client, Agent* agent, int conn_socket ) : Selectee( conn_socket ) { script_client = client; script_agent = agent; connection_socket = conn_socket; } int ScriptSelectee::NotifyOfSelection() { fd_set fd_mask; FD_ZERO( &fd_mask ); FD_SET( connection_socket, &fd_mask ); GlishEvent* e = script_client->NextEvent( &fd_mask ); if ( ! e ) { delete script_client; exit( 0 ); } // Ref() the value, since CreateEvent is going to Unref() it, and the // script_client is also going to Unref() it via Unref()'ing the // whole GlishEvent. Ref( e->value ); script_agent->CreateEvent( e->name, e->value ); return 0; } DaemonSelectee::DaemonSelectee( RemoteDaemon* arg_daemon, Selector* sel, Sequencer* s ) : Selectee( arg_daemon->DaemonChannel()->ReadFD() ) { daemon = arg_daemon; selector = sel; sequencer = s; } int DaemonSelectee::NotifyOfSelection() { int fd = daemon->DaemonChannel()->ReadFD(); GlishEvent* e = recv_event( fd ); const char* message_name = 0; if ( e ) { if ( ! strcmp( e->name, "probe-reply" ) ) { if ( daemon->State() == DAEMON_LOST ) { message->Report( "connectivity to daemon @ ", daemon->Host(), " restored" ); message_name = "connection_restored"; } daemon->SetState( DAEMON_OK ); } else { error->Report( "received unsolicited message from daemon @ ", daemon->Host() ); } Unref( e ); } else { error->Report( "Glish daemon @ ", daemon->Host(), " terminated" ); selector->DeleteSelectee( fd ); message_name = "daemon_terminated"; } if ( message_name ) { Value message_val( daemon->Host() ); sequencer->SystemEvent( message_name, &message_val ); } return 0; } ProbeTimer::ProbeTimer( PDict(RemoteDaemon)* arg_daemons, Sequencer* s ) : SelectTimer( PROBE_DELAY, 0, PROBE_INTERVAL, 0 ) { daemons = arg_daemons; sequencer = s; } int ProbeTimer::DoExpiration() { IterCookie* c = daemons->InitForIteration(); RemoteDaemon* r; const char* key; while ( (r = daemons->NextEntry( key, c )) ) { if ( r->State() == DAEMON_REPLY_PENDING ) { // Oops. Haven't gotten a reply from our last probe. warn->Report( "connection to Glish daemon @ ", key, " lost" ); r->SetState( DAEMON_LOST ); Value message_val( r->Host() ); sequencer->SystemEvent( "connection_lost", &message_val ); } // Probe the daemon, regardless of its state. send_event( r->DaemonChannel()->WriteFD(), "probe", false_value ); if ( r->State() == DAEMON_OK ) r->SetState( DAEMON_REPLY_PENDING ); } return 1; } ScriptClient::ScriptClient( int& argc, char** argv ) : Client( argc, argv ) { selector = 0; agent = 0; } void ScriptClient::SetInterface( Selector* s, Agent* a ) { selector = s; agent = a; selector->AddSelectee( new ScriptSelectee( this, agent, read_fd ) ); } void ScriptClient::FD_Change( int fd, int add_flag ) { if ( ! agent ) return; if ( add_flag ) selector->AddSelectee( new ScriptSelectee( this, agent, fd ) ); else selector->DeleteSelectee( fd ); }