Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : * threadpool implementation based on pthreads
4 : * Copyright (C) Volker Lendecke 2009,2011
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include "system/filesys.h"
22 : #include "pthreadpool_pipe.h"
23 : #include "pthreadpool.h"
24 :
25 : struct pthreadpool_pipe {
26 : struct pthreadpool *pool;
27 : int num_jobs;
28 : pid_t pid;
29 : int pipe_fds[2];
30 : };
31 :
32 : static int pthreadpool_pipe_signal(int jobid,
33 : void (*job_fn)(void *private_data),
34 : void *job_private_data,
35 : void *private_data);
36 :
37 0 : int pthreadpool_pipe_init(unsigned max_threads,
38 : struct pthreadpool_pipe **presult)
39 : {
40 : struct pthreadpool_pipe *pool;
41 : int ret;
42 :
43 0 : pool = calloc(1, sizeof(struct pthreadpool_pipe));
44 0 : if (pool == NULL) {
45 0 : return ENOMEM;
46 : }
47 0 : pool->pid = getpid();
48 :
49 0 : ret = pipe(pool->pipe_fds);
50 0 : if (ret == -1) {
51 0 : int err = errno;
52 0 : free(pool);
53 0 : return err;
54 : }
55 :
56 0 : ret = pthreadpool_init(max_threads, &pool->pool,
57 : pthreadpool_pipe_signal, pool);
58 0 : if (ret != 0) {
59 0 : close(pool->pipe_fds[0]);
60 0 : close(pool->pipe_fds[1]);
61 0 : free(pool);
62 0 : return ret;
63 : }
64 :
65 0 : *presult = pool;
66 0 : return 0;
67 : }
68 :
69 0 : static int pthreadpool_pipe_signal(int jobid,
70 : void (*job_fn)(void *private_data),
71 : void *job_private_data,
72 : void *private_data)
73 : {
74 0 : struct pthreadpool_pipe *pool = private_data;
75 : ssize_t written;
76 :
77 : do {
78 0 : written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
79 0 : } while ((written == -1) && (errno == EINTR));
80 :
81 0 : if (written != sizeof(jobid)) {
82 0 : return errno;
83 : }
84 :
85 0 : return 0;
86 : }
87 :
88 0 : int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
89 : {
90 : int ret;
91 :
92 0 : if (pool->num_jobs != 0) {
93 0 : return EBUSY;
94 : }
95 :
96 0 : ret = pthreadpool_destroy(pool->pool);
97 0 : if (ret != 0) {
98 0 : return ret;
99 : }
100 :
101 0 : close(pool->pipe_fds[0]);
102 0 : pool->pipe_fds[0] = -1;
103 :
104 0 : close(pool->pipe_fds[1]);
105 0 : pool->pipe_fds[1] = -1;
106 :
107 0 : free(pool);
108 0 : return 0;
109 : }
110 :
111 0 : static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
112 : {
113 0 : pid_t pid = getpid();
114 : int signal_fd;
115 : int ret;
116 :
117 0 : if (pid == pool->pid) {
118 0 : return 0;
119 : }
120 :
121 0 : signal_fd = pool->pipe_fds[0];
122 :
123 0 : close(pool->pipe_fds[0]);
124 0 : pool->pipe_fds[0] = -1;
125 :
126 0 : close(pool->pipe_fds[1]);
127 0 : pool->pipe_fds[1] = -1;
128 :
129 0 : ret = pipe(pool->pipe_fds);
130 0 : if (ret != 0) {
131 0 : return errno;
132 : }
133 :
134 0 : ret = dup2(pool->pipe_fds[0], signal_fd);
135 0 : if (ret != 0) {
136 0 : return errno;
137 : }
138 :
139 0 : pool->pipe_fds[0] = signal_fd;
140 0 : pool->num_jobs = 0;
141 :
142 0 : return 0;
143 : }
144 :
145 0 : int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
146 : void (*fn)(void *private_data),
147 : void *private_data)
148 : {
149 : int ret;
150 :
151 0 : ret = pthreadpool_pipe_reinit(pool);
152 0 : if (ret != 0) {
153 0 : return ret;
154 : }
155 :
156 0 : ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
157 0 : if (ret != 0) {
158 0 : return ret;
159 : }
160 :
161 0 : pool->num_jobs += 1;
162 :
163 0 : return 0;
164 : }
165 :
166 0 : int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
167 : {
168 0 : return pool->pipe_fds[0];
169 : }
170 :
171 0 : int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
172 : unsigned num_jobids)
173 : {
174 : ssize_t to_read, nread, num_jobs;
175 0 : pid_t pid = getpid();
176 :
177 0 : if (pool->pid != pid) {
178 0 : return EINVAL;
179 : }
180 :
181 0 : to_read = sizeof(int) * num_jobids;
182 :
183 : do {
184 0 : nread = read(pool->pipe_fds[0], jobids, to_read);
185 0 : } while ((nread == -1) && (errno == EINTR));
186 :
187 0 : if (nread == -1) {
188 0 : return -errno;
189 : }
190 0 : if ((nread % sizeof(int)) != 0) {
191 0 : return -EINVAL;
192 : }
193 :
194 0 : num_jobs = nread / sizeof(int);
195 :
196 0 : if (num_jobs > pool->num_jobs) {
197 0 : return -EINVAL;
198 : }
199 0 : pool->num_jobs -= num_jobs;
200 :
201 0 : return num_jobs;
202 : }
|