pipeline

管道就是我们生活中看到的净水,它有两个水口,一个连接着进水管,一个连接着出水管,通过这个管道,我们就可以把水流一步步过滤处理,最终输出我们想要的净水。

linux中的管道也是同样的道理,它使用|表示。

比如我们经常看到统计排序的例子

ls /usr/bin | sort | uniq | wc -l

创建管道

int pipe(int pipefd[2]); 成功:0;失败:-1,设置errno

为了避免死锁并利用并行性,通常,带有一个或多个新管道的Unix进程将调用fork()创建新进程。然后,每个过程将在产生或使用任何数据之前关闭将不使用的管道末端。或者,进程可以创建一个新线程并使用管道在它们之间进行通信。

也可以使用mkfifo()或创建命名管道mknod(),然后在调用它们时将它们作为输入或输出文件呈现给程序。它们允许创建多路径管道,并且在与标准错误重定向或结合使用时特别有效。

c程序管道

    #include <sys/types.h>

    #include <sys/stat.h>

   int mkfifo(const char *pathname, mode_t mode);
#include <unistd.h>

#include <string.h>

#include <stdlib.h>

#include <stdio.h>

#include <sys/wait.h>

void sys_err(const char *str)

{

perror(str);

exit(1);

}

int main(void)

{

pid_t pid;

char buf[1024];

int fd[2];

char *p = "test for pipe\n";

if (pipe(fd) == -1)

sys_err("pipe");

pid = fork();

if (pid < 0) {

sys_err("fork err");

} else if (pid == 0) {

close(fd[1]);

int len = read(fd[0], buf, sizeof(buf));

write(STDOUT_FILENO, buf, len);

close(fd[0]);

} else {

close(fd[0]);

write(fd[1], p, strlen(p));

wait(NULL);

close(fd[1]);

}

return 0;

}

管道作用

  • 对数据进行过滤处理
  • 进程间通信
/*
 * FIFO server
 */

#include "all.h"

int
main(void)
{
  int fdw, fdw2;
  int fdr;
  char clt_path[PATH_LEN] = {'
/*
* FIFO server
*/
#include "all.h"
int
main(void)
{
int fdw, fdw2;
int fdr;
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char *p;
int n;
if (mkfifo(FIFO_SVR, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");  
if ((fdr = open(FIFO_SVR, O_RDONLY)) < 0)  
perr_exit("open()");
/* 
* 根据fifo的创建规则, 若从一个空管道或fifo读, 
* 而在读之前管道或fifo有打开来写的操作, 那么读操作将会阻塞 
* 直到管道或fifo不打开来读, 或管道或fifo中有数据为止. 
*
* 这里,我们的fifo本来是打开用来读的,但是为了,read不返回0,
* 让每次client端读完都阻塞在fifo上,我们又打开一次来读.
* 见unpv2 charper 4.7
*/
if ((fdw2 = open(FIFO_SVR, O_WRONLY)) < 0)  
fprintf(stderr, "open()");
while (1) {
/* read client fifo path from FIFO_SVR */
/* 这里由于FIFO_SVR有打开来写的操作,所以当管道没有数据时, 
* read会阻塞,而不是返回0. 
*/
if (read(fdr, clt_path, PATH_LEN) < 0) {
fprintf(stderr, "read fifo client path error : %s\n", strerror(errno));  
break;
}
if ((p = strstr(clt_path, "\r\n")) == NULL) {
fprintf(stderr, "clt_path error: %s\n", clt_path);
break;
}
*p = '\0';
DBG("clt_path", clt_path);
if (access(clt_path, W_OK) == -1) { // client fifo ok, but no permission
perror("access()");  
continue;
}
/* open client fifo for write */
if ((fdw = open(clt_path, O_WRONLY)) < 0) {
perror("open()");  
continue;
}
if ((n = read(fdr, buf, WORDS_LEN)) > 0) { /* read server words is ok */
printf("server read words : %s\n", buf);
buf[n] = '\0';
write(fdw, buf, strlen(buf));  
}
}
close(fdw);  
unlink(FIFO_SVR);
exit(0);
}
'}; char buf[MAX_LINE] = {'
/*
* FIFO server
*/
#include "all.h"
int
main(void)
{
int fdw, fdw2;
int fdr;
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char *p;
int n;
if (mkfifo(FIFO_SVR, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");  
if ((fdr = open(FIFO_SVR, O_RDONLY)) < 0)  
perr_exit("open()");
/* 
* 根据fifo的创建规则, 若从一个空管道或fifo读, 
* 而在读之前管道或fifo有打开来写的操作, 那么读操作将会阻塞 
* 直到管道或fifo不打开来读, 或管道或fifo中有数据为止. 
*
* 这里,我们的fifo本来是打开用来读的,但是为了,read不返回0,
* 让每次client端读完都阻塞在fifo上,我们又打开一次来读.
* 见unpv2 charper 4.7
*/
if ((fdw2 = open(FIFO_SVR, O_WRONLY)) < 0)  
fprintf(stderr, "open()");
while (1) {
/* read client fifo path from FIFO_SVR */
/* 这里由于FIFO_SVR有打开来写的操作,所以当管道没有数据时, 
* read会阻塞,而不是返回0. 
*/
if (read(fdr, clt_path, PATH_LEN) < 0) {
fprintf(stderr, "read fifo client path error : %s\n", strerror(errno));  
break;
}
if ((p = strstr(clt_path, "\r\n")) == NULL) {
fprintf(stderr, "clt_path error: %s\n", clt_path);
break;
}
*p = '\0';
DBG("clt_path", clt_path);
if (access(clt_path, W_OK) == -1) { // client fifo ok, but no permission
perror("access()");  
continue;
}
/* open client fifo for write */
if ((fdw = open(clt_path, O_WRONLY)) < 0) {
perror("open()");  
continue;
}
if ((n = read(fdr, buf, WORDS_LEN)) > 0) { /* read server words is ok */
printf("server read words : %s\n", buf);
buf[n] = '\0';
write(fdw, buf, strlen(buf));  
}
}
close(fdw);  
unlink(FIFO_SVR);
exit(0);
}
'}; char *p; int n; if (mkfifo(FIFO_SVR, FILE_MODE) == -1 && errno != EEXIST) perr_exit("mkfifo()"); if ((fdr = open(FIFO_SVR, O_RDONLY)) < 0) perr_exit("open()"); /* * 根据fifo的创建规则, 若从一个空管道或fifo读, * 而在读之前管道或fifo有打开来写的操作, 那么读操作将会阻塞 * 直到管道或fifo不打开来读, 或管道或fifo中有数据为止. * * 这里,我们的fifo本来是打开用来读的,但是为了,read不返回0, * 让每次client端读完都阻塞在fifo上,我们又打开一次来读. * 见unpv2 charper 4.7 */ if ((fdw2 = open(FIFO_SVR, O_WRONLY)) < 0) fprintf(stderr, "open()"); while (1) { /* read client fifo path from FIFO_SVR */ /* 这里由于FIFO_SVR有打开来写的操作,所以当管道没有数据时, * read会阻塞,而不是返回0. */ if (read(fdr, clt_path, PATH_LEN) < 0) { fprintf(stderr, "read fifo client path error : %s\n", strerror(errno)); break; } if ((p = strstr(clt_path, "\r\n")) == NULL) { fprintf(stderr, "clt_path error: %s\n", clt_path); break; } *p = '
/*
* FIFO server
*/
#include "all.h"
int
main(void)
{
int fdw, fdw2;
int fdr;
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char *p;
int n;
if (mkfifo(FIFO_SVR, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");  
if ((fdr = open(FIFO_SVR, O_RDONLY)) < 0)  
perr_exit("open()");
/* 
* 根据fifo的创建规则, 若从一个空管道或fifo读, 
* 而在读之前管道或fifo有打开来写的操作, 那么读操作将会阻塞 
* 直到管道或fifo不打开来读, 或管道或fifo中有数据为止. 
*
* 这里,我们的fifo本来是打开用来读的,但是为了,read不返回0,
* 让每次client端读完都阻塞在fifo上,我们又打开一次来读.
* 见unpv2 charper 4.7
*/
if ((fdw2 = open(FIFO_SVR, O_WRONLY)) < 0)  
fprintf(stderr, "open()");
while (1) {
/* read client fifo path from FIFO_SVR */
/* 这里由于FIFO_SVR有打开来写的操作,所以当管道没有数据时, 
* read会阻塞,而不是返回0. 
*/
if (read(fdr, clt_path, PATH_LEN) < 0) {
fprintf(stderr, "read fifo client path error : %s\n", strerror(errno));  
break;
}
if ((p = strstr(clt_path, "\r\n")) == NULL) {
fprintf(stderr, "clt_path error: %s\n", clt_path);
break;
}
*p = '\0';
DBG("clt_path", clt_path);
if (access(clt_path, W_OK) == -1) { // client fifo ok, but no permission
perror("access()");  
continue;
}
/* open client fifo for write */
if ((fdw = open(clt_path, O_WRONLY)) < 0) {
perror("open()");  
continue;
}
if ((n = read(fdr, buf, WORDS_LEN)) > 0) { /* read server words is ok */
printf("server read words : %s\n", buf);
buf[n] = '\0';
write(fdw, buf, strlen(buf));  
}
}
close(fdw);  
unlink(FIFO_SVR);
exit(0);
}
'; DBG("clt_path", clt_path); if (access(clt_path, W_OK) == -1) { // client fifo ok, but no permission perror("access()"); continue; } /* open client fifo for write */ if ((fdw = open(clt_path, O_WRONLY)) < 0) { perror("open()"); continue; } if ((n = read(fdr, buf, WORDS_LEN)) > 0) { /* read server words is ok */ printf("server read words : %s\n", buf); buf[n] = '
/*
* FIFO server
*/
#include "all.h"
int
main(void)
{
int fdw, fdw2;
int fdr;
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char *p;
int n;
if (mkfifo(FIFO_SVR, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");  
if ((fdr = open(FIFO_SVR, O_RDONLY)) < 0)  
perr_exit("open()");
/* 
* 根据fifo的创建规则, 若从一个空管道或fifo读, 
* 而在读之前管道或fifo有打开来写的操作, 那么读操作将会阻塞 
* 直到管道或fifo不打开来读, 或管道或fifo中有数据为止. 
*
* 这里,我们的fifo本来是打开用来读的,但是为了,read不返回0,
* 让每次client端读完都阻塞在fifo上,我们又打开一次来读.
* 见unpv2 charper 4.7
*/
if ((fdw2 = open(FIFO_SVR, O_WRONLY)) < 0)  
fprintf(stderr, "open()");
while (1) {
/* read client fifo path from FIFO_SVR */
/* 这里由于FIFO_SVR有打开来写的操作,所以当管道没有数据时, 
* read会阻塞,而不是返回0. 
*/
if (read(fdr, clt_path, PATH_LEN) < 0) {
fprintf(stderr, "read fifo client path error : %s\n", strerror(errno));  
break;
}
if ((p = strstr(clt_path, "\r\n")) == NULL) {
fprintf(stderr, "clt_path error: %s\n", clt_path);
break;
}
*p = '\0';
DBG("clt_path", clt_path);
if (access(clt_path, W_OK) == -1) { // client fifo ok, but no permission
perror("access()");  
continue;
}
/* open client fifo for write */
if ((fdw = open(clt_path, O_WRONLY)) < 0) {
perror("open()");  
continue;
}
if ((n = read(fdr, buf, WORDS_LEN)) > 0) { /* read server words is ok */
printf("server read words : %s\n", buf);
buf[n] = '\0';
write(fdw, buf, strlen(buf));  
}
}
close(fdw);  
unlink(FIFO_SVR);
exit(0);
}
'; write(fdw, buf, strlen(buf)); } } close(fdw); unlink(FIFO_SVR); exit(0); }
*
 * Fifo client
 *
 */
#include "all.h"

int
main(void)
{
  int fdr, fdw;
  pid_t pid;  
  char clt_path[PATH_LEN] = {'
*
* Fifo client
*
*/
#include "all.h"
int
main(void)
{
int fdr, fdw;
pid_t pid;  
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char buf_path[MAX_LINE] = {'\0'};
snprintf(clt_path, PATH_LEN, FIFO_CLT_FMT, (long)getpid());    
DBG("clt_path1 = ", clt_path);
snprintf(buf_path, PATH_LEN, "%s\r\n", clt_path);
if (mkfifo(clt_path, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");
/* client open clt_path for read
* open server for write 
*/
if ((fdw = open(FIFO_SVR, O_WRONLY)) < 0) 
perr_exit("open()");
/* write my fifo path to server */ 
if (write(fdw, buf_path, PATH_LEN) != PATH_LEN)    
perr_exit("write()");
if (write(fdw, WORDS, WORDS_LEN) < 0)  /* write words to fifo server */
perr_exit("error");
if ((fdr = open(clt_path, O_RDONLY)) < 0)  
perr_exit("open()");
if (read(fdr, buf, WORDS_LEN) > 0) {   /* read reply from fifo server */
buf[WORDS_LEN] = '\0';
printf("server said : %s\n", buf);
}
close(fdr);
unlink(clt_path);
exit(0);
}
'}; char buf[MAX_LINE] = {'
*
* Fifo client
*
*/
#include "all.h"
int
main(void)
{
int fdr, fdw;
pid_t pid;  
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char buf_path[MAX_LINE] = {'\0'};
snprintf(clt_path, PATH_LEN, FIFO_CLT_FMT, (long)getpid());    
DBG("clt_path1 = ", clt_path);
snprintf(buf_path, PATH_LEN, "%s\r\n", clt_path);
if (mkfifo(clt_path, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");
/* client open clt_path for read
* open server for write 
*/
if ((fdw = open(FIFO_SVR, O_WRONLY)) < 0) 
perr_exit("open()");
/* write my fifo path to server */ 
if (write(fdw, buf_path, PATH_LEN) != PATH_LEN)    
perr_exit("write()");
if (write(fdw, WORDS, WORDS_LEN) < 0)  /* write words to fifo server */
perr_exit("error");
if ((fdr = open(clt_path, O_RDONLY)) < 0)  
perr_exit("open()");
if (read(fdr, buf, WORDS_LEN) > 0) {   /* read reply from fifo server */
buf[WORDS_LEN] = '\0';
printf("server said : %s\n", buf);
}
close(fdr);
unlink(clt_path);
exit(0);
}
'}; char buf_path[MAX_LINE] = {'
*
* Fifo client
*
*/
#include "all.h"
int
main(void)
{
int fdr, fdw;
pid_t pid;  
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char buf_path[MAX_LINE] = {'\0'};
snprintf(clt_path, PATH_LEN, FIFO_CLT_FMT, (long)getpid());    
DBG("clt_path1 = ", clt_path);
snprintf(buf_path, PATH_LEN, "%s\r\n", clt_path);
if (mkfifo(clt_path, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");
/* client open clt_path for read
* open server for write 
*/
if ((fdw = open(FIFO_SVR, O_WRONLY)) < 0) 
perr_exit("open()");
/* write my fifo path to server */ 
if (write(fdw, buf_path, PATH_LEN) != PATH_LEN)    
perr_exit("write()");
if (write(fdw, WORDS, WORDS_LEN) < 0)  /* write words to fifo server */
perr_exit("error");
if ((fdr = open(clt_path, O_RDONLY)) < 0)  
perr_exit("open()");
if (read(fdr, buf, WORDS_LEN) > 0) {   /* read reply from fifo server */
buf[WORDS_LEN] = '\0';
printf("server said : %s\n", buf);
}
close(fdr);
unlink(clt_path);
exit(0);
}
'}; snprintf(clt_path, PATH_LEN, FIFO_CLT_FMT, (long)getpid()); DBG("clt_path1 = ", clt_path); snprintf(buf_path, PATH_LEN, "%s\r\n", clt_path); if (mkfifo(clt_path, FILE_MODE) == -1 && errno != EEXIST) perr_exit("mkfifo()"); /* client open clt_path for read * open server for write */ if ((fdw = open(FIFO_SVR, O_WRONLY)) < 0) perr_exit("open()"); /* write my fifo path to server */ if (write(fdw, buf_path, PATH_LEN) != PATH_LEN) perr_exit("write()"); if (write(fdw, WORDS, WORDS_LEN) < 0) /* write words to fifo server */ perr_exit("error"); if ((fdr = open(clt_path, O_RDONLY)) < 0) perr_exit("open()"); if (read(fdr, buf, WORDS_LEN) > 0) { /* read reply from fifo server */ buf[WORDS_LEN] = '
*
* Fifo client
*
*/
#include "all.h"
int
main(void)
{
int fdr, fdw;
pid_t pid;  
char clt_path[PATH_LEN] = {'\0'};
char buf[MAX_LINE] = {'\0'};
char buf_path[MAX_LINE] = {'\0'};
snprintf(clt_path, PATH_LEN, FIFO_CLT_FMT, (long)getpid());    
DBG("clt_path1 = ", clt_path);
snprintf(buf_path, PATH_LEN, "%s\r\n", clt_path);
if (mkfifo(clt_path, FILE_MODE) == -1 && errno != EEXIST)  
perr_exit("mkfifo()");
/* client open clt_path for read
* open server for write 
*/
if ((fdw = open(FIFO_SVR, O_WRONLY)) < 0) 
perr_exit("open()");
/* write my fifo path to server */ 
if (write(fdw, buf_path, PATH_LEN) != PATH_LEN)    
perr_exit("write()");
if (write(fdw, WORDS, WORDS_LEN) < 0)  /* write words to fifo server */
perr_exit("error");
if ((fdr = open(clt_path, O_RDONLY)) < 0)  
perr_exit("open()");
if (read(fdr, buf, WORDS_LEN) > 0) {   /* read reply from fifo server */
buf[WORDS_LEN] = '\0';
printf("server said : %s\n", buf);
}
close(fdr);
unlink(clt_path);
exit(0);
}
'; printf("server said : %s\n", buf); } close(fdr); unlink(clt_path); exit(0); }

Comments are closed.