wibble  1.1
pipe.h
Go to the documentation of this file.
1 // -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net>
2 
3 #include <wibble/sys/macros.h>
4 
5 #ifdef POSIX
6 #include <string.h>
7 #include <fcntl.h>
8 #include <sys/select.h>
9 #endif
10 #include <unistd.h>
11 
12 #include <deque>
13 #include <cerrno>
14 
15 #include <wibble/exception.h>
16 #include <wibble/sys/thread.h>
17 #include <wibble/sys/mutex.h>
18 #include <wibble/sys/exec.h>
19 
20 #ifndef WIBBLE_SYS_PIPE_H
21 #define WIBBLE_SYS_PIPE_H
22 
23 namespace wibble {
24 namespace sys {
25 
26 namespace wexcept = wibble::exception;
27 
28 struct Pipe {
29 
31  int fd;
32  bool close;
33  std::string data;
34  bool running;
35  bool closed;
37 
38  Writer() : fd( -1 ), close( false ), running( false ) {}
39 
40  void *main() {
41  do {
42  int wrote = 0;
43 
44  {
45  wibble::sys::MutexLock __l( mutex );
46  wrote = ::write( fd, data.c_str(), data.length() );
47  if ( wrote > 0 )
48  data.erase( data.begin(), data.begin() + wrote );
49  }
50 
51  if ( wrote == -1 ) {
52  if ( blocking( errno ) )
53 #ifdef POSIX
54  sched_yield();
55 #else
56  ;
57 #endif
58  else
59  throw wexcept::System( "writing to pipe" );
60  }
61  } while ( !done() );
62 
63  wibble::sys::MutexLock __l( mutex );
64  running = false;
65  if ( close )
66  ::close( fd );
67 
68  return 0;
69  }
70 
71  bool done() {
72  wibble::sys::MutexLock __l( mutex );
73  if ( data.empty() )
74  running = false;
75  return !running;
76  }
77 
78  void run( int _fd, std::string what ) {
79  wibble::sys::MutexLock __l( mutex );
80 
81  if ( running )
82  assert_eq( _fd, fd );
83  fd = _fd;
84  assert_neq( fd, -1 );
85 
86  data += what;
87  if ( running )
88  return;
89  running = true;
90  start();
91  }
92  };
93 
94  typedef std::deque< char > Buffer;
95  Buffer buffer;
96  int fd;
97  bool _eof;
99 
100  Pipe( int p ) : fd( p ), _eof( false )
101  {
102  if ( p == -1 )
103  return;
104 #ifdef POSIX
105  if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 )
106  throw wexcept::System( "fcntl on a pipe" );
107 #endif
108  }
109  Pipe() : fd( -1 ), _eof( false ) {}
110 
111  /* Writes data to the pipe, asynchronously. */
112  void write( std::string what ) {
113  writer.run( fd, what );
114  }
115 
116  void close() {
117  wibble::sys::MutexLock __l( writer.mutex );
118  writer.close = true;
119  if ( !writer.running )
120  ::close( fd );
121  }
122 
123  bool valid() {
124  return fd != -1;
125  }
126 
127  bool active() {
128  return valid() && !eof();
129  }
130 
131  bool eof() {
132  return _eof;
133  }
134 
135  static bool blocking( int err ) {
136 #ifdef POSIX
137  return err == EAGAIN || err == EWOULDBLOCK;
138 #else
139  return err == EAGAIN;
140 #endif
141  }
142 
143  int readMore() {
144  assert( valid() );
145  char _buffer[1024];
146  int r = ::read( fd, _buffer, 1023 );
147  if ( r == -1 && !blocking( errno ) )
148  throw wexcept::System( "reading from pipe" );
149  else if ( r == -1 )
150  return 0;
151  if ( r == 0 )
152  _eof = true;
153  else
154  std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) );
155  return r;
156  }
157 
158  std::string nextChunk() {
159  std::string line( buffer.begin(), buffer.end() );
160  buffer.clear();
161  return line;
162  }
163 
164  std::string nextLine() {
165  assert( valid() );
166  Buffer::iterator nl =
167  std::find( buffer.begin(), buffer.end(), '\n' );
168  while ( nl == buffer.end() ) {
169  if ( !readMore() )
170  return ""; // would block, so give up
171  nl = std::find( buffer.begin(), buffer.end(), '\n' );
172  }
173  std::string line( buffer.begin(), nl );
174 
175  if ( nl != buffer.end() )
176  ++ nl;
177  buffer.erase( buffer.begin(), nl );
178 
179  return line;
180  }
181 
182  /* Only returns on eof() or when data is buffered. */
183  void wait() {
184  assert( valid() );
185 #ifdef POSIX
186  fd_set fds;
187  FD_ZERO( &fds );
188 #endif
189  while ( buffer.empty() && !eof() ) {
190  if ( readMore() )
191  return;
192  if ( eof() )
193  return;
194 #ifdef POSIX
195 #pragma GCC diagnostic push
196 #pragma GCC diagnostic ignored "-Wold-style-cast"
197  FD_SET( fd, &fds );
198  select( fd + 1, &fds, 0, 0, 0 );
199 #pragma GCC diagnostic pop
200 #else
201  sleep( 1 );
202 #endif
203  }
204  }
205  std::string nextLineBlocking() {
206  assert( valid() );
207  std::string l;
208  while ( !eof() ) {
209  l = nextLine();
210  if ( !l.empty() )
211  return l;
212  if ( eof() )
213  return std::string( buffer.begin(), buffer.end() );
214  wait();
215  }
216  return l;
217  }
218 
219 };
220 
222 {
223  std::string cmd;
224 
225  PipeThrough( const std::string& _cmd ) : cmd( _cmd ) {}
226 
227  std::string run( std::string data ) {
228  int _in, _out;
229 
230 #ifdef _WIN32
231  Exec exec(cmd);
232 #elif defined POSIX
233  ShellCommand exec(cmd);
234 #endif
235 
236  exec.setupRedirects( &_in, &_out, 0 );
237  exec.fork();
238 
239  Pipe in( _in ), out( _out );
240 
241  in.write( data );
242  in.close();
243  std::string ret;
244  while ( !out.eof() ) {
245  out.wait();
246  ret += out.nextChunk();
247  }
248  return ret;
249  }
250 };
251 
252 }
253 }
254 #endif
std::string run(std::string data)
Definition: pipe.h:227
Iterator< typename I::value_type > iterator(I i)
Definition: iterator.h:123
std::deque< char > Buffer
Definition: pipe.h:94
Definition: core.h:11
void start()
Start the thread.
Definition: thread.cpp:70
std::string data
Definition: pipe.h:33
bool _eof
Definition: pipe.h:97
Acquire a mutex lock, RAII-style.
Definition: mutex.h:200
void sleep(int secs)
Portable version of sleep.
Definition: thread.cpp:31
void run(int _fd, std::string what)
Definition: pipe.h:78
Definition: pipe.h:221
#define assert_eq(x, y)
Definition: test.h:33
#define assert(x)
Definition: test.h:30
bool active()
Definition: pipe.h:127
Definition: pipe.h:28
Encapsulates a thread.
Definition: thread.h:83
bool closed
Definition: pipe.h:35
pthread mutex wrapper; WARNING: the class allows copying and assignment, but this is not always safe...
Definition: mutex.h:47
Writer()
Definition: pipe.h:38
bool done()
Definition: pipe.h:71
std::string nextLineBlocking()
Definition: pipe.h:205
void write(std::string what)
Definition: pipe.h:112
int readMore()
Definition: pipe.h:143
Definition: pipe.h:30
void wait()
Definition: pipe.h:183
std::string nextLine()
Definition: pipe.h:164
wibble::sys::Mutex mutex
Definition: pipe.h:36
Pipe(int p)
Definition: pipe.h:100
void * main()
Main thread function, executed in the new thread after creation.
Definition: pipe.h:40
pid_t fork()
For a subprocess to run proc.
PipeThrough(const std::string &_cmd)
Definition: pipe.h:225
void setupRedirects(int *stdinfd=0, int *stdoutfd=0, int *stderrfd=0)
Definition: childprocess.cpp:145
int fd
Definition: pipe.h:31
Definition: amorph.h:17
void close()
Definition: pipe.h:116
int fd
Definition: pipe.h:96
#define assert_neq(x, y)
Definition: test.h:36
bool close
Definition: pipe.h:32
std::string nextChunk()
Definition: pipe.h:158
bool running
Definition: pipe.h:34
Pipe()
Definition: pipe.h:109
bool eof()
Definition: pipe.h:131
bool valid()
Definition: pipe.h:123
static bool blocking(int err)
Definition: pipe.h:135
Buffer buffer
Definition: pipe.h:95
Base class for system exceptions.
Definition: exception.h:396
std::string cmd
Definition: pipe.h:223
Writer writer
Definition: pipe.h:98
Execute a shell command using /bin/sh -c.
Definition: exec.h:97
Execute external commands, either forked as a ChildProcess or directly using exec().
Definition: exec.h:33