Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

dox/Parallel/vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*=========================================================================
00002 
00003   Program:   Visualization Toolkit
00004   Module:    $RCSfile: vtkMultiProcessController.h,v $
00005   Language:  C++
00006 
00007   Copyright (c) 1993-2002 Ken Martin, Will Schroeder, Bill Lorensen 
00008   All rights reserved.
00009   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
00010 
00011      This software is distributed WITHOUT ANY WARRANTY; without even 
00012      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR 
00013      PURPOSE.  See the above copyright notice for more information.
00014 
00015 =========================================================================*/
00049 #ifndef __vtkMultiProcessController_h
00050 #define __vtkMultiProcessController_h
00051 
00052 #include "vtkObject.h"
00053 
00054 #include "vtkCommunicator.h" // Needed for direct access to communicator
00055 
00056 class vtkDataSet;
00057 class vtkImageData;
00058 class vtkCollection;
00059 class vtkOutputWindow;
00060 class vtkDataObject;
00061 class vtkMultiProcessController;
00062 
00063 //BTX
00064 // The type of function that gets called when new processes are initiated.
00065 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 
00066                                        void *userData);
00067 
00068 // The type of function that gets called when an RMI is triggered.
00069 typedef void (*vtkRMIFunctionType)(void *localArg, 
00070                                    void *remoteArg, int remoteArgLength, 
00071                                    int remoteProcessId);
00072 //ETX
00073 
00074 
00075 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00076 {
00077 public:
00078   static vtkMultiProcessController *New();
00079   vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00080   void PrintSelf(ostream& os, vtkIndent indent);
00081 
00085   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00086 
00088 
00091   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00092                           int initializedExternally)=0;
00094 
00097   virtual void Finalize()=0;
00098 
00102   virtual void Finalize(int finalizedExternally)=0;
00103 
00105 
00108   virtual void SetNumberOfProcesses(int num);
00109   vtkGetMacro( NumberOfProcesses, int );
00111 
00112   //BTX
00114 
00117   void SetSingleMethod(vtkProcessFunctionType, void *data);
00118   //ETX
00120 
00124   virtual void SingleMethodExecute() = 0;
00125   
00126   //BTX
00128 
00132   void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 
00133   //ETX
00135 
00139   virtual void MultipleMethodExecute() = 0;
00140 
00142   virtual int GetLocalProcessId() { return this->LocalProcessId; }
00143 
00148   static vtkMultiProcessController *GetGlobalController();
00149 
00152   virtual void CreateOutputWindow() = 0;
00153   
00155 
00160   vtkSetMacro(ForceDeepCopy, int);
00161   vtkGetMacro(ForceDeepCopy, int);
00162   vtkBooleanMacro(ForceDeepCopy, int);
00164 
00165   
00166   //------------------ RMIs --------------------
00167   //BTX
00173   void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00174   
00176 
00177   void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00178     {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00179   //ETX
00181   
00183   void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00184 
00187   void TriggerBreakRMIs();
00188 
00190 
00191   void TriggerRMI(int remoteProcessId, char *arg, int tag) 
00192     { this->TriggerRMI(remoteProcessId, (void*)arg, 
00193                        static_cast<int>(strlen(arg))+1, tag); }
00195 
00197 
00198   void TriggerRMI(int remoteProcessId, int tag)
00199     { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00201 
00204   void ProcessRMIs();
00205 
00207 
00210   vtkSetMacro(BreakFlag, int);
00211   vtkGetMacro(BreakFlag, int);
00213 
00215   vtkGetObjectMacro(Communicator, vtkCommunicator);
00217   
00218 //BTX
00219 
00220   enum Consts {
00221     MAX_PROCESSES=8192,
00222     ANY_SOURCE=-1,
00223     INVALID_SOURCE=-2,
00224     RMI_TAG=315167,
00225     RMI_ARG_TAG=315168,
00226     BREAK_RMI_TAG=239954
00227   };
00228 
00229 //ETX
00230 
00232   virtual void Barrier() = 0;
00233 
00234   static void SetGlobalController(vtkMultiProcessController *controller);
00235 
00236   //------------------ Communication --------------------
00237   
00239 
00241   int Send(int* data, int length, int remoteProcessId, int tag);
00242   int Send(unsigned long* data, int length, int remoteProcessId, 
00243            int tag);
00244   int Send(char* data, int length, int remoteProcessId, int tag);
00245   int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00246   int Send(float* data, int length, int remoteProcessId, int tag);
00247   int Send(double* data, int length, int remoteProcessId, int tag);
00248 #ifdef VTK_USE_64BIT_IDS
00249   int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00251 #endif
00252   int Send(vtkDataObject *data, int remoteId, int tag);
00253   int Send(vtkDataArray *data, int remoteId, int tag);
00254 
00256 
00259   int Receive(int* data, int length, int remoteProcessId, int tag);
00260   int Receive(unsigned long* data, int length, int remoteProcessId, 
00261               int tag);
00262   int Receive(char* data, int length, int remoteProcessId, int tag);
00263   int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00264   int Receive(float* data, int length, int remoteProcessId, int tag);
00265   int Receive(double* data, int length, int remoteProcessId, int tag);
00266 #ifdef VTK_USE_64BIT_IDS
00267   int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00269 #endif
00270   int Receive(vtkDataObject* data, int remoteId, int tag);
00271   int Receive(vtkDataArray* data, int remoteId, int tag);
00272 
00273 // Internally implemented RMI to break the process loop.
00274 
00275 protected:
00276   vtkMultiProcessController();
00277   ~vtkMultiProcessController();
00278   
00279   int MaximumNumberOfProcesses;
00280   int NumberOfProcesses;
00281 
00282   int LocalProcessId;
00283   
00284   vtkProcessFunctionType      SingleMethod;
00285   void                       *SingleData;
00286   vtkProcessFunctionType      MultipleMethod[MAX_PROCESSES];
00287   void                       *MultipleData[MAX_PROCESSES];  
00288   
00289   vtkCollection *RMIs;
00290   
00291   // This is a flag that can be used by the ports to break
00292   // their update loop. (same as ProcessRMIs)
00293   int BreakFlag;
00294 
00295   void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00296 
00297   // This method implements "GetGlobalController".  
00298   // It needs to be virtual and static.
00299   virtual vtkMultiProcessController *GetLocalController();
00300 
00301   
00302   // This flag can force deep copies during send.
00303   int ForceDeepCopy;
00304 
00305   vtkOutputWindow* OutputWindow;
00306 
00307   // Note that since the communicators can be created differently
00308   // depending on the type of controller, the subclasses are
00309   // responsible of deleting them.
00310   vtkCommunicator* Communicator;
00311 
00312   // Communicator which is a copy of the current user
00313   // level communicator except the context; i.e. even if the tags 
00314   // are the same, the RMI messages will not interfere with user 
00315   // level messages. (This only works with MPI. When using threads,
00316   // the tags have to be unique.)
00317   // Note that since the communicators can be created differently
00318   // depending on the type of controller, the subclasses are
00319   // responsible of deleting them.
00320   vtkCommunicator* RMICommunicator;
00321 
00322 private:
00323   vtkMultiProcessController(const vtkMultiProcessController&);  // Not implemented.
00324   void operator=(const vtkMultiProcessController&);  // Not implemented.
00325 };
00326 
00327 
00328 inline int vtkMultiProcessController::Send(vtkDataObject *data, 
00329                                            int remoteThreadId, int tag)
00330 {
00331   if (this->Communicator)
00332     {
00333     return this->Communicator->Send(data, remoteThreadId, tag);
00334     }
00335   else
00336     {
00337     return 0;
00338     }
00339 }
00340 
00341 inline int vtkMultiProcessController::Send(vtkDataArray *data, 
00342                                            int remoteThreadId, int tag)
00343 {
00344   if (this->Communicator)
00345     {
00346     return this->Communicator->Send(data, remoteThreadId, tag);
00347     }
00348   else
00349     {
00350     return 0;
00351     }
00352 }
00353 
00354 inline int vtkMultiProcessController::Send(int* data, int length, 
00355                                            int remoteThreadId, int tag)
00356 {
00357   if (this->Communicator)
00358     {
00359     return this->Communicator->Send(data, length, remoteThreadId, tag);
00360     }
00361   else
00362     {
00363     return 0;
00364     }
00365 }
00366 
00367 inline int vtkMultiProcessController::Send(unsigned long* data, 
00368                                            int length, int remoteThreadId, 
00369                                            int tag)
00370 {
00371   if (this->Communicator)
00372     {
00373     return this->Communicator->Send(data, length, remoteThreadId, tag);
00374     }
00375   else
00376     {
00377     return 0;
00378     }
00379 }
00380 
00381 inline int vtkMultiProcessController::Send(char* data, int length, 
00382                                            int remoteThreadId, int tag)
00383 {
00384   if (this->Communicator)
00385     {
00386     return this->Communicator->Send(data, length, remoteThreadId, tag);
00387     }
00388   else
00389     {
00390     return 0;
00391     }
00392 }
00393 
00394 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 
00395                                            int remoteThreadId, int tag)
00396 {
00397   if (this->Communicator)
00398     {
00399     return this->Communicator->Send(data, length, remoteThreadId, tag);
00400     }
00401   else
00402     {
00403     return 0;
00404     }
00405 }
00406 
00407 inline int vtkMultiProcessController::Send(float* data, int length, 
00408                                            int remoteThreadId, int tag)
00409 {
00410   if (this->Communicator)
00411     {
00412     return this->Communicator->Send(data, length, remoteThreadId, tag);
00413     }
00414   else
00415     {
00416     return 0;
00417     }
00418 }
00419 
00420 inline int vtkMultiProcessController::Send(double* data, int length, 
00421                                            int remoteThreadId, int tag)
00422 {
00423   if (this->Communicator)
00424     {
00425     return this->Communicator->Send(data, length, remoteThreadId, tag);
00426     }
00427   else
00428     {
00429     return 0;
00430     }
00431 }
00432 
00433 #ifdef VTK_USE_64BIT_IDS
00434 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 
00435                                            int remoteThreadId, int tag)
00436 {
00437   if (this->Communicator)
00438     {
00439     return this->Communicator->Send(data, length, remoteThreadId, tag);
00440     }
00441   else
00442     {
00443     return 0;
00444     }
00445 }
00446 #endif
00447 
00448 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 
00449                                               int remoteThreadId, int tag)
00450 {
00451   if (this->Communicator)
00452     {
00453     return this->Communicator->Receive(data, remoteThreadId, tag);
00454     }
00455   else
00456     {
00457     return 0;
00458     }
00459 }
00460 
00461 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 
00462                                               int remoteThreadId, int tag)
00463 {
00464   if (this->Communicator)
00465     {
00466     return this->Communicator->Receive(data, remoteThreadId, tag);
00467     }
00468   else
00469     {
00470     return 0;
00471     }
00472 }
00473 
00474 inline int vtkMultiProcessController::Receive(int* data, int length, 
00475                                               int remoteThreadId, int tag)
00476 {
00477   if (this->Communicator)
00478     {
00479     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00480     }
00481   else
00482     {
00483     return 0;
00484     }
00485 }
00486 
00487 inline int vtkMultiProcessController::Receive(unsigned long* data, 
00488                                               int length,int remoteThreadId, 
00489                                               int tag)
00490 {
00491   if (this->Communicator)
00492     {
00493     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00494     }
00495   else
00496     {
00497     return 0;
00498     }
00499 }
00500 
00501 inline int vtkMultiProcessController::Receive(char* data, int length, 
00502                                               int remoteThreadId, int tag)
00503 {
00504   if (this->Communicator)
00505     {
00506     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00507     }
00508   else
00509     {
00510     return 0;
00511     }
00512 }
00513 
00514 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 
00515                                               int remoteThreadId, int tag)
00516 {
00517   if (this->Communicator)
00518     {
00519     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00520     }
00521   else
00522     {
00523     return 0;
00524     }
00525 }
00526 
00527 inline int vtkMultiProcessController::Receive(float* data, int length, 
00528                                               int remoteThreadId, int tag)
00529 {
00530   if (this->Communicator)
00531     {
00532     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00533     }
00534   else
00535     {
00536     return 0;
00537     }
00538 }
00539 
00540 inline int vtkMultiProcessController::Receive(double* data, int length, 
00541                                               int remoteThreadId, int tag)
00542 {
00543   if (this->Communicator)
00544     {
00545     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00546     }
00547   else
00548     {
00549     return 0;
00550     }
00551 }
00552 
00553 #ifdef VTK_USE_64BIT_IDS
00554 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 
00555                                               int remoteThreadId, int tag)
00556 {
00557   if (this->Communicator)
00558     {
00559     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00560     }
00561   else
00562     {
00563     return 0;
00564     }
00565 }
00566 #endif
00567 
00568 #endif