imap.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. use std::collections::{HashMap, HashSet};
  2. use std::{io};
  3. use std::convert::TryFrom;
  4. use std::fs::File;
  5. use std::future::Future;
  6. use std::path::{PathBuf};
  7. use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. use anyhow::anyhow;
  11. use futures::{StreamExt, TryStreamExt};
  12. use kuchiki::iter::NodeIterator;
  13. use serde_json::to_writer;
  14. use tokio::net::TcpStream;
  15. use tokio_rustls::client::TlsStream;
  16. use tokio_rustls::rustls::{ClientConfig, RootCertStore};
  17. use tokio_rustls::rustls::pki_types::ServerName;
  18. use crate::config::Config;
  19. use crate::{add_email};
  20. use tokio::task;
  21. use tokio::time::sleep;
  22. #[cfg(not(target_os = "wasi"))]
  23. use async_imap::{Client, Session};
  24. #[cfg(not(target_os = "wasi"))]
  25. use async_imap::extensions::idle::IdleResponse::NewData;
  26. #[cfg(target_os = "wasi")]
  27. use async_imap_wasi::{Client, Session};
  28. #[cfg(target_os = "wasi")]
  29. use async_imap_wasi::extensions::idle::IdleResponse::NewData;
  30. /// create TLS connect with the IMAP server
  31. pub async fn connect_to_imap() -> anyhow::Result<Client<TlsStream<TcpStream>>>{
  32. let mut root_cert_store = RootCertStore::empty();
  33. root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
  34. let config = ClientConfig::builder()
  35. .with_root_certificates(root_cert_store)
  36. .with_no_client_auth();
  37. let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
  38. let dnsname = ServerName::try_from(Config::global().imap_domain.clone()).unwrap();
  39. let stream = TcpStream::connect(format!("{}:{}", Config::global().imap_domain.clone(), Config::global().imap_port.clone())).await?;
  40. let stream = connector.connect(dnsname, stream).await?;
  41. #[cfg(not(target_os = "wasi"))]
  42. let client = async_imap::Client::new(stream);
  43. #[cfg(target_os = "wasi")]
  44. let client = async_imap_wasi::Client::new(stream);
  45. Ok(client)
  46. }
  47. /// download all emails from the server and persist them
  48. pub async fn download_email_from_imap() -> anyhow::Result<Vec<(u32, PathBuf)>>{
  49. let mut client = connect_to_imap().await?;
  50. let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
  51. let mut session = match session_result {
  52. Ok(session) => {session}
  53. Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
  54. };
  55. let mut stored_paths: Vec<(u32, PathBuf)> = vec![];
  56. match list_imap_folders(&mut session).await {
  57. Ok(folders) => {
  58. for folder in folders {
  59. println!("Start downloading {}", folder.clone());
  60. let new_paths = match fetch_and_store_emails(&mut session, folder.clone()).await {
  61. Ok(paths) => {
  62. println!("Emails fetched and stored successfully for {}.", folder);
  63. paths
  64. },
  65. Err(err) => {
  66. eprintln!("Error during email fetching and storing: {}", err);
  67. vec![]
  68. }
  69. };
  70. stored_paths.extend(new_paths);
  71. }
  72. }
  73. Err(_) => {return Err(anyhow!("Unable to retrieve folders from IMAP server"))}
  74. }
  75. // Logout from the IMAP session
  76. session.logout().await.expect("Unable to logout");
  77. Ok(stored_paths)
  78. }
  79. /// return Vec with all mailboxes
  80. pub async fn list_imap_folders(session: &mut Session<TlsStream<TcpStream>>) -> anyhow::Result<Vec<String>> {
  81. let mut folders: Vec<String> = vec![];
  82. let mut folders_stream = session.list(None, Some("*")).await?;
  83. while let Some(folder_result) = folders_stream.next().await {
  84. match folder_result {
  85. Ok(folder) => {
  86. folders.push(folder.name().to_string())
  87. }
  88. Err(_) => {}
  89. }
  90. }
  91. Ok(folders)
  92. }
  93. /// download all emails from one mailbox
  94. pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>, list: String) -> anyhow::Result<Vec<(u32, PathBuf)>> {
  95. session.select(list.clone()).await?;
  96. // Create directories for maildir
  97. std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("new"))
  98. .expect(&*("Unable to create 'new' directory in ".to_owned() + &*list.clone()));
  99. std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("cur"))
  100. .expect(&*("Unable to create 'cur' directory in ".to_owned() + &*list.clone()));
  101. std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("tmp"))
  102. .expect(&*("Unable to create 'tmp' directory in ".to_owned() + &*list.clone()));
  103. let uids_path = Config::global().maildir.clone().join(".uids.json");
  104. let uids_server = session.uid_search("ALL").await.unwrap();
  105. let uids_local = read_local_uids(uids_path.clone()).unwrap();
  106. let mut stored_paths: Vec<(u32, PathBuf)> = vec![];
  107. for uid in uids_server {
  108. if let Some(uids) = uids_local.get(&list) {
  109. if uids.contains(&uid.to_string()) {
  110. continue;
  111. }
  112. }
  113. let mut messages_stream = session.uid_fetch(uid.to_string(), "(UID BODY[])").await?;
  114. let mut message = None;
  115. while let Some(message_result) = messages_stream.next().await {
  116. match message_result {
  117. Ok(message_ok) => {
  118. message = Some(message_ok);
  119. }
  120. Err(_) => {
  121. return Err(anyhow!("Unable to retrieve message"));
  122. }
  123. }
  124. }
  125. if let Some(msg) = &message {
  126. if let Some(body) = msg.body() {
  127. let mail_file = store(Config::global().maildir.clone().join(list.clone()), uid.clone().to_string(), "new".to_string(), body, "");
  128. match mail_file {
  129. Ok(file) => stored_paths.push((uid.to_string().parse().unwrap(), file)),
  130. Err(e) => eprintln!("Failed to store email: {}", e),
  131. }
  132. } else {
  133. eprintln!("Failed to retrieve email body");
  134. }
  135. if let Some(uid) = msg.uid {
  136. update_local_uids(uid.to_string(), list.clone(), uids_path.clone())
  137. .expect("Cannot write uid to .uids.json");
  138. }
  139. println!("Downloaded message with UID {}", uid.to_string());
  140. } else {
  141. return Err(anyhow!("No message found with the given UID"));
  142. }
  143. }
  144. Ok(stored_paths)
  145. }
  146. /// read uids that have been already downloaded
  147. fn read_local_uids(uids_path: PathBuf) -> anyhow::Result<HashMap<String, HashSet<String>>> {
  148. let file = match File::open(&uids_path) {
  149. Ok(file) => file,
  150. Err(_) => return Ok(HashMap::new()), // Propagate other errors
  151. };
  152. let mut reader = BufReader::new(file);
  153. let mut content = String::new();
  154. reader.read_to_string(&mut content)?;
  155. if content.trim().is_empty() {
  156. // Return an empty HashMap if the file is empty
  157. return Ok(HashMap::new());
  158. }
  159. // Deserialize the JSON into a temporary structure
  160. let data: HashMap<String, Vec<String>> = match serde_json::from_str(&content) {
  161. Ok(data) => data,
  162. Err(e) => return Err(anyhow!("Error deserializing local uids")), // Handle JSON parsing errors
  163. };
  164. // Convert Vec<String> to HashSet<String>
  165. let mut result: HashMap<String, HashSet<String>> = HashMap::new();
  166. for (key, value) in data {
  167. result.insert(key, value.into_iter().collect());
  168. }
  169. Ok(result)
  170. }
  171. /// add uid to the local list of downloaded uids
  172. fn update_local_uids(uid: String, list: String, uids_path: PathBuf) -> anyhow::Result<()> {
  173. let file = std::fs::OpenOptions::new()
  174. .write(true)
  175. .create(true)
  176. .open(uids_path.clone())?;
  177. let mut data = read_local_uids(uids_path.clone())?;
  178. data.entry(list)
  179. .or_insert_with(HashSet::new)
  180. .insert(uid);
  181. let writer = BufWriter::new(file);
  182. // Convert HashSet<String> back to Vec<String> for serialization
  183. let mut temp_data: HashMap<String, Vec<String>> = HashMap::new();
  184. for (key, value) in data {
  185. temp_data.insert(key.clone(), value.iter().cloned().collect());
  186. }
  187. // Serialize the data back to JSON and write to file
  188. to_writer(writer, &temp_data)?;
  189. Ok(())
  190. }
  191. /// delete single message remotely by uid (marks deleted and deletes from the server at the same time)
  192. pub async fn delete_email_by_uid(list: String, uid: u32) -> anyhow::Result<()>{
  193. // TODO split to mark_deleted() and expunge()
  194. let mut client = connect_to_imap().await?;
  195. let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
  196. let mut session = match session_result {
  197. Ok(session) => {session}
  198. Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
  199. };
  200. session.select(list.clone()).await?;
  201. let updates_stream = session.store(format!("{}", uid), "+FLAGS (\\Deleted)").await?;
  202. let _updates: Vec<_> = updates_stream.try_collect().await?;
  203. let _stream = session.expunge().await?;
  204. drop(_stream);
  205. session.logout().await?;
  206. Ok(())
  207. }
  208. /// create a folder remotely
  209. pub async fn create_folder(name: String) -> anyhow::Result<()>{
  210. let mut client = connect_to_imap().await?;
  211. let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
  212. let mut session = match session_result {
  213. Ok(session) => {session}
  214. Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
  215. };
  216. session.create(name.clone()).await?;
  217. session.logout().await?;
  218. Ok(())
  219. }
  220. /// rename a folder remotely
  221. pub async fn rename_folder(name: String, new_name: String) -> anyhow::Result<()>{
  222. let mut client = connect_to_imap().await?;
  223. let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
  224. let mut session = match session_result {
  225. Ok(session) => {session}
  226. Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
  227. };
  228. session.rename(name.clone(), new_name.clone()).await?;
  229. session.logout().await?;
  230. Ok(())
  231. }
  232. /// delete a folder remotely
  233. pub async fn delete_folder(name: String) -> anyhow::Result<()>{
  234. let mut client = connect_to_imap().await?;
  235. let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
  236. let mut session = match session_result {
  237. Ok(session) => {session}
  238. Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
  239. };
  240. session.delete(name.clone()).await?;
  241. session.logout().await?;
  242. Ok(())
  243. }
  244. /// run a async task to monitor any changes in the mailbox
  245. pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> {
  246. task::spawn(async move {
  247. let mut client = match connect_to_imap().await {
  248. Ok(client) => client,
  249. Err(e) => {
  250. return Err::<(), anyhow::Error>(anyhow!("Failed to connect to IMAP"));
  251. }
  252. };
  253. let session_result = client
  254. .login(Config::global().username.clone(), Config::global().password.clone())
  255. .await;
  256. let mut session = match session_result {
  257. Ok(session) => session,
  258. Err(_) => return Err(anyhow!("Unable to login to IMAP server")),
  259. };
  260. session.select(mailbox.clone()).await?;
  261. loop {
  262. let mut idle = session.idle();
  263. idle.init().await.unwrap();
  264. let (idle_wait, interrupt) = idle.wait();
  265. let idle_handle = task::spawn(async move {
  266. println!("IDLE: waiting for 30s"); // TODO remove debug prints
  267. sleep(Duration::from_secs(30)).await;
  268. println!("IDLE: waited 30 secs, now interrupting idle");
  269. drop(interrupt);
  270. });
  271. match idle_wait.await.unwrap() {
  272. // TODO add more cases like delete, move...
  273. NewData(data) => {
  274. // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox
  275. let new_paths = download_email_from_imap().await.expect("Cannot download new emails");
  276. for (uid, path) in new_paths.clone() {
  277. match add_email(path.clone(), uid.clone()){
  278. Ok(_) => {}
  279. Err(_) => {println!("Error adding email from {:?}", path.clone())}
  280. };
  281. }
  282. }
  283. reason => {
  284. println!("IDLE failed {:?}", reason);
  285. }
  286. }
  287. // Ensure the idle handle is dropped before the next loop iteration
  288. idle_handle.await.unwrap();
  289. // Reassign session to prevent ownership issues in the next loop iteration
  290. session = idle.done().await.unwrap();
  291. }
  292. });
  293. Ok(())
  294. }
  295. /// saves a raw email properly
  296. fn store(path: PathBuf, uid: String, subfolder: String, data: &[u8], info: &str) -> io::Result<PathBuf> {
  297. // loop when conflicting filenames occur, as described at
  298. // http://www.courier-mta.org/maildir.html
  299. // this assumes that pid and hostname don't change.
  300. let mut tmppath = path.clone();
  301. tmppath.push("tmp");
  302. let mut file;
  303. loop {
  304. tmppath.push(format!("email_{uid}"));
  305. match std::fs::OpenOptions::new()
  306. .write(true)
  307. .create_new(true)
  308. .open(&tmppath)
  309. {
  310. Ok(f) => {
  311. file = f;
  312. break;
  313. }
  314. Err(err) => {
  315. if err.kind() != ErrorKind::AlreadyExists {
  316. return Err(err.into());
  317. }
  318. tmppath.pop();
  319. }
  320. }
  321. }
  322. /// At this point, `file` is our new file at `tmppath`.
  323. /// If we leave the scope of this function prior to
  324. /// successfully writing the file to its final location,
  325. /// we need to ensure that we remove the temporary file.
  326. /// This struct takes care of that detail.
  327. struct UnlinkOnError {
  328. path_to_unlink: Option<PathBuf>,
  329. }
  330. impl Drop for UnlinkOnError {
  331. fn drop(&mut self) {
  332. if let Some(path) = self.path_to_unlink.take() {
  333. // Best effort to remove it
  334. std::fs::remove_file(path).ok();
  335. }
  336. }
  337. }
  338. // Ensure that we remove the temporary file on failure
  339. let mut unlink_guard = UnlinkOnError {
  340. path_to_unlink: Some(tmppath.clone()),
  341. };
  342. file.write_all(data)?;
  343. file.sync_all()?;
  344. let mut newpath = path.clone();
  345. newpath.push(subfolder);
  346. let id = format!("email_{uid}");
  347. newpath.push(format!("{}{}", id, info));
  348. std::fs::rename(&tmppath, &newpath)?;
  349. unlink_guard.path_to_unlink.take();
  350. Ok(newpath)
  351. }